ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/syncmail/syncmail
Revision: 1.6
Committed: Sun Oct 28 20:52:25 2001 UTC (22 years, 6 months ago) by root
Branch: MAIN
Changes since 1.5: +55 -23 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 #!/usr/bin/perl
2
3 use Coro;
4 use Coro::Handle;
5 use Coro::Event;
6 use Coro::Semaphore;
7 use Coro::Channel;
8 use Coro::Signal;
9 use Coro::RWLock;
10
11 use Set::Scalar;
12
13 use Fcntl;
14
15 our $VERSION = 0.1;
16
17 our $NOW = time;
18
19 require "config.pl";
20
21 use folder;
22 use vc;
23
24 $v = $VERBOSE;
25
26 # WARNING:
27 # Content-Length headers are deliberately being ignored. They
28 # are broken by design and will never be supported
29
30 # TODO: real message-id parsing
31
32 $|=1;
33
34 my $ecnt;
35
36 sub slog {
37 if ($_[0] <= $v) {
38 print STDERR "[$SLAVE] ", $_[1];
39 }
40 }
41
42 $lockdisk = new Coro::Semaphore;
43
44 # give up a/the timeslice
45 sub give {
46 Coro::Event::do_timer(after => 0);
47 }
48
49 my $quit = new Coro::RWLock;
50
51 sub rwlock::DESTROY {
52 $quit->unlock;
53 }
54
55 sub quit_guard {
56 $quit->rdlock;
57 bless [], rwlock;
58 }
59
60 sub find_folders {
61 my @folders;
62
63 opendir my $dh, $PREFIX
64 or die "$PREFIX: $!";
65
66 while (defined (my $folder = readdir $dh)) {
67 next if $folder =~ /^\./;
68 next if $folder =~ /~$/;
69 next unless -f "$PREFIX/$folder";
70 push @folders, $folder;
71 }
72
73 @folders;
74 }
75
76 my $sync_folder = new Coro::Semaphore 3; # max 3 folders in parallel
77
78 sub sync_folder {
79 my $name = $_[0];
80
81 my $quit_guard = quit_guard;
82 async {
83 my $guard = $sync_folder->guard;
84 my $vc = create vc;
85
86 my $folder = new folder name => $name;
87
88 $vc->snd("open", $name);
89 $vc->snd("mtime");
90
91 $folder->check;
92 $folder->read_mdif;
93
94 my $ctime = $folder->{host}{$OTHERNAME} || -1;
95 my $rtime = $vc->rcv;
96
97 $vc->snd("diff", $ctime);
98
99 my %diff; #
100 # 00 - local del
101 # 01 - local add
102 # 10 - remote del
103 # 11 - remote add
104
105 my @diff = grep { $_->[0] > $ctime } @{$folder->{diff}};
106 $diff = $vc->rcv;
107
108 while () {
109 if ($diff >= 0 and (!@diff or $diff <= $diff[0][0])) {
110 slog 0, "applying remote diff $diff (@add, @del)\n";
111 $diff{$_} = 0b01 for split /\0/, $vc->rcv; # add
112 $diff{$_} = 0b00 for split /\0/, $vc->rcv; # del
113
114 $diff = $vc->rcv;
115 } elsif (@diff) {
116 slog 0, "applying local diff $diff[0][0]\n";
117 $diff{$_} = 0b11 for @{$diff[0][1]};
118 $diff{$_} = 0b10 for @{$diff[0][2]};
119
120 shift @diff;
121 } else {
122 slog 0, "no more diffing\n";
123 last;
124 }
125 }
126
127 # append or update, depending on wether there are messages to be deleted
128 $vc->snd("begin");
129 $folder->begin_update;
130
131 while (my ($k,$v) = each %diff) {
132 push @{$diff[$v]}, $k;
133 slog 0, "DIFF $k : $v\n";#d#
134 }
135
136 $vc->snd("delete", join "\0", @{$diff[2]});
137 $folder->delete(@{$diff[0]});
138
139 $vc->snd("end");
140 $folder->end_update;
141
142 $vc->snd("ctime", $folder->{ctime});
143 $folder->{host}{$OTHERNAME} = $vc->rcv;
144
145 # sanity check
146 $vc->snd("inventory");
147 if ($folder->inventory ne $vc->rcv) {
148 slog 0, "FATAL: folder inventory mismatch after update";
149 die;
150 }
151 slog 0, "LINV ".$folder->inventory."\n";
152 slog 0, "RINV ".$vc->rcv."\n";
153
154 undef $quit_guard;
155 }
156 }
157
158 sub main {
159 my $vc = create vc;
160
161 # time checking done symmetrically
162 {
163 my $time = time;
164 $vc->snd("time");
165 my $othertime = $vc->rcv;
166 abs (($time + time)*0.5 - $othertime) <= $::MAXTIMEDIFF
167 or die "ERROR: time difference between hosts larger than $::MAXTIMEDIFF";
168 }
169 #Coro::Event::do_timer(after => 60);#d#
170
171 $vc->snd("setname", $MYNAME); $OTHERNAME = $vc->rcv;
172
173 if ($SLAVE) {
174 #
175 } else {
176 $vc->snd("list");
177 for (split /\0/, $vc->rcv) {
178 if (!-e "$PREFIX/$_") {
179 slog 2, "created new empty folder $_\n";
180 sysopen my $fh, "$PREFIX/$_", O_RDWR|O_CREAT, 0666;
181 }
182 }
183
184 for my $folder (find_folders) {
185 sync_folder $folder;
186 }
187
188 $quit->wrlock;
189
190 vc::snd_quit;
191 }
192 }
193
194 sub serve_folder {
195 my $vc = shift;
196 my $name = $vc->rcv;
197 my $folder = new folder name => $name;
198
199 slog 8, "serving folder $name\n";
200
201 $folder->check;
202 $folder->read_mdif;
203
204 while (my $msg = $vc->rcv) {
205 if ($msg eq "mtime") {
206 $vc->snd($folder->{mtime});
207 } elsif ($msg eq "inventory") {
208 $vc->snd($folder->inventory);
209 } elsif ($msg eq "diff") {
210 my $time = $vc->rcv;
211 for (@{$folder->{diff}}) {
212 next if $_->[0] <= $time;
213 $vc->snd($_->[0],
214 (join "\0", @{$_->[1]}),
215 (join "\0", @{$_->[2]}),
216 );
217 }
218 $vc->snd(-1);
219 } elsif ($msg eq "begin") {
220 $folder->begin_update;
221 } elsif ($msg eq "end") {
222 $folder->end_update;
223 } elsif ($msg eq "mtime") {
224 $vc->snd($folder->{mtime});
225 } elsif ($msg eq "setctime") {
226 $folder->{host}{$OTHERNAME} = $vc->rcv;
227 } else {
228 die "protocol error, unknown folder command ($msg)\n";
229 }
230 }
231 }
232
233 sub serve {
234 my $vc = shift;
235
236 slog 8, "new connection $vc->{port}\n";
237
238 while (my $msg = $vc->rcv) {
239 if ($msg eq "setname") {
240 $vc->snd($MYNAME);
241 $OTHERNAME = $vc->rcv;
242 } elsif ($msg eq "pri") {
243 $vc->{pri} = $vc->rcv;
244 } elsif ($msg eq "time") {
245 $vc->snd(time);
246 } elsif ($msg eq "list") {
247 $vc->snd(join "\0", find_folders);
248 } elsif ($msg eq "open") {
249 serve_folder($vc);
250 } else {
251 die "protocol error, unknown command ($msg)\n";
252 }
253 }
254 }
255
256 if ($SLAVE) {
257 $HOSTID = "slave";
258 async \&main;
259 vc::multiplex unblock \*STDIN, $SLAVE;
260 } else {
261 $HOSTID = "master";
262 {
263 use Socket;
264 socketpair S1, S2, AF_UNIX, SOCK_STREAM, PF_UNSPEC;
265 if (fork == 0) {
266 close S1;
267 open STDIN, "<&S2" or die;
268 open STDOUT, ">&S2" or die;
269 exec $0, "--slave";
270 exit 255;
271 }
272 close S2;
273 async \&main;
274 vc::multiplex unblock \*S1, $SLAVE;
275 }
276 }
277