--- syncmail/syncmail 2001/10/23 23:52:40 1.1 +++ syncmail/syncmail 2001/10/27 03:40:29 1.2 @@ -6,12 +6,23 @@ use Coro::Semaphore; use Coro::Channel; use Coro::Signal; +use Coro::RWLock; +use Fcntl; use MD5; -use constant VERSION => 1; +use constant PROTVERSION => 1; + +$VERSION = 0.1; + +$TIMEOUT = 20; +$IDLE = $TIMEOUT < 30 ? $TIMEOUT / 2 : $TIMEOUT - 15; +$MAXTIMEDIFF = 10; + +# WARNING: +# Content-Length headers are deliberately being ignored. They +# are broken by design and will never be supported -# TODO: content-length support (HACK!) # TODO: real message-id parsing $|=1; @@ -20,19 +31,54 @@ $SLAVE = 1*($ARGV[0] eq "--slave"); +$MYNAME = $SLAVE ? "slave" : "master"; +$PREFIX = $SLAVE ? "./dst" : "./src"; + +@OTHERNAMES = qw(third); + my $ecnt; sub slog { if ($_[0] <= $v) { - print STDERR $_[1]; + print STDERR "[$SLAVE] ", $_[1]; } } -# parse_folder(mbox-file-path, callback) +my $lockdisk = new Coro::Semaphore; + +# give up a/the timeslice +sub give { + Coro::Event::do_timer(after => 0); +} + +package folder; + +BEGIN { *slog = \&::slog }; + +use constant MDIFVERSION => 1; + +sub new { + my $class = shift; + my %arg = @_; + bless { + path => "$::PREFIX/$arg{name}", + %arg, + }, $class; +} + +sub dirty { + $_[0]{dirty} = 1; +} + +sub DESTROY { + $_[0]->write_mdif; +} + +# parse_mbox(mbox-file-path, callback) # callback gets called with \$header and \$body, # $header includes the mbox From_ line without # the leading From_ itself. -sub parse_folder { +sub parse_mbox { my ($path, $cb) = @_; open my $fh, "<", $path @@ -64,18 +110,25 @@ chomp $body; $cb->($offs, \$head, \$body); $offs = (tell $fh) - 5; - cede unless ++$ecnt & 1023; + ::give unless ++$ecnt & 1023; } 1; } +sub conf_path { + (my $conf = $_[0]{path}) =~ s%([^/]+$)%.$1.mdif%; + $conf; +} + sub read_mdif { - my ($path) = @_; - my $fh; - my $mdif; + my $self = shift; + my $path = $self->conf_path; + + return if $self->{idx}; + open my $fh, "<", $path - or return { }; + or return; defined ($_ = <$fh>) or die "$path: empty mdif file\n"; @@ -84,28 +137,28 @@ if ($_ eq "[SYNCMAIL]\n") { while (<$fh>) { last unless /^([a-z]+)\s*=\s*(.*)\n$/; - $mdif->{$1} = $2; + $self->{$1} = $2; } } elsif ($_ eq "[HOSTS]\n") { while (<$fh>) { last unless /^([^[].*)=(.*)\n$/; - $mdif->{host}{$1} = $2; + $self->{host}{$1} = $2; } - } elsif (/^\[DIFF(\d+)\.(\d+)\]\n$/) { - my ($gen, $mtime) = ($1, $2); + } elsif (/^\[DIFF (\d+)\]\n$/) { + my $mtime = $1; my @dif; while (<$fh>) { last unless /^[+-]/; push @dif, substr $_, 0, -1; } - unshift @{$mdif->{diff}}, [$gen, $mtime, \@dif]; + unshift @{$self->{diff}}, [$mtime, \@dif]; } elsif ($_ eq "[INDEX]\n") { my @idx; while (<$fh>) { last unless /^(\d+)=(.*)\n$/; push @idx, [$1, $2]; } - $mdif->{idx} = \@idx; + $self->{idx} = \@idx; } elsif (/^#/) { $_ = <$fh>; # nop @@ -114,15 +167,15 @@ } } while defined $_; - $mdif->{version} <= VERSION - or die "$path: version mismatch ($mdif->{version} found, <".VERSION." expected)\n"; - - $mdif; + $self->{version} <= MDIFVERSION + or die "$path: version mismatch ($self->{version} found, <".MDIFVERSION." expected)\n"; } sub write_mdif { - my ($path, $mdif) = @_; - my $fh; + my $self = shift; + my $path = $self->conf_path; + + return unless $self->{dirty}; open my $fh, ">", "$path~" or die "$path~: $!"; @@ -130,23 +183,27 @@ print $fh "# automatically generated, do NOT edit\n"; print $fh "[SYNCMAIL]\n"; - print $fh "$_=$mdif->{$_}\n" for (qw(fsize mtime gen version)); + print $fh "$_=$self->{$_}\n" for (qw(fsize mtime version)); print $fh "[HOSTS]\n"; - print $fh "$k=$v\n" while my ($k,$v) = each %{$mdif->{host}}; + while (my ($k,$v) = each %{$self->{host}}) { + print $fh "$k=$v\n"; + } print $fh "[INDEX]\n"; - print $fh "$_->[0]=$_->[1]\n" for @{$mdif->{idx}}; + print $fh "$_->[0]=$_->[1]\n" for @{$self->{idx}}; - for (reverse @{$mdif->{diff}}) { - print $fh "[DIFF$_->[0].$_->[1]]\n"; - print $fh $_, "\n" for @{$_->[2]}; + for (reverse @{$self->{diff}}) { + print $fh "[DIFF $_->[0]]\n"; + print $fh $_, "\n" for @{$_->[1]}; } close $fh or die "$path~: unable to create updated .mdif: $!"; rename "$path~", $path; + + delete $self->{dirty}; } sub gendiff { @@ -174,18 +231,17 @@ \@d; } -my $check_folder = new Coro::Semaphore; +sub check { + my $self = shift; + my $path = $self->{path}; + my $conf = $self->conf_path; + my $guard = $lockdisk->guard; -sub check_folder { - my ($path) = @_; - my $guard = $check_folder->guard; - - (my $conf = $path) =~ s%([^/]+$)%.$1.mdif%; - - slog 1, "checking $path\n"; + slog 3, "checking $path\n"; if (stat $path) { my ($fsize, $mtime) = (stat _)[7, 9]; + if (open my $fh, "<", $conf) { my %conf; <$fh>; # skip initial comment @@ -194,15 +250,18 @@ while (<$fh> =~ /^([a-z]+)\s*=\s*(.*)$/) { $conf{$1} = $2; } - return (1, \%conf) if $fsize == $conf{fsize} - && $mtime == $conf{mtime}; + return 1 if $fsize == $conf{fsize} + && $mtime == $conf{mtime}; + + $conf{mtime} <= $mtime + or die "$path: folder older than mdif"; } slog 2, "updating $path\n"; my @idx; - parse_folder $path, sub { + parse_mbox $path, sub { my ($offs, $head, $body) = @_; my $mid; if ($$head =~ /^Message-Id:\s*(<[^<\n]+>)\s*\n/im) { @@ -213,28 +272,26 @@ push @idx, [$offs, $mid]; } or return (); - my $mdif = read_mdif $conf; + $self->read_mdif; - if ($mdif->{version}) { - my $d = gendiff $mdif->{idx}, \@idx; - push @{$mdif->{diff}}, [ - $mdif->{gen}++, - $mdif->{mtime}, + if ($self->{version}) { + my $d = gendiff $self->{idx}, \@idx; + push @{$self->{diff}}, [ + $self->{mtime}, $d, ] if @$d; } else { - slog 2, "$path: new folder\n"; - $mdif->{version} ||= VERSION; - $mdif->{gen} = 1; + slog 2, "$path: previously unknown folder\n"; + $self->{version} ||= MDIFVERSION; } - $mdif->{fsize} = $fsize; - $mdif->{mtime} = $mtime; - $mdif->{idx} = \@idx; + $self->{fsize} = $fsize; + $self->{mtime} = $mtime; + $self->{idx} = \@idx; - write_mdif $conf, $mdif; + $self->dirty; - return (2, $mdif); + return 2; } else { slog 2, "$path: no longer exists\n"; unlink $conf; @@ -243,6 +300,34 @@ } } +package main; + +my $quit = new Coro::RWLock; + +sub rwlock::DESTROY { + $quit->unlock; +} + +sub quit_guard { + $quit->rdlock; + bless [], rwlock; +} + +sub find_folders { + my @folders; + + opendir my $dh, $PREFIX + or die "$PREFIX: $!"; + + while (defined (my $folder = readdir $dh)) { + next if $folder =~ /^\./; + next unless -f "$PREFIX/$folder"; + push @folders, $folder; + } + + @folders; +} + my $send = new Coro::Channel 10; my $done = 0; @@ -250,7 +335,8 @@ sub request { my $res; my $signal = new Coro::Signal; - my $cmd = defined $_[1] ? "000+".length($_[1])." $_[0]\n$_[1]" : "000 $_[0]\n"; + my $cmd = defined $_[1] ? "000+".length($_[1])." $_[0]\n$_[1]" + : "000 $_[0]\n"; $send->put([$signal, \$res, $cmd]); $signal->wait; $res; @@ -258,100 +344,170 @@ # reply $id, $code, $msg, $data sub reply { - my $cmd = defined $_[3] ? "$_[1]+".length($_[3])." $_[2]\n$_[3]" : "$_[1] $_[2]\n"; + my $cmd = defined $_[3] ? "$_[1]+".length($_[3])." $_[2]\n$_[3]" + : "$_[1] $_[2]\n"; $send->put([undef, undef, "$_[0] $cmd"]); } sub handle_commands { my ($fh) = @_; - my $id = "a"; + + # periodically send nop as keepalive signal to avoid the + # rsync-on-slow-disk-timeout problem. + my $nopper = Event->timer(parked => 1, cb => sub { request "nop" }); + + my @folder; + async { - $fh->print("- 000 hello $HOSTID\n"); + my $id = "a"; + $fh->print("\@SYNCMAIL VERSION $VERSION PROTOCOL ".PROTVERSION."\n"); while (my $r = $send->get) { if (defined $r->[1]) { my $id = ++$id; $request{$id} = $r; - print STDERR "<<< $SLAVE sendign request $id:$r->[2]";#d# + print STDERR "<<< $SLAVE sending request $id:$r->[2]";#d# $fh->print("$id $r->[2]"); } else { - print STDERR "<<< $SLAVE sendign reply $r->[2]";#d# + print STDERR "<<< $SLAVE sending reply $r->[2]";#d# $fh->print($r->[2]); } + $nopper->at(time+$::IDLE); $nopper->start; } - print STDERR "$SLAVE shutdown\n";#d# - shutdown $fh, 1; + $fh->print("- 000 quit\n"); }; - while (<$fh>) { - slog 0, ">>> $SLAVE received :$_"; - /^(\S+) (\d\d\d)(?:\+(\d+))?\s*(.*)$/ - or die "protocol error, garbled command ($_)"; - - my ($id, $code, $dlen, $msg) = ($1, $2, $3, $4); - my $data; - - $fh->sysread($data, $dlen) == $dlen - or die "unexpected read error: $!"; - - if ($code == 0) { - if ($msg eq "quit") { - print $fh "$id 200 quit\n"; - $send->put(undef); - last; - } elsif ($msg eq "nop") { - reply $id, 200, "nop"; - } elsif ($msg =~ /^hello (.*)$/) { - $OTHERID = $1; - slog 3, "otherid set to $OTHERID\n"; - } else { - die "protocol error, unknown command ($_)\n"; + + async { + eval { + $fh->timeout($::TIMEOUT); + + $_ = <$fh>; + $_ eq "\@SYNCMAIL VERSION $VERSION PROTOCOL ".PROTVERSION."\n" + or die "garbled input or version mismatch: $_"; + + while (<$fh>) { + slog 0, ">>> $SLAVE received :$_"; + /^(\S+) (\d\d\d)(?:\+(\d+))?\s*(.*)$/ + or die "protocol error, garbled command ($_)"; + + my ($id, $code, $dlen, $msg) = ($1, $2, $3, $4); + my $data; + + $fh->sysread($data, $dlen) == $dlen + or die "unexpected read error: $!"; + + if ($code == 0) { + if ($msg eq "quit") { + $send->put(undef); + last; + } elsif ($msg eq "nop") { + reply $id, 200, "nop"; + } elsif ($msg eq "myname") { + $OTHERNAME = $data; + slog 3, "otherid set to $OTHERNAME\n"; + reply $id, 200, "myname", $MYNAME; + } elsif ($msg eq "timestamp") { + reply $id, 200, time; + } elsif ($msg eq "folders") { + reply $id, 200, "ok", join "\0", find_folders; + } elsif ($msg eq "open") { + async { + my $folder = new folder name => $data; + $folder->check; + $folder->read_mdif; + push @folder, $folder; + reply $id, 200, "$#folder $folder->{mtime}"; + }; + } elsif ($msg =~ /^update (\d+) (\d+)$/) { + if ($folder[$1]->{host}{$OTHERNAME} != $2) { + $folder[$1]->{host}{$OTHERNAME} = $2; + $folder[$1]->dirty; + } + reply $id, 200, "ok"; + } elsif ($msg =~ /^close (\d+)$/) { + undef $folder[$1]; + reply $id, 200, "ok"; + } else { + chomp; + die "protocol error, unknown command ($_)\n"; + } + } else { + my $r = delete $request{$id} + or die "protocol error, invalid reply id ($_)\n"; + + ${$r->[1]} = [$code, $msg, $data]; + $r->[0]->send; + } } - } else { - my $r = delete $request{$id} - or die "protocol error, invalid reply id ($_)\n"; + }; - ${$r->[1]} = [$code, $msg, $data]; - $r->[0]->send; - } + slog 0, "ERROR: $@" if $@; + print STDERR "unlooping\n";#d# + unloop; + }; - if ($done && !%request) { - $done = 0; - async { - request("quit"); - exit 0; - }; - } - } + loop; + print STDERR "$SLAVE hiho\n";#d# +} - exit 0; +my $sync_folder = new Coro::Semaphore 3; # max 3 folders in parallel + +sub sync_folder { + my $name = $_[0]; + + my $quit_guard = quit_guard; + async { + my $guard = $sync_folder->guard; + + ::give; + + my $folder = new folder name => $name; + my ($rfid, $rmtime) = split /\s+/, (request open => $name)->[1]; + + $folder->check; + $folder->read_mdif; + my $mtime = $folder->{host}{$OTHERNAME}; + + $folder->{host}{$OTHERNAME} = $folder->{mtime}; + + request "update $rfid $folder->{mtime}"; + request "close $rfid"; + undef $quit_guard; + } } -sub sync_dir { - my ($path) = @_; +sub main { + # time checking done symmetrically + { + my $time = time; + my $othertime = (request "timestamp")->[1]; + abs (($time + time)*0.5 - $othertime) <= $::MAXTIMEDIFF + or die "ERROR: time difference between hosts larger than $::MAXTIMEDIFF"; + } - opendir my $dh, $path - or die "$path: $!"; + $OTHERNAME = (request "myname", $MYNAME)->[2]; + if ($SLAVE) { + # + } else { + for (split /\0/, (request "folders")->[2]) { + if (!-e "$PREFIX/$_") { + slog 2, "created new empty folder $_\n"; + sysopen my $fh, "$PREFIX/$_", O_RDWR|O_CREAT, 0666; + } + } - while (defined (my $folder = readdir $dh)) { - next if $folder =~ /^\./; - my $path = "$path/$folder"; - next unless -f $path; - my ($status, $mdif) = check_folder $path; - print STDERR "$path: $status | $mdif\n";#d# - if ($SLAVE) { - request "fhave", $folder; - } else { - request "fwant", $folder; + for my $folder (find_folders) { + sync_folder $folder; } - } - $done = 1; - my $x = request("nop"); - print STDERR "$SLAVE returned nop @$x\n";#d# + print "writelock\n";#d# + $quit->wrlock; + $send->put(undef); + } } if ($SLAVE) { $HOSTID = "slave"; - async \&sync_dir, "./dst"; + async \&main; handle_commands unblock \*STDIN; } else { $HOSTID = "master"; @@ -359,12 +515,14 @@ use Socket; socketpair S1, S2, AF_UNIX, SOCK_STREAM, PF_UNSPEC; if (fork == 0) { + close S1; open STDIN, "<&S2" or die; open STDOUT, ">&S2" or die; exec $0, "--slave"; exit 255; } - async \&sync_dir, "./src"; + close S2; + async \&main; handle_commands unblock \*S1; } }