ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/syncmail/syncmail
(Generate patch)

Comparing syncmail/syncmail (file contents):
Revision 1.1 by root, Tue Oct 23 23:52:40 2001 UTC vs.
Revision 1.8 by root, Mon Oct 29 02:36:32 2001 UTC

4use Coro::Handle; 4use Coro::Handle;
5use Coro::Event; 5use Coro::Event;
6use Coro::Semaphore; 6use Coro::Semaphore;
7use Coro::Channel; 7use Coro::Channel;
8use Coro::Signal; 8use Coro::Signal;
9use Coro::RWLock;
9 10
10use MD5; 11use Set::Scalar;
11 12
12use constant VERSION => 1; 13use Fcntl;
13 14
14# TODO: content-length support (HACK!) 15our $VERSION = 0.1;
16
17our $NOW = time;
18
19require "config.pl";
20
21use folder;
22use vc;
23
24$v = $VERBOSE;
25
26# WARNING:
27# Content-Length headers are deliberately being ignored. They
28# are broken by design and will never be supported
29
15# TODO: real message-id parsing 30# TODO: real message-id parsing
16 31
17$|=1; 32$|=1;
18
19$v = 9;
20
21$SLAVE = 1*($ARGV[0] eq "--slave");
22 33
23my $ecnt; 34my $ecnt;
24 35
25sub slog { 36sub slog {
26 if ($_[0] <= $v) { 37 if ($_[0] <= $v) {
27 print STDERR $_[1]; 38 print STDERR "[$SLAVE] ", $_[1];
28 }
29}
30
31# parse_folder(mbox-file-path, callback)
32# callback gets called with \$header and \$body,
33# $header includes the mbox From_ line without
34# the leading From_ itself.
35sub parse_folder {
36 my ($path, $cb) = @_;
37
38 open my $fh, "<", $path
39 or die "$path: $!";
40
41 local $/ = "\n\n";
42
43 my ($head, $body, $offs);
44
45 5 == read $fh, $head, 5
46 or return;
47
48 $head eq "From "
49 or return;
50
51 $offs = 0;
52 while (defined ($head = <$fh>)) {
53 $head =~ /^.*? [A-Z][a-z][a-z] [A-Z][a-z][a-z] [ 0-9][0-9] \d\d:\d\d:\d\d(?: [+-]\d\d\d\d)? \d\d(?:\d\d)\n/
54 or die "$path: not standard mbox format header:\n$head\n";
55
56 local $/ = "\nFrom ";
57 # NEVER enable this. content-length simply is broken by design
58 #if ($head =~ /^Content-Length:\s+(\d+)$/im) {
59 # $1 <= read $fh, $body, $1 + 5
60 # or die "$path: partial message in mbox";
61 #} else {
62 $body = <$fh>;
63 #}
64 chomp $body;
65 $cb->($offs, \$head, \$body);
66 $offs = (tell $fh) - 5;
67 cede unless ++$ecnt & 1023;
68 }
69
70 1;
71}
72
73sub read_mdif {
74 my ($path) = @_;
75 my $fh;
76 my $mdif;
77 open my $fh, "<", $path
78 or return { };
79
80 defined ($_ = <$fh>)
81 or die "$path: empty mdif file\n";
82
83 do {
84 if ($_ eq "[SYNCMAIL]\n") {
85 while (<$fh>) {
86 last unless /^([a-z]+)\s*=\s*(.*)\n$/;
87 $mdif->{$1} = $2;
88 }
89 } elsif ($_ eq "[HOSTS]\n") {
90 while (<$fh>) {
91 last unless /^([^[].*)=(.*)\n$/;
92 $mdif->{host}{$1} = $2;
93 }
94 } elsif (/^\[DIFF(\d+)\.(\d+)\]\n$/) {
95 my ($gen, $mtime) = ($1, $2);
96 my @dif;
97 while (<$fh>) {
98 last unless /^[+-]/;
99 push @dif, substr $_, 0, -1;
100 }
101 unshift @{$mdif->{diff}}, [$gen, $mtime, \@dif];
102 } elsif ($_ eq "[INDEX]\n") {
103 my @idx;
104 while (<$fh>) {
105 last unless /^(\d+)=(.*)\n$/;
106 push @idx, [$1, $2];
107 }
108 $mdif->{idx} = \@idx;
109 } elsif (/^#/) {
110 $_ = <$fh>;
111 # nop
112 } else {
113 die "$path: unparseable section '$_'\n";
114 }
115 } while defined $_;
116
117 $mdif->{version} <= VERSION
118 or die "$path: version mismatch ($mdif->{version} found, <".VERSION." expected)\n";
119
120 $mdif;
121}
122
123sub write_mdif {
124 my ($path, $mdif) = @_;
125 my $fh;
126
127 open my $fh, ">", "$path~"
128 or die "$path~: $!";
129
130 print $fh "# automatically generated, do NOT edit\n";
131
132 print $fh "[SYNCMAIL]\n";
133 print $fh "$_=$mdif->{$_}\n" for (qw(fsize mtime gen version));
134
135 print $fh "[HOSTS]\n";
136 print $fh "$k=$v\n" while my ($k,$v) = each %{$mdif->{host}};
137
138 print $fh "[INDEX]\n";
139 print $fh "$_->[0]=$_->[1]\n" for @{$mdif->{idx}};
140
141 for (reverse @{$mdif->{diff}}) {
142 print $fh "[DIFF$_->[0].$_->[1]]\n";
143 print $fh $_, "\n" for @{$_->[2]};
144 }
145
146 close $fh
147 or die "$path~: unable to create updated .mdif: $!";
148
149 rename "$path~", $path;
150}
151
152sub gendiff {
153 my ($d1, $d2) = @_;
154
155 my @d;
156 my (%d1, %d2);
157
158 for (@$d2) {
159 undef $d2{$_->[1]};
160 }
161
162 # delete msgs in d1 but not in d2
163 for (@$d1) {
164 undef $d1{$_->[1]};
165 push @d, "-$_->[1]" unless exists $d2{$_->[1]};
166 }
167 %d2 = (); # conserve memory
168
169 # add msgs in d2 but not in d1
170 for (@$d2) {
171 push @d, "+$_->[1]" unless exists $d1{$_->[1]};
172 }
173
174 \@d;
175}
176
177my $check_folder = new Coro::Semaphore;
178
179sub check_folder {
180 my ($path) = @_;
181 my $guard = $check_folder->guard;
182
183 (my $conf = $path) =~ s%([^/]+$)%.$1.mdif%;
184
185 slog 1, "checking $path\n";
186
187 if (stat $path) {
188 my ($fsize, $mtime) = (stat _)[7, 9];
189 if (open my $fh, "<", $conf) {
190 my %conf;
191 <$fh>; # skip initial comment
192 <$fh> eq "[SYNCMAIL]\n"
193 or die "$conf: format error";
194 while (<$fh> =~ /^([a-z]+)\s*=\s*(.*)$/) {
195 $conf{$1} = $2;
196 }
197 return (1, \%conf) if $fsize == $conf{fsize}
198 && $mtime == $conf{mtime};
199 }
200
201 slog 2, "updating $path\n";
202
203 my @idx;
204
205 parse_folder $path, sub {
206 my ($offs, $head, $body) = @_;
207 my $mid;
208 if ($$head =~ /^Message-Id:\s*(<[^<\n]+>)\s*\n/im) {
209 $mid = $1;
210 } else {
211 $mid = MD5->hexhash("$$head\0$$body");
212 }
213 push @idx, [$offs, $mid];
214 } or return ();
215
216 my $mdif = read_mdif $conf;
217
218 if ($mdif->{version}) {
219 my $d = gendiff $mdif->{idx}, \@idx;
220 push @{$mdif->{diff}}, [
221 $mdif->{gen}++,
222 $mdif->{mtime},
223 $d,
224 ] if @$d;
225 } else {
226 slog 2, "$path: new folder\n";
227 $mdif->{version} ||= VERSION;
228 $mdif->{gen} = 1;
229 }
230
231 $mdif->{fsize} = $fsize;
232 $mdif->{mtime} = $mtime;
233 $mdif->{idx} = \@idx;
234
235 write_mdif $conf, $mdif;
236
237 return (2, $mdif);
238 } else {
239 slog 2, "$path: no longer exists\n";
240 unlink $conf;
241
242 return ();
243 }
244}
245
246my $send = new Coro::Channel 10;
247my $done = 0;
248
249# request $command, $data
250sub request {
251 my $res;
252 my $signal = new Coro::Signal;
253 my $cmd = defined $_[1] ? "000+".length($_[1])." $_[0]\n$_[1]" : "000 $_[0]\n";
254 $send->put([$signal, \$res, $cmd]);
255 $signal->wait;
256 $res;
257}
258
259# reply $id, $code, $msg, $data
260sub reply {
261 my $cmd = defined $_[3] ? "$_[1]+".length($_[3])." $_[2]\n$_[3]" : "$_[1] $_[2]\n";
262 $send->put([undef, undef, "$_[0] $cmd"]);
263}
264
265sub handle_commands {
266 my ($fh) = @_;
267 my $id = "a";
268 async {
269 $fh->print("- 000 hello $HOSTID\n");
270 while (my $r = $send->get) {
271 if (defined $r->[1]) {
272 my $id = ++$id;
273 $request{$id} = $r;
274 print STDERR "<<< $SLAVE sendign request $id:$r->[2]";#d#
275 $fh->print("$id $r->[2]");
276 } else {
277 print STDERR "<<< $SLAVE sendign reply $r->[2]";#d#
278 $fh->print($r->[2]);
279 }
280 }
281 print STDERR "$SLAVE shutdown\n";#d#
282 shutdown $fh, 1;
283 }; 39 }
284 while (<$fh>) {
285 slog 0, ">>> $SLAVE received :$_";
286 /^(\S+) (\d\d\d)(?:\+(\d+))?\s*(.*)$/
287 or die "protocol error, garbled command ($_)";
288
289 my ($id, $code, $dlen, $msg) = ($1, $2, $3, $4);
290 my $data;
291
292 $fh->sysread($data, $dlen) == $dlen
293 or die "unexpected read error: $!";
294
295 if ($code == 0) {
296 if ($msg eq "quit") {
297 print $fh "$id 200 quit\n";
298 $send->put(undef);
299 last;
300 } elsif ($msg eq "nop") {
301 reply $id, 200, "nop";
302 } elsif ($msg =~ /^hello (.*)$/) {
303 $OTHERID = $1;
304 slog 3, "otherid set to $OTHERID\n";
305 } else {
306 die "protocol error, unknown command ($_)\n";
307 }
308 } else {
309 my $r = delete $request{$id}
310 or die "protocol error, invalid reply id ($_)\n";
311
312 ${$r->[1]} = [$code, $msg, $data];
313 $r->[0]->send;
314 }
315
316 if ($done && !%request) {
317 $done = 0;
318 async {
319 request("quit");
320 exit 0;
321 };
322 }
323 }
324
325 exit 0;
326} 40}
327 41
328sub sync_dir { 42$lockdisk = new Coro::Semaphore;
329 my ($path) = @_;
330 43
44# give up a/the timeslice
45sub give {
46 Coro::Event::do_timer(after => 0);
47}
48
49my $quit = new Coro::RWLock;
50
51sub rwlock::DESTROY {
52 $quit->unlock;
53}
54
55sub quit_guard {
56 $quit->rdlock;
57 bless [], rwlock;
58}
59
60sub find_folders {
61 my @folders;
62
331 opendir my $dh, $path 63 opendir my $dh, $PREFIX
332 or die "$path: $!"; 64 or die "$PREFIX: $!";
333 65
334 while (defined (my $folder = readdir $dh)) { 66 while (defined (my $folder = readdir $dh)) {
335 next if $folder =~ /^\./; 67 next if $folder =~ /^\./;
336 my $path = "$path/$folder"; 68 next if $folder =~ /~$/;
337 next unless -f $path; 69 next unless -f "$PREFIX/$folder";
338 my ($status, $mdif) = check_folder $path; 70 push @folders, $folder;
339 print STDERR "$path: $status | $mdif\n";#d# 71 }
72
73 @folders;
74}
75
76my $sync_folder = new Coro::Semaphore 3; # max 3 folders in parallel
77
78sub sync_offer {
79 my ($folder, $avc, $diff) = @_;
80
81 my $vc = create vc pri => 1;
82 $avc->snd("offer", $vc->{port});
83
84 async {
85 $vc->rcv; # need to synchronize, argl. should open on other side
86 $vc->snd(join "\0", @$diff);
87 my %dup; @dup{split /\0/, $vc->rcv} = ();
88
89 for (@$diff) {
90 $vc->snd($folder->fetch($_)) unless exists $dup{$_};
91 }
92
93 defined $vc->rcv and die "protocol error, expected close";
94 };
95}
96
97sub sync_folder {
98 my $name = $_[0];
99
100 my $quit_guard = quit_guard;
101 async {
102 my $guard = $sync_folder->guard;
103 my $vc = create vc;
104
105 my $folder = new folder name => $name;
106
107 $vc->snd("open", $name);
108 $folder->read_mdif;
109
110 my $ctime = $folder->{host}{$OTHERNAME} || -1;
111
112 $vc->snd("diff", $ctime);
113
114 my %diff; #
115 # 00 - local del
116 # 01 - local add
117 # 10 - remote del
118 # 11 - remote add
119
120 my @diff = grep { $_->[0] > $ctime } @{$folder->{diff}};
121
122 $diff = $vc->rcv;
123 while () {
124 if ($diff >= 0 and (!@diff or $diff < $diff[0][0])) {
125 $diff{$_} = 0b01 for split /\0/, $vc->rcv; # add
126 $diff{$_} = 0b00 for split /\0/, $vc->rcv; # del
127
128 $diff = $vc->rcv;
129 } elsif (@diff) {
130 $diff{$_} = 0b11 for @{$diff[0][1]};
131 $diff{$_} = 0b10 for @{$diff[0][2]};
132
133 shift @diff;
134 } else {
135 last;
136 }
137 }
138
139 # append or update, depending on wether there are messages to be deleted
140 $vc->snd("begin");
141 $folder->begin_update;
142
143 while (my ($k,$v) = each %diff) {
144 push @{$diff[$v]}, $k;
145 slog 0, "DIFF $k : $v\n";#d#
146 }
147
148 $vc->snd("delete", join "\0", @{$diff[2]});
149 $folder->delete(@{$diff[0]});
150
151 # offer ours
152 my $offer_coro = sync_offer($folder, $vc, $diff[3]);
153
154 # request theirs
155 {
156 my @send = grep { !$folder->exists($_) } @{$diff[1]};
157 $vc->snd("send", join "\0", @send);
158 $folder->append($_, $vc->rcv) for @send;
159 $vc->snd("-"); # sync
160 }
161
162 slog 0, "waiting...\n";#d#
163 $offer_coro->join;
164
165 # sanity check
166 $vc->snd("inventory");
167 if ($folder->inventory ne $vc->rcv) {
168 $folder->write_mdif;
169 slog 0, "FATAL: folder inventory mismatch after update\n";
170 }
171
172 $vc->snd("end");
173 $folder->end_update;
174
175 $vc->snd("setctime", $folder->{ctime});
176
177 $vc->snd("mtime");
178 $folder->{host}{$OTHERNAME} = $vc->rcv;
179
180 $vc->snd("close");
181 $folder->close;
182
183 undef $quit_guard;
184 }
185}
186
187sub main {
188 my $vc = create vc;
189
190 # time checking done symmetrically
191 {
192 my $time = time;
193 $vc->snd("time");
194 my $othertime = $vc->rcv;
195 abs (($time + time)*0.5 - $othertime) <= $::MAXTIMEDIFF
196 or die "ERROR: time difference between hosts larger than $::MAXTIMEDIFF";
197 }
198
199 $vc->snd("setname", $MYNAME); $OTHERNAME = $vc->rcv;
200
340 if ($SLAVE) { 201 if ($SLAVE) {
341 request "fhave", $folder; 202 #
203 } else {
204 $vc->snd("list");
205 for (split /\0/, $vc->rcv) {
206 if (!-e "$PREFIX/$_") {
207 slog 2, "created new empty folder $_\n";
208 sysopen my $fh, "$PREFIX/$_", O_RDWR|O_CREAT, 0666;
209 }
210 }
211
212 for my $folder (find_folders) {
213 sync_folder $folder;
214 }
215
216 $quit->wrlock;
217
218 vc::snd_quit;
219 }
220}
221
222sub serve_folder {
223 my $vc = shift;
224 my $name = $vc->rcv;
225 my $folder = new folder name => $name;
226
227 slog 8, "serving folder $name\n";
228
229 $folder->read_mdif;
230
231 while (my $msg = $vc->rcv) {
232 if ($msg eq "mtime") {
233 $vc->snd($folder->{mtime});
234 } elsif ($msg eq "inventory") {
235 $vc->snd($folder->inventory);
236 } elsif ($msg eq "diff") {
237 my $time = $vc->rcv;
238 for (@{$folder->{diff}}) {
239 next if $_->[0] <= $time;
240 $vc->snd($_->[0],
241 (join "\0", @{$_->[1]}),
242 (join "\0", @{$_->[2]}),
243 );
244 }
245 $vc->snd(-1);
246 } elsif ($msg eq "begin") {
247 $folder->begin_update;
248 } elsif ($msg eq "delete") {
249 $folder->delete(split /\0/, $vc->rcv);
250 } elsif ($msg eq "offer") {
251 my $ovc = catch vc port => $vc->rcv;
252 async {
253 my @offer;
254 {
255 my @dup;
256
257 $ovc->snd("-"); # synchronize
258 for (split /\0/, $ovc->rcv) {
259 if ($folder->exists($_)) {
260 push @dup, $_;
261 } else {
262 push @offer, $_;
263 }
264 }
265
266 $ovc->snd(join "\0", @dup);
267 }
268
269 # now we'll get everything in @offer, in order
270 $folder->append($_, $ovc->rcv) for @offer;
271 $ovc->close;
272 };
273 } elsif ($msg eq "send") {
274 $vc->pri(1);
275 $vc->snd($folder->fetch($_)) for split /\0/, $vc->rcv;
276 $vc->rcv; # sync
277 $vc->pri(0);
278 } elsif ($msg eq "end") {
279 $folder->end_update;
280 } elsif ($msg eq "mtime") {
281 $vc->snd($folder->{mtime});
282 } elsif ($msg eq "setctime") {
283 $folder->{host}{$OTHERNAME} = $vc->rcv;
284 } elsif ($msg eq "close") {
285 $folder->close;
342 } else { 286 } else {
343 request "fwant", $folder; 287 die "protocol error, unknown folder command ($msg)\n";
344 } 288 }
345 } 289 }
290}
346 291
347 $done = 1; 292sub serve {
348 my $x = request("nop"); 293 my $vc = shift;
349 print STDERR "$SLAVE returned nop @$x\n";#d# 294
295 slog 8, "new connection $vc->{port}\n";
296
297 while (my $msg = $vc->rcv) {
298 if ($msg eq "setname") {
299 $vc->snd($MYNAME);
300 $OTHERNAME = $vc->rcv;
301 } elsif ($msg eq "pri") {
302 $vc->{pri} = $vc->rcv;
303 } elsif ($msg eq "time") {
304 $vc->snd(time);
305 } elsif ($msg eq "list") {
306 $vc->snd(join "\0", find_folders);
307 } elsif ($msg eq "open") {
308 serve_folder($vc);
309 } else {
310 die "protocol error, unknown command ($msg)\n";
311 }
312 }
350} 313}
351 314
352if ($SLAVE) { 315if ($SLAVE) {
353 $HOSTID = "slave"; 316 $HOSTID = "slave";
354 async \&sync_dir, "./dst"; 317 async \&main;
355 handle_commands unblock \*STDIN; 318 vc::multiplex unblock \*STDIN, $SLAVE;
356} else { 319} else {
357 $HOSTID = "master"; 320 $HOSTID = "master";
358 { 321 {
359 use Socket; 322 use Socket;
360 socketpair S1, S2, AF_UNIX, SOCK_STREAM, PF_UNSPEC; 323 socketpair S1, S2, AF_UNIX, SOCK_STREAM, PF_UNSPEC;
361 if (fork == 0) { 324 if (fork == 0) {
325 close S1;
362 open STDIN, "<&S2" or die; 326 open STDIN, "<&S2" or die;
363 open STDOUT, ">&S2" or die; 327 open STDOUT, ">&S2" or die;
364 exec $0, "--slave"; 328 exec $0, "--slave";
365 exit 255; 329 exit 255;
366 } 330 }
367 async \&sync_dir, "./src"; 331 close S2;
368 handle_commands unblock \*S1; 332 async \&main;
333 vc::multiplex unblock \*S1, $SLAVE;
369 } 334 }
370} 335}
371 336

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines