--- syncmail/syncmail 2001/10/28 20:52:25 1.6 +++ syncmail/syncmail 2001/10/29 00:37:41 1.7 @@ -75,6 +75,25 @@ my $sync_folder = new Coro::Semaphore 3; # max 3 folders in parallel +sub sync_offer { + my ($folder, $avc, $diff) = @_; + + my $vc = create vc pri => 1; + $avc->snd("offer", $vc->{port}); + + async { + $vc->rcv; # need to synchronize, argl. should open on other side + $vc->snd(join "\0", @$diff); + my %dup; @dup{split /\0/, $vc->rcv} = (); + + for (@$diff) { + $vc->snd($folder->fetch($_)) unless exists $dup{$_}; + } + + defined $vc->rcv and die "protocol error, expected close"; + }; +} + sub sync_folder { my $name = $_[0]; @@ -88,7 +107,6 @@ $vc->snd("open", $name); $vc->snd("mtime"); - $folder->check; $folder->read_mdif; my $ctime = $folder->{host}{$OTHERNAME} || -1; @@ -103,23 +121,20 @@ # 11 - remote add my @diff = grep { $_->[0] > $ctime } @{$folder->{diff}}; - $diff = $vc->rcv; + $diff = $vc->rcv; while () { - if ($diff >= 0 and (!@diff or $diff <= $diff[0][0])) { - slog 0, "applying remote diff $diff (@add, @del)\n"; + if ($diff >= 0 and (!@diff or $diff < $diff[0][0])) { $diff{$_} = 0b01 for split /\0/, $vc->rcv; # add $diff{$_} = 0b00 for split /\0/, $vc->rcv; # del $diff = $vc->rcv; } elsif (@diff) { - slog 0, "applying local diff $diff[0][0]\n"; $diff{$_} = 0b11 for @{$diff[0][1]}; $diff{$_} = 0b10 for @{$diff[0][2]}; shift @diff; } else { - slog 0, "no more diffing\n"; last; } } @@ -136,20 +151,37 @@ $vc->snd("delete", join "\0", @{$diff[2]}); $folder->delete(@{$diff[0]}); - $vc->snd("end"); - $folder->end_update; - - $vc->snd("ctime", $folder->{ctime}); - $folder->{host}{$OTHERNAME} = $vc->rcv; + # offer ours + my $offer_coro = sync_offer($folder, $vc, $diff[3]); + + # request theirs + { + my @send = grep { !$folder->exists($_) } @{$diff[1]}; + $vc->snd("send", join "\0", @send); + $folder->append($_, $vc->rcv) for @send; + $vc->snd("-"); # sync + } + + slog 0, "waiting...\n";#d# + $offer_coro->join; # sanity check $vc->snd("inventory"); if ($folder->inventory ne $vc->rcv) { - slog 0, "FATAL: folder inventory mismatch after update"; - die; + $folder->write_mdif; + slog 0, "FATAL: folder inventory mismatch after update\n"; } - slog 0, "LINV ".$folder->inventory."\n"; - slog 0, "RINV ".$vc->rcv."\n"; + + $vc->snd("end"); + $folder->end_update; + + $vc->snd("setctime", $folder->{ctime}); + + $vc->snd("mtime"); + $folder->{host}{$OTHERNAME} = $vc->rcv; + + $vc->snd("close"); + $folder->close; undef $quit_guard; } @@ -166,7 +198,6 @@ abs (($time + time)*0.5 - $othertime) <= $::MAXTIMEDIFF or die "ERROR: time difference between hosts larger than $::MAXTIMEDIFF"; } -#Coro::Event::do_timer(after => 60);#d# $vc->snd("setname", $MYNAME); $OTHERNAME = $vc->rcv; @@ -198,7 +229,6 @@ slog 8, "serving folder $name\n"; - $folder->check; $folder->read_mdif; while (my $msg = $vc->rcv) { @@ -218,12 +248,44 @@ $vc->snd(-1); } elsif ($msg eq "begin") { $folder->begin_update; + } elsif ($msg eq "delete") { + $folder->delete(split /\0/, $vc->rcv); + } elsif ($msg eq "offer") { + my $ovc = catch vc port => $vc->rcv; + async { + my @offer; + { + my @dup; + + $ovc->snd("-"); # synchronize + for (split /\0/, $ovc->rcv) { + if ($folder->exists($_)) { + push @dup, $_; + } else { + push @offer, $_; + } + } + + $ovc->snd(join "\0", @dup); + } + + # now we'll get everything in @offer, in order + $folder->append($_, $ovc->rcv) for @offer; + $ovc->close; + }; + } elsif ($msg eq "send") { + $vc->pri(1); + $vc->snd($folder->fetch($_)) for split /\0/, $vc->rcv; + $vc->rcv; # sync + $vc->pri(0); } elsif ($msg eq "end") { $folder->end_update; } elsif ($msg eq "mtime") { $vc->snd($folder->{mtime}); } elsif ($msg eq "setctime") { $folder->{host}{$OTHERNAME} = $vc->rcv; + } elsif ($msg eq "close") { + $folder->close; } else { die "protocol error, unknown folder command ($msg)\n"; }