package vc; use Convert::Scalar 'weaken'; use Coro; use Coro::Event; use Coro::Channel; BEGIN { *slog = \&::slog } use constant MAXMSG => 1024; use constant MAXQ => 10; my @send; my $send_full = new Coro::Signal; my $send_empty = new Coro::Signal; my @port; my $port; sub dumpstr { local $_ = shift; s/([^\x20-\x7f\xa0-\xff])/sprintf "\\x{%02x}", ord $1/ge; $_ = "\"$_\""; (substr $_, 60) = "\"..." if length($_) > 60; $_; } sub new_passive { my $class = shift; my $self = bless { rcv => (new Coro::Channel 1e9), @_, }, $class; weaken ($port[$self->{port}] = $self); $self; } sub new { my $class = shift; my $self = $class->new_passive(port => ($port += 2), @_); $self->snd("pri", $self->{pri}) if self->{pri}; $self; } sub DESTROY { &close; } sub close { my $self = shift; if ($self->{port}) { push @{$send[0]}, pack "nn", $self->{port}, 65535; delete $self->{port}; } } sub pri { my $self = shift; $self->{pri} = $_[0]; $self->snd("pri", $_[0]); } sub snd { my $self = shift; $send_empty->wait while @send > MAXQ; for (@_) { slog 8, ">>> :$self->{port} ".dumpstr($_)."\n"; #d# push @{$send[$self->{pri}]}, pack "nna*", $self->{port}, length $_, $_; } $send_full->send; } sub rcv { $_[0]{port} ? $_[0]{rcv}->get : undef; } sub feed { $_[0]{rcv}->put($_[1]); } sub snd_quit { push @{$send[0]}, undef; $send_full->send; } sub multiplex { my ($fh, $slave) = @_; $port = $slave ? 1 : 0; $fh->timeout($::TIMEOUT); # periodically send nop as keepalive signal to avoid the # rsync-on-slow-disk-timeout problem. my $nopper = Event->timer(parked => 1, cb => sub { push @{$send[0]}, pack "nn", 0, 1; $send_full->send; }); my $ident = "\@SYNCMAIL VERSION $VERSION PROTOCOL ".PROTVERSION."\n"; # the feeder async { my $id = "a"; $fh->print(pack "na*", length $ident, $ident); command: while() { for (@send) { if (@$_) { my $vc = shift @$_; if (defined $vc) { $fh->print($vc); } else { last command; } $nopper->at(time+$::IDLE); $nopper->start; next command; } } $send_empty->send; $send_full->wait; } $fh->print(pack "nn", 0, 0); }; async { eval { my ($port, $len, $data); $fh->read($len, 2) == 2 or die "unexpected eof: $!"; $fh->read($data, unpack "n", $len) or die "unexpected eof: $!"; $data eq $ident or die "garbled input or version mismatch: $_"; while ($fh->read($port, 4) == 4) { my ($port, $len) = unpack "nn", $port; if ($port) { if ($len > 65000) { slog 8, "||| :$port closing\n";#d# if ($port[$port]) { delete $port[$port]{port}; $port[$port]->feed(undef); undef $port[$port]; } } else { $fh->sysread($data, $len) == $len or die "unexpected read error: $!"; slog 8, "<<< :$port ".dumpstr($data)."\n"; #d# ($port[$port] ||= do { my $vc = new_passive vc port => $port; async \&::serve, $vc; $vc; })->feed($data); } } else { slog 8, "<<< :$port <$len>\n"; if ($len == 0) { snd_quit; last; } elsif ($len == 1) { # nop } else { die "illegal control code received"; } } } }; slog 0, "ERROR: $@" if $@; slog 9, "unlooping\n";#d# unloop; }; loop; } 1;