ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/syncmail/syncmail
Revision: 1.7
Committed: Mon Oct 29 00:37:41 2001 UTC (22 years, 6 months ago) by root
Branch: MAIN
Changes since 1.6: +79 -17 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #!/usr/bin/perl
2    
3     use Coro;
4     use Coro::Handle;
5     use Coro::Event;
6     use Coro::Semaphore;
7     use Coro::Channel;
8     use Coro::Signal;
9 root 1.2 use Coro::RWLock;
10 root 1.1
11 root 1.4 use Set::Scalar;
12    
13 root 1.2 use Fcntl;
14 root 1.1
15 root 1.6 our $VERSION = 0.1;
16 root 1.2
17 root 1.6 our $NOW = time;
18 root 1.2
19 root 1.3 require "config.pl";
20    
21     use folder;
22     use vc;
23    
24     $v = $VERBOSE;
25 root 1.2
26     # WARNING:
27     # Content-Length headers are deliberately being ignored. They
28     # are broken by design and will never be supported
29 root 1.1
30     # TODO: real message-id parsing
31    
32     $|=1;
33    
34     my $ecnt;
35    
36     sub slog {
37     if ($_[0] <= $v) {
38 root 1.2 print STDERR "[$SLAVE] ", $_[1];
39 root 1.1 }
40     }
41    
42 root 1.3 $lockdisk = new Coro::Semaphore;
43 root 1.2
44     # give up a/the timeslice
45     sub give {
46     Coro::Event::do_timer(after => 0);
47     }
48    
49     my $quit = new Coro::RWLock;
50    
51     sub rwlock::DESTROY {
52     $quit->unlock;
53     }
54    
55     sub quit_guard {
56     $quit->rdlock;
57     bless [], rwlock;
58     }
59    
60     sub find_folders {
61     my @folders;
62    
63     opendir my $dh, $PREFIX
64     or die "$PREFIX: $!";
65    
66     while (defined (my $folder = readdir $dh)) {
67     next if $folder =~ /^\./;
68 root 1.6 next if $folder =~ /~$/;
69 root 1.2 next unless -f "$PREFIX/$folder";
70     push @folders, $folder;
71     }
72    
73     @folders;
74     }
75    
76     my $sync_folder = new Coro::Semaphore 3; # max 3 folders in parallel
77    
78 root 1.7 sub sync_offer {
79     my ($folder, $avc, $diff) = @_;
80    
81     my $vc = create vc pri => 1;
82     $avc->snd("offer", $vc->{port});
83    
84     async {
85     $vc->rcv; # need to synchronize, argl. should open on other side
86     $vc->snd(join "\0", @$diff);
87     my %dup; @dup{split /\0/, $vc->rcv} = ();
88    
89     for (@$diff) {
90     $vc->snd($folder->fetch($_)) unless exists $dup{$_};
91     }
92    
93     defined $vc->rcv and die "protocol error, expected close";
94     };
95     }
96    
97 root 1.2 sub sync_folder {
98     my $name = $_[0];
99    
100     my $quit_guard = quit_guard;
101     async {
102     my $guard = $sync_folder->guard;
103 root 1.6 my $vc = create vc;
104 root 1.2
105 root 1.3 my $folder = new folder name => $name;
106 root 1.2
107 root 1.3 $vc->snd("open", $name);
108     $vc->snd("mtime");
109 root 1.2
110     $folder->read_mdif;
111 root 1.3
112 root 1.4 my $ctime = $folder->{host}{$OTHERNAME} || -1;
113 root 1.3 my $rtime = $vc->rcv;
114 root 1.1
115 root 1.4 $vc->snd("diff", $ctime);
116    
117 root 1.6 my %diff; #
118     # 00 - local del
119     # 01 - local add
120     # 10 - remote del
121     # 11 - remote add
122 root 1.4
123     my @diff = grep { $_->[0] > $ctime } @{$folder->{diff}};
124 root 1.7
125 root 1.4 $diff = $vc->rcv;
126     while () {
127 root 1.7 if ($diff >= 0 and (!@diff or $diff < $diff[0][0])) {
128 root 1.6 $diff{$_} = 0b01 for split /\0/, $vc->rcv; # add
129     $diff{$_} = 0b00 for split /\0/, $vc->rcv; # del
130    
131 root 1.4 $diff = $vc->rcv;
132     } elsif (@diff) {
133 root 1.6 $diff{$_} = 0b11 for @{$diff[0][1]};
134     $diff{$_} = 0b10 for @{$diff[0][2]};
135    
136 root 1.4 shift @diff;
137     } else {
138     last;
139     }
140     }
141    
142 root 1.6 # append or update, depending on wether there are messages to be deleted
143     $vc->snd("begin");
144     $folder->begin_update;
145    
146     while (my ($k,$v) = each %diff) {
147     push @{$diff[$v]}, $k;
148     slog 0, "DIFF $k : $v\n";#d#
149     }
150    
151     $vc->snd("delete", join "\0", @{$diff[2]});
152     $folder->delete(@{$diff[0]});
153    
154 root 1.7 # offer ours
155     my $offer_coro = sync_offer($folder, $vc, $diff[3]);
156    
157     # request theirs
158     {
159     my @send = grep { !$folder->exists($_) } @{$diff[1]};
160     $vc->snd("send", join "\0", @send);
161     $folder->append($_, $vc->rcv) for @send;
162     $vc->snd("-"); # sync
163     }
164    
165     slog 0, "waiting...\n";#d#
166     $offer_coro->join;
167    
168     # sanity check
169     $vc->snd("inventory");
170     if ($folder->inventory ne $vc->rcv) {
171     $folder->write_mdif;
172     slog 0, "FATAL: folder inventory mismatch after update\n";
173     }
174    
175 root 1.6 $vc->snd("end");
176     $folder->end_update;
177 root 1.4
178 root 1.7 $vc->snd("setctime", $folder->{ctime});
179    
180     $vc->snd("mtime");
181 root 1.6 $folder->{host}{$OTHERNAME} = $vc->rcv;
182    
183 root 1.7 $vc->snd("close");
184     $folder->close;
185 root 1.1
186 root 1.2 undef $quit_guard;
187 root 1.1 }
188     }
189    
190 root 1.2 sub main {
191 root 1.6 my $vc = create vc;
192 root 1.3
193 root 1.2 # time checking done symmetrically
194     {
195     my $time = time;
196 root 1.3 $vc->snd("time");
197     my $othertime = $vc->rcv;
198 root 1.2 abs (($time + time)*0.5 - $othertime) <= $::MAXTIMEDIFF
199     or die "ERROR: time difference between hosts larger than $::MAXTIMEDIFF";
200     }
201 root 1.3
202 root 1.6 $vc->snd("setname", $MYNAME); $OTHERNAME = $vc->rcv;
203 root 1.1
204 root 1.2 if ($SLAVE) {
205     #
206     } else {
207 root 1.3 $vc->snd("list");
208     for (split /\0/, $vc->rcv) {
209 root 1.2 if (!-e "$PREFIX/$_") {
210     slog 2, "created new empty folder $_\n";
211     sysopen my $fh, "$PREFIX/$_", O_RDWR|O_CREAT, 0666;
212     }
213     }
214 root 1.1
215 root 1.2 for my $folder (find_folders) {
216     sync_folder $folder;
217 root 1.1 }
218 root 1.2
219     $quit->wrlock;
220 root 1.3
221     vc::snd_quit;
222     }
223     }
224    
225     sub serve_folder {
226     my $vc = shift;
227     my $name = $vc->rcv;
228     my $folder = new folder name => $name;
229    
230     slog 8, "serving folder $name\n";
231    
232     $folder->read_mdif;
233    
234     while (my $msg = $vc->rcv) {
235     if ($msg eq "mtime") {
236     $vc->snd($folder->{mtime});
237 root 1.6 } elsif ($msg eq "inventory") {
238     $vc->snd($folder->inventory);
239 root 1.4 } elsif ($msg eq "diff") {
240     my $time = $vc->rcv;
241     for (@{$folder->{diff}}) {
242     next if $_->[0] <= $time;
243     $vc->snd($_->[0],
244     (join "\0", @{$_->[1]}),
245     (join "\0", @{$_->[2]}),
246     );
247     }
248     $vc->snd(-1);
249 root 1.6 } elsif ($msg eq "begin") {
250     $folder->begin_update;
251 root 1.7 } elsif ($msg eq "delete") {
252     $folder->delete(split /\0/, $vc->rcv);
253     } elsif ($msg eq "offer") {
254     my $ovc = catch vc port => $vc->rcv;
255     async {
256     my @offer;
257     {
258     my @dup;
259    
260     $ovc->snd("-"); # synchronize
261     for (split /\0/, $ovc->rcv) {
262     if ($folder->exists($_)) {
263     push @dup, $_;
264     } else {
265     push @offer, $_;
266     }
267     }
268    
269     $ovc->snd(join "\0", @dup);
270     }
271    
272     # now we'll get everything in @offer, in order
273     $folder->append($_, $ovc->rcv) for @offer;
274     $ovc->close;
275     };
276     } elsif ($msg eq "send") {
277     $vc->pri(1);
278     $vc->snd($folder->fetch($_)) for split /\0/, $vc->rcv;
279     $vc->rcv; # sync
280     $vc->pri(0);
281 root 1.6 } elsif ($msg eq "end") {
282     $folder->end_update;
283     } elsif ($msg eq "mtime") {
284     $vc->snd($folder->{mtime});
285 root 1.4 } elsif ($msg eq "setctime") {
286     $folder->{host}{$OTHERNAME} = $vc->rcv;
287 root 1.7 } elsif ($msg eq "close") {
288     $folder->close;
289 root 1.3 } else {
290     die "protocol error, unknown folder command ($msg)\n";
291     }
292     }
293     }
294    
295     sub serve {
296     my $vc = shift;
297    
298     slog 8, "new connection $vc->{port}\n";
299    
300     while (my $msg = $vc->rcv) {
301 root 1.6 if ($msg eq "setname") {
302     $vc->snd($MYNAME);
303     $OTHERNAME = $vc->rcv;
304 root 1.3 } elsif ($msg eq "pri") {
305 root 1.6 $vc->{pri} = $vc->rcv;
306 root 1.3 } elsif ($msg eq "time") {
307     $vc->snd(time);
308     } elsif ($msg eq "list") {
309     $vc->snd(join "\0", find_folders);
310     } elsif ($msg eq "open") {
311     serve_folder($vc);
312     } else {
313     die "protocol error, unknown command ($msg)\n";
314     }
315 root 1.1 }
316     }
317    
318     if ($SLAVE) {
319     $HOSTID = "slave";
320 root 1.2 async \&main;
321 root 1.3 vc::multiplex unblock \*STDIN, $SLAVE;
322 root 1.1 } else {
323     $HOSTID = "master";
324     {
325     use Socket;
326     socketpair S1, S2, AF_UNIX, SOCK_STREAM, PF_UNSPEC;
327     if (fork == 0) {
328 root 1.2 close S1;
329 root 1.1 open STDIN, "<&S2" or die;
330     open STDOUT, ">&S2" or die;
331     exec $0, "--slave";
332     exit 255;
333     }
334 root 1.2 close S2;
335     async \&main;
336 root 1.3 vc::multiplex unblock \*S1, $SLAVE;
337 root 1.1 }
338     }
339