ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/syncmail/syncmail
(Generate patch)

Comparing syncmail/syncmail (file contents):
Revision 1.2 by root, Sat Oct 27 03:40:29 2001 UTC vs.
Revision 1.7 by root, Mon Oct 29 00:37:41 2001 UTC

6use Coro::Semaphore; 6use Coro::Semaphore;
7use Coro::Channel; 7use Coro::Channel;
8use Coro::Signal; 8use Coro::Signal;
9use Coro::RWLock; 9use Coro::RWLock;
10 10
11use Set::Scalar;
12
11use Fcntl; 13use Fcntl;
12use MD5;
13 14
14use constant PROTVERSION => 1;
15
16$VERSION = 0.1; 15our $VERSION = 0.1;
17 16
18$TIMEOUT = 20; 17our $NOW = time;
19$IDLE = $TIMEOUT < 30 ? $TIMEOUT / 2 : $TIMEOUT - 15; 18
20$MAXTIMEDIFF = 10; 19require "config.pl";
20
21use folder;
22use vc;
23
24$v = $VERBOSE;
21 25
22# WARNING: 26# WARNING:
23# Content-Length headers are deliberately being ignored. They 27# Content-Length headers are deliberately being ignored. They
24# are broken by design and will never be supported 28# are broken by design and will never be supported
25 29
26# TODO: real message-id parsing 30# TODO: real message-id parsing
27 31
28$|=1; 32$|=1;
29 33
30$v = 9;
31
32$SLAVE = 1*($ARGV[0] eq "--slave");
33
34$MYNAME = $SLAVE ? "slave" : "master";
35$PREFIX = $SLAVE ? "./dst" : "./src";
36
37@OTHERNAMES = qw(third);
38
39my $ecnt; 34my $ecnt;
40 35
41sub slog { 36sub slog {
42 if ($_[0] <= $v) { 37 if ($_[0] <= $v) {
43 print STDERR "[$SLAVE] ", $_[1]; 38 print STDERR "[$SLAVE] ", $_[1];
44 } 39 }
45} 40}
46 41
47my $lockdisk = new Coro::Semaphore; 42$lockdisk = new Coro::Semaphore;
48 43
49# give up a/the timeslice 44# give up a/the timeslice
50sub give { 45sub give {
51 Coro::Event::do_timer(after => 0); 46 Coro::Event::do_timer(after => 0);
52} 47}
53 48
54package folder;
55
56BEGIN { *slog = \&::slog };
57
58use constant MDIFVERSION => 1;
59
60sub new {
61 my $class = shift;
62 my %arg = @_;
63 bless {
64 path => "$::PREFIX/$arg{name}",
65 %arg,
66 }, $class;
67}
68
69sub dirty {
70 $_[0]{dirty} = 1;
71}
72
73sub DESTROY {
74 $_[0]->write_mdif;
75}
76
77# parse_mbox(mbox-file-path, callback)
78# callback gets called with \$header and \$body,
79# $header includes the mbox From_ line without
80# the leading From_ itself.
81sub parse_mbox {
82 my ($path, $cb) = @_;
83
84 open my $fh, "<", $path
85 or die "$path: $!";
86
87 local $/ = "\n\n";
88
89 my ($head, $body, $offs);
90
91 5 == read $fh, $head, 5
92 or return;
93
94 $head eq "From "
95 or return;
96
97 $offs = 0;
98 while (defined ($head = <$fh>)) {
99 $head =~ /^.*? [A-Z][a-z][a-z] [A-Z][a-z][a-z] [ 0-9][0-9] \d\d:\d\d:\d\d(?: [+-]\d\d\d\d)? \d\d(?:\d\d)\n/
100 or die "$path: not standard mbox format header:\n$head\n";
101
102 local $/ = "\nFrom ";
103 # NEVER enable this. content-length simply is broken by design
104 #if ($head =~ /^Content-Length:\s+(\d+)$/im) {
105 # $1 <= read $fh, $body, $1 + 5
106 # or die "$path: partial message in mbox";
107 #} else {
108 $body = <$fh>;
109 #}
110 chomp $body;
111 $cb->($offs, \$head, \$body);
112 $offs = (tell $fh) - 5;
113 ::give unless ++$ecnt & 1023;
114 }
115
116 1;
117}
118
119sub conf_path {
120 (my $conf = $_[0]{path}) =~ s%([^/]+$)%.$1.mdif%;
121 $conf;
122}
123
124sub read_mdif {
125 my $self = shift;
126 my $path = $self->conf_path;
127
128 return if $self->{idx};
129
130 open my $fh, "<", $path
131 or return;
132
133 defined ($_ = <$fh>)
134 or die "$path: empty mdif file\n";
135
136 do {
137 if ($_ eq "[SYNCMAIL]\n") {
138 while (<$fh>) {
139 last unless /^([a-z]+)\s*=\s*(.*)\n$/;
140 $self->{$1} = $2;
141 }
142 } elsif ($_ eq "[HOSTS]\n") {
143 while (<$fh>) {
144 last unless /^([^[].*)=(.*)\n$/;
145 $self->{host}{$1} = $2;
146 }
147 } elsif (/^\[DIFF (\d+)\]\n$/) {
148 my $mtime = $1;
149 my @dif;
150 while (<$fh>) {
151 last unless /^[+-]/;
152 push @dif, substr $_, 0, -1;
153 }
154 unshift @{$self->{diff}}, [$mtime, \@dif];
155 } elsif ($_ eq "[INDEX]\n") {
156 my @idx;
157 while (<$fh>) {
158 last unless /^(\d+)=(.*)\n$/;
159 push @idx, [$1, $2];
160 }
161 $self->{idx} = \@idx;
162 } elsif (/^#/) {
163 $_ = <$fh>;
164 # nop
165 } else {
166 die "$path: unparseable section '$_'\n";
167 }
168 } while defined $_;
169
170 $self->{version} <= MDIFVERSION
171 or die "$path: version mismatch ($self->{version} found, <".MDIFVERSION." expected)\n";
172}
173
174sub write_mdif {
175 my $self = shift;
176 my $path = $self->conf_path;
177
178 return unless $self->{dirty};
179
180 open my $fh, ">", "$path~"
181 or die "$path~: $!";
182
183 print $fh "# automatically generated, do NOT edit\n";
184
185 print $fh "[SYNCMAIL]\n";
186 print $fh "$_=$self->{$_}\n" for (qw(fsize mtime version));
187
188 print $fh "[HOSTS]\n";
189 while (my ($k,$v) = each %{$self->{host}}) {
190 print $fh "$k=$v\n";
191 }
192
193 print $fh "[INDEX]\n";
194 print $fh "$_->[0]=$_->[1]\n" for @{$self->{idx}};
195
196 for (reverse @{$self->{diff}}) {
197 print $fh "[DIFF $_->[0]]\n";
198 print $fh $_, "\n" for @{$_->[1]};
199 }
200
201 close $fh
202 or die "$path~: unable to create updated .mdif: $!";
203
204 rename "$path~", $path;
205
206 delete $self->{dirty};
207}
208
209sub gendiff {
210 my ($d1, $d2) = @_;
211
212 my @d;
213 my (%d1, %d2);
214
215 for (@$d2) {
216 undef $d2{$_->[1]};
217 }
218
219 # delete msgs in d1 but not in d2
220 for (@$d1) {
221 undef $d1{$_->[1]};
222 push @d, "-$_->[1]" unless exists $d2{$_->[1]};
223 }
224 %d2 = (); # conserve memory
225
226 # add msgs in d2 but not in d1
227 for (@$d2) {
228 push @d, "+$_->[1]" unless exists $d1{$_->[1]};
229 }
230
231 \@d;
232}
233
234sub check {
235 my $self = shift;
236 my $path = $self->{path};
237 my $conf = $self->conf_path;
238 my $guard = $lockdisk->guard;
239
240 slog 3, "checking $path\n";
241
242 if (stat $path) {
243 my ($fsize, $mtime) = (stat _)[7, 9];
244
245 if (open my $fh, "<", $conf) {
246 my %conf;
247 <$fh>; # skip initial comment
248 <$fh> eq "[SYNCMAIL]\n"
249 or die "$conf: format error";
250 while (<$fh> =~ /^([a-z]+)\s*=\s*(.*)$/) {
251 $conf{$1} = $2;
252 }
253 return 1 if $fsize == $conf{fsize}
254 && $mtime == $conf{mtime};
255
256 $conf{mtime} <= $mtime
257 or die "$path: folder older than mdif";
258 }
259
260 slog 2, "updating $path\n";
261
262 my @idx;
263
264 parse_mbox $path, sub {
265 my ($offs, $head, $body) = @_;
266 my $mid;
267 if ($$head =~ /^Message-Id:\s*(<[^<\n]+>)\s*\n/im) {
268 $mid = $1;
269 } else {
270 $mid = MD5->hexhash("$$head\0$$body");
271 }
272 push @idx, [$offs, $mid];
273 } or return ();
274
275 $self->read_mdif;
276
277 if ($self->{version}) {
278 my $d = gendiff $self->{idx}, \@idx;
279 push @{$self->{diff}}, [
280 $self->{mtime},
281 $d,
282 ] if @$d;
283 } else {
284 slog 2, "$path: previously unknown folder\n";
285 $self->{version} ||= MDIFVERSION;
286 }
287
288 $self->{fsize} = $fsize;
289 $self->{mtime} = $mtime;
290 $self->{idx} = \@idx;
291
292 $self->dirty;
293
294 return 2;
295 } else {
296 slog 2, "$path: no longer exists\n";
297 unlink $conf;
298
299 return ();
300 }
301}
302
303package main;
304
305my $quit = new Coro::RWLock; 49my $quit = new Coro::RWLock;
306 50
307sub rwlock::DESTROY { 51sub rwlock::DESTROY {
308 $quit->unlock; 52 $quit->unlock;
309} 53}
319 opendir my $dh, $PREFIX 63 opendir my $dh, $PREFIX
320 or die "$PREFIX: $!"; 64 or die "$PREFIX: $!";
321 65
322 while (defined (my $folder = readdir $dh)) { 66 while (defined (my $folder = readdir $dh)) {
323 next if $folder =~ /^\./; 67 next if $folder =~ /^\./;
68 next if $folder =~ /~$/;
324 next unless -f "$PREFIX/$folder"; 69 next unless -f "$PREFIX/$folder";
325 push @folders, $folder; 70 push @folders, $folder;
326 } 71 }
327 72
328 @folders; 73 @folders;
329} 74}
330 75
331my $send = new Coro::Channel 10; 76my $sync_folder = new Coro::Semaphore 3; # max 3 folders in parallel
332my $done = 0;
333 77
334# request $command, $data 78sub sync_offer {
335sub request { 79 my ($folder, $avc, $diff) = @_;
336 my $res;
337 my $signal = new Coro::Signal;
338 my $cmd = defined $_[1] ? "000+".length($_[1])." $_[0]\n$_[1]"
339 : "000 $_[0]\n";
340 $send->put([$signal, \$res, $cmd]);
341 $signal->wait;
342 $res;
343}
344 80
345# reply $id, $code, $msg, $data 81 my $vc = create vc pri => 1;
346sub reply { 82 $avc->snd("offer", $vc->{port});
347 my $cmd = defined $_[3] ? "$_[1]+".length($_[3])." $_[2]\n$_[3]"
348 : "$_[1] $_[2]\n";
349 $send->put([undef, undef, "$_[0] $cmd"]);
350}
351
352sub handle_commands {
353 my ($fh) = @_;
354
355 # periodically send nop as keepalive signal to avoid the
356 # rsync-on-slow-disk-timeout problem.
357 my $nopper = Event->timer(parked => 1, cb => sub { request "nop" });
358
359 my @folder;
360 83
361 async { 84 async {
362 my $id = "a"; 85 $vc->rcv; # need to synchronize, argl. should open on other side
363 $fh->print("\@SYNCMAIL VERSION $VERSION PROTOCOL ".PROTVERSION."\n"); 86 $vc->snd(join "\0", @$diff);
364 while (my $r = $send->get) { 87 my %dup; @dup{split /\0/, $vc->rcv} = ();
365 if (defined $r->[1]) { 88
366 my $id = ++$id; 89 for (@$diff) {
367 $request{$id} = $r; 90 $vc->snd($folder->fetch($_)) unless exists $dup{$_};
368 print STDERR "<<< $SLAVE sending request $id:$r->[2]";#d#
369 $fh->print("$id $r->[2]");
370 } else {
371 print STDERR "<<< $SLAVE sending reply $r->[2]";#d#
372 $fh->print($r->[2]);
373 } 91 }
374 $nopper->at(time+$::IDLE); $nopper->start; 92
375 } 93 defined $vc->rcv and die "protocol error, expected close";
376 $fh->print("- 000 quit\n");
377 }; 94 };
378
379 async {
380 eval {
381 $fh->timeout($::TIMEOUT);
382
383 $_ = <$fh>;
384 $_ eq "\@SYNCMAIL VERSION $VERSION PROTOCOL ".PROTVERSION."\n"
385 or die "garbled input or version mismatch: $_";
386
387 while (<$fh>) {
388 slog 0, ">>> $SLAVE received :$_";
389 /^(\S+) (\d\d\d)(?:\+(\d+))?\s*(.*)$/
390 or die "protocol error, garbled command ($_)";
391
392 my ($id, $code, $dlen, $msg) = ($1, $2, $3, $4);
393 my $data;
394
395 $fh->sysread($data, $dlen) == $dlen
396 or die "unexpected read error: $!";
397
398 if ($code == 0) {
399 if ($msg eq "quit") {
400 $send->put(undef);
401 last;
402 } elsif ($msg eq "nop") {
403 reply $id, 200, "nop";
404 } elsif ($msg eq "myname") {
405 $OTHERNAME = $data;
406 slog 3, "otherid set to $OTHERNAME\n";
407 reply $id, 200, "myname", $MYNAME;
408 } elsif ($msg eq "timestamp") {
409 reply $id, 200, time;
410 } elsif ($msg eq "folders") {
411 reply $id, 200, "ok", join "\0", find_folders;
412 } elsif ($msg eq "open") {
413 async {
414 my $folder = new folder name => $data;
415 $folder->check;
416 $folder->read_mdif;
417 push @folder, $folder;
418 reply $id, 200, "$#folder $folder->{mtime}";
419 };
420 } elsif ($msg =~ /^update (\d+) (\d+)$/) {
421 if ($folder[$1]->{host}{$OTHERNAME} != $2) {
422 $folder[$1]->{host}{$OTHERNAME} = $2;
423 $folder[$1]->dirty;
424 }
425 reply $id, 200, "ok";
426 } elsif ($msg =~ /^close (\d+)$/) {
427 undef $folder[$1];
428 reply $id, 200, "ok";
429 } else {
430 chomp;
431 die "protocol error, unknown command ($_)\n";
432 }
433 } else {
434 my $r = delete $request{$id}
435 or die "protocol error, invalid reply id ($_)\n";
436
437 ${$r->[1]} = [$code, $msg, $data];
438 $r->[0]->send;
439 }
440 }
441 };
442
443 slog 0, "ERROR: $@" if $@;
444 print STDERR "unlooping\n";#d#
445 unloop;
446 };
447
448 loop;
449 print STDERR "$SLAVE hiho\n";#d#
450} 95}
451
452my $sync_folder = new Coro::Semaphore 3; # max 3 folders in parallel
453 96
454sub sync_folder { 97sub sync_folder {
455 my $name = $_[0]; 98 my $name = $_[0];
456 99
457 my $quit_guard = quit_guard; 100 my $quit_guard = quit_guard;
458 async { 101 async {
459 my $guard = $sync_folder->guard; 102 my $guard = $sync_folder->guard;
460 103 my $vc = create vc;
461 ::give;
462 104
463 my $folder = new folder name => $name; 105 my $folder = new folder name => $name;
464 my ($rfid, $rmtime) = split /\s+/, (request open => $name)->[1];
465 106
466 $folder->check; 107 $vc->snd("open", $name);
108 $vc->snd("mtime");
109
467 $folder->read_mdif; 110 $folder->read_mdif;
111
468 my $mtime = $folder->{host}{$OTHERNAME}; 112 my $ctime = $folder->{host}{$OTHERNAME} || -1;
113 my $rtime = $vc->rcv;
469 114
115 $vc->snd("diff", $ctime);
116
117 my %diff; #
118 # 00 - local del
119 # 01 - local add
120 # 10 - remote del
121 # 11 - remote add
122
123 my @diff = grep { $_->[0] > $ctime } @{$folder->{diff}};
124
125 $diff = $vc->rcv;
126 while () {
127 if ($diff >= 0 and (!@diff or $diff < $diff[0][0])) {
128 $diff{$_} = 0b01 for split /\0/, $vc->rcv; # add
129 $diff{$_} = 0b00 for split /\0/, $vc->rcv; # del
130
131 $diff = $vc->rcv;
132 } elsif (@diff) {
133 $diff{$_} = 0b11 for @{$diff[0][1]};
134 $diff{$_} = 0b10 for @{$diff[0][2]};
135
136 shift @diff;
137 } else {
138 last;
139 }
140 }
141
142 # append or update, depending on wether there are messages to be deleted
143 $vc->snd("begin");
144 $folder->begin_update;
145
146 while (my ($k,$v) = each %diff) {
147 push @{$diff[$v]}, $k;
148 slog 0, "DIFF $k : $v\n";#d#
149 }
150
151 $vc->snd("delete", join "\0", @{$diff[2]});
152 $folder->delete(@{$diff[0]});
153
154 # offer ours
155 my $offer_coro = sync_offer($folder, $vc, $diff[3]);
156
157 # request theirs
158 {
159 my @send = grep { !$folder->exists($_) } @{$diff[1]};
160 $vc->snd("send", join "\0", @send);
161 $folder->append($_, $vc->rcv) for @send;
162 $vc->snd("-"); # sync
163 }
164
165 slog 0, "waiting...\n";#d#
166 $offer_coro->join;
167
168 # sanity check
169 $vc->snd("inventory");
170 if ($folder->inventory ne $vc->rcv) {
171 $folder->write_mdif;
172 slog 0, "FATAL: folder inventory mismatch after update\n";
173 }
174
175 $vc->snd("end");
176 $folder->end_update;
177
178 $vc->snd("setctime", $folder->{ctime});
179
180 $vc->snd("mtime");
470 $folder->{host}{$OTHERNAME} = $folder->{mtime}; 181 $folder->{host}{$OTHERNAME} = $vc->rcv;
471 182
472 request "update $rfid $folder->{mtime}"; 183 $vc->snd("close");
473 request "close $rfid"; 184 $folder->close;
185
474 undef $quit_guard; 186 undef $quit_guard;
475 } 187 }
476} 188}
477 189
478sub main { 190sub main {
191 my $vc = create vc;
192
479 # time checking done symmetrically 193 # time checking done symmetrically
480 { 194 {
481 my $time = time; 195 my $time = time;
482 my $othertime = (request "timestamp")->[1]; 196 $vc->snd("time");
197 my $othertime = $vc->rcv;
483 abs (($time + time)*0.5 - $othertime) <= $::MAXTIMEDIFF 198 abs (($time + time)*0.5 - $othertime) <= $::MAXTIMEDIFF
484 or die "ERROR: time difference between hosts larger than $::MAXTIMEDIFF"; 199 or die "ERROR: time difference between hosts larger than $::MAXTIMEDIFF";
485 } 200 }
486 201
487 $OTHERNAME = (request "myname", $MYNAME)->[2]; 202 $vc->snd("setname", $MYNAME); $OTHERNAME = $vc->rcv;
203
488 if ($SLAVE) { 204 if ($SLAVE) {
489 # 205 #
490 } else { 206 } else {
491 for (split /\0/, (request "folders")->[2]) { 207 $vc->snd("list");
208 for (split /\0/, $vc->rcv) {
492 if (!-e "$PREFIX/$_") { 209 if (!-e "$PREFIX/$_") {
493 slog 2, "created new empty folder $_\n"; 210 slog 2, "created new empty folder $_\n";
494 sysopen my $fh, "$PREFIX/$_", O_RDWR|O_CREAT, 0666; 211 sysopen my $fh, "$PREFIX/$_", O_RDWR|O_CREAT, 0666;
495 } 212 }
496 } 213 }
497 214
498 for my $folder (find_folders) { 215 for my $folder (find_folders) {
499 sync_folder $folder; 216 sync_folder $folder;
500 } 217 }
501 218
502 print "writelock\n";#d#
503 $quit->wrlock; 219 $quit->wrlock;
504 $send->put(undef); 220
221 vc::snd_quit;
222 }
223}
224
225sub serve_folder {
226 my $vc = shift;
227 my $name = $vc->rcv;
228 my $folder = new folder name => $name;
229
230 slog 8, "serving folder $name\n";
231
232 $folder->read_mdif;
233
234 while (my $msg = $vc->rcv) {
235 if ($msg eq "mtime") {
236 $vc->snd($folder->{mtime});
237 } elsif ($msg eq "inventory") {
238 $vc->snd($folder->inventory);
239 } elsif ($msg eq "diff") {
240 my $time = $vc->rcv;
241 for (@{$folder->{diff}}) {
242 next if $_->[0] <= $time;
243 $vc->snd($_->[0],
244 (join "\0", @{$_->[1]}),
245 (join "\0", @{$_->[2]}),
246 );
247 }
248 $vc->snd(-1);
249 } elsif ($msg eq "begin") {
250 $folder->begin_update;
251 } elsif ($msg eq "delete") {
252 $folder->delete(split /\0/, $vc->rcv);
253 } elsif ($msg eq "offer") {
254 my $ovc = catch vc port => $vc->rcv;
255 async {
256 my @offer;
257 {
258 my @dup;
259
260 $ovc->snd("-"); # synchronize
261 for (split /\0/, $ovc->rcv) {
262 if ($folder->exists($_)) {
263 push @dup, $_;
264 } else {
265 push @offer, $_;
266 }
267 }
268
269 $ovc->snd(join "\0", @dup);
270 }
271
272 # now we'll get everything in @offer, in order
273 $folder->append($_, $ovc->rcv) for @offer;
274 $ovc->close;
275 };
276 } elsif ($msg eq "send") {
277 $vc->pri(1);
278 $vc->snd($folder->fetch($_)) for split /\0/, $vc->rcv;
279 $vc->rcv; # sync
280 $vc->pri(0);
281 } elsif ($msg eq "end") {
282 $folder->end_update;
283 } elsif ($msg eq "mtime") {
284 $vc->snd($folder->{mtime});
285 } elsif ($msg eq "setctime") {
286 $folder->{host}{$OTHERNAME} = $vc->rcv;
287 } elsif ($msg eq "close") {
288 $folder->close;
289 } else {
290 die "protocol error, unknown folder command ($msg)\n";
291 }
292 }
293}
294
295sub serve {
296 my $vc = shift;
297
298 slog 8, "new connection $vc->{port}\n";
299
300 while (my $msg = $vc->rcv) {
301 if ($msg eq "setname") {
302 $vc->snd($MYNAME);
303 $OTHERNAME = $vc->rcv;
304 } elsif ($msg eq "pri") {
305 $vc->{pri} = $vc->rcv;
306 } elsif ($msg eq "time") {
307 $vc->snd(time);
308 } elsif ($msg eq "list") {
309 $vc->snd(join "\0", find_folders);
310 } elsif ($msg eq "open") {
311 serve_folder($vc);
312 } else {
313 die "protocol error, unknown command ($msg)\n";
314 }
505 } 315 }
506} 316}
507 317
508if ($SLAVE) { 318if ($SLAVE) {
509 $HOSTID = "slave"; 319 $HOSTID = "slave";
510 async \&main; 320 async \&main;
511 handle_commands unblock \*STDIN; 321 vc::multiplex unblock \*STDIN, $SLAVE;
512} else { 322} else {
513 $HOSTID = "master"; 323 $HOSTID = "master";
514 { 324 {
515 use Socket; 325 use Socket;
516 socketpair S1, S2, AF_UNIX, SOCK_STREAM, PF_UNSPEC; 326 socketpair S1, S2, AF_UNIX, SOCK_STREAM, PF_UNSPEC;
521 exec $0, "--slave"; 331 exec $0, "--slave";
522 exit 255; 332 exit 255;
523 } 333 }
524 close S2; 334 close S2;
525 async \&main; 335 async \&main;
526 handle_commands unblock \*S1; 336 vc::multiplex unblock \*S1, $SLAVE;
527 } 337 }
528} 338}
529 339

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines