ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/syncmail/syncmail
Revision: 1.4
Committed: Sun Oct 28 03:51:24 2001 UTC (22 years, 6 months ago) by root
Branch: MAIN
Changes since 1.3: +48 -14 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 #!/usr/bin/perl
2
3 use Coro;
4 use Coro::Handle;
5 use Coro::Event;
6 use Coro::Semaphore;
7 use Coro::Channel;
8 use Coro::Signal;
9 use Coro::RWLock;
10
11 use Set::Scalar;
12
13 use Fcntl;
14
15 use constant PROTVERSION => 1;
16
17 $VERSION = 0.1;
18
19 require "config.pl";
20
21 use folder;
22 use vc;
23
24 $v = $VERBOSE;
25
26 # WARNING:
27 # Content-Length headers are deliberately being ignored. They
28 # are broken by design and will never be supported
29
30 # TODO: real message-id parsing
31
32 $|=1;
33
34 my $ecnt;
35
36 sub slog {
37 if ($_[0] <= $v) {
38 print STDERR "[$SLAVE] ", $_[1];
39 }
40 }
41
42 $lockdisk = new Coro::Semaphore;
43
44 # give up a/the timeslice
45 sub give {
46 Coro::Event::do_timer(after => 0);
47 }
48
49 my $quit = new Coro::RWLock;
50
51 sub rwlock::DESTROY {
52 $quit->unlock;
53 }
54
55 sub quit_guard {
56 $quit->rdlock;
57 bless [], rwlock;
58 }
59
60 sub find_folders {
61 my @folders;
62
63 opendir my $dh, $PREFIX
64 or die "$PREFIX: $!";
65
66 while (defined (my $folder = readdir $dh)) {
67 next if $folder =~ /^\./;
68 next unless -f "$PREFIX/$folder";
69 push @folders, $folder;
70 }
71
72 @folders;
73 }
74
75 my $sync_folder = new Coro::Semaphore 3; # max 3 folders in parallel
76
77 sub sync_folder {
78 my $name = $_[0];
79
80 my $quit_guard = quit_guard;
81 async {
82 my $guard = $sync_folder->guard;
83 my $vc = new vc;
84
85 my $folder = new folder name => $name;
86
87 $vc->snd("open", $name);
88 $vc->snd("mtime");
89
90 $folder->check;
91 $folder->read_mdif;
92
93 my $ctime = $folder->{host}{$OTHERNAME} || -1;
94 my $rtime = $vc->rcv;
95
96 $vc->snd("diff", $ctime);
97
98 my (%ladd, %ldel, %radd, %rdel);
99
100 my @diff = grep { $_->[0] > $ctime } @{$folder->{diff}};
101 $diff = $vc->rcv;
102
103 while () {
104 if ($diff >= 0 and (!@diff or $diff <= $diff[0][0])) {
105 my @add = split /\0/, $vc->rcv;
106 my @del = split /\0/, $vc->rcv;
107 $diff = $vc->rcv;
108 slog 0, "applying remote diff $diff\n";
109 for (@del) { undef $rdel{$_}; delete $radd{$_}; delete $ladd{$_}; }
110 for (@add) { undef $radd{$_}; delete $rdel{$_}; delete $ldel{$_}; }
111 } elsif (@diff) {
112 slog 0, "applying local diff $diff[0][0]\n";
113 for (@{$diff[0][2]}) { undef $rdel{$_}; delete $radd{$_}; delete $ladd{$_}; }
114 for (@{$diff[0][1]}) { undef $radd{$_}; delete $rdel{$_}; delete $ldel{$_}; }
115 shift @diff;
116 } else {
117 slog 0, "no more diffing\n";
118 last;
119 }
120 }
121
122 slog 0, "LADD ".join(" ", keys %ladd)."\n";
123 slog 0, "LDEL ".join(" ", keys %ldel)."\n";
124 slog 0, "RADD ".join(" ", keys %radd)."\n";
125 slog 0, "RDEL ".join(" ", keys %rdel)."\n";
126
127 #$folder->{host}{$OTHERNAME} = time;
128 #$vc->snd("setctime", time);
129
130 undef $quit_guard;
131 }
132 }
133
134 sub main {
135 my $vc = new vc;
136
137 # time checking done symmetrically
138 {
139 my $time = time;
140 $vc->snd("time");
141 my $othertime = $vc->rcv;
142 abs (($time + time)*0.5 - $othertime) <= $::MAXTIMEDIFF
143 or die "ERROR: time difference between hosts larger than $::MAXTIMEDIFF";
144 }
145 #Coro::Event::do_timer(after => 60);#d#
146
147 $vc->snd("name");
148 $OTHERNAME = $vc->rcv;
149
150 if ($SLAVE) {
151 #
152 } else {
153 $vc->snd("list");
154 for (split /\0/, $vc->rcv) {
155 if (!-e "$PREFIX/$_") {
156 slog 2, "created new empty folder $_\n";
157 sysopen my $fh, "$PREFIX/$_", O_RDWR|O_CREAT, 0666;
158 }
159 }
160
161 for my $folder (find_folders) {
162 sync_folder $folder;
163 }
164
165 $quit->wrlock;
166
167 vc::snd_quit;
168 }
169 }
170
171 sub serve_folder {
172 my $vc = shift;
173 my $name = $vc->rcv;
174 my $folder = new folder name => $name;
175
176 slog 8, "serving folder $name\n";
177
178 $folder->check;
179 $folder->read_mdif;
180
181 while (my $msg = $vc->rcv) {
182 if ($msg eq "mtime") {
183 $vc->snd($folder->{mtime});
184 } elsif ($msg eq "diff") {
185 my $time = $vc->rcv;
186 for (@{$folder->{diff}}) {
187 next if $_->[0] <= $time;
188 $vc->snd($_->[0],
189 (join "\0", @{$_->[1]}),
190 (join "\0", @{$_->[2]}),
191 );
192 }
193 $vc->snd(-1);
194 } elsif ($msg eq "setctime") {
195 $folder->{host}{$OTHERNAME} = $vc->rcv;
196 } else {
197 die "protocol error, unknown folder command ($msg)\n";
198 }
199 }
200 }
201
202 sub serve {
203 my $vc = shift;
204
205 slog 8, "new connection $vc->{port}\n";
206
207 while (my $msg = $vc->rcv) {
208 if ($msg eq "name") {
209 $vc->snd($::MYNAME);
210 } elsif ($msg eq "pri") {
211 $self->{pri} = $vc->rcv;
212 } elsif ($msg eq "time") {
213 $vc->snd(time);
214 } elsif ($msg eq "list") {
215 $vc->snd(join "\0", find_folders);
216 } elsif ($msg eq "open") {
217 serve_folder($vc);
218 } else {
219 die "protocol error, unknown command ($msg)\n";
220 }
221 }
222 }
223
224 if ($SLAVE) {
225 $HOSTID = "slave";
226 async \&main;
227 vc::multiplex unblock \*STDIN, $SLAVE;
228 } else {
229 $HOSTID = "master";
230 {
231 use Socket;
232 socketpair S1, S2, AF_UNIX, SOCK_STREAM, PF_UNSPEC;
233 if (fork == 0) {
234 close S1;
235 open STDIN, "<&S2" or die;
236 open STDOUT, ">&S2" or die;
237 exec $0, "--slave";
238 exit 255;
239 }
240 close S2;
241 async \&main;
242 vc::multiplex unblock \*S1, $SLAVE;
243 }
244 }
245