--- syncmail/vc.pm 2001/10/28 03:51:24 1.2 +++ syncmail/vc.pm 2001/10/28 20:52:25 1.3 @@ -11,6 +11,8 @@ use constant MAXMSG => 1024; use constant MAXQ => 10; +use constant PROTVERSION => 1; + my @send; my $send_full = new Coro::Signal; my $send_empty = new Coro::Signal; @@ -19,14 +21,15 @@ my $port; sub dumpstr { - local $_ = shift; + local $_ = $_[0]; s/([^\x20-\x7f\xa0-\xff])/sprintf "\\x{%02x}", ord $1/ge; $_ = "\"$_\""; (substr $_, 60) = "\"..." if length($_) > 60; $_; } -sub new_passive { +# catch (accept) an existing vc +sub catch { my $class = shift; my $self = bless { rcv => (new Coro::Channel 1e9), @@ -37,11 +40,12 @@ $self; } -sub new { +# create (connect) a new vc +sub create { my $class = shift; - my $self = $class->new_passive(port => ($port += 2), @_); + my $self = $class->catch(port => ($port += 2), @_); - $self->snd("pri", $self->{pri}) if self->{pri}; + $self->snd("pri", $self->{pri}) if $self->{pri}; $self; } @@ -65,22 +69,49 @@ $self->snd("pri", $_[0]); } +sub _snd { + my $self = shift; + + slog 8, ">>> :$self->{port} ".dumpstr($_[0])."\n"; #d# + push @{$send[$self->{pri}]}, + pack "nna*", $self->{port}, length $_[0], $_[0]; + +} + +sub _rcv { + $_[0]{port} ? $_[0]{rcv}->get : undef; +} + sub snd { my $self = shift; + my ($length, $ofs); $send_empty->wait while @send > MAXQ; - for (@_) { - slog 8, ">>> :$self->{port} ".dumpstr($_)."\n"; #d# - push @{$send[$self->{pri}]}, - pack "nna*", $self->{port}, length $_, $_; + if (length && MAXMSG >= length) { + $self->_snd($_); + } else { + $self->_snd(""); + for (my $ofs = 0; $ofs < length; $ofs += MAXMSG) { + $send_empty->wait while @send > MAXQ; + $self->_snd(substr $_, $ofs, MAXMSG); + } + $self->_snd(""); + } } - $send_full->send; } sub rcv { - $_[0]{port} ? $_[0]{rcv}->get : undef; + my $self = shift; + my $data = $self->_rcv; + + unless (length $data) { + my $blk; + $data .= $blk while length ($blk = $self->_rcv); + } + + $data; } sub feed { @@ -160,7 +191,7 @@ slog 8, "<<< :$port ".dumpstr($data)."\n"; #d# ($port[$port] ||= do { - my $vc = new_passive vc port => $port; + my $vc = catch vc port => $port; async \&::serve, $vc; $vc; })->feed($data);