ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/syncmail/syncmail
Revision: 1.6
Committed: Sun Oct 28 20:52:25 2001 UTC (22 years, 6 months ago) by root
Branch: MAIN
Changes since 1.5: +55 -23 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 #!/usr/bin/perl
2    
3     use Coro;
4     use Coro::Handle;
5     use Coro::Event;
6     use Coro::Semaphore;
7     use Coro::Channel;
8     use Coro::Signal;
9 root 1.2 use Coro::RWLock;
10 root 1.1
11 root 1.4 use Set::Scalar;
12    
13 root 1.2 use Fcntl;
14 root 1.1
15 root 1.6 our $VERSION = 0.1;
16 root 1.2
17 root 1.6 our $NOW = time;
18 root 1.2
19 root 1.3 require "config.pl";
20    
21     use folder;
22     use vc;
23    
24     $v = $VERBOSE;
25 root 1.2
26     # WARNING:
27     # Content-Length headers are deliberately being ignored. They
28     # are broken by design and will never be supported
29 root 1.1
30     # TODO: real message-id parsing
31    
32     $|=1;
33    
34     my $ecnt;
35    
36     sub slog {
37     if ($_[0] <= $v) {
38 root 1.2 print STDERR "[$SLAVE] ", $_[1];
39 root 1.1 }
40     }
41    
42 root 1.3 $lockdisk = new Coro::Semaphore;
43 root 1.2
44     # give up a/the timeslice
45     sub give {
46     Coro::Event::do_timer(after => 0);
47     }
48    
49     my $quit = new Coro::RWLock;
50    
51     sub rwlock::DESTROY {
52     $quit->unlock;
53     }
54    
55     sub quit_guard {
56     $quit->rdlock;
57     bless [], rwlock;
58     }
59    
60     sub find_folders {
61     my @folders;
62    
63     opendir my $dh, $PREFIX
64     or die "$PREFIX: $!";
65    
66     while (defined (my $folder = readdir $dh)) {
67     next if $folder =~ /^\./;
68 root 1.6 next if $folder =~ /~$/;
69 root 1.2 next unless -f "$PREFIX/$folder";
70     push @folders, $folder;
71     }
72    
73     @folders;
74     }
75    
76     my $sync_folder = new Coro::Semaphore 3; # max 3 folders in parallel
77    
78     sub sync_folder {
79     my $name = $_[0];
80    
81     my $quit_guard = quit_guard;
82     async {
83     my $guard = $sync_folder->guard;
84 root 1.6 my $vc = create vc;
85 root 1.2
86 root 1.3 my $folder = new folder name => $name;
87 root 1.2
88 root 1.3 $vc->snd("open", $name);
89     $vc->snd("mtime");
90 root 1.2
91     $folder->check;
92     $folder->read_mdif;
93 root 1.3
94 root 1.4 my $ctime = $folder->{host}{$OTHERNAME} || -1;
95 root 1.3 my $rtime = $vc->rcv;
96 root 1.1
97 root 1.4 $vc->snd("diff", $ctime);
98    
99 root 1.6 my %diff; #
100     # 00 - local del
101     # 01 - local add
102     # 10 - remote del
103     # 11 - remote add
104 root 1.4
105     my @diff = grep { $_->[0] > $ctime } @{$folder->{diff}};
106     $diff = $vc->rcv;
107    
108     while () {
109     if ($diff >= 0 and (!@diff or $diff <= $diff[0][0])) {
110 root 1.6 slog 0, "applying remote diff $diff (@add, @del)\n";
111     $diff{$_} = 0b01 for split /\0/, $vc->rcv; # add
112     $diff{$_} = 0b00 for split /\0/, $vc->rcv; # del
113    
114 root 1.4 $diff = $vc->rcv;
115     } elsif (@diff) {
116     slog 0, "applying local diff $diff[0][0]\n";
117 root 1.6 $diff{$_} = 0b11 for @{$diff[0][1]};
118     $diff{$_} = 0b10 for @{$diff[0][2]};
119    
120 root 1.4 shift @diff;
121     } else {
122     slog 0, "no more diffing\n";
123     last;
124     }
125     }
126    
127 root 1.6 # append or update, depending on wether there are messages to be deleted
128     $vc->snd("begin");
129     $folder->begin_update;
130    
131     while (my ($k,$v) = each %diff) {
132     push @{$diff[$v]}, $k;
133     slog 0, "DIFF $k : $v\n";#d#
134     }
135    
136     $vc->snd("delete", join "\0", @{$diff[2]});
137     $folder->delete(@{$diff[0]});
138    
139     $vc->snd("end");
140     $folder->end_update;
141 root 1.4
142 root 1.6 $vc->snd("ctime", $folder->{ctime});
143     $folder->{host}{$OTHERNAME} = $vc->rcv;
144    
145     # sanity check
146     $vc->snd("inventory");
147     if ($folder->inventory ne $vc->rcv) {
148     slog 0, "FATAL: folder inventory mismatch after update";
149     die;
150     }
151     slog 0, "LINV ".$folder->inventory."\n";
152     slog 0, "RINV ".$vc->rcv."\n";
153 root 1.1
154 root 1.2 undef $quit_guard;
155 root 1.1 }
156     }
157    
158 root 1.2 sub main {
159 root 1.6 my $vc = create vc;
160 root 1.3
161 root 1.2 # time checking done symmetrically
162     {
163     my $time = time;
164 root 1.3 $vc->snd("time");
165     my $othertime = $vc->rcv;
166 root 1.2 abs (($time + time)*0.5 - $othertime) <= $::MAXTIMEDIFF
167     or die "ERROR: time difference between hosts larger than $::MAXTIMEDIFF";
168     }
169 root 1.3 #Coro::Event::do_timer(after => 60);#d#
170    
171 root 1.6 $vc->snd("setname", $MYNAME); $OTHERNAME = $vc->rcv;
172 root 1.1
173 root 1.2 if ($SLAVE) {
174     #
175     } else {
176 root 1.3 $vc->snd("list");
177     for (split /\0/, $vc->rcv) {
178 root 1.2 if (!-e "$PREFIX/$_") {
179     slog 2, "created new empty folder $_\n";
180     sysopen my $fh, "$PREFIX/$_", O_RDWR|O_CREAT, 0666;
181     }
182     }
183 root 1.1
184 root 1.2 for my $folder (find_folders) {
185     sync_folder $folder;
186 root 1.1 }
187 root 1.2
188     $quit->wrlock;
189 root 1.3
190     vc::snd_quit;
191     }
192     }
193    
194     sub serve_folder {
195     my $vc = shift;
196     my $name = $vc->rcv;
197     my $folder = new folder name => $name;
198    
199     slog 8, "serving folder $name\n";
200    
201     $folder->check;
202     $folder->read_mdif;
203    
204     while (my $msg = $vc->rcv) {
205     if ($msg eq "mtime") {
206     $vc->snd($folder->{mtime});
207 root 1.6 } elsif ($msg eq "inventory") {
208     $vc->snd($folder->inventory);
209 root 1.4 } elsif ($msg eq "diff") {
210     my $time = $vc->rcv;
211     for (@{$folder->{diff}}) {
212     next if $_->[0] <= $time;
213     $vc->snd($_->[0],
214     (join "\0", @{$_->[1]}),
215     (join "\0", @{$_->[2]}),
216     );
217     }
218     $vc->snd(-1);
219 root 1.6 } elsif ($msg eq "begin") {
220     $folder->begin_update;
221     } elsif ($msg eq "end") {
222     $folder->end_update;
223     } elsif ($msg eq "mtime") {
224     $vc->snd($folder->{mtime});
225 root 1.4 } elsif ($msg eq "setctime") {
226     $folder->{host}{$OTHERNAME} = $vc->rcv;
227 root 1.3 } else {
228     die "protocol error, unknown folder command ($msg)\n";
229     }
230     }
231     }
232    
233     sub serve {
234     my $vc = shift;
235    
236     slog 8, "new connection $vc->{port}\n";
237    
238     while (my $msg = $vc->rcv) {
239 root 1.6 if ($msg eq "setname") {
240     $vc->snd($MYNAME);
241     $OTHERNAME = $vc->rcv;
242 root 1.3 } elsif ($msg eq "pri") {
243 root 1.6 $vc->{pri} = $vc->rcv;
244 root 1.3 } elsif ($msg eq "time") {
245     $vc->snd(time);
246     } elsif ($msg eq "list") {
247     $vc->snd(join "\0", find_folders);
248     } elsif ($msg eq "open") {
249     serve_folder($vc);
250     } else {
251     die "protocol error, unknown command ($msg)\n";
252     }
253 root 1.1 }
254     }
255    
256     if ($SLAVE) {
257     $HOSTID = "slave";
258 root 1.2 async \&main;
259 root 1.3 vc::multiplex unblock \*STDIN, $SLAVE;
260 root 1.1 } else {
261     $HOSTID = "master";
262     {
263     use Socket;
264     socketpair S1, S2, AF_UNIX, SOCK_STREAM, PF_UNSPEC;
265     if (fork == 0) {
266 root 1.2 close S1;
267 root 1.1 open STDIN, "<&S2" or die;
268     open STDOUT, ">&S2" or die;
269     exec $0, "--slave";
270     exit 255;
271     }
272 root 1.2 close S2;
273     async \&main;
274 root 1.3 vc::multiplex unblock \*S1, $SLAVE;
275 root 1.1 }
276     }
277