ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/syncmail/syncmail
(Generate patch)

Comparing syncmail/syncmail (file contents):
Revision 1.3 by root, Sat Oct 27 23:53:49 2001 UTC vs.
Revision 1.7 by root, Mon Oct 29 00:37:41 2001 UTC

6use Coro::Semaphore; 6use Coro::Semaphore;
7use Coro::Channel; 7use Coro::Channel;
8use Coro::Signal; 8use Coro::Signal;
9use Coro::RWLock; 9use Coro::RWLock;
10 10
11use Set::Scalar;
12
11use Fcntl; 13use Fcntl;
12use MD5;
13 14
14use constant PROTVERSION => 1;
15
16$VERSION = 0.1; 15our $VERSION = 0.1;
16
17our $NOW = time;
17 18
18require "config.pl"; 19require "config.pl";
19 20
20use folder; 21use folder;
21use vc; 22use vc;
62 opendir my $dh, $PREFIX 63 opendir my $dh, $PREFIX
63 or die "$PREFIX: $!"; 64 or die "$PREFIX: $!";
64 65
65 while (defined (my $folder = readdir $dh)) { 66 while (defined (my $folder = readdir $dh)) {
66 next if $folder =~ /^\./; 67 next if $folder =~ /^\./;
68 next if $folder =~ /~$/;
67 next unless -f "$PREFIX/$folder"; 69 next unless -f "$PREFIX/$folder";
68 push @folders, $folder; 70 push @folders, $folder;
69 } 71 }
70 72
71 @folders; 73 @folders;
72} 74}
73 75
74my $sync_folder = new Coro::Semaphore 3; # max 3 folders in parallel 76my $sync_folder = new Coro::Semaphore 3; # max 3 folders in parallel
77
78sub sync_offer {
79 my ($folder, $avc, $diff) = @_;
80
81 my $vc = create vc pri => 1;
82 $avc->snd("offer", $vc->{port});
83
84 async {
85 $vc->rcv; # need to synchronize, argl. should open on other side
86 $vc->snd(join "\0", @$diff);
87 my %dup; @dup{split /\0/, $vc->rcv} = ();
88
89 for (@$diff) {
90 $vc->snd($folder->fetch($_)) unless exists $dup{$_};
91 }
92
93 defined $vc->rcv and die "protocol error, expected close";
94 };
95}
75 96
76sub sync_folder { 97sub sync_folder {
77 my $name = $_[0]; 98 my $name = $_[0];
78 99
79 my $quit_guard = quit_guard; 100 my $quit_guard = quit_guard;
80 async { 101 async {
81 my $guard = $sync_folder->guard; 102 my $guard = $sync_folder->guard;
82 my $vc = new vc; 103 my $vc = create vc;
83 104
84 my $folder = new folder name => $name; 105 my $folder = new folder name => $name;
85 #my ($rfid, $rmtime) = split /\s+/, (request open => $name)->[1];
86 ::give;
87 106
88 $vc->snd("open", $name); 107 $vc->snd("open", $name);
89 $vc->snd("mtime"); 108 $vc->snd("mtime");
90 109
91 $folder->check;
92 $folder->read_mdif; 110 $folder->read_mdif;
93 111
94 my $mtime = $folder->{host}{$OTHERNAME}; 112 my $ctime = $folder->{host}{$OTHERNAME} || -1;
95 my $rtime = $vc->rcv; 113 my $rtime = $vc->rcv;
96 114
97 $vc->snd("diff", $rtime); 115 $vc->snd("diff", $ctime);
116
117 my %diff; #
118 # 00 - local del
119 # 01 - local add
120 # 10 - remote del
121 # 11 - remote add
122
123 my @diff = grep { $_->[0] > $ctime } @{$folder->{diff}};
124
125 $diff = $vc->rcv;
126 while () {
127 if ($diff >= 0 and (!@diff or $diff < $diff[0][0])) {
128 $diff{$_} = 0b01 for split /\0/, $vc->rcv; # add
129 $diff{$_} = 0b00 for split /\0/, $vc->rcv; # del
130
131 $diff = $vc->rcv;
132 } elsif (@diff) {
133 $diff{$_} = 0b11 for @{$diff[0][1]};
134 $diff{$_} = 0b10 for @{$diff[0][2]};
135
136 shift @diff;
137 } else {
138 last;
139 }
140 }
141
142 # append or update, depending on wether there are messages to be deleted
143 $vc->snd("begin");
144 $folder->begin_update;
145
146 while (my ($k,$v) = each %diff) {
147 push @{$diff[$v]}, $k;
148 slog 0, "DIFF $k : $v\n";#d#
149 }
150
151 $vc->snd("delete", join "\0", @{$diff[2]});
152 $folder->delete(@{$diff[0]});
153
154 # offer ours
155 my $offer_coro = sync_offer($folder, $vc, $diff[3]);
156
157 # request theirs
158 {
159 my @send = grep { !$folder->exists($_) } @{$diff[1]};
160 $vc->snd("send", join "\0", @send);
161 $folder->append($_, $vc->rcv) for @send;
162 $vc->snd("-"); # sync
163 }
164
165 slog 0, "waiting...\n";#d#
166 $offer_coro->join;
167
168 # sanity check
169 $vc->snd("inventory");
170 if ($folder->inventory ne $vc->rcv) {
171 $folder->write_mdif;
172 slog 0, "FATAL: folder inventory mismatch after update\n";
173 }
174
175 $vc->snd("end");
176 $folder->end_update;
177
178 $vc->snd("setctime", $folder->{ctime});
179
180 $vc->snd("mtime");
98 $folder->{host}{$OTHERNAME} = $folder->{mtime}; 181 $folder->{host}{$OTHERNAME} = $vc->rcv;
99 182
100 #request "update $rfid $folder->{mtime}"; 183 $vc->snd("close");
101 #request "close $rfid"; 184 $folder->close;
185
102 undef $quit_guard; 186 undef $quit_guard;
103 } 187 }
104} 188}
105 189
106sub main { 190sub main {
107 my $vc = new vc; 191 my $vc = create vc;
108 192
109 # time checking done symmetrically 193 # time checking done symmetrically
110 { 194 {
111 my $time = time; 195 my $time = time;
112 $vc->snd("time"); 196 $vc->snd("time");
113 my $othertime = $vc->rcv; 197 my $othertime = $vc->rcv;
114 abs (($time + time)*0.5 - $othertime) <= $::MAXTIMEDIFF 198 abs (($time + time)*0.5 - $othertime) <= $::MAXTIMEDIFF
115 or die "ERROR: time difference between hosts larger than $::MAXTIMEDIFF"; 199 or die "ERROR: time difference between hosts larger than $::MAXTIMEDIFF";
116 } 200 }
117#Coro::Event::do_timer(after => 60);#d#
118 201
119 $vc->snd("name"); 202 $vc->snd("setname", $MYNAME); $OTHERNAME = $vc->rcv;
120 $OTHERNAME = $vc->rcv;
121 203
122 if ($SLAVE) { 204 if ($SLAVE) {
123 # 205 #
124 } else { 206 } else {
125 $vc->snd("list"); 207 $vc->snd("list");
145 my $name = $vc->rcv; 227 my $name = $vc->rcv;
146 my $folder = new folder name => $name; 228 my $folder = new folder name => $name;
147 229
148 slog 8, "serving folder $name\n"; 230 slog 8, "serving folder $name\n";
149 231
150 $folder->check;
151 $folder->read_mdif; 232 $folder->read_mdif;
152 233
153 while (my $msg = $vc->rcv) { 234 while (my $msg = $vc->rcv) {
154 if ($msg eq "mtime") { 235 if ($msg eq "mtime") {
155 $vc->snd($folder->{mtime}); 236 $vc->snd($folder->{mtime});
156 } elsif ($msg =~ /^update (\d+) (\d+)$/) { 237 } elsif ($msg eq "inventory") {
157 #if ($folder[$1]->{host}{$OTHERNAME} != $2) { 238 $vc->snd($folder->inventory);
158 # $folder[$1]->{host}{$OTHERNAME} = $2; 239 } elsif ($msg eq "diff") {
159 # $folder[$1]->dirty; 240 my $time = $vc->rcv;
241 for (@{$folder->{diff}}) {
242 next if $_->[0] <= $time;
243 $vc->snd($_->[0],
244 (join "\0", @{$_->[1]}),
245 (join "\0", @{$_->[2]}),
246 );
160 #} 247 }
161 #reply $id, 200, "ok"; 248 $vc->snd(-1);
249 } elsif ($msg eq "begin") {
250 $folder->begin_update;
251 } elsif ($msg eq "delete") {
252 $folder->delete(split /\0/, $vc->rcv);
253 } elsif ($msg eq "offer") {
254 my $ovc = catch vc port => $vc->rcv;
255 async {
256 my @offer;
257 {
258 my @dup;
259
260 $ovc->snd("-"); # synchronize
261 for (split /\0/, $ovc->rcv) {
262 if ($folder->exists($_)) {
263 push @dup, $_;
264 } else {
265 push @offer, $_;
266 }
267 }
268
269 $ovc->snd(join "\0", @dup);
270 }
271
272 # now we'll get everything in @offer, in order
273 $folder->append($_, $ovc->rcv) for @offer;
274 $ovc->close;
275 };
276 } elsif ($msg eq "send") {
277 $vc->pri(1);
278 $vc->snd($folder->fetch($_)) for split /\0/, $vc->rcv;
279 $vc->rcv; # sync
280 $vc->pri(0);
281 } elsif ($msg eq "end") {
282 $folder->end_update;
283 } elsif ($msg eq "mtime") {
284 $vc->snd($folder->{mtime});
285 } elsif ($msg eq "setctime") {
286 $folder->{host}{$OTHERNAME} = $vc->rcv;
287 } elsif ($msg eq "close") {
288 $folder->close;
162 } else { 289 } else {
163 die "protocol error, unknown folder command ($msg)\n"; 290 die "protocol error, unknown folder command ($msg)\n";
164 } 291 }
165 } 292 }
166} 293}
169 my $vc = shift; 296 my $vc = shift;
170 297
171 slog 8, "new connection $vc->{port}\n"; 298 slog 8, "new connection $vc->{port}\n";
172 299
173 while (my $msg = $vc->rcv) { 300 while (my $msg = $vc->rcv) {
174 if ($msg eq "name") { 301 if ($msg eq "setname") {
175 $vc->snd($::MYNAME); 302 $vc->snd($MYNAME);
303 $OTHERNAME = $vc->rcv;
176 } elsif ($msg eq "pri") { 304 } elsif ($msg eq "pri") {
177 $self->{pri} = $vc->rcv; 305 $vc->{pri} = $vc->rcv;
178 } elsif ($msg eq "time") { 306 } elsif ($msg eq "time") {
179 $vc->snd(time); 307 $vc->snd(time);
180 } elsif ($msg eq "list") { 308 } elsif ($msg eq "list") {
181 $vc->snd(join "\0", find_folders); 309 $vc->snd(join "\0", find_folders);
182 } elsif ($msg eq "open") { 310 } elsif ($msg eq "open") {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines