ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/syncmail/vc.pm
(Generate patch)

Comparing syncmail/vc.pm (file contents):
Revision 1.1 by root, Sat Oct 27 23:53:49 2001 UTC vs.
Revision 1.3 by root, Sun Oct 28 20:52:25 2001 UTC

6use Coro::Event; 6use Coro::Event;
7use Coro::Channel; 7use Coro::Channel;
8 8
9BEGIN { *slog = \&::slog } 9BEGIN { *slog = \&::slog }
10 10
11$MAXMSG = 1024;
12
13use constant MAXMSG => 1024; 11use constant MAXMSG => 1024;
14use constant MAXQ => 10; 12use constant MAXQ => 10;
13
14use constant PROTVERSION => 1;
15 15
16my @send; 16my @send;
17my $send_full = new Coro::Signal; 17my $send_full = new Coro::Signal;
18my $send_empty = new Coro::Signal; 18my $send_empty = new Coro::Signal;
19 19
20my @port; 20my @port;
21my $port; 21my $port;
22 22
23sub dumpstr { 23sub dumpstr {
24 local $_ = shift; 24 local $_ = $_[0];
25 s/([^\x20-\x7f\xa0-\xff])/sprintf "\\x{%02x}", ord $1/ge; 25 s/([^\x20-\x7f\xa0-\xff])/sprintf "\\x{%02x}", ord $1/ge;
26 $_ = "\"$_\""; 26 $_ = "\"$_\"";
27 (substr $_, 60) = "\"..." if length($_) > 60; 27 (substr $_, 60) = "\"..." if length($_) > 60;
28 $_; 28 $_;
29} 29}
30 30
31sub new_passive { 31# catch (accept) an existing vc
32sub catch {
32 my $class = shift; 33 my $class = shift;
33 my $self = bless { 34 my $self = bless {
34 rcv => (new Coro::Channel 100), 35 rcv => (new Coro::Channel 1e9),
35 @_, 36 @_,
36 }, $class; 37 }, $class;
37 38
38 weaken ($port[$self->{port}] = $self); 39 weaken ($port[$self->{port}] = $self);
39 $self; 40 $self;
40} 41}
41 42
42sub new { 43# create (connect) a new vc
44sub create {
43 my $class = shift; 45 my $class = shift;
44 my $self = $class->new_passive(port => ($port += 2), @_); 46 my $self = $class->catch(port => ($port += 2), @_);
45 47
46 $self->snd("pri", $self->{pri}) if self->{pri}; 48 $self->snd("pri", $self->{pri}) if $self->{pri};
47 49
48 $self; 50 $self;
49} 51}
50 52
51sub DESTROY { 53sub DESTROY {
65 my $self = shift; 67 my $self = shift;
66 $self->{pri} = $_[0]; 68 $self->{pri} = $_[0];
67 $self->snd("pri", $_[0]); 69 $self->snd("pri", $_[0]);
68} 70}
69 71
72sub _snd {
73 my $self = shift;
74
75 slog 8, ">>> :$self->{port} ".dumpstr($_[0])."\n"; #d#
76 push @{$send[$self->{pri}]},
77 pack "nna*", $self->{port}, length $_[0], $_[0];
78
79}
80
81sub _rcv {
82 $_[0]{port} ? $_[0]{rcv}->get : undef;
83}
84
70sub snd { 85sub snd {
71 my $self = shift; 86 my $self = shift;
87 my ($length, $ofs);
72 88
73 $send_empty->wait while @send > MAXQ; 89 $send_empty->wait while @send > MAXQ;
74
75 for (@_) { 90 for (@_) {
76 slog 8, ">>> :$vc->{port} ".dumpstr($_)."\n"; #d# 91 if (length && MAXMSG >= length) {
77 push @{$send[$self->{pri}]}, 92 $self->_snd($_);
78 pack "nna*", $self->{port}, length $_, $_; 93 } else {
94 $self->_snd("");
95 for (my $ofs = 0; $ofs < length; $ofs += MAXMSG) {
96 $send_empty->wait while @send > MAXQ;
97 $self->_snd(substr $_, $ofs, MAXMSG);
98 }
99 $self->_snd("");
100 }
79 } 101 }
80
81 $send_full->send; 102 $send_full->send;
82} 103}
83 104
84sub rcv { 105sub rcv {
85 $_[0]{port} ? $_[0]{rcv}->get : undef; 106 my $self = shift;
107 my $data = $self->_rcv;
108
109 unless (length $data) {
110 my $blk;
111 $data .= $blk while length ($blk = $self->_rcv);
112 }
113
114 $data;
86} 115}
87 116
88sub feed { 117sub feed {
89 $_[0]{rcv}->put($_[1]); 118 $_[0]{rcv}->put($_[1]);
90} 119}
160 or die "unexpected read error: $!"; 189 or die "unexpected read error: $!";
161 190
162 slog 8, "<<< :$port ".dumpstr($data)."\n"; #d# 191 slog 8, "<<< :$port ".dumpstr($data)."\n"; #d#
163 192
164 ($port[$port] ||= do { 193 ($port[$port] ||= do {
165 my $vc = new_passive vc port => $port; 194 my $vc = catch vc port => $port;
166 async \&::serve, $vc; 195 async \&::serve, $vc;
167 $vc; 196 $vc;
168 })->feed($data); 197 })->feed($data);
169 } 198 }
170 } else { 199 } else {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines