… | |
… | |
4 | use Coro::Handle; |
4 | use Coro::Handle; |
5 | use Coro::Event; |
5 | use Coro::Event; |
6 | use Coro::Semaphore; |
6 | use Coro::Semaphore; |
7 | use Coro::Channel; |
7 | use Coro::Channel; |
8 | use Coro::Signal; |
8 | use Coro::Signal; |
|
|
9 | use Coro::RWLock; |
9 | |
10 | |
10 | use MD5; |
11 | use Set::Scalar; |
11 | |
12 | |
12 | use constant VERSION => 1; |
13 | use Fcntl; |
13 | |
14 | |
14 | # TODO: content-length support (HACK!) |
15 | our $VERSION = 0.1; |
|
|
16 | |
|
|
17 | our $NOW = time; |
|
|
18 | |
|
|
19 | require "config.pl"; |
|
|
20 | |
|
|
21 | use folder; |
|
|
22 | use vc; |
|
|
23 | |
|
|
24 | $v = $VERBOSE; |
|
|
25 | |
|
|
26 | # WARNING: |
|
|
27 | # Content-Length headers are deliberately being ignored. They |
|
|
28 | # are broken by design and will never be supported |
|
|
29 | |
15 | # TODO: real message-id parsing |
30 | # TODO: real message-id parsing |
16 | |
31 | |
17 | $|=1; |
32 | $|=1; |
18 | |
|
|
19 | $v = 9; |
|
|
20 | |
|
|
21 | $SLAVE = 1*($ARGV[0] eq "--slave"); |
|
|
22 | |
33 | |
23 | my $ecnt; |
34 | my $ecnt; |
24 | |
35 | |
25 | sub slog { |
36 | sub slog { |
26 | if ($_[0] <= $v) { |
37 | if ($_[0] <= $v) { |
27 | print STDERR $_[1]; |
38 | print STDERR "[$SLAVE] ", $_[1]; |
28 | } |
|
|
29 | } |
|
|
30 | |
|
|
31 | # parse_folder(mbox-file-path, callback) |
|
|
32 | # callback gets called with \$header and \$body, |
|
|
33 | # $header includes the mbox From_ line without |
|
|
34 | # the leading From_ itself. |
|
|
35 | sub parse_folder { |
|
|
36 | my ($path, $cb) = @_; |
|
|
37 | |
|
|
38 | open my $fh, "<", $path |
|
|
39 | or die "$path: $!"; |
|
|
40 | |
|
|
41 | local $/ = "\n\n"; |
|
|
42 | |
|
|
43 | my ($head, $body, $offs); |
|
|
44 | |
|
|
45 | 5 == read $fh, $head, 5 |
|
|
46 | or return; |
|
|
47 | |
|
|
48 | $head eq "From " |
|
|
49 | or return; |
|
|
50 | |
|
|
51 | $offs = 0; |
|
|
52 | while (defined ($head = <$fh>)) { |
|
|
53 | $head =~ /^.*? [A-Z][a-z][a-z] [A-Z][a-z][a-z] [ 0-9][0-9] \d\d:\d\d:\d\d(?: [+-]\d\d\d\d)? \d\d(?:\d\d)\n/ |
|
|
54 | or die "$path: not standard mbox format header:\n$head\n"; |
|
|
55 | |
|
|
56 | local $/ = "\nFrom "; |
|
|
57 | # NEVER enable this. content-length simply is broken by design |
|
|
58 | #if ($head =~ /^Content-Length:\s+(\d+)$/im) { |
|
|
59 | # $1 <= read $fh, $body, $1 + 5 |
|
|
60 | # or die "$path: partial message in mbox"; |
|
|
61 | #} else { |
|
|
62 | $body = <$fh>; |
|
|
63 | #} |
|
|
64 | chomp $body; |
|
|
65 | $cb->($offs, \$head, \$body); |
|
|
66 | $offs = (tell $fh) - 5; |
|
|
67 | cede unless ++$ecnt & 1023; |
|
|
68 | } |
|
|
69 | |
|
|
70 | 1; |
|
|
71 | } |
|
|
72 | |
|
|
73 | sub read_mdif { |
|
|
74 | my ($path) = @_; |
|
|
75 | my $fh; |
|
|
76 | my $mdif; |
|
|
77 | open my $fh, "<", $path |
|
|
78 | or return { }; |
|
|
79 | |
|
|
80 | defined ($_ = <$fh>) |
|
|
81 | or die "$path: empty mdif file\n"; |
|
|
82 | |
|
|
83 | do { |
|
|
84 | if ($_ eq "[SYNCMAIL]\n") { |
|
|
85 | while (<$fh>) { |
|
|
86 | last unless /^([a-z]+)\s*=\s*(.*)\n$/; |
|
|
87 | $mdif->{$1} = $2; |
|
|
88 | } |
|
|
89 | } elsif ($_ eq "[HOSTS]\n") { |
|
|
90 | while (<$fh>) { |
|
|
91 | last unless /^([^[].*)=(.*)\n$/; |
|
|
92 | $mdif->{host}{$1} = $2; |
|
|
93 | } |
|
|
94 | } elsif (/^\[DIFF(\d+)\.(\d+)\]\n$/) { |
|
|
95 | my ($gen, $mtime) = ($1, $2); |
|
|
96 | my @dif; |
|
|
97 | while (<$fh>) { |
|
|
98 | last unless /^[+-]/; |
|
|
99 | push @dif, substr $_, 0, -1; |
|
|
100 | } |
|
|
101 | unshift @{$mdif->{diff}}, [$gen, $mtime, \@dif]; |
|
|
102 | } elsif ($_ eq "[INDEX]\n") { |
|
|
103 | my @idx; |
|
|
104 | while (<$fh>) { |
|
|
105 | last unless /^(\d+)=(.*)\n$/; |
|
|
106 | push @idx, [$1, $2]; |
|
|
107 | } |
|
|
108 | $mdif->{idx} = \@idx; |
|
|
109 | } elsif (/^#/) { |
|
|
110 | $_ = <$fh>; |
|
|
111 | # nop |
|
|
112 | } else { |
|
|
113 | die "$path: unparseable section '$_'\n"; |
|
|
114 | } |
|
|
115 | } while defined $_; |
|
|
116 | |
|
|
117 | $mdif->{version} <= VERSION |
|
|
118 | or die "$path: version mismatch ($mdif->{version} found, <".VERSION." expected)\n"; |
|
|
119 | |
|
|
120 | $mdif; |
|
|
121 | } |
|
|
122 | |
|
|
123 | sub write_mdif { |
|
|
124 | my ($path, $mdif) = @_; |
|
|
125 | my $fh; |
|
|
126 | |
|
|
127 | open my $fh, ">", "$path~" |
|
|
128 | or die "$path~: $!"; |
|
|
129 | |
|
|
130 | print $fh "# automatically generated, do NOT edit\n"; |
|
|
131 | |
|
|
132 | print $fh "[SYNCMAIL]\n"; |
|
|
133 | print $fh "$_=$mdif->{$_}\n" for (qw(fsize mtime gen version)); |
|
|
134 | |
|
|
135 | print $fh "[HOSTS]\n"; |
|
|
136 | print $fh "$k=$v\n" while my ($k,$v) = each %{$mdif->{host}}; |
|
|
137 | |
|
|
138 | print $fh "[INDEX]\n"; |
|
|
139 | print $fh "$_->[0]=$_->[1]\n" for @{$mdif->{idx}}; |
|
|
140 | |
|
|
141 | for (reverse @{$mdif->{diff}}) { |
|
|
142 | print $fh "[DIFF$_->[0].$_->[1]]\n"; |
|
|
143 | print $fh $_, "\n" for @{$_->[2]}; |
|
|
144 | } |
|
|
145 | |
|
|
146 | close $fh |
|
|
147 | or die "$path~: unable to create updated .mdif: $!"; |
|
|
148 | |
|
|
149 | rename "$path~", $path; |
|
|
150 | } |
|
|
151 | |
|
|
152 | sub gendiff { |
|
|
153 | my ($d1, $d2) = @_; |
|
|
154 | |
|
|
155 | my @d; |
|
|
156 | my (%d1, %d2); |
|
|
157 | |
|
|
158 | for (@$d2) { |
|
|
159 | undef $d2{$_->[1]}; |
|
|
160 | } |
|
|
161 | |
|
|
162 | # delete msgs in d1 but not in d2 |
|
|
163 | for (@$d1) { |
|
|
164 | undef $d1{$_->[1]}; |
|
|
165 | push @d, "-$_->[1]" unless exists $d2{$_->[1]}; |
|
|
166 | } |
|
|
167 | %d2 = (); # conserve memory |
|
|
168 | |
|
|
169 | # add msgs in d2 but not in d1 |
|
|
170 | for (@$d2) { |
|
|
171 | push @d, "+$_->[1]" unless exists $d1{$_->[1]}; |
|
|
172 | } |
|
|
173 | |
|
|
174 | \@d; |
|
|
175 | } |
|
|
176 | |
|
|
177 | my $check_folder = new Coro::Semaphore; |
|
|
178 | |
|
|
179 | sub check_folder { |
|
|
180 | my ($path) = @_; |
|
|
181 | my $guard = $check_folder->guard; |
|
|
182 | |
|
|
183 | (my $conf = $path) =~ s%([^/]+$)%.$1.mdif%; |
|
|
184 | |
|
|
185 | slog 1, "checking $path\n"; |
|
|
186 | |
|
|
187 | if (stat $path) { |
|
|
188 | my ($fsize, $mtime) = (stat _)[7, 9]; |
|
|
189 | if (open my $fh, "<", $conf) { |
|
|
190 | my %conf; |
|
|
191 | <$fh>; # skip initial comment |
|
|
192 | <$fh> eq "[SYNCMAIL]\n" |
|
|
193 | or die "$conf: format error"; |
|
|
194 | while (<$fh> =~ /^([a-z]+)\s*=\s*(.*)$/) { |
|
|
195 | $conf{$1} = $2; |
|
|
196 | } |
|
|
197 | return (1, \%conf) if $fsize == $conf{fsize} |
|
|
198 | && $mtime == $conf{mtime}; |
|
|
199 | } |
|
|
200 | |
|
|
201 | slog 2, "updating $path\n"; |
|
|
202 | |
|
|
203 | my @idx; |
|
|
204 | |
|
|
205 | parse_folder $path, sub { |
|
|
206 | my ($offs, $head, $body) = @_; |
|
|
207 | my $mid; |
|
|
208 | if ($$head =~ /^Message-Id:\s*(<[^<\n]+>)\s*\n/im) { |
|
|
209 | $mid = $1; |
|
|
210 | } else { |
|
|
211 | $mid = MD5->hexhash("$$head\0$$body"); |
|
|
212 | } |
|
|
213 | push @idx, [$offs, $mid]; |
|
|
214 | } or return (); |
|
|
215 | |
|
|
216 | my $mdif = read_mdif $conf; |
|
|
217 | |
|
|
218 | if ($mdif->{version}) { |
|
|
219 | my $d = gendiff $mdif->{idx}, \@idx; |
|
|
220 | push @{$mdif->{diff}}, [ |
|
|
221 | $mdif->{gen}++, |
|
|
222 | $mdif->{mtime}, |
|
|
223 | $d, |
|
|
224 | ] if @$d; |
|
|
225 | } else { |
|
|
226 | slog 2, "$path: new folder\n"; |
|
|
227 | $mdif->{version} ||= VERSION; |
|
|
228 | $mdif->{gen} = 1; |
|
|
229 | } |
|
|
230 | |
|
|
231 | $mdif->{fsize} = $fsize; |
|
|
232 | $mdif->{mtime} = $mtime; |
|
|
233 | $mdif->{idx} = \@idx; |
|
|
234 | |
|
|
235 | write_mdif $conf, $mdif; |
|
|
236 | |
|
|
237 | return (2, $mdif); |
|
|
238 | } else { |
|
|
239 | slog 2, "$path: no longer exists\n"; |
|
|
240 | unlink $conf; |
|
|
241 | |
|
|
242 | return (); |
|
|
243 | } |
|
|
244 | } |
|
|
245 | |
|
|
246 | my $send = new Coro::Channel 10; |
|
|
247 | my $done = 0; |
|
|
248 | |
|
|
249 | # request $command, $data |
|
|
250 | sub request { |
|
|
251 | my $res; |
|
|
252 | my $signal = new Coro::Signal; |
|
|
253 | my $cmd = defined $_[1] ? "000+".length($_[1])." $_[0]\n$_[1]" : "000 $_[0]\n"; |
|
|
254 | $send->put([$signal, \$res, $cmd]); |
|
|
255 | $signal->wait; |
|
|
256 | $res; |
|
|
257 | } |
|
|
258 | |
|
|
259 | # reply $id, $code, $msg, $data |
|
|
260 | sub reply { |
|
|
261 | my $cmd = defined $_[3] ? "$_[1]+".length($_[3])." $_[2]\n$_[3]" : "$_[1] $_[2]\n"; |
|
|
262 | $send->put([undef, undef, "$_[0] $cmd"]); |
|
|
263 | } |
|
|
264 | |
|
|
265 | sub handle_commands { |
|
|
266 | my ($fh) = @_; |
|
|
267 | my $id = "a"; |
|
|
268 | async { |
|
|
269 | $fh->print("- 000 hello $HOSTID\n"); |
|
|
270 | while (my $r = $send->get) { |
|
|
271 | if (defined $r->[1]) { |
|
|
272 | my $id = ++$id; |
|
|
273 | $request{$id} = $r; |
|
|
274 | print STDERR "<<< $SLAVE sendign request $id:$r->[2]";#d# |
|
|
275 | $fh->print("$id $r->[2]"); |
|
|
276 | } else { |
|
|
277 | print STDERR "<<< $SLAVE sendign reply $r->[2]";#d# |
|
|
278 | $fh->print($r->[2]); |
|
|
279 | } |
|
|
280 | } |
|
|
281 | print STDERR "$SLAVE shutdown\n";#d# |
|
|
282 | shutdown $fh, 1; |
|
|
283 | }; |
39 | } |
284 | while (<$fh>) { |
|
|
285 | slog 0, ">>> $SLAVE received :$_"; |
|
|
286 | /^(\S+) (\d\d\d)(?:\+(\d+))?\s*(.*)$/ |
|
|
287 | or die "protocol error, garbled command ($_)"; |
|
|
288 | |
|
|
289 | my ($id, $code, $dlen, $msg) = ($1, $2, $3, $4); |
|
|
290 | my $data; |
|
|
291 | |
|
|
292 | $fh->sysread($data, $dlen) == $dlen |
|
|
293 | or die "unexpected read error: $!"; |
|
|
294 | |
|
|
295 | if ($code == 0) { |
|
|
296 | if ($msg eq "quit") { |
|
|
297 | print $fh "$id 200 quit\n"; |
|
|
298 | $send->put(undef); |
|
|
299 | last; |
|
|
300 | } elsif ($msg eq "nop") { |
|
|
301 | reply $id, 200, "nop"; |
|
|
302 | } elsif ($msg =~ /^hello (.*)$/) { |
|
|
303 | $OTHERID = $1; |
|
|
304 | slog 3, "otherid set to $OTHERID\n"; |
|
|
305 | } else { |
|
|
306 | die "protocol error, unknown command ($_)\n"; |
|
|
307 | } |
|
|
308 | } else { |
|
|
309 | my $r = delete $request{$id} |
|
|
310 | or die "protocol error, invalid reply id ($_)\n"; |
|
|
311 | |
|
|
312 | ${$r->[1]} = [$code, $msg, $data]; |
|
|
313 | $r->[0]->send; |
|
|
314 | } |
|
|
315 | |
|
|
316 | if ($done && !%request) { |
|
|
317 | $done = 0; |
|
|
318 | async { |
|
|
319 | request("quit"); |
|
|
320 | exit 0; |
|
|
321 | }; |
|
|
322 | } |
|
|
323 | } |
|
|
324 | |
|
|
325 | exit 0; |
|
|
326 | } |
40 | } |
327 | |
41 | |
328 | sub sync_dir { |
42 | $lockdisk = new Coro::Semaphore; |
329 | my ($path) = @_; |
|
|
330 | |
43 | |
|
|
44 | # give up a/the timeslice |
|
|
45 | sub give { |
|
|
46 | Coro::Event::do_timer(after => 0); |
|
|
47 | } |
|
|
48 | |
|
|
49 | my $quit = new Coro::RWLock; |
|
|
50 | |
|
|
51 | sub rwlock::DESTROY { |
|
|
52 | $quit->unlock; |
|
|
53 | } |
|
|
54 | |
|
|
55 | sub quit_guard { |
|
|
56 | $quit->rdlock; |
|
|
57 | bless [], rwlock; |
|
|
58 | } |
|
|
59 | |
|
|
60 | sub find_folders { |
|
|
61 | my @folders; |
|
|
62 | |
331 | opendir my $dh, $path |
63 | opendir my $dh, $PREFIX |
332 | or die "$path: $!"; |
64 | or die "$PREFIX: $!"; |
333 | |
65 | |
334 | while (defined (my $folder = readdir $dh)) { |
66 | while (defined (my $folder = readdir $dh)) { |
335 | next if $folder =~ /^\./; |
67 | next if $folder =~ /^\./; |
336 | my $path = "$path/$folder"; |
68 | next if $folder =~ /~$/; |
337 | next unless -f $path; |
69 | next unless -f "$PREFIX/$folder"; |
338 | my ($status, $mdif) = check_folder $path; |
70 | push @folders, $folder; |
339 | print STDERR "$path: $status | $mdif\n";#d# |
71 | } |
|
|
72 | |
|
|
73 | @folders; |
|
|
74 | } |
|
|
75 | |
|
|
76 | my $sync_folder = new Coro::Semaphore 3; # max 3 folders in parallel |
|
|
77 | |
|
|
78 | sub sync_offer { |
|
|
79 | my ($folder, $avc, $diff) = @_; |
|
|
80 | |
|
|
81 | my $vc = create vc pri => 1; |
|
|
82 | $avc->snd("offer", $vc->{port}); |
|
|
83 | |
|
|
84 | async { |
|
|
85 | $vc->rcv; # need to synchronize, argl. should open on other side |
|
|
86 | $vc->snd(join "\0", @$diff); |
|
|
87 | my %dup; @dup{split /\0/, $vc->rcv} = (); |
|
|
88 | |
|
|
89 | for (@$diff) { |
|
|
90 | $vc->snd($folder->fetch($_)) unless exists $dup{$_}; |
|
|
91 | } |
|
|
92 | |
|
|
93 | defined $vc->rcv and die "protocol error, expected close"; |
|
|
94 | }; |
|
|
95 | } |
|
|
96 | |
|
|
97 | sub sync_folder { |
|
|
98 | my $name = $_[0]; |
|
|
99 | |
|
|
100 | my $quit_guard = quit_guard; |
|
|
101 | async { |
|
|
102 | my $guard = $sync_folder->guard; |
|
|
103 | my $vc = create vc; |
|
|
104 | |
|
|
105 | my $folder = new folder name => $name; |
|
|
106 | |
|
|
107 | $vc->snd("open", $name); |
|
|
108 | $vc->snd("mtime"); |
|
|
109 | |
|
|
110 | $folder->read_mdif; |
|
|
111 | |
|
|
112 | my $ctime = $folder->{host}{$OTHERNAME} || -1; |
|
|
113 | my $rtime = $vc->rcv; |
|
|
114 | |
|
|
115 | $vc->snd("diff", $ctime); |
|
|
116 | |
|
|
117 | my %diff; # |
|
|
118 | # 00 - local del |
|
|
119 | # 01 - local add |
|
|
120 | # 10 - remote del |
|
|
121 | # 11 - remote add |
|
|
122 | |
|
|
123 | my @diff = grep { $_->[0] > $ctime } @{$folder->{diff}}; |
|
|
124 | |
|
|
125 | $diff = $vc->rcv; |
|
|
126 | while () { |
|
|
127 | if ($diff >= 0 and (!@diff or $diff < $diff[0][0])) { |
|
|
128 | $diff{$_} = 0b01 for split /\0/, $vc->rcv; # add |
|
|
129 | $diff{$_} = 0b00 for split /\0/, $vc->rcv; # del |
|
|
130 | |
|
|
131 | $diff = $vc->rcv; |
|
|
132 | } elsif (@diff) { |
|
|
133 | $diff{$_} = 0b11 for @{$diff[0][1]}; |
|
|
134 | $diff{$_} = 0b10 for @{$diff[0][2]}; |
|
|
135 | |
|
|
136 | shift @diff; |
|
|
137 | } else { |
|
|
138 | last; |
|
|
139 | } |
|
|
140 | } |
|
|
141 | |
|
|
142 | # append or update, depending on wether there are messages to be deleted |
|
|
143 | $vc->snd("begin"); |
|
|
144 | $folder->begin_update; |
|
|
145 | |
|
|
146 | while (my ($k,$v) = each %diff) { |
|
|
147 | push @{$diff[$v]}, $k; |
|
|
148 | slog 0, "DIFF $k : $v\n";#d# |
|
|
149 | } |
|
|
150 | |
|
|
151 | $vc->snd("delete", join "\0", @{$diff[2]}); |
|
|
152 | $folder->delete(@{$diff[0]}); |
|
|
153 | |
|
|
154 | # offer ours |
|
|
155 | my $offer_coro = sync_offer($folder, $vc, $diff[3]); |
|
|
156 | |
|
|
157 | # request theirs |
|
|
158 | { |
|
|
159 | my @send = grep { !$folder->exists($_) } @{$diff[1]}; |
|
|
160 | $vc->snd("send", join "\0", @send); |
|
|
161 | $folder->append($_, $vc->rcv) for @send; |
|
|
162 | $vc->snd("-"); # sync |
|
|
163 | } |
|
|
164 | |
|
|
165 | slog 0, "waiting...\n";#d# |
|
|
166 | $offer_coro->join; |
|
|
167 | |
|
|
168 | # sanity check |
|
|
169 | $vc->snd("inventory"); |
|
|
170 | if ($folder->inventory ne $vc->rcv) { |
|
|
171 | $folder->write_mdif; |
|
|
172 | slog 0, "FATAL: folder inventory mismatch after update\n"; |
|
|
173 | } |
|
|
174 | |
|
|
175 | $vc->snd("end"); |
|
|
176 | $folder->end_update; |
|
|
177 | |
|
|
178 | $vc->snd("setctime", $folder->{ctime}); |
|
|
179 | |
|
|
180 | $vc->snd("mtime"); |
|
|
181 | $folder->{host}{$OTHERNAME} = $vc->rcv; |
|
|
182 | |
|
|
183 | $vc->snd("close"); |
|
|
184 | $folder->close; |
|
|
185 | |
|
|
186 | undef $quit_guard; |
|
|
187 | } |
|
|
188 | } |
|
|
189 | |
|
|
190 | sub main { |
|
|
191 | my $vc = create vc; |
|
|
192 | |
|
|
193 | # time checking done symmetrically |
|
|
194 | { |
|
|
195 | my $time = time; |
|
|
196 | $vc->snd("time"); |
|
|
197 | my $othertime = $vc->rcv; |
|
|
198 | abs (($time + time)*0.5 - $othertime) <= $::MAXTIMEDIFF |
|
|
199 | or die "ERROR: time difference between hosts larger than $::MAXTIMEDIFF"; |
|
|
200 | } |
|
|
201 | |
|
|
202 | $vc->snd("setname", $MYNAME); $OTHERNAME = $vc->rcv; |
|
|
203 | |
340 | if ($SLAVE) { |
204 | if ($SLAVE) { |
341 | request "fhave", $folder; |
205 | # |
|
|
206 | } else { |
|
|
207 | $vc->snd("list"); |
|
|
208 | for (split /\0/, $vc->rcv) { |
|
|
209 | if (!-e "$PREFIX/$_") { |
|
|
210 | slog 2, "created new empty folder $_\n"; |
|
|
211 | sysopen my $fh, "$PREFIX/$_", O_RDWR|O_CREAT, 0666; |
|
|
212 | } |
|
|
213 | } |
|
|
214 | |
|
|
215 | for my $folder (find_folders) { |
|
|
216 | sync_folder $folder; |
|
|
217 | } |
|
|
218 | |
|
|
219 | $quit->wrlock; |
|
|
220 | |
|
|
221 | vc::snd_quit; |
|
|
222 | } |
|
|
223 | } |
|
|
224 | |
|
|
225 | sub serve_folder { |
|
|
226 | my $vc = shift; |
|
|
227 | my $name = $vc->rcv; |
|
|
228 | my $folder = new folder name => $name; |
|
|
229 | |
|
|
230 | slog 8, "serving folder $name\n"; |
|
|
231 | |
|
|
232 | $folder->read_mdif; |
|
|
233 | |
|
|
234 | while (my $msg = $vc->rcv) { |
|
|
235 | if ($msg eq "mtime") { |
|
|
236 | $vc->snd($folder->{mtime}); |
|
|
237 | } elsif ($msg eq "inventory") { |
|
|
238 | $vc->snd($folder->inventory); |
|
|
239 | } elsif ($msg eq "diff") { |
|
|
240 | my $time = $vc->rcv; |
|
|
241 | for (@{$folder->{diff}}) { |
|
|
242 | next if $_->[0] <= $time; |
|
|
243 | $vc->snd($_->[0], |
|
|
244 | (join "\0", @{$_->[1]}), |
|
|
245 | (join "\0", @{$_->[2]}), |
|
|
246 | ); |
|
|
247 | } |
|
|
248 | $vc->snd(-1); |
|
|
249 | } elsif ($msg eq "begin") { |
|
|
250 | $folder->begin_update; |
|
|
251 | } elsif ($msg eq "delete") { |
|
|
252 | $folder->delete(split /\0/, $vc->rcv); |
|
|
253 | } elsif ($msg eq "offer") { |
|
|
254 | my $ovc = catch vc port => $vc->rcv; |
|
|
255 | async { |
|
|
256 | my @offer; |
|
|
257 | { |
|
|
258 | my @dup; |
|
|
259 | |
|
|
260 | $ovc->snd("-"); # synchronize |
|
|
261 | for (split /\0/, $ovc->rcv) { |
|
|
262 | if ($folder->exists($_)) { |
|
|
263 | push @dup, $_; |
|
|
264 | } else { |
|
|
265 | push @offer, $_; |
|
|
266 | } |
|
|
267 | } |
|
|
268 | |
|
|
269 | $ovc->snd(join "\0", @dup); |
|
|
270 | } |
|
|
271 | |
|
|
272 | # now we'll get everything in @offer, in order |
|
|
273 | $folder->append($_, $ovc->rcv) for @offer; |
|
|
274 | $ovc->close; |
|
|
275 | }; |
|
|
276 | } elsif ($msg eq "send") { |
|
|
277 | $vc->pri(1); |
|
|
278 | $vc->snd($folder->fetch($_)) for split /\0/, $vc->rcv; |
|
|
279 | $vc->rcv; # sync |
|
|
280 | $vc->pri(0); |
|
|
281 | } elsif ($msg eq "end") { |
|
|
282 | $folder->end_update; |
|
|
283 | } elsif ($msg eq "mtime") { |
|
|
284 | $vc->snd($folder->{mtime}); |
|
|
285 | } elsif ($msg eq "setctime") { |
|
|
286 | $folder->{host}{$OTHERNAME} = $vc->rcv; |
|
|
287 | } elsif ($msg eq "close") { |
|
|
288 | $folder->close; |
342 | } else { |
289 | } else { |
343 | request "fwant", $folder; |
290 | die "protocol error, unknown folder command ($msg)\n"; |
344 | } |
291 | } |
345 | } |
292 | } |
|
|
293 | } |
346 | |
294 | |
347 | $done = 1; |
295 | sub serve { |
348 | my $x = request("nop"); |
296 | my $vc = shift; |
349 | print STDERR "$SLAVE returned nop @$x\n";#d# |
297 | |
|
|
298 | slog 8, "new connection $vc->{port}\n"; |
|
|
299 | |
|
|
300 | while (my $msg = $vc->rcv) { |
|
|
301 | if ($msg eq "setname") { |
|
|
302 | $vc->snd($MYNAME); |
|
|
303 | $OTHERNAME = $vc->rcv; |
|
|
304 | } elsif ($msg eq "pri") { |
|
|
305 | $vc->{pri} = $vc->rcv; |
|
|
306 | } elsif ($msg eq "time") { |
|
|
307 | $vc->snd(time); |
|
|
308 | } elsif ($msg eq "list") { |
|
|
309 | $vc->snd(join "\0", find_folders); |
|
|
310 | } elsif ($msg eq "open") { |
|
|
311 | serve_folder($vc); |
|
|
312 | } else { |
|
|
313 | die "protocol error, unknown command ($msg)\n"; |
|
|
314 | } |
|
|
315 | } |
350 | } |
316 | } |
351 | |
317 | |
352 | if ($SLAVE) { |
318 | if ($SLAVE) { |
353 | $HOSTID = "slave"; |
319 | $HOSTID = "slave"; |
354 | async \&sync_dir, "./dst"; |
320 | async \&main; |
355 | handle_commands unblock \*STDIN; |
321 | vc::multiplex unblock \*STDIN, $SLAVE; |
356 | } else { |
322 | } else { |
357 | $HOSTID = "master"; |
323 | $HOSTID = "master"; |
358 | { |
324 | { |
359 | use Socket; |
325 | use Socket; |
360 | socketpair S1, S2, AF_UNIX, SOCK_STREAM, PF_UNSPEC; |
326 | socketpair S1, S2, AF_UNIX, SOCK_STREAM, PF_UNSPEC; |
361 | if (fork == 0) { |
327 | if (fork == 0) { |
|
|
328 | close S1; |
362 | open STDIN, "<&S2" or die; |
329 | open STDIN, "<&S2" or die; |
363 | open STDOUT, ">&S2" or die; |
330 | open STDOUT, ">&S2" or die; |
364 | exec $0, "--slave"; |
331 | exec $0, "--slave"; |
365 | exit 255; |
332 | exit 255; |
366 | } |
333 | } |
367 | async \&sync_dir, "./src"; |
334 | close S2; |
368 | handle_commands unblock \*S1; |
335 | async \&main; |
|
|
336 | vc::multiplex unblock \*S1, $SLAVE; |
369 | } |
337 | } |
370 | } |
338 | } |
371 | |
339 | |