#!/usr/bin/perl use Coro; use Coro::Handle; use Coro::Event; use Coro::Semaphore; use Coro::Channel; use Coro::Signal; use Coro::RWLock; use Set::Scalar; use Fcntl; our $VERSION = 0.1; our $NOW = time; require "config.pl"; use folder; use vc; $v = $VERBOSE; # WARNING: # Content-Length headers are deliberately being ignored. They # are broken by design and will never be supported # TODO: real message-id parsing $|=1; my $ecnt; sub slog { if ($_[0] <= $v) { print STDERR "[$SLAVE] ", $_[1]; } } $lockdisk = new Coro::Semaphore; # give up a/the timeslice sub give { Coro::Event::do_timer(after => 0); } my $quit = new Coro::RWLock; sub rwlock::DESTROY { $quit->unlock; } sub quit_guard { $quit->rdlock; bless [], rwlock; } sub find_folders { my @folders; opendir my $dh, $PREFIX or die "$PREFIX: $!"; while (defined (my $folder = readdir $dh)) { next if $folder =~ /^\./; next if $folder =~ /~$/; next unless -f "$PREFIX/$folder"; push @folders, $folder; } @folders; } my $sync_folder = new Coro::Semaphore 3; # max 3 folders in parallel sub sync_offer { my ($folder, $avc, $diff) = @_; my $vc = create vc pri => 1; $avc->snd("offer", $vc->{port}); async { $vc->rcv; # need to synchronize, argl. should open on other side $vc->snd(join "\0", @$diff); my %dup; @dup{split /\0/, $vc->rcv} = (); for (@$diff) { $vc->snd($folder->fetch($_)) unless exists $dup{$_}; } defined $vc->rcv and die "protocol error, expected close"; }; } sub sync_folder { my $name = $_[0]; my $quit_guard = quit_guard; async { my $guard = $sync_folder->guard; my $vc = create vc; my $folder = new folder name => $name; $vc->snd("open", $name); $vc->snd("mtime"); $folder->read_mdif; my $ctime = $folder->{host}{$OTHERNAME} || -1; my $rtime = $vc->rcv; $vc->snd("diff", $ctime); my %diff; # # 00 - local del # 01 - local add # 10 - remote del # 11 - remote add my @diff = grep { $_->[0] > $ctime } @{$folder->{diff}}; $diff = $vc->rcv; while () { if ($diff >= 0 and (!@diff or $diff < $diff[0][0])) { $diff{$_} = 0b01 for split /\0/, $vc->rcv; # add $diff{$_} = 0b00 for split /\0/, $vc->rcv; # del $diff = $vc->rcv; } elsif (@diff) { $diff{$_} = 0b11 for @{$diff[0][1]}; $diff{$_} = 0b10 for @{$diff[0][2]}; shift @diff; } else { last; } } # append or update, depending on wether there are messages to be deleted $vc->snd("begin"); $folder->begin_update; while (my ($k,$v) = each %diff) { push @{$diff[$v]}, $k; slog 0, "DIFF $k : $v\n";#d# } $vc->snd("delete", join "\0", @{$diff[2]}); $folder->delete(@{$diff[0]}); # offer ours my $offer_coro = sync_offer($folder, $vc, $diff[3]); # request theirs { my @send = grep { !$folder->exists($_) } @{$diff[1]}; $vc->snd("send", join "\0", @send); $folder->append($_, $vc->rcv) for @send; $vc->snd("-"); # sync } slog 0, "waiting...\n";#d# $offer_coro->join; # sanity check $vc->snd("inventory"); if ($folder->inventory ne $vc->rcv) { $folder->write_mdif; slog 0, "FATAL: folder inventory mismatch after update\n"; } $vc->snd("end"); $folder->end_update; $vc->snd("setctime", $folder->{ctime}); $vc->snd("mtime"); $folder->{host}{$OTHERNAME} = $vc->rcv; $vc->snd("close"); $folder->close; undef $quit_guard; } } sub main { my $vc = create vc; # time checking done symmetrically { my $time = time; $vc->snd("time"); my $othertime = $vc->rcv; abs (($time + time)*0.5 - $othertime) <= $::MAXTIMEDIFF or die "ERROR: time difference between hosts larger than $::MAXTIMEDIFF"; } $vc->snd("setname", $MYNAME); $OTHERNAME = $vc->rcv; if ($SLAVE) { # } else { $vc->snd("list"); for (split /\0/, $vc->rcv) { if (!-e "$PREFIX/$_") { slog 2, "created new empty folder $_\n"; sysopen my $fh, "$PREFIX/$_", O_RDWR|O_CREAT, 0666; } } for my $folder (find_folders) { sync_folder $folder; } $quit->wrlock; vc::snd_quit; } } sub serve_folder { my $vc = shift; my $name = $vc->rcv; my $folder = new folder name => $name; slog 8, "serving folder $name\n"; $folder->read_mdif; while (my $msg = $vc->rcv) { if ($msg eq "mtime") { $vc->snd($folder->{mtime}); } elsif ($msg eq "inventory") { $vc->snd($folder->inventory); } elsif ($msg eq "diff") { my $time = $vc->rcv; for (@{$folder->{diff}}) { next if $_->[0] <= $time; $vc->snd($_->[0], (join "\0", @{$_->[1]}), (join "\0", @{$_->[2]}), ); } $vc->snd(-1); } elsif ($msg eq "begin") { $folder->begin_update; } elsif ($msg eq "delete") { $folder->delete(split /\0/, $vc->rcv); } elsif ($msg eq "offer") { my $ovc = catch vc port => $vc->rcv; async { my @offer; { my @dup; $ovc->snd("-"); # synchronize for (split /\0/, $ovc->rcv) { if ($folder->exists($_)) { push @dup, $_; } else { push @offer, $_; } } $ovc->snd(join "\0", @dup); } # now we'll get everything in @offer, in order $folder->append($_, $ovc->rcv) for @offer; $ovc->close; }; } elsif ($msg eq "send") { $vc->pri(1); $vc->snd($folder->fetch($_)) for split /\0/, $vc->rcv; $vc->rcv; # sync $vc->pri(0); } elsif ($msg eq "end") { $folder->end_update; } elsif ($msg eq "mtime") { $vc->snd($folder->{mtime}); } elsif ($msg eq "setctime") { $folder->{host}{$OTHERNAME} = $vc->rcv; } elsif ($msg eq "close") { $folder->close; } else { die "protocol error, unknown folder command ($msg)\n"; } } } sub serve { my $vc = shift; slog 8, "new connection $vc->{port}\n"; while (my $msg = $vc->rcv) { if ($msg eq "setname") { $vc->snd($MYNAME); $OTHERNAME = $vc->rcv; } elsif ($msg eq "pri") { $vc->{pri} = $vc->rcv; } elsif ($msg eq "time") { $vc->snd(time); } elsif ($msg eq "list") { $vc->snd(join "\0", find_folders); } elsif ($msg eq "open") { serve_folder($vc); } else { die "protocol error, unknown command ($msg)\n"; } } } if ($SLAVE) { $HOSTID = "slave"; async \&main; vc::multiplex unblock \*STDIN, $SLAVE; } else { $HOSTID = "master"; { use Socket; socketpair S1, S2, AF_UNIX, SOCK_STREAM, PF_UNSPEC; if (fork == 0) { close S1; open STDIN, "<&S2" or die; open STDOUT, ">&S2" or die; exec $0, "--slave"; exit 255; } close S2; async \&main; vc::multiplex unblock \*S1, $SLAVE; } }