#!/usr/bin/perl use Coro; use Coro::Handle; use Coro::Event; use Coro::Semaphore; use Coro::Channel; use Coro::Signal; use Coro::RWLock; use Set::Scalar; use Fcntl; use constant PROTVERSION => 1; $VERSION = 0.1; require "config.pl"; use folder; use vc; $v = $VERBOSE; # WARNING: # Content-Length headers are deliberately being ignored. They # are broken by design and will never be supported # TODO: real message-id parsing $|=1; my $ecnt; sub slog { if ($_[0] <= $v) { print STDERR "[$SLAVE] ", $_[1]; } } $lockdisk = new Coro::Semaphore; # give up a/the timeslice sub give { Coro::Event::do_timer(after => 0); } my $quit = new Coro::RWLock; sub rwlock::DESTROY { $quit->unlock; } sub quit_guard { $quit->rdlock; bless [], rwlock; } sub find_folders { my @folders; opendir my $dh, $PREFIX or die "$PREFIX: $!"; while (defined (my $folder = readdir $dh)) { next if $folder =~ /^\./; next unless -f "$PREFIX/$folder"; push @folders, $folder; } @folders; } my $sync_folder = new Coro::Semaphore 3; # max 3 folders in parallel sub sync_folder { my $name = $_[0]; my $quit_guard = quit_guard; async { my $guard = $sync_folder->guard; my $vc = new vc; my $folder = new folder name => $name; $vc->snd("open", $name); $vc->snd("mtime"); $folder->check; $folder->read_mdif; my $ctime = $folder->{host}{$OTHERNAME} || -1; my $rtime = $vc->rcv; $vc->snd("diff", $ctime); my (%ladd, %ldel, %radd, %rdel); my @diff = grep { $_->[0] > $ctime } @{$folder->{diff}}; $diff = $vc->rcv; while () { if ($diff >= 0 and (!@diff or $diff <= $diff[0][0])) { my @add = split /\0/, $vc->rcv; my @del = split /\0/, $vc->rcv; $diff = $vc->rcv; slog 0, "applying remote diff $diff\n"; for (@del) { undef $rdel{$_}; delete $radd{$_}; delete $ladd{$_}; } for (@add) { undef $radd{$_}; delete $rdel{$_}; delete $ldel{$_}; } } elsif (@diff) { slog 0, "applying local diff $diff[0][0]\n"; for (@{$diff[0][2]}) { undef $rdel{$_}; delete $radd{$_}; delete $ladd{$_}; } for (@{$diff[0][1]}) { undef $radd{$_}; delete $rdel{$_}; delete $ldel{$_}; } shift @diff; } else { slog 0, "no more diffing\n"; last; } } slog 0, "LADD ".join(" ", keys %ladd)."\n"; slog 0, "LDEL ".join(" ", keys %ldel)."\n"; slog 0, "RADD ".join(" ", keys %radd)."\n"; slog 0, "RDEL ".join(" ", keys %rdel)."\n"; #$folder->{host}{$OTHERNAME} = time; #$vc->snd("setctime", time); undef $quit_guard; } } sub main { my $vc = new vc; # time checking done symmetrically { my $time = time; $vc->snd("time"); my $othertime = $vc->rcv; abs (($time + time)*0.5 - $othertime) <= $::MAXTIMEDIFF or die "ERROR: time difference between hosts larger than $::MAXTIMEDIFF"; } #Coro::Event::do_timer(after => 60);#d# $vc->snd("name"); $OTHERNAME = $vc->rcv; if ($SLAVE) { # } else { $vc->snd("list"); for (split /\0/, $vc->rcv) { if (!-e "$PREFIX/$_") { slog 2, "created new empty folder $_\n"; sysopen my $fh, "$PREFIX/$_", O_RDWR|O_CREAT, 0666; } } for my $folder (find_folders) { sync_folder $folder; } $quit->wrlock; vc::snd_quit; } } sub serve_folder { my $vc = shift; my $name = $vc->rcv; my $folder = new folder name => $name; slog 8, "serving folder $name\n"; $folder->check; $folder->read_mdif; while (my $msg = $vc->rcv) { if ($msg eq "mtime") { $vc->snd($folder->{mtime}); } elsif ($msg eq "diff") { my $time = $vc->rcv; for (@{$folder->{diff}}) { next if $_->[0] <= $time; $vc->snd($_->[0], (join "\0", @{$_->[1]}), (join "\0", @{$_->[2]}), ); } $vc->snd(-1); } elsif ($msg eq "setctime") { $folder->{host}{$OTHERNAME} = $vc->rcv; } else { die "protocol error, unknown folder command ($msg)\n"; } } } sub serve { my $vc = shift; slog 8, "new connection $vc->{port}\n"; while (my $msg = $vc->rcv) { if ($msg eq "name") { $vc->snd($::MYNAME); } elsif ($msg eq "pri") { $self->{pri} = $vc->rcv; } elsif ($msg eq "time") { $vc->snd(time); } elsif ($msg eq "list") { $vc->snd(join "\0", find_folders); } elsif ($msg eq "open") { serve_folder($vc); } else { die "protocol error, unknown command ($msg)\n"; } } } if ($SLAVE) { $HOSTID = "slave"; async \&main; vc::multiplex unblock \*STDIN, $SLAVE; } else { $HOSTID = "master"; { use Socket; socketpair S1, S2, AF_UNIX, SOCK_STREAM, PF_UNSPEC; if (fork == 0) { close S1; open STDIN, "<&S2" or die; open STDOUT, ">&S2" or die; exec $0, "--slave"; exit 255; } close S2; async \&main; vc::multiplex unblock \*S1, $SLAVE; } }