#!/usr/bin/perl use Coro; use Coro::Handle; use Coro::Event; use Coro::Semaphore; use Coro::Channel; use Coro::Signal; use Coro::RWLock; use Set::Scalar; use Fcntl; our $VERSION = 0.1; our $NOW = time; require "config.pl"; use folder; use vc; $v = $VERBOSE; # WARNING: # Content-Length headers are deliberately being ignored. They # are broken by design and will never be supported # TODO: real message-id parsing $|=1; my $ecnt; sub slog { if ($_[0] <= $v) { print STDERR "[$SLAVE] ", $_[1]; } } $lockdisk = new Coro::Semaphore; # give up a/the timeslice sub give { Coro::Event::do_timer(after => 0); } my $quit = new Coro::RWLock; sub rwlock::DESTROY { $quit->unlock; } sub quit_guard { $quit->rdlock; bless [], rwlock; } sub find_folders { my @folders; opendir my $dh, $PREFIX or die "$PREFIX: $!"; while (defined (my $folder = readdir $dh)) { next if $folder =~ /^\./; next if $folder =~ /~$/; next unless -f "$PREFIX/$folder"; push @folders, $folder; } @folders; } my $sync_folder = new Coro::Semaphore 3; # max 3 folders in parallel sub sync_folder { my $name = $_[0]; my $quit_guard = quit_guard; async { my $guard = $sync_folder->guard; my $vc = create vc; my $folder = new folder name => $name; $vc->snd("open", $name); $vc->snd("mtime"); $folder->check; $folder->read_mdif; my $ctime = $folder->{host}{$OTHERNAME} || -1; my $rtime = $vc->rcv; $vc->snd("diff", $ctime); my %diff; # # 00 - local del # 01 - local add # 10 - remote del # 11 - remote add my @diff = grep { $_->[0] > $ctime } @{$folder->{diff}}; $diff = $vc->rcv; while () { if ($diff >= 0 and (!@diff or $diff <= $diff[0][0])) { slog 0, "applying remote diff $diff (@add, @del)\n"; $diff{$_} = 0b01 for split /\0/, $vc->rcv; # add $diff{$_} = 0b00 for split /\0/, $vc->rcv; # del $diff = $vc->rcv; } elsif (@diff) { slog 0, "applying local diff $diff[0][0]\n"; $diff{$_} = 0b11 for @{$diff[0][1]}; $diff{$_} = 0b10 for @{$diff[0][2]}; shift @diff; } else { slog 0, "no more diffing\n"; last; } } # append or update, depending on wether there are messages to be deleted $vc->snd("begin"); $folder->begin_update; while (my ($k,$v) = each %diff) { push @{$diff[$v]}, $k; slog 0, "DIFF $k : $v\n";#d# } $vc->snd("delete", join "\0", @{$diff[2]}); $folder->delete(@{$diff[0]}); $vc->snd("end"); $folder->end_update; $vc->snd("ctime", $folder->{ctime}); $folder->{host}{$OTHERNAME} = $vc->rcv; # sanity check $vc->snd("inventory"); if ($folder->inventory ne $vc->rcv) { slog 0, "FATAL: folder inventory mismatch after update"; die; } slog 0, "LINV ".$folder->inventory."\n"; slog 0, "RINV ".$vc->rcv."\n"; undef $quit_guard; } } sub main { my $vc = create vc; # time checking done symmetrically { my $time = time; $vc->snd("time"); my $othertime = $vc->rcv; abs (($time + time)*0.5 - $othertime) <= $::MAXTIMEDIFF or die "ERROR: time difference between hosts larger than $::MAXTIMEDIFF"; } #Coro::Event::do_timer(after => 60);#d# $vc->snd("setname", $MYNAME); $OTHERNAME = $vc->rcv; if ($SLAVE) { # } else { $vc->snd("list"); for (split /\0/, $vc->rcv) { if (!-e "$PREFIX/$_") { slog 2, "created new empty folder $_\n"; sysopen my $fh, "$PREFIX/$_", O_RDWR|O_CREAT, 0666; } } for my $folder (find_folders) { sync_folder $folder; } $quit->wrlock; vc::snd_quit; } } sub serve_folder { my $vc = shift; my $name = $vc->rcv; my $folder = new folder name => $name; slog 8, "serving folder $name\n"; $folder->check; $folder->read_mdif; while (my $msg = $vc->rcv) { if ($msg eq "mtime") { $vc->snd($folder->{mtime}); } elsif ($msg eq "inventory") { $vc->snd($folder->inventory); } elsif ($msg eq "diff") { my $time = $vc->rcv; for (@{$folder->{diff}}) { next if $_->[0] <= $time; $vc->snd($_->[0], (join "\0", @{$_->[1]}), (join "\0", @{$_->[2]}), ); } $vc->snd(-1); } elsif ($msg eq "begin") { $folder->begin_update; } elsif ($msg eq "end") { $folder->end_update; } elsif ($msg eq "mtime") { $vc->snd($folder->{mtime}); } elsif ($msg eq "setctime") { $folder->{host}{$OTHERNAME} = $vc->rcv; } else { die "protocol error, unknown folder command ($msg)\n"; } } } sub serve { my $vc = shift; slog 8, "new connection $vc->{port}\n"; while (my $msg = $vc->rcv) { if ($msg eq "setname") { $vc->snd($MYNAME); $OTHERNAME = $vc->rcv; } elsif ($msg eq "pri") { $vc->{pri} = $vc->rcv; } elsif ($msg eq "time") { $vc->snd(time); } elsif ($msg eq "list") { $vc->snd(join "\0", find_folders); } elsif ($msg eq "open") { serve_folder($vc); } else { die "protocol error, unknown command ($msg)\n"; } } } if ($SLAVE) { $HOSTID = "slave"; async \&main; vc::multiplex unblock \*STDIN, $SLAVE; } else { $HOSTID = "master"; { use Socket; socketpair S1, S2, AF_UNIX, SOCK_STREAM, PF_UNSPEC; if (fork == 0) { close S1; open STDIN, "<&S2" or die; open STDOUT, ">&S2" or die; exec $0, "--slave"; exit 255; } close S2; async \&main; vc::multiplex unblock \*S1, $SLAVE; } }