ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/syncmail/syncmail
Revision: 1.2
Committed: Sat Oct 27 03:40:29 2001 UTC (22 years, 6 months ago) by root
Branch: MAIN
Changes since 1.1: +277 -119 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #!/usr/bin/perl
2    
3     use Coro;
4     use Coro::Handle;
5     use Coro::Event;
6     use Coro::Semaphore;
7     use Coro::Channel;
8     use Coro::Signal;
9 root 1.2 use Coro::RWLock;
10 root 1.1
11 root 1.2 use Fcntl;
12 root 1.1 use MD5;
13    
14 root 1.2 use constant PROTVERSION => 1;
15    
16     $VERSION = 0.1;
17    
18     $TIMEOUT = 20;
19     $IDLE = $TIMEOUT < 30 ? $TIMEOUT / 2 : $TIMEOUT - 15;
20     $MAXTIMEDIFF = 10;
21    
22     # WARNING:
23     # Content-Length headers are deliberately being ignored. They
24     # are broken by design and will never be supported
25 root 1.1
26     # TODO: real message-id parsing
27    
28     $|=1;
29    
30     $v = 9;
31    
32     $SLAVE = 1*($ARGV[0] eq "--slave");
33    
34 root 1.2 $MYNAME = $SLAVE ? "slave" : "master";
35     $PREFIX = $SLAVE ? "./dst" : "./src";
36    
37     @OTHERNAMES = qw(third);
38    
39 root 1.1 my $ecnt;
40    
41     sub slog {
42     if ($_[0] <= $v) {
43 root 1.2 print STDERR "[$SLAVE] ", $_[1];
44 root 1.1 }
45     }
46    
47 root 1.2 my $lockdisk = new Coro::Semaphore;
48    
49     # give up a/the timeslice
50     sub give {
51     Coro::Event::do_timer(after => 0);
52     }
53    
54     package folder;
55    
56     BEGIN { *slog = \&::slog };
57    
58     use constant MDIFVERSION => 1;
59    
60     sub new {
61     my $class = shift;
62     my %arg = @_;
63     bless {
64     path => "$::PREFIX/$arg{name}",
65     %arg,
66     }, $class;
67     }
68    
69     sub dirty {
70     $_[0]{dirty} = 1;
71     }
72    
73     sub DESTROY {
74     $_[0]->write_mdif;
75     }
76    
77     # parse_mbox(mbox-file-path, callback)
78 root 1.1 # callback gets called with \$header and \$body,
79     # $header includes the mbox From_ line without
80     # the leading From_ itself.
81 root 1.2 sub parse_mbox {
82 root 1.1 my ($path, $cb) = @_;
83    
84     open my $fh, "<", $path
85     or die "$path: $!";
86    
87     local $/ = "\n\n";
88    
89     my ($head, $body, $offs);
90    
91     5 == read $fh, $head, 5
92     or return;
93    
94     $head eq "From "
95     or return;
96    
97     $offs = 0;
98     while (defined ($head = <$fh>)) {
99     $head =~ /^.*? [A-Z][a-z][a-z] [A-Z][a-z][a-z] [ 0-9][0-9] \d\d:\d\d:\d\d(?: [+-]\d\d\d\d)? \d\d(?:\d\d)\n/
100     or die "$path: not standard mbox format header:\n$head\n";
101    
102     local $/ = "\nFrom ";
103     # NEVER enable this. content-length simply is broken by design
104     #if ($head =~ /^Content-Length:\s+(\d+)$/im) {
105     # $1 <= read $fh, $body, $1 + 5
106     # or die "$path: partial message in mbox";
107     #} else {
108     $body = <$fh>;
109     #}
110     chomp $body;
111     $cb->($offs, \$head, \$body);
112     $offs = (tell $fh) - 5;
113 root 1.2 ::give unless ++$ecnt & 1023;
114 root 1.1 }
115    
116     1;
117     }
118    
119 root 1.2 sub conf_path {
120     (my $conf = $_[0]{path}) =~ s%([^/]+$)%.$1.mdif%;
121     $conf;
122     }
123    
124 root 1.1 sub read_mdif {
125 root 1.2 my $self = shift;
126     my $path = $self->conf_path;
127    
128     return if $self->{idx};
129    
130 root 1.1 open my $fh, "<", $path
131 root 1.2 or return;
132 root 1.1
133     defined ($_ = <$fh>)
134     or die "$path: empty mdif file\n";
135    
136     do {
137     if ($_ eq "[SYNCMAIL]\n") {
138     while (<$fh>) {
139     last unless /^([a-z]+)\s*=\s*(.*)\n$/;
140 root 1.2 $self->{$1} = $2;
141 root 1.1 }
142     } elsif ($_ eq "[HOSTS]\n") {
143     while (<$fh>) {
144     last unless /^([^[].*)=(.*)\n$/;
145 root 1.2 $self->{host}{$1} = $2;
146 root 1.1 }
147 root 1.2 } elsif (/^\[DIFF (\d+)\]\n$/) {
148     my $mtime = $1;
149 root 1.1 my @dif;
150     while (<$fh>) {
151     last unless /^[+-]/;
152     push @dif, substr $_, 0, -1;
153     }
154 root 1.2 unshift @{$self->{diff}}, [$mtime, \@dif];
155 root 1.1 } elsif ($_ eq "[INDEX]\n") {
156     my @idx;
157     while (<$fh>) {
158     last unless /^(\d+)=(.*)\n$/;
159     push @idx, [$1, $2];
160     }
161 root 1.2 $self->{idx} = \@idx;
162 root 1.1 } elsif (/^#/) {
163     $_ = <$fh>;
164     # nop
165     } else {
166     die "$path: unparseable section '$_'\n";
167     }
168     } while defined $_;
169    
170 root 1.2 $self->{version} <= MDIFVERSION
171     or die "$path: version mismatch ($self->{version} found, <".MDIFVERSION." expected)\n";
172 root 1.1 }
173    
174     sub write_mdif {
175 root 1.2 my $self = shift;
176     my $path = $self->conf_path;
177    
178     return unless $self->{dirty};
179 root 1.1
180     open my $fh, ">", "$path~"
181     or die "$path~: $!";
182    
183     print $fh "# automatically generated, do NOT edit\n";
184    
185     print $fh "[SYNCMAIL]\n";
186 root 1.2 print $fh "$_=$self->{$_}\n" for (qw(fsize mtime version));
187 root 1.1
188     print $fh "[HOSTS]\n";
189 root 1.2 while (my ($k,$v) = each %{$self->{host}}) {
190     print $fh "$k=$v\n";
191     }
192 root 1.1
193     print $fh "[INDEX]\n";
194 root 1.2 print $fh "$_->[0]=$_->[1]\n" for @{$self->{idx}};
195 root 1.1
196 root 1.2 for (reverse @{$self->{diff}}) {
197     print $fh "[DIFF $_->[0]]\n";
198     print $fh $_, "\n" for @{$_->[1]};
199 root 1.1 }
200    
201     close $fh
202     or die "$path~: unable to create updated .mdif: $!";
203    
204     rename "$path~", $path;
205 root 1.2
206     delete $self->{dirty};
207 root 1.1 }
208    
209     sub gendiff {
210     my ($d1, $d2) = @_;
211    
212     my @d;
213     my (%d1, %d2);
214    
215     for (@$d2) {
216     undef $d2{$_->[1]};
217     }
218    
219     # delete msgs in d1 but not in d2
220     for (@$d1) {
221     undef $d1{$_->[1]};
222     push @d, "-$_->[1]" unless exists $d2{$_->[1]};
223     }
224     %d2 = (); # conserve memory
225    
226     # add msgs in d2 but not in d1
227     for (@$d2) {
228     push @d, "+$_->[1]" unless exists $d1{$_->[1]};
229     }
230    
231     \@d;
232     }
233    
234 root 1.2 sub check {
235     my $self = shift;
236     my $path = $self->{path};
237     my $conf = $self->conf_path;
238     my $guard = $lockdisk->guard;
239 root 1.1
240 root 1.2 slog 3, "checking $path\n";
241 root 1.1
242     if (stat $path) {
243     my ($fsize, $mtime) = (stat _)[7, 9];
244 root 1.2
245 root 1.1 if (open my $fh, "<", $conf) {
246     my %conf;
247     <$fh>; # skip initial comment
248     <$fh> eq "[SYNCMAIL]\n"
249     or die "$conf: format error";
250     while (<$fh> =~ /^([a-z]+)\s*=\s*(.*)$/) {
251     $conf{$1} = $2;
252     }
253 root 1.2 return 1 if $fsize == $conf{fsize}
254     && $mtime == $conf{mtime};
255    
256     $conf{mtime} <= $mtime
257     or die "$path: folder older than mdif";
258 root 1.1 }
259    
260     slog 2, "updating $path\n";
261    
262     my @idx;
263    
264 root 1.2 parse_mbox $path, sub {
265 root 1.1 my ($offs, $head, $body) = @_;
266     my $mid;
267     if ($$head =~ /^Message-Id:\s*(<[^<\n]+>)\s*\n/im) {
268     $mid = $1;
269     } else {
270     $mid = MD5->hexhash("$$head\0$$body");
271     }
272     push @idx, [$offs, $mid];
273     } or return ();
274    
275 root 1.2 $self->read_mdif;
276 root 1.1
277 root 1.2 if ($self->{version}) {
278     my $d = gendiff $self->{idx}, \@idx;
279     push @{$self->{diff}}, [
280     $self->{mtime},
281 root 1.1 $d,
282     ] if @$d;
283     } else {
284 root 1.2 slog 2, "$path: previously unknown folder\n";
285     $self->{version} ||= MDIFVERSION;
286 root 1.1 }
287    
288 root 1.2 $self->{fsize} = $fsize;
289     $self->{mtime} = $mtime;
290     $self->{idx} = \@idx;
291 root 1.1
292 root 1.2 $self->dirty;
293 root 1.1
294 root 1.2 return 2;
295 root 1.1 } else {
296     slog 2, "$path: no longer exists\n";
297     unlink $conf;
298    
299     return ();
300     }
301     }
302    
303 root 1.2 package main;
304    
305     my $quit = new Coro::RWLock;
306    
307     sub rwlock::DESTROY {
308     $quit->unlock;
309     }
310    
311     sub quit_guard {
312     $quit->rdlock;
313     bless [], rwlock;
314     }
315    
316     sub find_folders {
317     my @folders;
318    
319     opendir my $dh, $PREFIX
320     or die "$PREFIX: $!";
321    
322     while (defined (my $folder = readdir $dh)) {
323     next if $folder =~ /^\./;
324     next unless -f "$PREFIX/$folder";
325     push @folders, $folder;
326     }
327    
328     @folders;
329     }
330    
331 root 1.1 my $send = new Coro::Channel 10;
332     my $done = 0;
333    
334     # request $command, $data
335     sub request {
336     my $res;
337     my $signal = new Coro::Signal;
338 root 1.2 my $cmd = defined $_[1] ? "000+".length($_[1])." $_[0]\n$_[1]"
339     : "000 $_[0]\n";
340 root 1.1 $send->put([$signal, \$res, $cmd]);
341     $signal->wait;
342     $res;
343     }
344    
345     # reply $id, $code, $msg, $data
346     sub reply {
347 root 1.2 my $cmd = defined $_[3] ? "$_[1]+".length($_[3])." $_[2]\n$_[3]"
348     : "$_[1] $_[2]\n";
349 root 1.1 $send->put([undef, undef, "$_[0] $cmd"]);
350     }
351    
352     sub handle_commands {
353     my ($fh) = @_;
354 root 1.2
355     # periodically send nop as keepalive signal to avoid the
356     # rsync-on-slow-disk-timeout problem.
357     my $nopper = Event->timer(parked => 1, cb => sub { request "nop" });
358    
359     my @folder;
360    
361 root 1.1 async {
362 root 1.2 my $id = "a";
363     $fh->print("\@SYNCMAIL VERSION $VERSION PROTOCOL ".PROTVERSION."\n");
364 root 1.1 while (my $r = $send->get) {
365     if (defined $r->[1]) {
366     my $id = ++$id;
367     $request{$id} = $r;
368 root 1.2 print STDERR "<<< $SLAVE sending request $id:$r->[2]";#d#
369 root 1.1 $fh->print("$id $r->[2]");
370     } else {
371 root 1.2 print STDERR "<<< $SLAVE sending reply $r->[2]";#d#
372 root 1.1 $fh->print($r->[2]);
373     }
374 root 1.2 $nopper->at(time+$::IDLE); $nopper->start;
375 root 1.1 }
376 root 1.2 $fh->print("- 000 quit\n");
377 root 1.1 };
378 root 1.2
379     async {
380     eval {
381     $fh->timeout($::TIMEOUT);
382    
383     $_ = <$fh>;
384     $_ eq "\@SYNCMAIL VERSION $VERSION PROTOCOL ".PROTVERSION."\n"
385     or die "garbled input or version mismatch: $_";
386    
387     while (<$fh>) {
388     slog 0, ">>> $SLAVE received :$_";
389     /^(\S+) (\d\d\d)(?:\+(\d+))?\s*(.*)$/
390     or die "protocol error, garbled command ($_)";
391    
392     my ($id, $code, $dlen, $msg) = ($1, $2, $3, $4);
393     my $data;
394    
395     $fh->sysread($data, $dlen) == $dlen
396     or die "unexpected read error: $!";
397    
398     if ($code == 0) {
399     if ($msg eq "quit") {
400     $send->put(undef);
401     last;
402     } elsif ($msg eq "nop") {
403     reply $id, 200, "nop";
404     } elsif ($msg eq "myname") {
405     $OTHERNAME = $data;
406     slog 3, "otherid set to $OTHERNAME\n";
407     reply $id, 200, "myname", $MYNAME;
408     } elsif ($msg eq "timestamp") {
409     reply $id, 200, time;
410     } elsif ($msg eq "folders") {
411     reply $id, 200, "ok", join "\0", find_folders;
412     } elsif ($msg eq "open") {
413     async {
414     my $folder = new folder name => $data;
415     $folder->check;
416     $folder->read_mdif;
417     push @folder, $folder;
418     reply $id, 200, "$#folder $folder->{mtime}";
419     };
420     } elsif ($msg =~ /^update (\d+) (\d+)$/) {
421     if ($folder[$1]->{host}{$OTHERNAME} != $2) {
422     $folder[$1]->{host}{$OTHERNAME} = $2;
423     $folder[$1]->dirty;
424     }
425     reply $id, 200, "ok";
426     } elsif ($msg =~ /^close (\d+)$/) {
427     undef $folder[$1];
428     reply $id, 200, "ok";
429     } else {
430     chomp;
431     die "protocol error, unknown command ($_)\n";
432     }
433     } else {
434     my $r = delete $request{$id}
435     or die "protocol error, invalid reply id ($_)\n";
436    
437     ${$r->[1]} = [$code, $msg, $data];
438     $r->[0]->send;
439     }
440 root 1.1 }
441 root 1.2 };
442    
443     slog 0, "ERROR: $@" if $@;
444     print STDERR "unlooping\n";#d#
445     unloop;
446     };
447    
448     loop;
449     print STDERR "$SLAVE hiho\n";#d#
450     }
451    
452     my $sync_folder = new Coro::Semaphore 3; # max 3 folders in parallel
453    
454     sub sync_folder {
455     my $name = $_[0];
456    
457     my $quit_guard = quit_guard;
458     async {
459     my $guard = $sync_folder->guard;
460    
461     ::give;
462    
463     my $folder = new folder name => $name;
464     my ($rfid, $rmtime) = split /\s+/, (request open => $name)->[1];
465    
466     $folder->check;
467     $folder->read_mdif;
468     my $mtime = $folder->{host}{$OTHERNAME};
469 root 1.1
470 root 1.2 $folder->{host}{$OTHERNAME} = $folder->{mtime};
471 root 1.1
472 root 1.2 request "update $rfid $folder->{mtime}";
473     request "close $rfid";
474     undef $quit_guard;
475 root 1.1 }
476     }
477    
478 root 1.2 sub main {
479     # time checking done symmetrically
480     {
481     my $time = time;
482     my $othertime = (request "timestamp")->[1];
483     abs (($time + time)*0.5 - $othertime) <= $::MAXTIMEDIFF
484     or die "ERROR: time difference between hosts larger than $::MAXTIMEDIFF";
485     }
486 root 1.1
487 root 1.2 $OTHERNAME = (request "myname", $MYNAME)->[2];
488     if ($SLAVE) {
489     #
490     } else {
491     for (split /\0/, (request "folders")->[2]) {
492     if (!-e "$PREFIX/$_") {
493     slog 2, "created new empty folder $_\n";
494     sysopen my $fh, "$PREFIX/$_", O_RDWR|O_CREAT, 0666;
495     }
496     }
497 root 1.1
498 root 1.2 for my $folder (find_folders) {
499     sync_folder $folder;
500 root 1.1 }
501 root 1.2
502     print "writelock\n";#d#
503     $quit->wrlock;
504     $send->put(undef);
505 root 1.1 }
506     }
507    
508     if ($SLAVE) {
509     $HOSTID = "slave";
510 root 1.2 async \&main;
511 root 1.1 handle_commands unblock \*STDIN;
512     } else {
513     $HOSTID = "master";
514     {
515     use Socket;
516     socketpair S1, S2, AF_UNIX, SOCK_STREAM, PF_UNSPEC;
517     if (fork == 0) {
518 root 1.2 close S1;
519 root 1.1 open STDIN, "<&S2" or die;
520     open STDOUT, ">&S2" or die;
521     exec $0, "--slave";
522     exit 255;
523     }
524 root 1.2 close S2;
525     async \&main;
526 root 1.1 handle_commands unblock \*S1;
527     }
528     }
529