ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/syncmail/syncmail
(Generate patch)

Comparing syncmail/syncmail (file contents):
Revision 1.4 by root, Sun Oct 28 03:51:24 2001 UTC vs.
Revision 1.8 by root, Mon Oct 29 02:36:32 2001 UTC

10 10
11use Set::Scalar; 11use Set::Scalar;
12 12
13use Fcntl; 13use Fcntl;
14 14
15use constant PROTVERSION => 1;
16
17$VERSION = 0.1; 15our $VERSION = 0.1;
16
17our $NOW = time;
18 18
19require "config.pl"; 19require "config.pl";
20 20
21use folder; 21use folder;
22use vc; 22use vc;
63 opendir my $dh, $PREFIX 63 opendir my $dh, $PREFIX
64 or die "$PREFIX: $!"; 64 or die "$PREFIX: $!";
65 65
66 while (defined (my $folder = readdir $dh)) { 66 while (defined (my $folder = readdir $dh)) {
67 next if $folder =~ /^\./; 67 next if $folder =~ /^\./;
68 next if $folder =~ /~$/;
68 next unless -f "$PREFIX/$folder"; 69 next unless -f "$PREFIX/$folder";
69 push @folders, $folder; 70 push @folders, $folder;
70 } 71 }
71 72
72 @folders; 73 @folders;
73} 74}
74 75
75my $sync_folder = new Coro::Semaphore 3; # max 3 folders in parallel 76my $sync_folder = new Coro::Semaphore 3; # max 3 folders in parallel
77
78sub sync_offer {
79 my ($folder, $avc, $diff) = @_;
80
81 my $vc = create vc pri => 1;
82 $avc->snd("offer", $vc->{port});
83
84 async {
85 $vc->rcv; # need to synchronize, argl. should open on other side
86 $vc->snd(join "\0", @$diff);
87 my %dup; @dup{split /\0/, $vc->rcv} = ();
88
89 for (@$diff) {
90 $vc->snd($folder->fetch($_)) unless exists $dup{$_};
91 }
92
93 defined $vc->rcv and die "protocol error, expected close";
94 };
95}
76 96
77sub sync_folder { 97sub sync_folder {
78 my $name = $_[0]; 98 my $name = $_[0];
79 99
80 my $quit_guard = quit_guard; 100 my $quit_guard = quit_guard;
81 async { 101 async {
82 my $guard = $sync_folder->guard; 102 my $guard = $sync_folder->guard;
83 my $vc = new vc; 103 my $vc = create vc;
84 104
85 my $folder = new folder name => $name; 105 my $folder = new folder name => $name;
86 106
87 $vc->snd("open", $name); 107 $vc->snd("open", $name);
88 $vc->snd("mtime");
89
90 $folder->check;
91 $folder->read_mdif; 108 $folder->read_mdif;
92 109
93 my $ctime = $folder->{host}{$OTHERNAME} || -1; 110 my $ctime = $folder->{host}{$OTHERNAME} || -1;
94 my $rtime = $vc->rcv;
95 111
96 $vc->snd("diff", $ctime); 112 $vc->snd("diff", $ctime);
97 113
98 my (%ladd, %ldel, %radd, %rdel); 114 my %diff; #
115 # 00 - local del
116 # 01 - local add
117 # 10 - remote del
118 # 11 - remote add
99 119
100 my @diff = grep { $_->[0] > $ctime } @{$folder->{diff}}; 120 my @diff = grep { $_->[0] > $ctime } @{$folder->{diff}};
121
101 $diff = $vc->rcv; 122 $diff = $vc->rcv;
102
103 while () { 123 while () {
104 if ($diff >= 0 and (!@diff or $diff <= $diff[0][0])) { 124 if ($diff >= 0 and (!@diff or $diff < $diff[0][0])) {
105 my @add = split /\0/, $vc->rcv; 125 $diff{$_} = 0b01 for split /\0/, $vc->rcv; # add
106 my @del = split /\0/, $vc->rcv; 126 $diff{$_} = 0b00 for split /\0/, $vc->rcv; # del
127
107 $diff = $vc->rcv; 128 $diff = $vc->rcv;
108 slog 0, "applying remote diff $diff\n";
109 for (@del) { undef $rdel{$_}; delete $radd{$_}; delete $ladd{$_}; }
110 for (@add) { undef $radd{$_}; delete $rdel{$_}; delete $ldel{$_}; }
111 } elsif (@diff) { 129 } elsif (@diff) {
112 slog 0, "applying local diff $diff[0][0]\n"; 130 $diff{$_} = 0b11 for @{$diff[0][1]};
113 for (@{$diff[0][2]}) { undef $rdel{$_}; delete $radd{$_}; delete $ladd{$_}; } 131 $diff{$_} = 0b10 for @{$diff[0][2]};
114 for (@{$diff[0][1]}) { undef $radd{$_}; delete $rdel{$_}; delete $ldel{$_}; } 132
115 shift @diff; 133 shift @diff;
116 } else { 134 } else {
117 slog 0, "no more diffing\n";
118 last; 135 last;
119 } 136 }
120 } 137 }
121 138
122 slog 0, "LADD ".join(" ", keys %ladd)."\n"; 139 # append or update, depending on wether there are messages to be deleted
123 slog 0, "LDEL ".join(" ", keys %ldel)."\n"; 140 $vc->snd("begin");
124 slog 0, "RADD ".join(" ", keys %radd)."\n"; 141 $folder->begin_update;
125 slog 0, "RDEL ".join(" ", keys %rdel)."\n"; 142
143 while (my ($k,$v) = each %diff) {
144 push @{$diff[$v]}, $k;
145 slog 0, "DIFF $k : $v\n";#d#
146 }
147
148 $vc->snd("delete", join "\0", @{$diff[2]});
149 $folder->delete(@{$diff[0]});
150
151 # offer ours
152 my $offer_coro = sync_offer($folder, $vc, $diff[3]);
153
154 # request theirs
155 {
156 my @send = grep { !$folder->exists($_) } @{$diff[1]};
157 $vc->snd("send", join "\0", @send);
158 $folder->append($_, $vc->rcv) for @send;
159 $vc->snd("-"); # sync
160 }
161
162 slog 0, "waiting...\n";#d#
163 $offer_coro->join;
164
165 # sanity check
166 $vc->snd("inventory");
167 if ($folder->inventory ne $vc->rcv) {
168 $folder->write_mdif;
169 slog 0, "FATAL: folder inventory mismatch after update\n";
170 }
171
172 $vc->snd("end");
173 $folder->end_update;
126 174
175 $vc->snd("setctime", $folder->{ctime});
176
177 $vc->snd("mtime");
127 #$folder->{host}{$OTHERNAME} = time; 178 $folder->{host}{$OTHERNAME} = $vc->rcv;
128 #$vc->snd("setctime", time); 179
180 $vc->snd("close");
181 $folder->close;
129 182
130 undef $quit_guard; 183 undef $quit_guard;
131 } 184 }
132} 185}
133 186
134sub main { 187sub main {
135 my $vc = new vc; 188 my $vc = create vc;
136 189
137 # time checking done symmetrically 190 # time checking done symmetrically
138 { 191 {
139 my $time = time; 192 my $time = time;
140 $vc->snd("time"); 193 $vc->snd("time");
141 my $othertime = $vc->rcv; 194 my $othertime = $vc->rcv;
142 abs (($time + time)*0.5 - $othertime) <= $::MAXTIMEDIFF 195 abs (($time + time)*0.5 - $othertime) <= $::MAXTIMEDIFF
143 or die "ERROR: time difference between hosts larger than $::MAXTIMEDIFF"; 196 or die "ERROR: time difference between hosts larger than $::MAXTIMEDIFF";
144 } 197 }
145#Coro::Event::do_timer(after => 60);#d#
146 198
147 $vc->snd("name"); 199 $vc->snd("setname", $MYNAME); $OTHERNAME = $vc->rcv;
148 $OTHERNAME = $vc->rcv;
149 200
150 if ($SLAVE) { 201 if ($SLAVE) {
151 # 202 #
152 } else { 203 } else {
153 $vc->snd("list"); 204 $vc->snd("list");
173 my $name = $vc->rcv; 224 my $name = $vc->rcv;
174 my $folder = new folder name => $name; 225 my $folder = new folder name => $name;
175 226
176 slog 8, "serving folder $name\n"; 227 slog 8, "serving folder $name\n";
177 228
178 $folder->check;
179 $folder->read_mdif; 229 $folder->read_mdif;
180 230
181 while (my $msg = $vc->rcv) { 231 while (my $msg = $vc->rcv) {
182 if ($msg eq "mtime") { 232 if ($msg eq "mtime") {
183 $vc->snd($folder->{mtime}); 233 $vc->snd($folder->{mtime});
234 } elsif ($msg eq "inventory") {
235 $vc->snd($folder->inventory);
184 } elsif ($msg eq "diff") { 236 } elsif ($msg eq "diff") {
185 my $time = $vc->rcv; 237 my $time = $vc->rcv;
186 for (@{$folder->{diff}}) { 238 for (@{$folder->{diff}}) {
187 next if $_->[0] <= $time; 239 next if $_->[0] <= $time;
188 $vc->snd($_->[0], 240 $vc->snd($_->[0],
189 (join "\0", @{$_->[1]}), 241 (join "\0", @{$_->[1]}),
190 (join "\0", @{$_->[2]}), 242 (join "\0", @{$_->[2]}),
191 ); 243 );
192 } 244 }
193 $vc->snd(-1); 245 $vc->snd(-1);
246 } elsif ($msg eq "begin") {
247 $folder->begin_update;
248 } elsif ($msg eq "delete") {
249 $folder->delete(split /\0/, $vc->rcv);
250 } elsif ($msg eq "offer") {
251 my $ovc = catch vc port => $vc->rcv;
252 async {
253 my @offer;
254 {
255 my @dup;
256
257 $ovc->snd("-"); # synchronize
258 for (split /\0/, $ovc->rcv) {
259 if ($folder->exists($_)) {
260 push @dup, $_;
261 } else {
262 push @offer, $_;
263 }
264 }
265
266 $ovc->snd(join "\0", @dup);
267 }
268
269 # now we'll get everything in @offer, in order
270 $folder->append($_, $ovc->rcv) for @offer;
271 $ovc->close;
272 };
273 } elsif ($msg eq "send") {
274 $vc->pri(1);
275 $vc->snd($folder->fetch($_)) for split /\0/, $vc->rcv;
276 $vc->rcv; # sync
277 $vc->pri(0);
278 } elsif ($msg eq "end") {
279 $folder->end_update;
280 } elsif ($msg eq "mtime") {
281 $vc->snd($folder->{mtime});
194 } elsif ($msg eq "setctime") { 282 } elsif ($msg eq "setctime") {
195 $folder->{host}{$OTHERNAME} = $vc->rcv; 283 $folder->{host}{$OTHERNAME} = $vc->rcv;
284 } elsif ($msg eq "close") {
285 $folder->close;
196 } else { 286 } else {
197 die "protocol error, unknown folder command ($msg)\n"; 287 die "protocol error, unknown folder command ($msg)\n";
198 } 288 }
199 } 289 }
200} 290}
203 my $vc = shift; 293 my $vc = shift;
204 294
205 slog 8, "new connection $vc->{port}\n"; 295 slog 8, "new connection $vc->{port}\n";
206 296
207 while (my $msg = $vc->rcv) { 297 while (my $msg = $vc->rcv) {
208 if ($msg eq "name") { 298 if ($msg eq "setname") {
209 $vc->snd($::MYNAME); 299 $vc->snd($MYNAME);
300 $OTHERNAME = $vc->rcv;
210 } elsif ($msg eq "pri") { 301 } elsif ($msg eq "pri") {
211 $self->{pri} = $vc->rcv; 302 $vc->{pri} = $vc->rcv;
212 } elsif ($msg eq "time") { 303 } elsif ($msg eq "time") {
213 $vc->snd(time); 304 $vc->snd(time);
214 } elsif ($msg eq "list") { 305 } elsif ($msg eq "list") {
215 $vc->snd(join "\0", find_folders); 306 $vc->snd(join "\0", find_folders);
216 } elsif ($msg eq "open") { 307 } elsif ($msg eq "open") {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines