ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/syncmail/vc.pm
(Generate patch)

Comparing syncmail/vc.pm (file contents):
Revision 1.2 by root, Sun Oct 28 03:51:24 2001 UTC vs.
Revision 1.3 by root, Sun Oct 28 20:52:25 2001 UTC

9BEGIN { *slog = \&::slog } 9BEGIN { *slog = \&::slog }
10 10
11use constant MAXMSG => 1024; 11use constant MAXMSG => 1024;
12use constant MAXQ => 10; 12use constant MAXQ => 10;
13 13
14use constant PROTVERSION => 1;
15
14my @send; 16my @send;
15my $send_full = new Coro::Signal; 17my $send_full = new Coro::Signal;
16my $send_empty = new Coro::Signal; 18my $send_empty = new Coro::Signal;
17 19
18my @port; 20my @port;
19my $port; 21my $port;
20 22
21sub dumpstr { 23sub dumpstr {
22 local $_ = shift; 24 local $_ = $_[0];
23 s/([^\x20-\x7f\xa0-\xff])/sprintf "\\x{%02x}", ord $1/ge; 25 s/([^\x20-\x7f\xa0-\xff])/sprintf "\\x{%02x}", ord $1/ge;
24 $_ = "\"$_\""; 26 $_ = "\"$_\"";
25 (substr $_, 60) = "\"..." if length($_) > 60; 27 (substr $_, 60) = "\"..." if length($_) > 60;
26 $_; 28 $_;
27} 29}
28 30
29sub new_passive { 31# catch (accept) an existing vc
32sub catch {
30 my $class = shift; 33 my $class = shift;
31 my $self = bless { 34 my $self = bless {
32 rcv => (new Coro::Channel 1e9), 35 rcv => (new Coro::Channel 1e9),
33 @_, 36 @_,
34 }, $class; 37 }, $class;
35 38
36 weaken ($port[$self->{port}] = $self); 39 weaken ($port[$self->{port}] = $self);
37 $self; 40 $self;
38} 41}
39 42
40sub new { 43# create (connect) a new vc
44sub create {
41 my $class = shift; 45 my $class = shift;
42 my $self = $class->new_passive(port => ($port += 2), @_); 46 my $self = $class->catch(port => ($port += 2), @_);
43 47
44 $self->snd("pri", $self->{pri}) if self->{pri}; 48 $self->snd("pri", $self->{pri}) if $self->{pri};
45 49
46 $self; 50 $self;
47} 51}
48 52
49sub DESTROY { 53sub DESTROY {
63 my $self = shift; 67 my $self = shift;
64 $self->{pri} = $_[0]; 68 $self->{pri} = $_[0];
65 $self->snd("pri", $_[0]); 69 $self->snd("pri", $_[0]);
66} 70}
67 71
72sub _snd {
73 my $self = shift;
74
75 slog 8, ">>> :$self->{port} ".dumpstr($_[0])."\n"; #d#
76 push @{$send[$self->{pri}]},
77 pack "nna*", $self->{port}, length $_[0], $_[0];
78
79}
80
81sub _rcv {
82 $_[0]{port} ? $_[0]{rcv}->get : undef;
83}
84
68sub snd { 85sub snd {
69 my $self = shift; 86 my $self = shift;
87 my ($length, $ofs);
70 88
71 $send_empty->wait while @send > MAXQ; 89 $send_empty->wait while @send > MAXQ;
72
73 for (@_) { 90 for (@_) {
74 slog 8, ">>> :$self->{port} ".dumpstr($_)."\n"; #d# 91 if (length && MAXMSG >= length) {
75 push @{$send[$self->{pri}]}, 92 $self->_snd($_);
76 pack "nna*", $self->{port}, length $_, $_; 93 } else {
94 $self->_snd("");
95 for (my $ofs = 0; $ofs < length; $ofs += MAXMSG) {
96 $send_empty->wait while @send > MAXQ;
97 $self->_snd(substr $_, $ofs, MAXMSG);
98 }
99 $self->_snd("");
100 }
77 } 101 }
78
79 $send_full->send; 102 $send_full->send;
80} 103}
81 104
82sub rcv { 105sub rcv {
83 $_[0]{port} ? $_[0]{rcv}->get : undef; 106 my $self = shift;
107 my $data = $self->_rcv;
108
109 unless (length $data) {
110 my $blk;
111 $data .= $blk while length ($blk = $self->_rcv);
112 }
113
114 $data;
84} 115}
85 116
86sub feed { 117sub feed {
87 $_[0]{rcv}->put($_[1]); 118 $_[0]{rcv}->put($_[1]);
88} 119}
158 or die "unexpected read error: $!"; 189 or die "unexpected read error: $!";
159 190
160 slog 8, "<<< :$port ".dumpstr($data)."\n"; #d# 191 slog 8, "<<< :$port ".dumpstr($data)."\n"; #d#
161 192
162 ($port[$port] ||= do { 193 ($port[$port] ||= do {
163 my $vc = new_passive vc port => $port; 194 my $vc = catch vc port => $port;
164 async \&::serve, $vc; 195 async \&::serve, $vc;
165 $vc; 196 $vc;
166 })->feed($data); 197 })->feed($data);
167 } 198 }
168 } else { 199 } else {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines