… | |
… | |
4 | use Coro::Handle; |
4 | use Coro::Handle; |
5 | use Coro::Event; |
5 | use Coro::Event; |
6 | use Coro::Semaphore; |
6 | use Coro::Semaphore; |
7 | use Coro::Channel; |
7 | use Coro::Channel; |
8 | use Coro::Signal; |
8 | use Coro::Signal; |
|
|
9 | use Coro::RWLock; |
9 | |
10 | |
|
|
11 | use Fcntl; |
10 | use MD5; |
12 | use MD5; |
11 | |
13 | |
12 | use constant VERSION => 1; |
14 | use constant PROTVERSION => 1; |
13 | |
15 | |
14 | # TODO: content-length support (HACK!) |
16 | $VERSION = 0.1; |
|
|
17 | |
|
|
18 | $TIMEOUT = 20; |
|
|
19 | $IDLE = $TIMEOUT < 30 ? $TIMEOUT / 2 : $TIMEOUT - 15; |
|
|
20 | $MAXTIMEDIFF = 10; |
|
|
21 | |
|
|
22 | # WARNING: |
|
|
23 | # Content-Length headers are deliberately being ignored. They |
|
|
24 | # are broken by design and will never be supported |
|
|
25 | |
15 | # TODO: real message-id parsing |
26 | # TODO: real message-id parsing |
16 | |
27 | |
17 | $|=1; |
28 | $|=1; |
18 | |
29 | |
19 | $v = 9; |
30 | $v = 9; |
20 | |
31 | |
21 | $SLAVE = 1*($ARGV[0] eq "--slave"); |
32 | $SLAVE = 1*($ARGV[0] eq "--slave"); |
|
|
33 | |
|
|
34 | $MYNAME = $SLAVE ? "slave" : "master"; |
|
|
35 | $PREFIX = $SLAVE ? "./dst" : "./src"; |
|
|
36 | |
|
|
37 | @OTHERNAMES = qw(third); |
22 | |
38 | |
23 | my $ecnt; |
39 | my $ecnt; |
24 | |
40 | |
25 | sub slog { |
41 | sub slog { |
26 | if ($_[0] <= $v) { |
42 | if ($_[0] <= $v) { |
27 | print STDERR $_[1]; |
43 | print STDERR "[$SLAVE] ", $_[1]; |
28 | } |
44 | } |
29 | } |
45 | } |
30 | |
46 | |
|
|
47 | my $lockdisk = new Coro::Semaphore; |
|
|
48 | |
|
|
49 | # give up a/the timeslice |
|
|
50 | sub give { |
|
|
51 | Coro::Event::do_timer(after => 0); |
|
|
52 | } |
|
|
53 | |
|
|
54 | package folder; |
|
|
55 | |
|
|
56 | BEGIN { *slog = \&::slog }; |
|
|
57 | |
|
|
58 | use constant MDIFVERSION => 1; |
|
|
59 | |
|
|
60 | sub new { |
|
|
61 | my $class = shift; |
|
|
62 | my %arg = @_; |
|
|
63 | bless { |
|
|
64 | path => "$::PREFIX/$arg{name}", |
|
|
65 | %arg, |
|
|
66 | }, $class; |
|
|
67 | } |
|
|
68 | |
|
|
69 | sub dirty { |
|
|
70 | $_[0]{dirty} = 1; |
|
|
71 | } |
|
|
72 | |
|
|
73 | sub DESTROY { |
|
|
74 | $_[0]->write_mdif; |
|
|
75 | } |
|
|
76 | |
31 | # parse_folder(mbox-file-path, callback) |
77 | # parse_mbox(mbox-file-path, callback) |
32 | # callback gets called with \$header and \$body, |
78 | # callback gets called with \$header and \$body, |
33 | # $header includes the mbox From_ line without |
79 | # $header includes the mbox From_ line without |
34 | # the leading From_ itself. |
80 | # the leading From_ itself. |
35 | sub parse_folder { |
81 | sub parse_mbox { |
36 | my ($path, $cb) = @_; |
82 | my ($path, $cb) = @_; |
37 | |
83 | |
38 | open my $fh, "<", $path |
84 | open my $fh, "<", $path |
39 | or die "$path: $!"; |
85 | or die "$path: $!"; |
40 | |
86 | |
… | |
… | |
62 | $body = <$fh>; |
108 | $body = <$fh>; |
63 | #} |
109 | #} |
64 | chomp $body; |
110 | chomp $body; |
65 | $cb->($offs, \$head, \$body); |
111 | $cb->($offs, \$head, \$body); |
66 | $offs = (tell $fh) - 5; |
112 | $offs = (tell $fh) - 5; |
67 | cede unless ++$ecnt & 1023; |
113 | ::give unless ++$ecnt & 1023; |
68 | } |
114 | } |
69 | |
115 | |
70 | 1; |
116 | 1; |
71 | } |
117 | } |
72 | |
118 | |
|
|
119 | sub conf_path { |
|
|
120 | (my $conf = $_[0]{path}) =~ s%([^/]+$)%.$1.mdif%; |
|
|
121 | $conf; |
|
|
122 | } |
|
|
123 | |
73 | sub read_mdif { |
124 | sub read_mdif { |
74 | my ($path) = @_; |
125 | my $self = shift; |
75 | my $fh; |
126 | my $path = $self->conf_path; |
76 | my $mdif; |
127 | |
|
|
128 | return if $self->{idx}; |
|
|
129 | |
77 | open my $fh, "<", $path |
130 | open my $fh, "<", $path |
78 | or return { }; |
131 | or return; |
79 | |
132 | |
80 | defined ($_ = <$fh>) |
133 | defined ($_ = <$fh>) |
81 | or die "$path: empty mdif file\n"; |
134 | or die "$path: empty mdif file\n"; |
82 | |
135 | |
83 | do { |
136 | do { |
84 | if ($_ eq "[SYNCMAIL]\n") { |
137 | if ($_ eq "[SYNCMAIL]\n") { |
85 | while (<$fh>) { |
138 | while (<$fh>) { |
86 | last unless /^([a-z]+)\s*=\s*(.*)\n$/; |
139 | last unless /^([a-z]+)\s*=\s*(.*)\n$/; |
87 | $mdif->{$1} = $2; |
140 | $self->{$1} = $2; |
88 | } |
141 | } |
89 | } elsif ($_ eq "[HOSTS]\n") { |
142 | } elsif ($_ eq "[HOSTS]\n") { |
90 | while (<$fh>) { |
143 | while (<$fh>) { |
91 | last unless /^([^[].*)=(.*)\n$/; |
144 | last unless /^([^[].*)=(.*)\n$/; |
92 | $mdif->{host}{$1} = $2; |
145 | $self->{host}{$1} = $2; |
93 | } |
146 | } |
94 | } elsif (/^\[DIFF(\d+)\.(\d+)\]\n$/) { |
147 | } elsif (/^\[DIFF (\d+)\]\n$/) { |
95 | my ($gen, $mtime) = ($1, $2); |
148 | my $mtime = $1; |
96 | my @dif; |
149 | my @dif; |
97 | while (<$fh>) { |
150 | while (<$fh>) { |
98 | last unless /^[+-]/; |
151 | last unless /^[+-]/; |
99 | push @dif, substr $_, 0, -1; |
152 | push @dif, substr $_, 0, -1; |
100 | } |
153 | } |
101 | unshift @{$mdif->{diff}}, [$gen, $mtime, \@dif]; |
154 | unshift @{$self->{diff}}, [$mtime, \@dif]; |
102 | } elsif ($_ eq "[INDEX]\n") { |
155 | } elsif ($_ eq "[INDEX]\n") { |
103 | my @idx; |
156 | my @idx; |
104 | while (<$fh>) { |
157 | while (<$fh>) { |
105 | last unless /^(\d+)=(.*)\n$/; |
158 | last unless /^(\d+)=(.*)\n$/; |
106 | push @idx, [$1, $2]; |
159 | push @idx, [$1, $2]; |
107 | } |
160 | } |
108 | $mdif->{idx} = \@idx; |
161 | $self->{idx} = \@idx; |
109 | } elsif (/^#/) { |
162 | } elsif (/^#/) { |
110 | $_ = <$fh>; |
163 | $_ = <$fh>; |
111 | # nop |
164 | # nop |
112 | } else { |
165 | } else { |
113 | die "$path: unparseable section '$_'\n"; |
166 | die "$path: unparseable section '$_'\n"; |
114 | } |
167 | } |
115 | } while defined $_; |
168 | } while defined $_; |
116 | |
169 | |
117 | $mdif->{version} <= VERSION |
170 | $self->{version} <= MDIFVERSION |
118 | or die "$path: version mismatch ($mdif->{version} found, <".VERSION." expected)\n"; |
171 | or die "$path: version mismatch ($self->{version} found, <".MDIFVERSION." expected)\n"; |
119 | |
|
|
120 | $mdif; |
|
|
121 | } |
172 | } |
122 | |
173 | |
123 | sub write_mdif { |
174 | sub write_mdif { |
124 | my ($path, $mdif) = @_; |
175 | my $self = shift; |
125 | my $fh; |
176 | my $path = $self->conf_path; |
|
|
177 | |
|
|
178 | return unless $self->{dirty}; |
126 | |
179 | |
127 | open my $fh, ">", "$path~" |
180 | open my $fh, ">", "$path~" |
128 | or die "$path~: $!"; |
181 | or die "$path~: $!"; |
129 | |
182 | |
130 | print $fh "# automatically generated, do NOT edit\n"; |
183 | print $fh "# automatically generated, do NOT edit\n"; |
131 | |
184 | |
132 | print $fh "[SYNCMAIL]\n"; |
185 | print $fh "[SYNCMAIL]\n"; |
133 | print $fh "$_=$mdif->{$_}\n" for (qw(fsize mtime gen version)); |
186 | print $fh "$_=$self->{$_}\n" for (qw(fsize mtime version)); |
134 | |
187 | |
135 | print $fh "[HOSTS]\n"; |
188 | print $fh "[HOSTS]\n"; |
136 | print $fh "$k=$v\n" while my ($k,$v) = each %{$mdif->{host}}; |
189 | while (my ($k,$v) = each %{$self->{host}}) { |
|
|
190 | print $fh "$k=$v\n"; |
|
|
191 | } |
137 | |
192 | |
138 | print $fh "[INDEX]\n"; |
193 | print $fh "[INDEX]\n"; |
139 | print $fh "$_->[0]=$_->[1]\n" for @{$mdif->{idx}}; |
194 | print $fh "$_->[0]=$_->[1]\n" for @{$self->{idx}}; |
140 | |
195 | |
141 | for (reverse @{$mdif->{diff}}) { |
196 | for (reverse @{$self->{diff}}) { |
142 | print $fh "[DIFF$_->[0].$_->[1]]\n"; |
197 | print $fh "[DIFF $_->[0]]\n"; |
143 | print $fh $_, "\n" for @{$_->[2]}; |
198 | print $fh $_, "\n" for @{$_->[1]}; |
144 | } |
199 | } |
145 | |
200 | |
146 | close $fh |
201 | close $fh |
147 | or die "$path~: unable to create updated .mdif: $!"; |
202 | or die "$path~: unable to create updated .mdif: $!"; |
148 | |
203 | |
149 | rename "$path~", $path; |
204 | rename "$path~", $path; |
|
|
205 | |
|
|
206 | delete $self->{dirty}; |
150 | } |
207 | } |
151 | |
208 | |
152 | sub gendiff { |
209 | sub gendiff { |
153 | my ($d1, $d2) = @_; |
210 | my ($d1, $d2) = @_; |
154 | |
211 | |
… | |
… | |
172 | } |
229 | } |
173 | |
230 | |
174 | \@d; |
231 | \@d; |
175 | } |
232 | } |
176 | |
233 | |
177 | my $check_folder = new Coro::Semaphore; |
|
|
178 | |
|
|
179 | sub check_folder { |
234 | sub check { |
180 | my ($path) = @_; |
235 | my $self = shift; |
|
|
236 | my $path = $self->{path}; |
|
|
237 | my $conf = $self->conf_path; |
181 | my $guard = $check_folder->guard; |
238 | my $guard = $lockdisk->guard; |
182 | |
239 | |
183 | (my $conf = $path) =~ s%([^/]+$)%.$1.mdif%; |
|
|
184 | |
|
|
185 | slog 1, "checking $path\n"; |
240 | slog 3, "checking $path\n"; |
186 | |
241 | |
187 | if (stat $path) { |
242 | if (stat $path) { |
188 | my ($fsize, $mtime) = (stat _)[7, 9]; |
243 | my ($fsize, $mtime) = (stat _)[7, 9]; |
|
|
244 | |
189 | if (open my $fh, "<", $conf) { |
245 | if (open my $fh, "<", $conf) { |
190 | my %conf; |
246 | my %conf; |
191 | <$fh>; # skip initial comment |
247 | <$fh>; # skip initial comment |
192 | <$fh> eq "[SYNCMAIL]\n" |
248 | <$fh> eq "[SYNCMAIL]\n" |
193 | or die "$conf: format error"; |
249 | or die "$conf: format error"; |
194 | while (<$fh> =~ /^([a-z]+)\s*=\s*(.*)$/) { |
250 | while (<$fh> =~ /^([a-z]+)\s*=\s*(.*)$/) { |
195 | $conf{$1} = $2; |
251 | $conf{$1} = $2; |
196 | } |
252 | } |
197 | return (1, \%conf) if $fsize == $conf{fsize} |
253 | return 1 if $fsize == $conf{fsize} |
198 | && $mtime == $conf{mtime}; |
254 | && $mtime == $conf{mtime}; |
|
|
255 | |
|
|
256 | $conf{mtime} <= $mtime |
|
|
257 | or die "$path: folder older than mdif"; |
199 | } |
258 | } |
200 | |
259 | |
201 | slog 2, "updating $path\n"; |
260 | slog 2, "updating $path\n"; |
202 | |
261 | |
203 | my @idx; |
262 | my @idx; |
204 | |
263 | |
205 | parse_folder $path, sub { |
264 | parse_mbox $path, sub { |
206 | my ($offs, $head, $body) = @_; |
265 | my ($offs, $head, $body) = @_; |
207 | my $mid; |
266 | my $mid; |
208 | if ($$head =~ /^Message-Id:\s*(<[^<\n]+>)\s*\n/im) { |
267 | if ($$head =~ /^Message-Id:\s*(<[^<\n]+>)\s*\n/im) { |
209 | $mid = $1; |
268 | $mid = $1; |
210 | } else { |
269 | } else { |
211 | $mid = MD5->hexhash("$$head\0$$body"); |
270 | $mid = MD5->hexhash("$$head\0$$body"); |
212 | } |
271 | } |
213 | push @idx, [$offs, $mid]; |
272 | push @idx, [$offs, $mid]; |
214 | } or return (); |
273 | } or return (); |
215 | |
274 | |
216 | my $mdif = read_mdif $conf; |
275 | $self->read_mdif; |
217 | |
276 | |
218 | if ($mdif->{version}) { |
277 | if ($self->{version}) { |
219 | my $d = gendiff $mdif->{idx}, \@idx; |
278 | my $d = gendiff $self->{idx}, \@idx; |
220 | push @{$mdif->{diff}}, [ |
279 | push @{$self->{diff}}, [ |
221 | $mdif->{gen}++, |
|
|
222 | $mdif->{mtime}, |
280 | $self->{mtime}, |
223 | $d, |
281 | $d, |
224 | ] if @$d; |
282 | ] if @$d; |
225 | } else { |
283 | } else { |
226 | slog 2, "$path: new folder\n"; |
284 | slog 2, "$path: previously unknown folder\n"; |
227 | $mdif->{version} ||= VERSION; |
285 | $self->{version} ||= MDIFVERSION; |
228 | $mdif->{gen} = 1; |
|
|
229 | } |
286 | } |
230 | |
287 | |
231 | $mdif->{fsize} = $fsize; |
288 | $self->{fsize} = $fsize; |
232 | $mdif->{mtime} = $mtime; |
289 | $self->{mtime} = $mtime; |
233 | $mdif->{idx} = \@idx; |
290 | $self->{idx} = \@idx; |
234 | |
291 | |
235 | write_mdif $conf, $mdif; |
292 | $self->dirty; |
236 | |
293 | |
237 | return (2, $mdif); |
294 | return 2; |
238 | } else { |
295 | } else { |
239 | slog 2, "$path: no longer exists\n"; |
296 | slog 2, "$path: no longer exists\n"; |
240 | unlink $conf; |
297 | unlink $conf; |
241 | |
298 | |
242 | return (); |
299 | return (); |
243 | } |
300 | } |
|
|
301 | } |
|
|
302 | |
|
|
303 | package main; |
|
|
304 | |
|
|
305 | my $quit = new Coro::RWLock; |
|
|
306 | |
|
|
307 | sub rwlock::DESTROY { |
|
|
308 | $quit->unlock; |
|
|
309 | } |
|
|
310 | |
|
|
311 | sub quit_guard { |
|
|
312 | $quit->rdlock; |
|
|
313 | bless [], rwlock; |
|
|
314 | } |
|
|
315 | |
|
|
316 | sub find_folders { |
|
|
317 | my @folders; |
|
|
318 | |
|
|
319 | opendir my $dh, $PREFIX |
|
|
320 | or die "$PREFIX: $!"; |
|
|
321 | |
|
|
322 | while (defined (my $folder = readdir $dh)) { |
|
|
323 | next if $folder =~ /^\./; |
|
|
324 | next unless -f "$PREFIX/$folder"; |
|
|
325 | push @folders, $folder; |
|
|
326 | } |
|
|
327 | |
|
|
328 | @folders; |
244 | } |
329 | } |
245 | |
330 | |
246 | my $send = new Coro::Channel 10; |
331 | my $send = new Coro::Channel 10; |
247 | my $done = 0; |
332 | my $done = 0; |
248 | |
333 | |
249 | # request $command, $data |
334 | # request $command, $data |
250 | sub request { |
335 | sub request { |
251 | my $res; |
336 | my $res; |
252 | my $signal = new Coro::Signal; |
337 | my $signal = new Coro::Signal; |
253 | my $cmd = defined $_[1] ? "000+".length($_[1])." $_[0]\n$_[1]" : "000 $_[0]\n"; |
338 | my $cmd = defined $_[1] ? "000+".length($_[1])." $_[0]\n$_[1]" |
|
|
339 | : "000 $_[0]\n"; |
254 | $send->put([$signal, \$res, $cmd]); |
340 | $send->put([$signal, \$res, $cmd]); |
255 | $signal->wait; |
341 | $signal->wait; |
256 | $res; |
342 | $res; |
257 | } |
343 | } |
258 | |
344 | |
259 | # reply $id, $code, $msg, $data |
345 | # reply $id, $code, $msg, $data |
260 | sub reply { |
346 | sub reply { |
261 | my $cmd = defined $_[3] ? "$_[1]+".length($_[3])." $_[2]\n$_[3]" : "$_[1] $_[2]\n"; |
347 | my $cmd = defined $_[3] ? "$_[1]+".length($_[3])." $_[2]\n$_[3]" |
|
|
348 | : "$_[1] $_[2]\n"; |
262 | $send->put([undef, undef, "$_[0] $cmd"]); |
349 | $send->put([undef, undef, "$_[0] $cmd"]); |
263 | } |
350 | } |
264 | |
351 | |
265 | sub handle_commands { |
352 | sub handle_commands { |
266 | my ($fh) = @_; |
353 | my ($fh) = @_; |
267 | my $id = "a"; |
354 | |
|
|
355 | # periodically send nop as keepalive signal to avoid the |
|
|
356 | # rsync-on-slow-disk-timeout problem. |
|
|
357 | my $nopper = Event->timer(parked => 1, cb => sub { request "nop" }); |
|
|
358 | |
|
|
359 | my @folder; |
|
|
360 | |
268 | async { |
361 | async { |
269 | $fh->print("- 000 hello $HOSTID\n"); |
362 | my $id = "a"; |
|
|
363 | $fh->print("\@SYNCMAIL VERSION $VERSION PROTOCOL ".PROTVERSION."\n"); |
270 | while (my $r = $send->get) { |
364 | while (my $r = $send->get) { |
271 | if (defined $r->[1]) { |
365 | if (defined $r->[1]) { |
272 | my $id = ++$id; |
366 | my $id = ++$id; |
273 | $request{$id} = $r; |
367 | $request{$id} = $r; |
274 | print STDERR "<<< $SLAVE sendign request $id:$r->[2]";#d# |
368 | print STDERR "<<< $SLAVE sending request $id:$r->[2]";#d# |
275 | $fh->print("$id $r->[2]"); |
369 | $fh->print("$id $r->[2]"); |
276 | } else { |
370 | } else { |
277 | print STDERR "<<< $SLAVE sendign reply $r->[2]";#d# |
371 | print STDERR "<<< $SLAVE sending reply $r->[2]";#d# |
278 | $fh->print($r->[2]); |
372 | $fh->print($r->[2]); |
279 | } |
373 | } |
|
|
374 | $nopper->at(time+$::IDLE); $nopper->start; |
280 | } |
375 | } |
281 | print STDERR "$SLAVE shutdown\n";#d# |
376 | $fh->print("- 000 quit\n"); |
282 | shutdown $fh, 1; |
|
|
283 | }; |
377 | }; |
|
|
378 | |
|
|
379 | async { |
|
|
380 | eval { |
|
|
381 | $fh->timeout($::TIMEOUT); |
|
|
382 | |
|
|
383 | $_ = <$fh>; |
|
|
384 | $_ eq "\@SYNCMAIL VERSION $VERSION PROTOCOL ".PROTVERSION."\n" |
|
|
385 | or die "garbled input or version mismatch: $_"; |
|
|
386 | |
284 | while (<$fh>) { |
387 | while (<$fh>) { |
285 | slog 0, ">>> $SLAVE received :$_"; |
388 | slog 0, ">>> $SLAVE received :$_"; |
286 | /^(\S+) (\d\d\d)(?:\+(\d+))?\s*(.*)$/ |
389 | /^(\S+) (\d\d\d)(?:\+(\d+))?\s*(.*)$/ |
287 | or die "protocol error, garbled command ($_)"; |
390 | or die "protocol error, garbled command ($_)"; |
288 | |
391 | |
289 | my ($id, $code, $dlen, $msg) = ($1, $2, $3, $4); |
392 | my ($id, $code, $dlen, $msg) = ($1, $2, $3, $4); |
290 | my $data; |
393 | my $data; |
291 | |
394 | |
292 | $fh->sysread($data, $dlen) == $dlen |
395 | $fh->sysread($data, $dlen) == $dlen |
293 | or die "unexpected read error: $!"; |
396 | or die "unexpected read error: $!"; |
294 | |
397 | |
295 | if ($code == 0) { |
398 | if ($code == 0) { |
296 | if ($msg eq "quit") { |
399 | if ($msg eq "quit") { |
297 | print $fh "$id 200 quit\n"; |
|
|
298 | $send->put(undef); |
400 | $send->put(undef); |
299 | last; |
401 | last; |
300 | } elsif ($msg eq "nop") { |
402 | } elsif ($msg eq "nop") { |
301 | reply $id, 200, "nop"; |
403 | reply $id, 200, "nop"; |
302 | } elsif ($msg =~ /^hello (.*)$/) { |
404 | } elsif ($msg eq "myname") { |
303 | $OTHERID = $1; |
405 | $OTHERNAME = $data; |
304 | slog 3, "otherid set to $OTHERID\n"; |
406 | slog 3, "otherid set to $OTHERNAME\n"; |
|
|
407 | reply $id, 200, "myname", $MYNAME; |
|
|
408 | } elsif ($msg eq "timestamp") { |
|
|
409 | reply $id, 200, time; |
|
|
410 | } elsif ($msg eq "folders") { |
|
|
411 | reply $id, 200, "ok", join "\0", find_folders; |
|
|
412 | } elsif ($msg eq "open") { |
|
|
413 | async { |
|
|
414 | my $folder = new folder name => $data; |
|
|
415 | $folder->check; |
|
|
416 | $folder->read_mdif; |
|
|
417 | push @folder, $folder; |
|
|
418 | reply $id, 200, "$#folder $folder->{mtime}"; |
|
|
419 | }; |
|
|
420 | } elsif ($msg =~ /^update (\d+) (\d+)$/) { |
|
|
421 | if ($folder[$1]->{host}{$OTHERNAME} != $2) { |
|
|
422 | $folder[$1]->{host}{$OTHERNAME} = $2; |
|
|
423 | $folder[$1]->dirty; |
|
|
424 | } |
|
|
425 | reply $id, 200, "ok"; |
|
|
426 | } elsif ($msg =~ /^close (\d+)$/) { |
|
|
427 | undef $folder[$1]; |
|
|
428 | reply $id, 200, "ok"; |
305 | } else { |
429 | } else { |
|
|
430 | chomp; |
306 | die "protocol error, unknown command ($_)\n"; |
431 | die "protocol error, unknown command ($_)\n"; |
307 | } |
432 | } |
308 | } else { |
433 | } else { |
309 | my $r = delete $request{$id} |
434 | my $r = delete $request{$id} |
310 | or die "protocol error, invalid reply id ($_)\n"; |
435 | or die "protocol error, invalid reply id ($_)\n"; |
311 | |
436 | |
312 | ${$r->[1]} = [$code, $msg, $data]; |
437 | ${$r->[1]} = [$code, $msg, $data]; |
313 | $r->[0]->send; |
438 | $r->[0]->send; |
314 | } |
|
|
315 | |
|
|
316 | if ($done && !%request) { |
|
|
317 | $done = 0; |
|
|
318 | async { |
|
|
319 | request("quit"); |
|
|
320 | exit 0; |
439 | } |
321 | }; |
440 | } |
322 | } |
441 | }; |
|
|
442 | |
|
|
443 | slog 0, "ERROR: $@" if $@; |
|
|
444 | print STDERR "unlooping\n";#d# |
|
|
445 | unloop; |
323 | } |
446 | }; |
324 | |
447 | |
325 | exit 0; |
448 | loop; |
|
|
449 | print STDERR "$SLAVE hiho\n";#d# |
326 | } |
450 | } |
327 | |
451 | |
|
|
452 | my $sync_folder = new Coro::Semaphore 3; # max 3 folders in parallel |
|
|
453 | |
328 | sub sync_dir { |
454 | sub sync_folder { |
329 | my ($path) = @_; |
455 | my $name = $_[0]; |
330 | |
456 | |
331 | opendir my $dh, $path |
457 | my $quit_guard = quit_guard; |
332 | or die "$path: $!"; |
458 | async { |
|
|
459 | my $guard = $sync_folder->guard; |
333 | |
460 | |
334 | while (defined (my $folder = readdir $dh)) { |
461 | ::give; |
335 | next if $folder =~ /^\./; |
462 | |
336 | my $path = "$path/$folder"; |
463 | my $folder = new folder name => $name; |
337 | next unless -f $path; |
464 | my ($rfid, $rmtime) = split /\s+/, (request open => $name)->[1]; |
338 | my ($status, $mdif) = check_folder $path; |
465 | |
339 | print STDERR "$path: $status | $mdif\n";#d# |
466 | $folder->check; |
|
|
467 | $folder->read_mdif; |
|
|
468 | my $mtime = $folder->{host}{$OTHERNAME}; |
|
|
469 | |
|
|
470 | $folder->{host}{$OTHERNAME} = $folder->{mtime}; |
|
|
471 | |
|
|
472 | request "update $rfid $folder->{mtime}"; |
|
|
473 | request "close $rfid"; |
|
|
474 | undef $quit_guard; |
|
|
475 | } |
|
|
476 | } |
|
|
477 | |
|
|
478 | sub main { |
|
|
479 | # time checking done symmetrically |
|
|
480 | { |
|
|
481 | my $time = time; |
|
|
482 | my $othertime = (request "timestamp")->[1]; |
|
|
483 | abs (($time + time)*0.5 - $othertime) <= $::MAXTIMEDIFF |
|
|
484 | or die "ERROR: time difference between hosts larger than $::MAXTIMEDIFF"; |
|
|
485 | } |
|
|
486 | |
|
|
487 | $OTHERNAME = (request "myname", $MYNAME)->[2]; |
340 | if ($SLAVE) { |
488 | if ($SLAVE) { |
341 | request "fhave", $folder; |
489 | # |
342 | } else { |
490 | } else { |
343 | request "fwant", $folder; |
491 | for (split /\0/, (request "folders")->[2]) { |
|
|
492 | if (!-e "$PREFIX/$_") { |
|
|
493 | slog 2, "created new empty folder $_\n"; |
|
|
494 | sysopen my $fh, "$PREFIX/$_", O_RDWR|O_CREAT, 0666; |
344 | } |
495 | } |
345 | } |
496 | } |
346 | |
497 | |
347 | $done = 1; |
498 | for my $folder (find_folders) { |
348 | my $x = request("nop"); |
499 | sync_folder $folder; |
349 | print STDERR "$SLAVE returned nop @$x\n";#d# |
500 | } |
|
|
501 | |
|
|
502 | print "writelock\n";#d# |
|
|
503 | $quit->wrlock; |
|
|
504 | $send->put(undef); |
|
|
505 | } |
350 | } |
506 | } |
351 | |
507 | |
352 | if ($SLAVE) { |
508 | if ($SLAVE) { |
353 | $HOSTID = "slave"; |
509 | $HOSTID = "slave"; |
354 | async \&sync_dir, "./dst"; |
510 | async \&main; |
355 | handle_commands unblock \*STDIN; |
511 | handle_commands unblock \*STDIN; |
356 | } else { |
512 | } else { |
357 | $HOSTID = "master"; |
513 | $HOSTID = "master"; |
358 | { |
514 | { |
359 | use Socket; |
515 | use Socket; |
360 | socketpair S1, S2, AF_UNIX, SOCK_STREAM, PF_UNSPEC; |
516 | socketpair S1, S2, AF_UNIX, SOCK_STREAM, PF_UNSPEC; |
361 | if (fork == 0) { |
517 | if (fork == 0) { |
|
|
518 | close S1; |
362 | open STDIN, "<&S2" or die; |
519 | open STDIN, "<&S2" or die; |
363 | open STDOUT, ">&S2" or die; |
520 | open STDOUT, ">&S2" or die; |
364 | exec $0, "--slave"; |
521 | exec $0, "--slave"; |
365 | exit 255; |
522 | exit 255; |
366 | } |
523 | } |
367 | async \&sync_dir, "./src"; |
524 | close S2; |
|
|
525 | async \&main; |
368 | handle_commands unblock \*S1; |
526 | handle_commands unblock \*S1; |
369 | } |
527 | } |
370 | } |
528 | } |
371 | |
529 | |