ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/syncmail/syncmail
Revision: 1.1
Committed: Tue Oct 23 23:52:40 2001 UTC (22 years, 7 months ago) by root
Branch: MAIN
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #!/usr/bin/perl
2    
3     use Coro;
4     use Coro::Handle;
5     use Coro::Event;
6     use Coro::Semaphore;
7     use Coro::Channel;
8     use Coro::Signal;
9    
10     use MD5;
11    
12     use constant VERSION => 1;
13    
14     # TODO: content-length support (HACK!)
15     # TODO: real message-id parsing
16    
17     $|=1;
18    
19     $v = 9;
20    
21     $SLAVE = 1*($ARGV[0] eq "--slave");
22    
23     my $ecnt;
24    
25     sub slog {
26     if ($_[0] <= $v) {
27     print STDERR $_[1];
28     }
29     }
30    
31     # parse_folder(mbox-file-path, callback)
32     # callback gets called with \$header and \$body,
33     # $header includes the mbox From_ line without
34     # the leading From_ itself.
35     sub parse_folder {
36     my ($path, $cb) = @_;
37    
38     open my $fh, "<", $path
39     or die "$path: $!";
40    
41     local $/ = "\n\n";
42    
43     my ($head, $body, $offs);
44    
45     5 == read $fh, $head, 5
46     or return;
47    
48     $head eq "From "
49     or return;
50    
51     $offs = 0;
52     while (defined ($head = <$fh>)) {
53     $head =~ /^.*? [A-Z][a-z][a-z] [A-Z][a-z][a-z] [ 0-9][0-9] \d\d:\d\d:\d\d(?: [+-]\d\d\d\d)? \d\d(?:\d\d)\n/
54     or die "$path: not standard mbox format header:\n$head\n";
55    
56     local $/ = "\nFrom ";
57     # NEVER enable this. content-length simply is broken by design
58     #if ($head =~ /^Content-Length:\s+(\d+)$/im) {
59     # $1 <= read $fh, $body, $1 + 5
60     # or die "$path: partial message in mbox";
61     #} else {
62     $body = <$fh>;
63     #}
64     chomp $body;
65     $cb->($offs, \$head, \$body);
66     $offs = (tell $fh) - 5;
67     cede unless ++$ecnt & 1023;
68     }
69    
70     1;
71     }
72    
73     sub read_mdif {
74     my ($path) = @_;
75     my $fh;
76     my $mdif;
77     open my $fh, "<", $path
78     or return { };
79    
80     defined ($_ = <$fh>)
81     or die "$path: empty mdif file\n";
82    
83     do {
84     if ($_ eq "[SYNCMAIL]\n") {
85     while (<$fh>) {
86     last unless /^([a-z]+)\s*=\s*(.*)\n$/;
87     $mdif->{$1} = $2;
88     }
89     } elsif ($_ eq "[HOSTS]\n") {
90     while (<$fh>) {
91     last unless /^([^[].*)=(.*)\n$/;
92     $mdif->{host}{$1} = $2;
93     }
94     } elsif (/^\[DIFF(\d+)\.(\d+)\]\n$/) {
95     my ($gen, $mtime) = ($1, $2);
96     my @dif;
97     while (<$fh>) {
98     last unless /^[+-]/;
99     push @dif, substr $_, 0, -1;
100     }
101     unshift @{$mdif->{diff}}, [$gen, $mtime, \@dif];
102     } elsif ($_ eq "[INDEX]\n") {
103     my @idx;
104     while (<$fh>) {
105     last unless /^(\d+)=(.*)\n$/;
106     push @idx, [$1, $2];
107     }
108     $mdif->{idx} = \@idx;
109     } elsif (/^#/) {
110     $_ = <$fh>;
111     # nop
112     } else {
113     die "$path: unparseable section '$_'\n";
114     }
115     } while defined $_;
116    
117     $mdif->{version} <= VERSION
118     or die "$path: version mismatch ($mdif->{version} found, <".VERSION." expected)\n";
119    
120     $mdif;
121     }
122    
123     sub write_mdif {
124     my ($path, $mdif) = @_;
125     my $fh;
126    
127     open my $fh, ">", "$path~"
128     or die "$path~: $!";
129    
130     print $fh "# automatically generated, do NOT edit\n";
131    
132     print $fh "[SYNCMAIL]\n";
133     print $fh "$_=$mdif->{$_}\n" for (qw(fsize mtime gen version));
134    
135     print $fh "[HOSTS]\n";
136     print $fh "$k=$v\n" while my ($k,$v) = each %{$mdif->{host}};
137    
138     print $fh "[INDEX]\n";
139     print $fh "$_->[0]=$_->[1]\n" for @{$mdif->{idx}};
140    
141     for (reverse @{$mdif->{diff}}) {
142     print $fh "[DIFF$_->[0].$_->[1]]\n";
143     print $fh $_, "\n" for @{$_->[2]};
144     }
145    
146     close $fh
147     or die "$path~: unable to create updated .mdif: $!";
148    
149     rename "$path~", $path;
150     }
151    
152     sub gendiff {
153     my ($d1, $d2) = @_;
154    
155     my @d;
156     my (%d1, %d2);
157    
158     for (@$d2) {
159     undef $d2{$_->[1]};
160     }
161    
162     # delete msgs in d1 but not in d2
163     for (@$d1) {
164     undef $d1{$_->[1]};
165     push @d, "-$_->[1]" unless exists $d2{$_->[1]};
166     }
167     %d2 = (); # conserve memory
168    
169     # add msgs in d2 but not in d1
170     for (@$d2) {
171     push @d, "+$_->[1]" unless exists $d1{$_->[1]};
172     }
173    
174     \@d;
175     }
176    
177     my $check_folder = new Coro::Semaphore;
178    
179     sub check_folder {
180     my ($path) = @_;
181     my $guard = $check_folder->guard;
182    
183     (my $conf = $path) =~ s%([^/]+$)%.$1.mdif%;
184    
185     slog 1, "checking $path\n";
186    
187     if (stat $path) {
188     my ($fsize, $mtime) = (stat _)[7, 9];
189     if (open my $fh, "<", $conf) {
190     my %conf;
191     <$fh>; # skip initial comment
192     <$fh> eq "[SYNCMAIL]\n"
193     or die "$conf: format error";
194     while (<$fh> =~ /^([a-z]+)\s*=\s*(.*)$/) {
195     $conf{$1} = $2;
196     }
197     return (1, \%conf) if $fsize == $conf{fsize}
198     && $mtime == $conf{mtime};
199     }
200    
201     slog 2, "updating $path\n";
202    
203     my @idx;
204    
205     parse_folder $path, sub {
206     my ($offs, $head, $body) = @_;
207     my $mid;
208     if ($$head =~ /^Message-Id:\s*(<[^<\n]+>)\s*\n/im) {
209     $mid = $1;
210     } else {
211     $mid = MD5->hexhash("$$head\0$$body");
212     }
213     push @idx, [$offs, $mid];
214     } or return ();
215    
216     my $mdif = read_mdif $conf;
217    
218     if ($mdif->{version}) {
219     my $d = gendiff $mdif->{idx}, \@idx;
220     push @{$mdif->{diff}}, [
221     $mdif->{gen}++,
222     $mdif->{mtime},
223     $d,
224     ] if @$d;
225     } else {
226     slog 2, "$path: new folder\n";
227     $mdif->{version} ||= VERSION;
228     $mdif->{gen} = 1;
229     }
230    
231     $mdif->{fsize} = $fsize;
232     $mdif->{mtime} = $mtime;
233     $mdif->{idx} = \@idx;
234    
235     write_mdif $conf, $mdif;
236    
237     return (2, $mdif);
238     } else {
239     slog 2, "$path: no longer exists\n";
240     unlink $conf;
241    
242     return ();
243     }
244     }
245    
246     my $send = new Coro::Channel 10;
247     my $done = 0;
248    
249     # request $command, $data
250     sub request {
251     my $res;
252     my $signal = new Coro::Signal;
253     my $cmd = defined $_[1] ? "000+".length($_[1])." $_[0]\n$_[1]" : "000 $_[0]\n";
254     $send->put([$signal, \$res, $cmd]);
255     $signal->wait;
256     $res;
257     }
258    
259     # reply $id, $code, $msg, $data
260     sub reply {
261     my $cmd = defined $_[3] ? "$_[1]+".length($_[3])." $_[2]\n$_[3]" : "$_[1] $_[2]\n";
262     $send->put([undef, undef, "$_[0] $cmd"]);
263     }
264    
265     sub handle_commands {
266     my ($fh) = @_;
267     my $id = "a";
268     async {
269     $fh->print("- 000 hello $HOSTID\n");
270     while (my $r = $send->get) {
271     if (defined $r->[1]) {
272     my $id = ++$id;
273     $request{$id} = $r;
274     print STDERR "<<< $SLAVE sendign request $id:$r->[2]";#d#
275     $fh->print("$id $r->[2]");
276     } else {
277     print STDERR "<<< $SLAVE sendign reply $r->[2]";#d#
278     $fh->print($r->[2]);
279     }
280     }
281     print STDERR "$SLAVE shutdown\n";#d#
282     shutdown $fh, 1;
283     };
284     while (<$fh>) {
285     slog 0, ">>> $SLAVE received :$_";
286     /^(\S+) (\d\d\d)(?:\+(\d+))?\s*(.*)$/
287     or die "protocol error, garbled command ($_)";
288    
289     my ($id, $code, $dlen, $msg) = ($1, $2, $3, $4);
290     my $data;
291    
292     $fh->sysread($data, $dlen) == $dlen
293     or die "unexpected read error: $!";
294    
295     if ($code == 0) {
296     if ($msg eq "quit") {
297     print $fh "$id 200 quit\n";
298     $send->put(undef);
299     last;
300     } elsif ($msg eq "nop") {
301     reply $id, 200, "nop";
302     } elsif ($msg =~ /^hello (.*)$/) {
303     $OTHERID = $1;
304     slog 3, "otherid set to $OTHERID\n";
305     } else {
306     die "protocol error, unknown command ($_)\n";
307     }
308     } else {
309     my $r = delete $request{$id}
310     or die "protocol error, invalid reply id ($_)\n";
311    
312     ${$r->[1]} = [$code, $msg, $data];
313     $r->[0]->send;
314     }
315    
316     if ($done && !%request) {
317     $done = 0;
318     async {
319     request("quit");
320     exit 0;
321     };
322     }
323     }
324    
325     exit 0;
326     }
327    
328     sub sync_dir {
329     my ($path) = @_;
330    
331     opendir my $dh, $path
332     or die "$path: $!";
333    
334     while (defined (my $folder = readdir $dh)) {
335     next if $folder =~ /^\./;
336     my $path = "$path/$folder";
337     next unless -f $path;
338     my ($status, $mdif) = check_folder $path;
339     print STDERR "$path: $status | $mdif\n";#d#
340     if ($SLAVE) {
341     request "fhave", $folder;
342     } else {
343     request "fwant", $folder;
344     }
345     }
346    
347     $done = 1;
348     my $x = request("nop");
349     print STDERR "$SLAVE returned nop @$x\n";#d#
350     }
351    
352     if ($SLAVE) {
353     $HOSTID = "slave";
354     async \&sync_dir, "./dst";
355     handle_commands unblock \*STDIN;
356     } else {
357     $HOSTID = "master";
358     {
359     use Socket;
360     socketpair S1, S2, AF_UNIX, SOCK_STREAM, PF_UNSPEC;
361     if (fork == 0) {
362     open STDIN, "<&S2" or die;
363     open STDOUT, ">&S2" or die;
364     exec $0, "--slave";
365     exit 255;
366     }
367     async \&sync_dir, "./src";
368     handle_commands unblock \*S1;
369     }
370     }
371