ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/syncmail/syncmail
Revision: 1.2
Committed: Sat Oct 27 03:40:29 2001 UTC (22 years, 7 months ago) by root
Branch: MAIN
Changes since 1.1: +277 -119 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 #!/usr/bin/perl
2
3 use Coro;
4 use Coro::Handle;
5 use Coro::Event;
6 use Coro::Semaphore;
7 use Coro::Channel;
8 use Coro::Signal;
9 use Coro::RWLock;
10
11 use Fcntl;
12 use MD5;
13
14 use constant PROTVERSION => 1;
15
16 $VERSION = 0.1;
17
18 $TIMEOUT = 20;
19 $IDLE = $TIMEOUT < 30 ? $TIMEOUT / 2 : $TIMEOUT - 15;
20 $MAXTIMEDIFF = 10;
21
22 # WARNING:
23 # Content-Length headers are deliberately being ignored. They
24 # are broken by design and will never be supported
25
26 # TODO: real message-id parsing
27
28 $|=1;
29
30 $v = 9;
31
32 $SLAVE = 1*($ARGV[0] eq "--slave");
33
34 $MYNAME = $SLAVE ? "slave" : "master";
35 $PREFIX = $SLAVE ? "./dst" : "./src";
36
37 @OTHERNAMES = qw(third);
38
39 my $ecnt;
40
41 sub slog {
42 if ($_[0] <= $v) {
43 print STDERR "[$SLAVE] ", $_[1];
44 }
45 }
46
47 my $lockdisk = new Coro::Semaphore;
48
49 # give up a/the timeslice
50 sub give {
51 Coro::Event::do_timer(after => 0);
52 }
53
54 package folder;
55
56 BEGIN { *slog = \&::slog };
57
58 use constant MDIFVERSION => 1;
59
60 sub new {
61 my $class = shift;
62 my %arg = @_;
63 bless {
64 path => "$::PREFIX/$arg{name}",
65 %arg,
66 }, $class;
67 }
68
69 sub dirty {
70 $_[0]{dirty} = 1;
71 }
72
73 sub DESTROY {
74 $_[0]->write_mdif;
75 }
76
77 # parse_mbox(mbox-file-path, callback)
78 # callback gets called with \$header and \$body,
79 # $header includes the mbox From_ line without
80 # the leading From_ itself.
81 sub parse_mbox {
82 my ($path, $cb) = @_;
83
84 open my $fh, "<", $path
85 or die "$path: $!";
86
87 local $/ = "\n\n";
88
89 my ($head, $body, $offs);
90
91 5 == read $fh, $head, 5
92 or return;
93
94 $head eq "From "
95 or return;
96
97 $offs = 0;
98 while (defined ($head = <$fh>)) {
99 $head =~ /^.*? [A-Z][a-z][a-z] [A-Z][a-z][a-z] [ 0-9][0-9] \d\d:\d\d:\d\d(?: [+-]\d\d\d\d)? \d\d(?:\d\d)\n/
100 or die "$path: not standard mbox format header:\n$head\n";
101
102 local $/ = "\nFrom ";
103 # NEVER enable this. content-length simply is broken by design
104 #if ($head =~ /^Content-Length:\s+(\d+)$/im) {
105 # $1 <= read $fh, $body, $1 + 5
106 # or die "$path: partial message in mbox";
107 #} else {
108 $body = <$fh>;
109 #}
110 chomp $body;
111 $cb->($offs, \$head, \$body);
112 $offs = (tell $fh) - 5;
113 ::give unless ++$ecnt & 1023;
114 }
115
116 1;
117 }
118
119 sub conf_path {
120 (my $conf = $_[0]{path}) =~ s%([^/]+$)%.$1.mdif%;
121 $conf;
122 }
123
124 sub read_mdif {
125 my $self = shift;
126 my $path = $self->conf_path;
127
128 return if $self->{idx};
129
130 open my $fh, "<", $path
131 or return;
132
133 defined ($_ = <$fh>)
134 or die "$path: empty mdif file\n";
135
136 do {
137 if ($_ eq "[SYNCMAIL]\n") {
138 while (<$fh>) {
139 last unless /^([a-z]+)\s*=\s*(.*)\n$/;
140 $self->{$1} = $2;
141 }
142 } elsif ($_ eq "[HOSTS]\n") {
143 while (<$fh>) {
144 last unless /^([^[].*)=(.*)\n$/;
145 $self->{host}{$1} = $2;
146 }
147 } elsif (/^\[DIFF (\d+)\]\n$/) {
148 my $mtime = $1;
149 my @dif;
150 while (<$fh>) {
151 last unless /^[+-]/;
152 push @dif, substr $_, 0, -1;
153 }
154 unshift @{$self->{diff}}, [$mtime, \@dif];
155 } elsif ($_ eq "[INDEX]\n") {
156 my @idx;
157 while (<$fh>) {
158 last unless /^(\d+)=(.*)\n$/;
159 push @idx, [$1, $2];
160 }
161 $self->{idx} = \@idx;
162 } elsif (/^#/) {
163 $_ = <$fh>;
164 # nop
165 } else {
166 die "$path: unparseable section '$_'\n";
167 }
168 } while defined $_;
169
170 $self->{version} <= MDIFVERSION
171 or die "$path: version mismatch ($self->{version} found, <".MDIFVERSION." expected)\n";
172 }
173
174 sub write_mdif {
175 my $self = shift;
176 my $path = $self->conf_path;
177
178 return unless $self->{dirty};
179
180 open my $fh, ">", "$path~"
181 or die "$path~: $!";
182
183 print $fh "# automatically generated, do NOT edit\n";
184
185 print $fh "[SYNCMAIL]\n";
186 print $fh "$_=$self->{$_}\n" for (qw(fsize mtime version));
187
188 print $fh "[HOSTS]\n";
189 while (my ($k,$v) = each %{$self->{host}}) {
190 print $fh "$k=$v\n";
191 }
192
193 print $fh "[INDEX]\n";
194 print $fh "$_->[0]=$_->[1]\n" for @{$self->{idx}};
195
196 for (reverse @{$self->{diff}}) {
197 print $fh "[DIFF $_->[0]]\n";
198 print $fh $_, "\n" for @{$_->[1]};
199 }
200
201 close $fh
202 or die "$path~: unable to create updated .mdif: $!";
203
204 rename "$path~", $path;
205
206 delete $self->{dirty};
207 }
208
209 sub gendiff {
210 my ($d1, $d2) = @_;
211
212 my @d;
213 my (%d1, %d2);
214
215 for (@$d2) {
216 undef $d2{$_->[1]};
217 }
218
219 # delete msgs in d1 but not in d2
220 for (@$d1) {
221 undef $d1{$_->[1]};
222 push @d, "-$_->[1]" unless exists $d2{$_->[1]};
223 }
224 %d2 = (); # conserve memory
225
226 # add msgs in d2 but not in d1
227 for (@$d2) {
228 push @d, "+$_->[1]" unless exists $d1{$_->[1]};
229 }
230
231 \@d;
232 }
233
234 sub check {
235 my $self = shift;
236 my $path = $self->{path};
237 my $conf = $self->conf_path;
238 my $guard = $lockdisk->guard;
239
240 slog 3, "checking $path\n";
241
242 if (stat $path) {
243 my ($fsize, $mtime) = (stat _)[7, 9];
244
245 if (open my $fh, "<", $conf) {
246 my %conf;
247 <$fh>; # skip initial comment
248 <$fh> eq "[SYNCMAIL]\n"
249 or die "$conf: format error";
250 while (<$fh> =~ /^([a-z]+)\s*=\s*(.*)$/) {
251 $conf{$1} = $2;
252 }
253 return 1 if $fsize == $conf{fsize}
254 && $mtime == $conf{mtime};
255
256 $conf{mtime} <= $mtime
257 or die "$path: folder older than mdif";
258 }
259
260 slog 2, "updating $path\n";
261
262 my @idx;
263
264 parse_mbox $path, sub {
265 my ($offs, $head, $body) = @_;
266 my $mid;
267 if ($$head =~ /^Message-Id:\s*(<[^<\n]+>)\s*\n/im) {
268 $mid = $1;
269 } else {
270 $mid = MD5->hexhash("$$head\0$$body");
271 }
272 push @idx, [$offs, $mid];
273 } or return ();
274
275 $self->read_mdif;
276
277 if ($self->{version}) {
278 my $d = gendiff $self->{idx}, \@idx;
279 push @{$self->{diff}}, [
280 $self->{mtime},
281 $d,
282 ] if @$d;
283 } else {
284 slog 2, "$path: previously unknown folder\n";
285 $self->{version} ||= MDIFVERSION;
286 }
287
288 $self->{fsize} = $fsize;
289 $self->{mtime} = $mtime;
290 $self->{idx} = \@idx;
291
292 $self->dirty;
293
294 return 2;
295 } else {
296 slog 2, "$path: no longer exists\n";
297 unlink $conf;
298
299 return ();
300 }
301 }
302
303 package main;
304
305 my $quit = new Coro::RWLock;
306
307 sub rwlock::DESTROY {
308 $quit->unlock;
309 }
310
311 sub quit_guard {
312 $quit->rdlock;
313 bless [], rwlock;
314 }
315
316 sub find_folders {
317 my @folders;
318
319 opendir my $dh, $PREFIX
320 or die "$PREFIX: $!";
321
322 while (defined (my $folder = readdir $dh)) {
323 next if $folder =~ /^\./;
324 next unless -f "$PREFIX/$folder";
325 push @folders, $folder;
326 }
327
328 @folders;
329 }
330
331 my $send = new Coro::Channel 10;
332 my $done = 0;
333
334 # request $command, $data
335 sub request {
336 my $res;
337 my $signal = new Coro::Signal;
338 my $cmd = defined $_[1] ? "000+".length($_[1])." $_[0]\n$_[1]"
339 : "000 $_[0]\n";
340 $send->put([$signal, \$res, $cmd]);
341 $signal->wait;
342 $res;
343 }
344
345 # reply $id, $code, $msg, $data
346 sub reply {
347 my $cmd = defined $_[3] ? "$_[1]+".length($_[3])." $_[2]\n$_[3]"
348 : "$_[1] $_[2]\n";
349 $send->put([undef, undef, "$_[0] $cmd"]);
350 }
351
352 sub handle_commands {
353 my ($fh) = @_;
354
355 # periodically send nop as keepalive signal to avoid the
356 # rsync-on-slow-disk-timeout problem.
357 my $nopper = Event->timer(parked => 1, cb => sub { request "nop" });
358
359 my @folder;
360
361 async {
362 my $id = "a";
363 $fh->print("\@SYNCMAIL VERSION $VERSION PROTOCOL ".PROTVERSION."\n");
364 while (my $r = $send->get) {
365 if (defined $r->[1]) {
366 my $id = ++$id;
367 $request{$id} = $r;
368 print STDERR "<<< $SLAVE sending request $id:$r->[2]";#d#
369 $fh->print("$id $r->[2]");
370 } else {
371 print STDERR "<<< $SLAVE sending reply $r->[2]";#d#
372 $fh->print($r->[2]);
373 }
374 $nopper->at(time+$::IDLE); $nopper->start;
375 }
376 $fh->print("- 000 quit\n");
377 };
378
379 async {
380 eval {
381 $fh->timeout($::TIMEOUT);
382
383 $_ = <$fh>;
384 $_ eq "\@SYNCMAIL VERSION $VERSION PROTOCOL ".PROTVERSION."\n"
385 or die "garbled input or version mismatch: $_";
386
387 while (<$fh>) {
388 slog 0, ">>> $SLAVE received :$_";
389 /^(\S+) (\d\d\d)(?:\+(\d+))?\s*(.*)$/
390 or die "protocol error, garbled command ($_)";
391
392 my ($id, $code, $dlen, $msg) = ($1, $2, $3, $4);
393 my $data;
394
395 $fh->sysread($data, $dlen) == $dlen
396 or die "unexpected read error: $!";
397
398 if ($code == 0) {
399 if ($msg eq "quit") {
400 $send->put(undef);
401 last;
402 } elsif ($msg eq "nop") {
403 reply $id, 200, "nop";
404 } elsif ($msg eq "myname") {
405 $OTHERNAME = $data;
406 slog 3, "otherid set to $OTHERNAME\n";
407 reply $id, 200, "myname", $MYNAME;
408 } elsif ($msg eq "timestamp") {
409 reply $id, 200, time;
410 } elsif ($msg eq "folders") {
411 reply $id, 200, "ok", join "\0", find_folders;
412 } elsif ($msg eq "open") {
413 async {
414 my $folder = new folder name => $data;
415 $folder->check;
416 $folder->read_mdif;
417 push @folder, $folder;
418 reply $id, 200, "$#folder $folder->{mtime}";
419 };
420 } elsif ($msg =~ /^update (\d+) (\d+)$/) {
421 if ($folder[$1]->{host}{$OTHERNAME} != $2) {
422 $folder[$1]->{host}{$OTHERNAME} = $2;
423 $folder[$1]->dirty;
424 }
425 reply $id, 200, "ok";
426 } elsif ($msg =~ /^close (\d+)$/) {
427 undef $folder[$1];
428 reply $id, 200, "ok";
429 } else {
430 chomp;
431 die "protocol error, unknown command ($_)\n";
432 }
433 } else {
434 my $r = delete $request{$id}
435 or die "protocol error, invalid reply id ($_)\n";
436
437 ${$r->[1]} = [$code, $msg, $data];
438 $r->[0]->send;
439 }
440 }
441 };
442
443 slog 0, "ERROR: $@" if $@;
444 print STDERR "unlooping\n";#d#
445 unloop;
446 };
447
448 loop;
449 print STDERR "$SLAVE hiho\n";#d#
450 }
451
452 my $sync_folder = new Coro::Semaphore 3; # max 3 folders in parallel
453
454 sub sync_folder {
455 my $name = $_[0];
456
457 my $quit_guard = quit_guard;
458 async {
459 my $guard = $sync_folder->guard;
460
461 ::give;
462
463 my $folder = new folder name => $name;
464 my ($rfid, $rmtime) = split /\s+/, (request open => $name)->[1];
465
466 $folder->check;
467 $folder->read_mdif;
468 my $mtime = $folder->{host}{$OTHERNAME};
469
470 $folder->{host}{$OTHERNAME} = $folder->{mtime};
471
472 request "update $rfid $folder->{mtime}";
473 request "close $rfid";
474 undef $quit_guard;
475 }
476 }
477
478 sub main {
479 # time checking done symmetrically
480 {
481 my $time = time;
482 my $othertime = (request "timestamp")->[1];
483 abs (($time + time)*0.5 - $othertime) <= $::MAXTIMEDIFF
484 or die "ERROR: time difference between hosts larger than $::MAXTIMEDIFF";
485 }
486
487 $OTHERNAME = (request "myname", $MYNAME)->[2];
488 if ($SLAVE) {
489 #
490 } else {
491 for (split /\0/, (request "folders")->[2]) {
492 if (!-e "$PREFIX/$_") {
493 slog 2, "created new empty folder $_\n";
494 sysopen my $fh, "$PREFIX/$_", O_RDWR|O_CREAT, 0666;
495 }
496 }
497
498 for my $folder (find_folders) {
499 sync_folder $folder;
500 }
501
502 print "writelock\n";#d#
503 $quit->wrlock;
504 $send->put(undef);
505 }
506 }
507
508 if ($SLAVE) {
509 $HOSTID = "slave";
510 async \&main;
511 handle_commands unblock \*STDIN;
512 } else {
513 $HOSTID = "master";
514 {
515 use Socket;
516 socketpair S1, S2, AF_UNIX, SOCK_STREAM, PF_UNSPEC;
517 if (fork == 0) {
518 close S1;
519 open STDIN, "<&S2" or die;
520 open STDOUT, ">&S2" or die;
521 exec $0, "--slave";
522 exit 255;
523 }
524 close S2;
525 async \&main;
526 handle_commands unblock \*S1;
527 }
528 }
529