package vc; use Convert::Scalar 'weaken'; use Coro; use Coro::Event; use Coro::Channel; BEGIN { *slog = \&::slog } use constant MAXMSG => 1024; use constant MAXQ => 10; use constant PROTVERSION => 1; my @send; my $send_full = new Coro::Signal; my $send_empty = new Coro::Signal; my @port; my $port; sub dumpstr { local $_ = $_[0]; s/([^\x20-\x7f\xa0-\xff])/sprintf "\\x{%02x}", ord $1/ge; $_ = "\"$_\""; (substr $_, 60) = "\"..." if length($_) > 60; $_; } # catch (accept) an existing vc sub catch { my $class = shift; my $self = bless { rcv => (new Coro::Channel 1e9), @_, }, $class; weaken ($port[$self->{port}] = $self); $self; } # create (connect) a new vc sub create { my $class = shift; my $self = $class->catch(port => ($port += 2), @_); $self->snd("pri", $self->{pri}) if $self->{pri}; $self; } sub DESTROY { &close; } sub close { my $self = shift; if ($self->{port}) { push @{$send[0]}, pack "nn", $self->{port}, 65535; delete $self->{port}; } } sub pri { my $self = shift; $self->{pri} = $_[0]; $self->snd("pri", $_[0]); } sub _snd { my $self = shift; slog 8, ">>> :$self->{port} ".dumpstr($_[0])."\n"; #d# push @{$send[$self->{pri}]}, pack "nna*", $self->{port}, length $_[0], $_[0]; } sub _rcv { $_[0]{port} ? $_[0]{rcv}->get : undef; } sub snd { my $self = shift; my ($length, $ofs); $send_empty->wait while @send > MAXQ; for (@_) { if (length && MAXMSG >= length) { $self->_snd($_); } else { $self->_snd(""); for (my $ofs = 0; $ofs < length; $ofs += MAXMSG) { $send_empty->wait while @send > MAXQ; $self->_snd(substr $_, $ofs, MAXMSG); } $self->_snd(""); } } $send_full->send; } sub rcv { my $self = shift; my $data = $self->_rcv; unless (length $data) { my $blk; $data .= $blk while length ($blk = $self->_rcv); } $data; } sub feed { $_[0]{rcv}->put($_[1]); } sub snd_quit { push @{$send[0]}, undef; $send_full->send; } sub multiplex { my ($fh, $slave) = @_; $port = $slave ? 1 : 0; $fh->timeout($::TIMEOUT); # periodically send nop as keepalive signal to avoid the # rsync-on-slow-disk-timeout problem. my $nopper = Event->timer(parked => 1, cb => sub { push @{$send[0]}, pack "nn", 0, 1; $send_full->send; }); my $ident = "\@SYNCMAIL VERSION $VERSION PROTOCOL ".PROTVERSION."\n"; # the feeder async { my $id = "a"; $fh->print(pack "na*", length $ident, $ident); command: while() { for (@send) { if (@$_) { my $vc = shift @$_; if (defined $vc) { $fh->print($vc); } else { last command; } $nopper->at(time+$::IDLE); $nopper->start; next command; } } $send_empty->send; $send_full->wait; } $fh->print(pack "nn", 0, 0); }; async { eval { my ($port, $len, $data); $fh->read($len, 2) == 2 or die "unexpected eof: $!"; $fh->read($data, unpack "n", $len) or die "unexpected eof: $!"; $data eq $ident or die "garbled input or version mismatch: $_"; while ($fh->read($port, 4) == 4) { my ($port, $len) = unpack "nn", $port; if ($port) { if ($len > 65000) { slog 8, "||| :$port closing\n";#d# if ($port[$port]) { delete $port[$port]{port}; $port[$port]->feed(undef); undef $port[$port]; } } else { $fh->sysread($data, $len) == $len or die "unexpected read error: $!"; slog 8, "<<< :$port ".dumpstr($data)."\n"; #d# ($port[$port] ||= do { my $vc = catch vc port => $port; async \&::serve, $vc; $vc; })->feed($data); } } else { slog 8, "<<< :$port <$len>\n"; if ($len == 0) { snd_quit; last; } elsif ($len == 1) { # nop } else { die "illegal control code received"; } } } }; slog 0, "ERROR: $@" if $@; slog 9, "unlooping\n";#d# unloop; }; loop; } 1;