ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/syncmail/syncmail
(Generate patch)

Comparing syncmail/syncmail (file contents):
Revision 1.1 by root, Tue Oct 23 23:52:40 2001 UTC vs.
Revision 1.2 by root, Sat Oct 27 03:40:29 2001 UTC

4use Coro::Handle; 4use Coro::Handle;
5use Coro::Event; 5use Coro::Event;
6use Coro::Semaphore; 6use Coro::Semaphore;
7use Coro::Channel; 7use Coro::Channel;
8use Coro::Signal; 8use Coro::Signal;
9use Coro::RWLock;
9 10
11use Fcntl;
10use MD5; 12use MD5;
11 13
12use constant VERSION => 1; 14use constant PROTVERSION => 1;
13 15
14# TODO: content-length support (HACK!) 16$VERSION = 0.1;
17
18$TIMEOUT = 20;
19$IDLE = $TIMEOUT < 30 ? $TIMEOUT / 2 : $TIMEOUT - 15;
20$MAXTIMEDIFF = 10;
21
22# WARNING:
23# Content-Length headers are deliberately being ignored. They
24# are broken by design and will never be supported
25
15# TODO: real message-id parsing 26# TODO: real message-id parsing
16 27
17$|=1; 28$|=1;
18 29
19$v = 9; 30$v = 9;
20 31
21$SLAVE = 1*($ARGV[0] eq "--slave"); 32$SLAVE = 1*($ARGV[0] eq "--slave");
33
34$MYNAME = $SLAVE ? "slave" : "master";
35$PREFIX = $SLAVE ? "./dst" : "./src";
36
37@OTHERNAMES = qw(third);
22 38
23my $ecnt; 39my $ecnt;
24 40
25sub slog { 41sub slog {
26 if ($_[0] <= $v) { 42 if ($_[0] <= $v) {
27 print STDERR $_[1]; 43 print STDERR "[$SLAVE] ", $_[1];
28 } 44 }
29} 45}
30 46
47my $lockdisk = new Coro::Semaphore;
48
49# give up a/the timeslice
50sub give {
51 Coro::Event::do_timer(after => 0);
52}
53
54package folder;
55
56BEGIN { *slog = \&::slog };
57
58use constant MDIFVERSION => 1;
59
60sub new {
61 my $class = shift;
62 my %arg = @_;
63 bless {
64 path => "$::PREFIX/$arg{name}",
65 %arg,
66 }, $class;
67}
68
69sub dirty {
70 $_[0]{dirty} = 1;
71}
72
73sub DESTROY {
74 $_[0]->write_mdif;
75}
76
31# parse_folder(mbox-file-path, callback) 77# parse_mbox(mbox-file-path, callback)
32# callback gets called with \$header and \$body, 78# callback gets called with \$header and \$body,
33# $header includes the mbox From_ line without 79# $header includes the mbox From_ line without
34# the leading From_ itself. 80# the leading From_ itself.
35sub parse_folder { 81sub parse_mbox {
36 my ($path, $cb) = @_; 82 my ($path, $cb) = @_;
37 83
38 open my $fh, "<", $path 84 open my $fh, "<", $path
39 or die "$path: $!"; 85 or die "$path: $!";
40 86
62 $body = <$fh>; 108 $body = <$fh>;
63 #} 109 #}
64 chomp $body; 110 chomp $body;
65 $cb->($offs, \$head, \$body); 111 $cb->($offs, \$head, \$body);
66 $offs = (tell $fh) - 5; 112 $offs = (tell $fh) - 5;
67 cede unless ++$ecnt & 1023; 113 ::give unless ++$ecnt & 1023;
68 } 114 }
69 115
70 1; 116 1;
71} 117}
72 118
119sub conf_path {
120 (my $conf = $_[0]{path}) =~ s%([^/]+$)%.$1.mdif%;
121 $conf;
122}
123
73sub read_mdif { 124sub read_mdif {
74 my ($path) = @_; 125 my $self = shift;
75 my $fh; 126 my $path = $self->conf_path;
76 my $mdif; 127
128 return if $self->{idx};
129
77 open my $fh, "<", $path 130 open my $fh, "<", $path
78 or return { }; 131 or return;
79 132
80 defined ($_ = <$fh>) 133 defined ($_ = <$fh>)
81 or die "$path: empty mdif file\n"; 134 or die "$path: empty mdif file\n";
82 135
83 do { 136 do {
84 if ($_ eq "[SYNCMAIL]\n") { 137 if ($_ eq "[SYNCMAIL]\n") {
85 while (<$fh>) { 138 while (<$fh>) {
86 last unless /^([a-z]+)\s*=\s*(.*)\n$/; 139 last unless /^([a-z]+)\s*=\s*(.*)\n$/;
87 $mdif->{$1} = $2; 140 $self->{$1} = $2;
88 } 141 }
89 } elsif ($_ eq "[HOSTS]\n") { 142 } elsif ($_ eq "[HOSTS]\n") {
90 while (<$fh>) { 143 while (<$fh>) {
91 last unless /^([^[].*)=(.*)\n$/; 144 last unless /^([^[].*)=(.*)\n$/;
92 $mdif->{host}{$1} = $2; 145 $self->{host}{$1} = $2;
93 } 146 }
94 } elsif (/^\[DIFF(\d+)\.(\d+)\]\n$/) { 147 } elsif (/^\[DIFF (\d+)\]\n$/) {
95 my ($gen, $mtime) = ($1, $2); 148 my $mtime = $1;
96 my @dif; 149 my @dif;
97 while (<$fh>) { 150 while (<$fh>) {
98 last unless /^[+-]/; 151 last unless /^[+-]/;
99 push @dif, substr $_, 0, -1; 152 push @dif, substr $_, 0, -1;
100 } 153 }
101 unshift @{$mdif->{diff}}, [$gen, $mtime, \@dif]; 154 unshift @{$self->{diff}}, [$mtime, \@dif];
102 } elsif ($_ eq "[INDEX]\n") { 155 } elsif ($_ eq "[INDEX]\n") {
103 my @idx; 156 my @idx;
104 while (<$fh>) { 157 while (<$fh>) {
105 last unless /^(\d+)=(.*)\n$/; 158 last unless /^(\d+)=(.*)\n$/;
106 push @idx, [$1, $2]; 159 push @idx, [$1, $2];
107 } 160 }
108 $mdif->{idx} = \@idx; 161 $self->{idx} = \@idx;
109 } elsif (/^#/) { 162 } elsif (/^#/) {
110 $_ = <$fh>; 163 $_ = <$fh>;
111 # nop 164 # nop
112 } else { 165 } else {
113 die "$path: unparseable section '$_'\n"; 166 die "$path: unparseable section '$_'\n";
114 } 167 }
115 } while defined $_; 168 } while defined $_;
116 169
117 $mdif->{version} <= VERSION 170 $self->{version} <= MDIFVERSION
118 or die "$path: version mismatch ($mdif->{version} found, <".VERSION." expected)\n"; 171 or die "$path: version mismatch ($self->{version} found, <".MDIFVERSION." expected)\n";
119
120 $mdif;
121} 172}
122 173
123sub write_mdif { 174sub write_mdif {
124 my ($path, $mdif) = @_; 175 my $self = shift;
125 my $fh; 176 my $path = $self->conf_path;
177
178 return unless $self->{dirty};
126 179
127 open my $fh, ">", "$path~" 180 open my $fh, ">", "$path~"
128 or die "$path~: $!"; 181 or die "$path~: $!";
129 182
130 print $fh "# automatically generated, do NOT edit\n"; 183 print $fh "# automatically generated, do NOT edit\n";
131 184
132 print $fh "[SYNCMAIL]\n"; 185 print $fh "[SYNCMAIL]\n";
133 print $fh "$_=$mdif->{$_}\n" for (qw(fsize mtime gen version)); 186 print $fh "$_=$self->{$_}\n" for (qw(fsize mtime version));
134 187
135 print $fh "[HOSTS]\n"; 188 print $fh "[HOSTS]\n";
136 print $fh "$k=$v\n" while my ($k,$v) = each %{$mdif->{host}}; 189 while (my ($k,$v) = each %{$self->{host}}) {
190 print $fh "$k=$v\n";
191 }
137 192
138 print $fh "[INDEX]\n"; 193 print $fh "[INDEX]\n";
139 print $fh "$_->[0]=$_->[1]\n" for @{$mdif->{idx}}; 194 print $fh "$_->[0]=$_->[1]\n" for @{$self->{idx}};
140 195
141 for (reverse @{$mdif->{diff}}) { 196 for (reverse @{$self->{diff}}) {
142 print $fh "[DIFF$_->[0].$_->[1]]\n"; 197 print $fh "[DIFF $_->[0]]\n";
143 print $fh $_, "\n" for @{$_->[2]}; 198 print $fh $_, "\n" for @{$_->[1]};
144 } 199 }
145 200
146 close $fh 201 close $fh
147 or die "$path~: unable to create updated .mdif: $!"; 202 or die "$path~: unable to create updated .mdif: $!";
148 203
149 rename "$path~", $path; 204 rename "$path~", $path;
205
206 delete $self->{dirty};
150} 207}
151 208
152sub gendiff { 209sub gendiff {
153 my ($d1, $d2) = @_; 210 my ($d1, $d2) = @_;
154 211
172 } 229 }
173 230
174 \@d; 231 \@d;
175} 232}
176 233
177my $check_folder = new Coro::Semaphore;
178
179sub check_folder { 234sub check {
180 my ($path) = @_; 235 my $self = shift;
236 my $path = $self->{path};
237 my $conf = $self->conf_path;
181 my $guard = $check_folder->guard; 238 my $guard = $lockdisk->guard;
182 239
183 (my $conf = $path) =~ s%([^/]+$)%.$1.mdif%;
184
185 slog 1, "checking $path\n"; 240 slog 3, "checking $path\n";
186 241
187 if (stat $path) { 242 if (stat $path) {
188 my ($fsize, $mtime) = (stat _)[7, 9]; 243 my ($fsize, $mtime) = (stat _)[7, 9];
244
189 if (open my $fh, "<", $conf) { 245 if (open my $fh, "<", $conf) {
190 my %conf; 246 my %conf;
191 <$fh>; # skip initial comment 247 <$fh>; # skip initial comment
192 <$fh> eq "[SYNCMAIL]\n" 248 <$fh> eq "[SYNCMAIL]\n"
193 or die "$conf: format error"; 249 or die "$conf: format error";
194 while (<$fh> =~ /^([a-z]+)\s*=\s*(.*)$/) { 250 while (<$fh> =~ /^([a-z]+)\s*=\s*(.*)$/) {
195 $conf{$1} = $2; 251 $conf{$1} = $2;
196 } 252 }
197 return (1, \%conf) if $fsize == $conf{fsize} 253 return 1 if $fsize == $conf{fsize}
198 && $mtime == $conf{mtime}; 254 && $mtime == $conf{mtime};
255
256 $conf{mtime} <= $mtime
257 or die "$path: folder older than mdif";
199 } 258 }
200 259
201 slog 2, "updating $path\n"; 260 slog 2, "updating $path\n";
202 261
203 my @idx; 262 my @idx;
204 263
205 parse_folder $path, sub { 264 parse_mbox $path, sub {
206 my ($offs, $head, $body) = @_; 265 my ($offs, $head, $body) = @_;
207 my $mid; 266 my $mid;
208 if ($$head =~ /^Message-Id:\s*(<[^<\n]+>)\s*\n/im) { 267 if ($$head =~ /^Message-Id:\s*(<[^<\n]+>)\s*\n/im) {
209 $mid = $1; 268 $mid = $1;
210 } else { 269 } else {
211 $mid = MD5->hexhash("$$head\0$$body"); 270 $mid = MD5->hexhash("$$head\0$$body");
212 } 271 }
213 push @idx, [$offs, $mid]; 272 push @idx, [$offs, $mid];
214 } or return (); 273 } or return ();
215 274
216 my $mdif = read_mdif $conf; 275 $self->read_mdif;
217 276
218 if ($mdif->{version}) { 277 if ($self->{version}) {
219 my $d = gendiff $mdif->{idx}, \@idx; 278 my $d = gendiff $self->{idx}, \@idx;
220 push @{$mdif->{diff}}, [ 279 push @{$self->{diff}}, [
221 $mdif->{gen}++,
222 $mdif->{mtime}, 280 $self->{mtime},
223 $d, 281 $d,
224 ] if @$d; 282 ] if @$d;
225 } else { 283 } else {
226 slog 2, "$path: new folder\n"; 284 slog 2, "$path: previously unknown folder\n";
227 $mdif->{version} ||= VERSION; 285 $self->{version} ||= MDIFVERSION;
228 $mdif->{gen} = 1;
229 } 286 }
230 287
231 $mdif->{fsize} = $fsize; 288 $self->{fsize} = $fsize;
232 $mdif->{mtime} = $mtime; 289 $self->{mtime} = $mtime;
233 $mdif->{idx} = \@idx; 290 $self->{idx} = \@idx;
234 291
235 write_mdif $conf, $mdif; 292 $self->dirty;
236 293
237 return (2, $mdif); 294 return 2;
238 } else { 295 } else {
239 slog 2, "$path: no longer exists\n"; 296 slog 2, "$path: no longer exists\n";
240 unlink $conf; 297 unlink $conf;
241 298
242 return (); 299 return ();
243 } 300 }
301}
302
303package main;
304
305my $quit = new Coro::RWLock;
306
307sub rwlock::DESTROY {
308 $quit->unlock;
309}
310
311sub quit_guard {
312 $quit->rdlock;
313 bless [], rwlock;
314}
315
316sub find_folders {
317 my @folders;
318
319 opendir my $dh, $PREFIX
320 or die "$PREFIX: $!";
321
322 while (defined (my $folder = readdir $dh)) {
323 next if $folder =~ /^\./;
324 next unless -f "$PREFIX/$folder";
325 push @folders, $folder;
326 }
327
328 @folders;
244} 329}
245 330
246my $send = new Coro::Channel 10; 331my $send = new Coro::Channel 10;
247my $done = 0; 332my $done = 0;
248 333
249# request $command, $data 334# request $command, $data
250sub request { 335sub request {
251 my $res; 336 my $res;
252 my $signal = new Coro::Signal; 337 my $signal = new Coro::Signal;
253 my $cmd = defined $_[1] ? "000+".length($_[1])." $_[0]\n$_[1]" : "000 $_[0]\n"; 338 my $cmd = defined $_[1] ? "000+".length($_[1])." $_[0]\n$_[1]"
339 : "000 $_[0]\n";
254 $send->put([$signal, \$res, $cmd]); 340 $send->put([$signal, \$res, $cmd]);
255 $signal->wait; 341 $signal->wait;
256 $res; 342 $res;
257} 343}
258 344
259# reply $id, $code, $msg, $data 345# reply $id, $code, $msg, $data
260sub reply { 346sub reply {
261 my $cmd = defined $_[3] ? "$_[1]+".length($_[3])." $_[2]\n$_[3]" : "$_[1] $_[2]\n"; 347 my $cmd = defined $_[3] ? "$_[1]+".length($_[3])." $_[2]\n$_[3]"
348 : "$_[1] $_[2]\n";
262 $send->put([undef, undef, "$_[0] $cmd"]); 349 $send->put([undef, undef, "$_[0] $cmd"]);
263} 350}
264 351
265sub handle_commands { 352sub handle_commands {
266 my ($fh) = @_; 353 my ($fh) = @_;
267 my $id = "a"; 354
355 # periodically send nop as keepalive signal to avoid the
356 # rsync-on-slow-disk-timeout problem.
357 my $nopper = Event->timer(parked => 1, cb => sub { request "nop" });
358
359 my @folder;
360
268 async { 361 async {
269 $fh->print("- 000 hello $HOSTID\n"); 362 my $id = "a";
363 $fh->print("\@SYNCMAIL VERSION $VERSION PROTOCOL ".PROTVERSION."\n");
270 while (my $r = $send->get) { 364 while (my $r = $send->get) {
271 if (defined $r->[1]) { 365 if (defined $r->[1]) {
272 my $id = ++$id; 366 my $id = ++$id;
273 $request{$id} = $r; 367 $request{$id} = $r;
274 print STDERR "<<< $SLAVE sendign request $id:$r->[2]";#d# 368 print STDERR "<<< $SLAVE sending request $id:$r->[2]";#d#
275 $fh->print("$id $r->[2]"); 369 $fh->print("$id $r->[2]");
276 } else { 370 } else {
277 print STDERR "<<< $SLAVE sendign reply $r->[2]";#d# 371 print STDERR "<<< $SLAVE sending reply $r->[2]";#d#
278 $fh->print($r->[2]); 372 $fh->print($r->[2]);
279 } 373 }
374 $nopper->at(time+$::IDLE); $nopper->start;
280 } 375 }
281 print STDERR "$SLAVE shutdown\n";#d# 376 $fh->print("- 000 quit\n");
282 shutdown $fh, 1;
283 }; 377 };
378
379 async {
380 eval {
381 $fh->timeout($::TIMEOUT);
382
383 $_ = <$fh>;
384 $_ eq "\@SYNCMAIL VERSION $VERSION PROTOCOL ".PROTVERSION."\n"
385 or die "garbled input or version mismatch: $_";
386
284 while (<$fh>) { 387 while (<$fh>) {
285 slog 0, ">>> $SLAVE received :$_"; 388 slog 0, ">>> $SLAVE received :$_";
286 /^(\S+) (\d\d\d)(?:\+(\d+))?\s*(.*)$/ 389 /^(\S+) (\d\d\d)(?:\+(\d+))?\s*(.*)$/
287 or die "protocol error, garbled command ($_)"; 390 or die "protocol error, garbled command ($_)";
288 391
289 my ($id, $code, $dlen, $msg) = ($1, $2, $3, $4); 392 my ($id, $code, $dlen, $msg) = ($1, $2, $3, $4);
290 my $data; 393 my $data;
291 394
292 $fh->sysread($data, $dlen) == $dlen 395 $fh->sysread($data, $dlen) == $dlen
293 or die "unexpected read error: $!"; 396 or die "unexpected read error: $!";
294 397
295 if ($code == 0) { 398 if ($code == 0) {
296 if ($msg eq "quit") { 399 if ($msg eq "quit") {
297 print $fh "$id 200 quit\n";
298 $send->put(undef); 400 $send->put(undef);
299 last; 401 last;
300 } elsif ($msg eq "nop") { 402 } elsif ($msg eq "nop") {
301 reply $id, 200, "nop"; 403 reply $id, 200, "nop";
302 } elsif ($msg =~ /^hello (.*)$/) { 404 } elsif ($msg eq "myname") {
303 $OTHERID = $1; 405 $OTHERNAME = $data;
304 slog 3, "otherid set to $OTHERID\n"; 406 slog 3, "otherid set to $OTHERNAME\n";
407 reply $id, 200, "myname", $MYNAME;
408 } elsif ($msg eq "timestamp") {
409 reply $id, 200, time;
410 } elsif ($msg eq "folders") {
411 reply $id, 200, "ok", join "\0", find_folders;
412 } elsif ($msg eq "open") {
413 async {
414 my $folder = new folder name => $data;
415 $folder->check;
416 $folder->read_mdif;
417 push @folder, $folder;
418 reply $id, 200, "$#folder $folder->{mtime}";
419 };
420 } elsif ($msg =~ /^update (\d+) (\d+)$/) {
421 if ($folder[$1]->{host}{$OTHERNAME} != $2) {
422 $folder[$1]->{host}{$OTHERNAME} = $2;
423 $folder[$1]->dirty;
424 }
425 reply $id, 200, "ok";
426 } elsif ($msg =~ /^close (\d+)$/) {
427 undef $folder[$1];
428 reply $id, 200, "ok";
305 } else { 429 } else {
430 chomp;
306 die "protocol error, unknown command ($_)\n"; 431 die "protocol error, unknown command ($_)\n";
307 } 432 }
308 } else { 433 } else {
309 my $r = delete $request{$id} 434 my $r = delete $request{$id}
310 or die "protocol error, invalid reply id ($_)\n"; 435 or die "protocol error, invalid reply id ($_)\n";
311 436
312 ${$r->[1]} = [$code, $msg, $data]; 437 ${$r->[1]} = [$code, $msg, $data];
313 $r->[0]->send; 438 $r->[0]->send;
314 }
315
316 if ($done && !%request) {
317 $done = 0;
318 async {
319 request("quit");
320 exit 0; 439 }
321 }; 440 }
322 } 441 };
442
443 slog 0, "ERROR: $@" if $@;
444 print STDERR "unlooping\n";#d#
445 unloop;
323 } 446 };
324 447
325 exit 0; 448 loop;
449 print STDERR "$SLAVE hiho\n";#d#
326} 450}
327 451
452my $sync_folder = new Coro::Semaphore 3; # max 3 folders in parallel
453
328sub sync_dir { 454sub sync_folder {
329 my ($path) = @_; 455 my $name = $_[0];
330 456
331 opendir my $dh, $path 457 my $quit_guard = quit_guard;
332 or die "$path: $!"; 458 async {
459 my $guard = $sync_folder->guard;
333 460
334 while (defined (my $folder = readdir $dh)) { 461 ::give;
335 next if $folder =~ /^\./; 462
336 my $path = "$path/$folder"; 463 my $folder = new folder name => $name;
337 next unless -f $path; 464 my ($rfid, $rmtime) = split /\s+/, (request open => $name)->[1];
338 my ($status, $mdif) = check_folder $path; 465
339 print STDERR "$path: $status | $mdif\n";#d# 466 $folder->check;
467 $folder->read_mdif;
468 my $mtime = $folder->{host}{$OTHERNAME};
469
470 $folder->{host}{$OTHERNAME} = $folder->{mtime};
471
472 request "update $rfid $folder->{mtime}";
473 request "close $rfid";
474 undef $quit_guard;
475 }
476}
477
478sub main {
479 # time checking done symmetrically
480 {
481 my $time = time;
482 my $othertime = (request "timestamp")->[1];
483 abs (($time + time)*0.5 - $othertime) <= $::MAXTIMEDIFF
484 or die "ERROR: time difference between hosts larger than $::MAXTIMEDIFF";
485 }
486
487 $OTHERNAME = (request "myname", $MYNAME)->[2];
340 if ($SLAVE) { 488 if ($SLAVE) {
341 request "fhave", $folder; 489 #
342 } else { 490 } else {
343 request "fwant", $folder; 491 for (split /\0/, (request "folders")->[2]) {
492 if (!-e "$PREFIX/$_") {
493 slog 2, "created new empty folder $_\n";
494 sysopen my $fh, "$PREFIX/$_", O_RDWR|O_CREAT, 0666;
344 } 495 }
345 } 496 }
346 497
347 $done = 1; 498 for my $folder (find_folders) {
348 my $x = request("nop"); 499 sync_folder $folder;
349 print STDERR "$SLAVE returned nop @$x\n";#d# 500 }
501
502 print "writelock\n";#d#
503 $quit->wrlock;
504 $send->put(undef);
505 }
350} 506}
351 507
352if ($SLAVE) { 508if ($SLAVE) {
353 $HOSTID = "slave"; 509 $HOSTID = "slave";
354 async \&sync_dir, "./dst"; 510 async \&main;
355 handle_commands unblock \*STDIN; 511 handle_commands unblock \*STDIN;
356} else { 512} else {
357 $HOSTID = "master"; 513 $HOSTID = "master";
358 { 514 {
359 use Socket; 515 use Socket;
360 socketpair S1, S2, AF_UNIX, SOCK_STREAM, PF_UNSPEC; 516 socketpair S1, S2, AF_UNIX, SOCK_STREAM, PF_UNSPEC;
361 if (fork == 0) { 517 if (fork == 0) {
518 close S1;
362 open STDIN, "<&S2" or die; 519 open STDIN, "<&S2" or die;
363 open STDOUT, ">&S2" or die; 520 open STDOUT, ">&S2" or die;
364 exec $0, "--slave"; 521 exec $0, "--slave";
365 exit 255; 522 exit 255;
366 } 523 }
367 async \&sync_dir, "./src"; 524 close S2;
525 async \&main;
368 handle_commands unblock \*S1; 526 handle_commands unblock \*S1;
369 } 527 }
370} 528}
371 529

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines