ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/syncmail/syncmail
Revision: 1.8
Committed: Mon Oct 29 02:36:32 2001 UTC (22 years, 6 months ago) by root
Branch: MAIN
CVS Tags: HEAD
Changes since 1.7: +0 -3 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 #!/usr/bin/perl
2
3 use Coro;
4 use Coro::Handle;
5 use Coro::Event;
6 use Coro::Semaphore;
7 use Coro::Channel;
8 use Coro::Signal;
9 use Coro::RWLock;
10
11 use Set::Scalar;
12
13 use Fcntl;
14
15 our $VERSION = 0.1;
16
17 our $NOW = time;
18
19 require "config.pl";
20
21 use folder;
22 use vc;
23
24 $v = $VERBOSE;
25
26 # WARNING:
27 # Content-Length headers are deliberately being ignored. They
28 # are broken by design and will never be supported
29
30 # TODO: real message-id parsing
31
32 $|=1;
33
34 my $ecnt;
35
36 sub slog {
37 if ($_[0] <= $v) {
38 print STDERR "[$SLAVE] ", $_[1];
39 }
40 }
41
42 $lockdisk = new Coro::Semaphore;
43
44 # give up a/the timeslice
45 sub give {
46 Coro::Event::do_timer(after => 0);
47 }
48
49 my $quit = new Coro::RWLock;
50
51 sub rwlock::DESTROY {
52 $quit->unlock;
53 }
54
55 sub quit_guard {
56 $quit->rdlock;
57 bless [], rwlock;
58 }
59
60 sub find_folders {
61 my @folders;
62
63 opendir my $dh, $PREFIX
64 or die "$PREFIX: $!";
65
66 while (defined (my $folder = readdir $dh)) {
67 next if $folder =~ /^\./;
68 next if $folder =~ /~$/;
69 next unless -f "$PREFIX/$folder";
70 push @folders, $folder;
71 }
72
73 @folders;
74 }
75
76 my $sync_folder = new Coro::Semaphore 3; # max 3 folders in parallel
77
78 sub sync_offer {
79 my ($folder, $avc, $diff) = @_;
80
81 my $vc = create vc pri => 1;
82 $avc->snd("offer", $vc->{port});
83
84 async {
85 $vc->rcv; # need to synchronize, argl. should open on other side
86 $vc->snd(join "\0", @$diff);
87 my %dup; @dup{split /\0/, $vc->rcv} = ();
88
89 for (@$diff) {
90 $vc->snd($folder->fetch($_)) unless exists $dup{$_};
91 }
92
93 defined $vc->rcv and die "protocol error, expected close";
94 };
95 }
96
97 sub sync_folder {
98 my $name = $_[0];
99
100 my $quit_guard = quit_guard;
101 async {
102 my $guard = $sync_folder->guard;
103 my $vc = create vc;
104
105 my $folder = new folder name => $name;
106
107 $vc->snd("open", $name);
108 $folder->read_mdif;
109
110 my $ctime = $folder->{host}{$OTHERNAME} || -1;
111
112 $vc->snd("diff", $ctime);
113
114 my %diff; #
115 # 00 - local del
116 # 01 - local add
117 # 10 - remote del
118 # 11 - remote add
119
120 my @diff = grep { $_->[0] > $ctime } @{$folder->{diff}};
121
122 $diff = $vc->rcv;
123 while () {
124 if ($diff >= 0 and (!@diff or $diff < $diff[0][0])) {
125 $diff{$_} = 0b01 for split /\0/, $vc->rcv; # add
126 $diff{$_} = 0b00 for split /\0/, $vc->rcv; # del
127
128 $diff = $vc->rcv;
129 } elsif (@diff) {
130 $diff{$_} = 0b11 for @{$diff[0][1]};
131 $diff{$_} = 0b10 for @{$diff[0][2]};
132
133 shift @diff;
134 } else {
135 last;
136 }
137 }
138
139 # append or update, depending on wether there are messages to be deleted
140 $vc->snd("begin");
141 $folder->begin_update;
142
143 while (my ($k,$v) = each %diff) {
144 push @{$diff[$v]}, $k;
145 slog 0, "DIFF $k : $v\n";#d#
146 }
147
148 $vc->snd("delete", join "\0", @{$diff[2]});
149 $folder->delete(@{$diff[0]});
150
151 # offer ours
152 my $offer_coro = sync_offer($folder, $vc, $diff[3]);
153
154 # request theirs
155 {
156 my @send = grep { !$folder->exists($_) } @{$diff[1]};
157 $vc->snd("send", join "\0", @send);
158 $folder->append($_, $vc->rcv) for @send;
159 $vc->snd("-"); # sync
160 }
161
162 slog 0, "waiting...\n";#d#
163 $offer_coro->join;
164
165 # sanity check
166 $vc->snd("inventory");
167 if ($folder->inventory ne $vc->rcv) {
168 $folder->write_mdif;
169 slog 0, "FATAL: folder inventory mismatch after update\n";
170 }
171
172 $vc->snd("end");
173 $folder->end_update;
174
175 $vc->snd("setctime", $folder->{ctime});
176
177 $vc->snd("mtime");
178 $folder->{host}{$OTHERNAME} = $vc->rcv;
179
180 $vc->snd("close");
181 $folder->close;
182
183 undef $quit_guard;
184 }
185 }
186
187 sub main {
188 my $vc = create vc;
189
190 # time checking done symmetrically
191 {
192 my $time = time;
193 $vc->snd("time");
194 my $othertime = $vc->rcv;
195 abs (($time + time)*0.5 - $othertime) <= $::MAXTIMEDIFF
196 or die "ERROR: time difference between hosts larger than $::MAXTIMEDIFF";
197 }
198
199 $vc->snd("setname", $MYNAME); $OTHERNAME = $vc->rcv;
200
201 if ($SLAVE) {
202 #
203 } else {
204 $vc->snd("list");
205 for (split /\0/, $vc->rcv) {
206 if (!-e "$PREFIX/$_") {
207 slog 2, "created new empty folder $_\n";
208 sysopen my $fh, "$PREFIX/$_", O_RDWR|O_CREAT, 0666;
209 }
210 }
211
212 for my $folder (find_folders) {
213 sync_folder $folder;
214 }
215
216 $quit->wrlock;
217
218 vc::snd_quit;
219 }
220 }
221
222 sub serve_folder {
223 my $vc = shift;
224 my $name = $vc->rcv;
225 my $folder = new folder name => $name;
226
227 slog 8, "serving folder $name\n";
228
229 $folder->read_mdif;
230
231 while (my $msg = $vc->rcv) {
232 if ($msg eq "mtime") {
233 $vc->snd($folder->{mtime});
234 } elsif ($msg eq "inventory") {
235 $vc->snd($folder->inventory);
236 } elsif ($msg eq "diff") {
237 my $time = $vc->rcv;
238 for (@{$folder->{diff}}) {
239 next if $_->[0] <= $time;
240 $vc->snd($_->[0],
241 (join "\0", @{$_->[1]}),
242 (join "\0", @{$_->[2]}),
243 );
244 }
245 $vc->snd(-1);
246 } elsif ($msg eq "begin") {
247 $folder->begin_update;
248 } elsif ($msg eq "delete") {
249 $folder->delete(split /\0/, $vc->rcv);
250 } elsif ($msg eq "offer") {
251 my $ovc = catch vc port => $vc->rcv;
252 async {
253 my @offer;
254 {
255 my @dup;
256
257 $ovc->snd("-"); # synchronize
258 for (split /\0/, $ovc->rcv) {
259 if ($folder->exists($_)) {
260 push @dup, $_;
261 } else {
262 push @offer, $_;
263 }
264 }
265
266 $ovc->snd(join "\0", @dup);
267 }
268
269 # now we'll get everything in @offer, in order
270 $folder->append($_, $ovc->rcv) for @offer;
271 $ovc->close;
272 };
273 } elsif ($msg eq "send") {
274 $vc->pri(1);
275 $vc->snd($folder->fetch($_)) for split /\0/, $vc->rcv;
276 $vc->rcv; # sync
277 $vc->pri(0);
278 } elsif ($msg eq "end") {
279 $folder->end_update;
280 } elsif ($msg eq "mtime") {
281 $vc->snd($folder->{mtime});
282 } elsif ($msg eq "setctime") {
283 $folder->{host}{$OTHERNAME} = $vc->rcv;
284 } elsif ($msg eq "close") {
285 $folder->close;
286 } else {
287 die "protocol error, unknown folder command ($msg)\n";
288 }
289 }
290 }
291
292 sub serve {
293 my $vc = shift;
294
295 slog 8, "new connection $vc->{port}\n";
296
297 while (my $msg = $vc->rcv) {
298 if ($msg eq "setname") {
299 $vc->snd($MYNAME);
300 $OTHERNAME = $vc->rcv;
301 } elsif ($msg eq "pri") {
302 $vc->{pri} = $vc->rcv;
303 } elsif ($msg eq "time") {
304 $vc->snd(time);
305 } elsif ($msg eq "list") {
306 $vc->snd(join "\0", find_folders);
307 } elsif ($msg eq "open") {
308 serve_folder($vc);
309 } else {
310 die "protocol error, unknown command ($msg)\n";
311 }
312 }
313 }
314
315 if ($SLAVE) {
316 $HOSTID = "slave";
317 async \&main;
318 vc::multiplex unblock \*STDIN, $SLAVE;
319 } else {
320 $HOSTID = "master";
321 {
322 use Socket;
323 socketpair S1, S2, AF_UNIX, SOCK_STREAM, PF_UNSPEC;
324 if (fork == 0) {
325 close S1;
326 open STDIN, "<&S2" or die;
327 open STDOUT, ">&S2" or die;
328 exec $0, "--slave";
329 exit 255;
330 }
331 close S2;
332 async \&main;
333 vc::multiplex unblock \*S1, $SLAVE;
334 }
335 }
336