ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/syncmail/syncmail
(Generate patch)

Comparing syncmail/syncmail (file contents):
Revision 1.2 by root, Sat Oct 27 03:40:29 2001 UTC vs.
Revision 1.3 by root, Sat Oct 27 23:53:49 2001 UTC

13 13
14use constant PROTVERSION => 1; 14use constant PROTVERSION => 1;
15 15
16$VERSION = 0.1; 16$VERSION = 0.1;
17 17
18$TIMEOUT = 20; 18require "config.pl";
19$IDLE = $TIMEOUT < 30 ? $TIMEOUT / 2 : $TIMEOUT - 15; 19
20$MAXTIMEDIFF = 10; 20use folder;
21use vc;
22
23$v = $VERBOSE;
21 24
22# WARNING: 25# WARNING:
23# Content-Length headers are deliberately being ignored. They 26# Content-Length headers are deliberately being ignored. They
24# are broken by design and will never be supported 27# are broken by design and will never be supported
25 28
26# TODO: real message-id parsing 29# TODO: real message-id parsing
27 30
28$|=1; 31$|=1;
29 32
30$v = 9;
31
32$SLAVE = 1*($ARGV[0] eq "--slave");
33
34$MYNAME = $SLAVE ? "slave" : "master";
35$PREFIX = $SLAVE ? "./dst" : "./src";
36
37@OTHERNAMES = qw(third);
38
39my $ecnt; 33my $ecnt;
40 34
41sub slog { 35sub slog {
42 if ($_[0] <= $v) { 36 if ($_[0] <= $v) {
43 print STDERR "[$SLAVE] ", $_[1]; 37 print STDERR "[$SLAVE] ", $_[1];
44 } 38 }
45} 39}
46 40
47my $lockdisk = new Coro::Semaphore; 41$lockdisk = new Coro::Semaphore;
48 42
49# give up a/the timeslice 43# give up a/the timeslice
50sub give { 44sub give {
51 Coro::Event::do_timer(after => 0); 45 Coro::Event::do_timer(after => 0);
52} 46}
53
54package folder;
55
56BEGIN { *slog = \&::slog };
57
58use constant MDIFVERSION => 1;
59
60sub new {
61 my $class = shift;
62 my %arg = @_;
63 bless {
64 path => "$::PREFIX/$arg{name}",
65 %arg,
66 }, $class;
67}
68
69sub dirty {
70 $_[0]{dirty} = 1;
71}
72
73sub DESTROY {
74 $_[0]->write_mdif;
75}
76
77# parse_mbox(mbox-file-path, callback)
78# callback gets called with \$header and \$body,
79# $header includes the mbox From_ line without
80# the leading From_ itself.
81sub parse_mbox {
82 my ($path, $cb) = @_;
83
84 open my $fh, "<", $path
85 or die "$path: $!";
86
87 local $/ = "\n\n";
88
89 my ($head, $body, $offs);
90
91 5 == read $fh, $head, 5
92 or return;
93
94 $head eq "From "
95 or return;
96
97 $offs = 0;
98 while (defined ($head = <$fh>)) {
99 $head =~ /^.*? [A-Z][a-z][a-z] [A-Z][a-z][a-z] [ 0-9][0-9] \d\d:\d\d:\d\d(?: [+-]\d\d\d\d)? \d\d(?:\d\d)\n/
100 or die "$path: not standard mbox format header:\n$head\n";
101
102 local $/ = "\nFrom ";
103 # NEVER enable this. content-length simply is broken by design
104 #if ($head =~ /^Content-Length:\s+(\d+)$/im) {
105 # $1 <= read $fh, $body, $1 + 5
106 # or die "$path: partial message in mbox";
107 #} else {
108 $body = <$fh>;
109 #}
110 chomp $body;
111 $cb->($offs, \$head, \$body);
112 $offs = (tell $fh) - 5;
113 ::give unless ++$ecnt & 1023;
114 }
115
116 1;
117}
118
119sub conf_path {
120 (my $conf = $_[0]{path}) =~ s%([^/]+$)%.$1.mdif%;
121 $conf;
122}
123
124sub read_mdif {
125 my $self = shift;
126 my $path = $self->conf_path;
127
128 return if $self->{idx};
129
130 open my $fh, "<", $path
131 or return;
132
133 defined ($_ = <$fh>)
134 or die "$path: empty mdif file\n";
135
136 do {
137 if ($_ eq "[SYNCMAIL]\n") {
138 while (<$fh>) {
139 last unless /^([a-z]+)\s*=\s*(.*)\n$/;
140 $self->{$1} = $2;
141 }
142 } elsif ($_ eq "[HOSTS]\n") {
143 while (<$fh>) {
144 last unless /^([^[].*)=(.*)\n$/;
145 $self->{host}{$1} = $2;
146 }
147 } elsif (/^\[DIFF (\d+)\]\n$/) {
148 my $mtime = $1;
149 my @dif;
150 while (<$fh>) {
151 last unless /^[+-]/;
152 push @dif, substr $_, 0, -1;
153 }
154 unshift @{$self->{diff}}, [$mtime, \@dif];
155 } elsif ($_ eq "[INDEX]\n") {
156 my @idx;
157 while (<$fh>) {
158 last unless /^(\d+)=(.*)\n$/;
159 push @idx, [$1, $2];
160 }
161 $self->{idx} = \@idx;
162 } elsif (/^#/) {
163 $_ = <$fh>;
164 # nop
165 } else {
166 die "$path: unparseable section '$_'\n";
167 }
168 } while defined $_;
169
170 $self->{version} <= MDIFVERSION
171 or die "$path: version mismatch ($self->{version} found, <".MDIFVERSION." expected)\n";
172}
173
174sub write_mdif {
175 my $self = shift;
176 my $path = $self->conf_path;
177
178 return unless $self->{dirty};
179
180 open my $fh, ">", "$path~"
181 or die "$path~: $!";
182
183 print $fh "# automatically generated, do NOT edit\n";
184
185 print $fh "[SYNCMAIL]\n";
186 print $fh "$_=$self->{$_}\n" for (qw(fsize mtime version));
187
188 print $fh "[HOSTS]\n";
189 while (my ($k,$v) = each %{$self->{host}}) {
190 print $fh "$k=$v\n";
191 }
192
193 print $fh "[INDEX]\n";
194 print $fh "$_->[0]=$_->[1]\n" for @{$self->{idx}};
195
196 for (reverse @{$self->{diff}}) {
197 print $fh "[DIFF $_->[0]]\n";
198 print $fh $_, "\n" for @{$_->[1]};
199 }
200
201 close $fh
202 or die "$path~: unable to create updated .mdif: $!";
203
204 rename "$path~", $path;
205
206 delete $self->{dirty};
207}
208
209sub gendiff {
210 my ($d1, $d2) = @_;
211
212 my @d;
213 my (%d1, %d2);
214
215 for (@$d2) {
216 undef $d2{$_->[1]};
217 }
218
219 # delete msgs in d1 but not in d2
220 for (@$d1) {
221 undef $d1{$_->[1]};
222 push @d, "-$_->[1]" unless exists $d2{$_->[1]};
223 }
224 %d2 = (); # conserve memory
225
226 # add msgs in d2 but not in d1
227 for (@$d2) {
228 push @d, "+$_->[1]" unless exists $d1{$_->[1]};
229 }
230
231 \@d;
232}
233
234sub check {
235 my $self = shift;
236 my $path = $self->{path};
237 my $conf = $self->conf_path;
238 my $guard = $lockdisk->guard;
239
240 slog 3, "checking $path\n";
241
242 if (stat $path) {
243 my ($fsize, $mtime) = (stat _)[7, 9];
244
245 if (open my $fh, "<", $conf) {
246 my %conf;
247 <$fh>; # skip initial comment
248 <$fh> eq "[SYNCMAIL]\n"
249 or die "$conf: format error";
250 while (<$fh> =~ /^([a-z]+)\s*=\s*(.*)$/) {
251 $conf{$1} = $2;
252 }
253 return 1 if $fsize == $conf{fsize}
254 && $mtime == $conf{mtime};
255
256 $conf{mtime} <= $mtime
257 or die "$path: folder older than mdif";
258 }
259
260 slog 2, "updating $path\n";
261
262 my @idx;
263
264 parse_mbox $path, sub {
265 my ($offs, $head, $body) = @_;
266 my $mid;
267 if ($$head =~ /^Message-Id:\s*(<[^<\n]+>)\s*\n/im) {
268 $mid = $1;
269 } else {
270 $mid = MD5->hexhash("$$head\0$$body");
271 }
272 push @idx, [$offs, $mid];
273 } or return ();
274
275 $self->read_mdif;
276
277 if ($self->{version}) {
278 my $d = gendiff $self->{idx}, \@idx;
279 push @{$self->{diff}}, [
280 $self->{mtime},
281 $d,
282 ] if @$d;
283 } else {
284 slog 2, "$path: previously unknown folder\n";
285 $self->{version} ||= MDIFVERSION;
286 }
287
288 $self->{fsize} = $fsize;
289 $self->{mtime} = $mtime;
290 $self->{idx} = \@idx;
291
292 $self->dirty;
293
294 return 2;
295 } else {
296 slog 2, "$path: no longer exists\n";
297 unlink $conf;
298
299 return ();
300 }
301}
302
303package main;
304 47
305my $quit = new Coro::RWLock; 48my $quit = new Coro::RWLock;
306 49
307sub rwlock::DESTROY { 50sub rwlock::DESTROY {
308 $quit->unlock; 51 $quit->unlock;
326 } 69 }
327 70
328 @folders; 71 @folders;
329} 72}
330 73
331my $send = new Coro::Channel 10;
332my $done = 0;
333
334# request $command, $data
335sub request {
336 my $res;
337 my $signal = new Coro::Signal;
338 my $cmd = defined $_[1] ? "000+".length($_[1])." $_[0]\n$_[1]"
339 : "000 $_[0]\n";
340 $send->put([$signal, \$res, $cmd]);
341 $signal->wait;
342 $res;
343}
344
345# reply $id, $code, $msg, $data
346sub reply {
347 my $cmd = defined $_[3] ? "$_[1]+".length($_[3])." $_[2]\n$_[3]"
348 : "$_[1] $_[2]\n";
349 $send->put([undef, undef, "$_[0] $cmd"]);
350}
351
352sub handle_commands {
353 my ($fh) = @_;
354
355 # periodically send nop as keepalive signal to avoid the
356 # rsync-on-slow-disk-timeout problem.
357 my $nopper = Event->timer(parked => 1, cb => sub { request "nop" });
358
359 my @folder;
360
361 async {
362 my $id = "a";
363 $fh->print("\@SYNCMAIL VERSION $VERSION PROTOCOL ".PROTVERSION."\n");
364 while (my $r = $send->get) {
365 if (defined $r->[1]) {
366 my $id = ++$id;
367 $request{$id} = $r;
368 print STDERR "<<< $SLAVE sending request $id:$r->[2]";#d#
369 $fh->print("$id $r->[2]");
370 } else {
371 print STDERR "<<< $SLAVE sending reply $r->[2]";#d#
372 $fh->print($r->[2]);
373 }
374 $nopper->at(time+$::IDLE); $nopper->start;
375 }
376 $fh->print("- 000 quit\n");
377 };
378
379 async {
380 eval {
381 $fh->timeout($::TIMEOUT);
382
383 $_ = <$fh>;
384 $_ eq "\@SYNCMAIL VERSION $VERSION PROTOCOL ".PROTVERSION."\n"
385 or die "garbled input or version mismatch: $_";
386
387 while (<$fh>) {
388 slog 0, ">>> $SLAVE received :$_";
389 /^(\S+) (\d\d\d)(?:\+(\d+))?\s*(.*)$/
390 or die "protocol error, garbled command ($_)";
391
392 my ($id, $code, $dlen, $msg) = ($1, $2, $3, $4);
393 my $data;
394
395 $fh->sysread($data, $dlen) == $dlen
396 or die "unexpected read error: $!";
397
398 if ($code == 0) {
399 if ($msg eq "quit") {
400 $send->put(undef);
401 last;
402 } elsif ($msg eq "nop") {
403 reply $id, 200, "nop";
404 } elsif ($msg eq "myname") {
405 $OTHERNAME = $data;
406 slog 3, "otherid set to $OTHERNAME\n";
407 reply $id, 200, "myname", $MYNAME;
408 } elsif ($msg eq "timestamp") {
409 reply $id, 200, time;
410 } elsif ($msg eq "folders") {
411 reply $id, 200, "ok", join "\0", find_folders;
412 } elsif ($msg eq "open") {
413 async {
414 my $folder = new folder name => $data;
415 $folder->check;
416 $folder->read_mdif;
417 push @folder, $folder;
418 reply $id, 200, "$#folder $folder->{mtime}";
419 };
420 } elsif ($msg =~ /^update (\d+) (\d+)$/) {
421 if ($folder[$1]->{host}{$OTHERNAME} != $2) {
422 $folder[$1]->{host}{$OTHERNAME} = $2;
423 $folder[$1]->dirty;
424 }
425 reply $id, 200, "ok";
426 } elsif ($msg =~ /^close (\d+)$/) {
427 undef $folder[$1];
428 reply $id, 200, "ok";
429 } else {
430 chomp;
431 die "protocol error, unknown command ($_)\n";
432 }
433 } else {
434 my $r = delete $request{$id}
435 or die "protocol error, invalid reply id ($_)\n";
436
437 ${$r->[1]} = [$code, $msg, $data];
438 $r->[0]->send;
439 }
440 }
441 };
442
443 slog 0, "ERROR: $@" if $@;
444 print STDERR "unlooping\n";#d#
445 unloop;
446 };
447
448 loop;
449 print STDERR "$SLAVE hiho\n";#d#
450}
451
452my $sync_folder = new Coro::Semaphore 3; # max 3 folders in parallel 74my $sync_folder = new Coro::Semaphore 3; # max 3 folders in parallel
453 75
454sub sync_folder { 76sub sync_folder {
455 my $name = $_[0]; 77 my $name = $_[0];
456 78
457 my $quit_guard = quit_guard; 79 my $quit_guard = quit_guard;
458 async { 80 async {
459 my $guard = $sync_folder->guard; 81 my $guard = $sync_folder->guard;
82 my $vc = new vc;
460 83
84 my $folder = new folder name => $name;
85 #my ($rfid, $rmtime) = split /\s+/, (request open => $name)->[1];
461 ::give; 86 ::give;
462 87
463 my $folder = new folder name => $name; 88 $vc->snd("open", $name);
464 my ($rfid, $rmtime) = split /\s+/, (request open => $name)->[1]; 89 $vc->snd("mtime");
465 90
466 $folder->check; 91 $folder->check;
467 $folder->read_mdif; 92 $folder->read_mdif;
93
468 my $mtime = $folder->{host}{$OTHERNAME}; 94 my $mtime = $folder->{host}{$OTHERNAME};
95 my $rtime = $vc->rcv;
469 96
97 $vc->snd("diff", $rtime);
470 $folder->{host}{$OTHERNAME} = $folder->{mtime}; 98 $folder->{host}{$OTHERNAME} = $folder->{mtime};
471 99
472 request "update $rfid $folder->{mtime}"; 100 #request "update $rfid $folder->{mtime}";
473 request "close $rfid"; 101 #request "close $rfid";
474 undef $quit_guard; 102 undef $quit_guard;
475 } 103 }
476} 104}
477 105
478sub main { 106sub main {
107 my $vc = new vc;
108
479 # time checking done symmetrically 109 # time checking done symmetrically
480 { 110 {
481 my $time = time; 111 my $time = time;
482 my $othertime = (request "timestamp")->[1]; 112 $vc->snd("time");
113 my $othertime = $vc->rcv;
483 abs (($time + time)*0.5 - $othertime) <= $::MAXTIMEDIFF 114 abs (($time + time)*0.5 - $othertime) <= $::MAXTIMEDIFF
484 or die "ERROR: time difference between hosts larger than $::MAXTIMEDIFF"; 115 or die "ERROR: time difference between hosts larger than $::MAXTIMEDIFF";
485 } 116 }
117#Coro::Event::do_timer(after => 60);#d#
486 118
487 $OTHERNAME = (request "myname", $MYNAME)->[2]; 119 $vc->snd("name");
120 $OTHERNAME = $vc->rcv;
121
488 if ($SLAVE) { 122 if ($SLAVE) {
489 # 123 #
490 } else { 124 } else {
491 for (split /\0/, (request "folders")->[2]) { 125 $vc->snd("list");
126 for (split /\0/, $vc->rcv) {
492 if (!-e "$PREFIX/$_") { 127 if (!-e "$PREFIX/$_") {
493 slog 2, "created new empty folder $_\n"; 128 slog 2, "created new empty folder $_\n";
494 sysopen my $fh, "$PREFIX/$_", O_RDWR|O_CREAT, 0666; 129 sysopen my $fh, "$PREFIX/$_", O_RDWR|O_CREAT, 0666;
495 } 130 }
496 } 131 }
497 132
498 for my $folder (find_folders) { 133 for my $folder (find_folders) {
499 sync_folder $folder; 134 sync_folder $folder;
500 } 135 }
501 136
502 print "writelock\n";#d#
503 $quit->wrlock; 137 $quit->wrlock;
504 $send->put(undef); 138
139 vc::snd_quit;
140 }
141}
142
143sub serve_folder {
144 my $vc = shift;
145 my $name = $vc->rcv;
146 my $folder = new folder name => $name;
147
148 slog 8, "serving folder $name\n";
149
150 $folder->check;
151 $folder->read_mdif;
152
153 while (my $msg = $vc->rcv) {
154 if ($msg eq "mtime") {
155 $vc->snd($folder->{mtime});
156 } elsif ($msg =~ /^update (\d+) (\d+)$/) {
157 #if ($folder[$1]->{host}{$OTHERNAME} != $2) {
158 # $folder[$1]->{host}{$OTHERNAME} = $2;
159 # $folder[$1]->dirty;
160 #}
161 #reply $id, 200, "ok";
162 } else {
163 die "protocol error, unknown folder command ($msg)\n";
164 }
165 }
166}
167
168sub serve {
169 my $vc = shift;
170
171 slog 8, "new connection $vc->{port}\n";
172
173 while (my $msg = $vc->rcv) {
174 if ($msg eq "name") {
175 $vc->snd($::MYNAME);
176 } elsif ($msg eq "pri") {
177 $self->{pri} = $vc->rcv;
178 } elsif ($msg eq "time") {
179 $vc->snd(time);
180 } elsif ($msg eq "list") {
181 $vc->snd(join "\0", find_folders);
182 } elsif ($msg eq "open") {
183 serve_folder($vc);
184 } else {
185 die "protocol error, unknown command ($msg)\n";
186 }
505 } 187 }
506} 188}
507 189
508if ($SLAVE) { 190if ($SLAVE) {
509 $HOSTID = "slave"; 191 $HOSTID = "slave";
510 async \&main; 192 async \&main;
511 handle_commands unblock \*STDIN; 193 vc::multiplex unblock \*STDIN, $SLAVE;
512} else { 194} else {
513 $HOSTID = "master"; 195 $HOSTID = "master";
514 { 196 {
515 use Socket; 197 use Socket;
516 socketpair S1, S2, AF_UNIX, SOCK_STREAM, PF_UNSPEC; 198 socketpair S1, S2, AF_UNIX, SOCK_STREAM, PF_UNSPEC;
521 exec $0, "--slave"; 203 exec $0, "--slave";
522 exit 255; 204 exit 255;
523 } 205 }
524 close S2; 206 close S2;
525 async \&main; 207 async \&main;
526 handle_commands unblock \*S1; 208 vc::multiplex unblock \*S1, $SLAVE;
527 } 209 }
528} 210}
529 211

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines