ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/syncmail/vc.pm
(Generate patch)

Comparing syncmail/vc.pm (file contents):
Revision 1.1 by root, Sat Oct 27 23:53:49 2001 UTC vs.
Revision 1.4 by root, Mon Oct 29 00:37:41 2001 UTC

6use Coro::Event; 6use Coro::Event;
7use Coro::Channel; 7use Coro::Channel;
8 8
9BEGIN { *slog = \&::slog } 9BEGIN { *slog = \&::slog }
10 10
11$MAXMSG = 1024;
12
13use constant MAXMSG => 1024; 11use constant MAXMSG => 1024;
14use constant MAXQ => 10; 12use constant MAXQ => 10;
13
14use constant PROTVERSION => 1;
15 15
16my @send; 16my @send;
17my $send_full = new Coro::Signal; 17my $send_full = new Coro::Signal;
18my $send_empty = new Coro::Signal; 18my $send_empty = new Coro::Signal;
19 19
20my @port; 20my @port;
21my $port; 21my $port;
22 22
23sub dumpstr { 23sub dumpstr {
24 local $_ = shift; 24 local $_ = $_[0];
25 s/([^\x20-\x7f\xa0-\xff])/sprintf "\\x{%02x}", ord $1/ge; 25 s/([^\x20-\x7f\xa0-\xff])/sprintf "\\x{%02x}", ord $1/ge;
26 $_ = "\"$_\""; 26 $_ = "\"$_\"";
27 (substr $_, 60) = "\"..." if length($_) > 60; 27 (substr $_, 60) = "\"..." if length($_) > 60;
28 $_; 28 $_;
29} 29}
30 30
31sub new_passive { 31# catch (accept) an existing vc
32sub catch {
32 my $class = shift; 33 my $class = shift;
33 my $self = bless { 34 my $self = bless {
34 rcv => (new Coro::Channel 100), 35 rcv => (new Coro::Channel 1e9),
35 @_, 36 @_,
36 }, $class; 37 }, $class;
37 38
38 weaken ($port[$self->{port}] = $self); 39 weaken ($port[$self->{port}] = $self);
39 $self; 40 $self;
40} 41}
41 42
42sub new { 43# create (connect) a new vc
44sub create {
43 my $class = shift; 45 my $class = shift;
44 my $self = $class->new_passive(port => ($port += 2), @_); 46 my $self = $class->catch(port => ($port += 2), @_);
45
46 $self->snd("pri", $self->{pri}) if self->{pri};
47 47
48 $self; 48 $self;
49} 49}
50 50
51sub DESTROY { 51sub DESTROY {
54 54
55sub close { 55sub close {
56 my $self = shift; 56 my $self = shift;
57 57
58 if ($self->{port}) { 58 if ($self->{port}) {
59 slog 0, "closing port $self->{port}\n";#d#
59 push @{$send[0]}, pack "nn", $self->{port}, 65535; 60 push @{$send[0]}, pack "nn", $self->{port}, 65535;
61 $send_full->send;
60 delete $self->{port}; 62 delete $self->{port};
61 } 63 }
62} 64}
63 65
64sub pri { 66sub pri {
65 my $self = shift; 67 my $self = shift;
66 $self->{pri} = $_[0]; 68 $self->{pri} = $_[0];
67 $self->snd("pri", $_[0]); 69}
70
71sub _snd {
72 my $self = shift;
73
74 slog 8, ">>> :$self->{port} ".dumpstr($_[0])."\n"; #d#
75 push @{$send[$self->{pri}]},
76 pack "nna*", $self->{port}, length $_[0], $_[0];
77
78}
79
80sub _rcv {
81 $_[0]{port} ? $_[0]{rcv}->get : undef;
68} 82}
69 83
70sub snd { 84sub snd {
71 my $self = shift; 85 my $self = shift;
86 my ($length, $ofs);
72 87
73 $send_empty->wait while @send > MAXQ; 88 $send_empty->wait while @send > MAXQ;
74
75 for (@_) { 89 for (@_) {
76 slog 8, ">>> :$vc->{port} ".dumpstr($_)."\n"; #d# 90 if (length && MAXMSG >= length) {
77 push @{$send[$self->{pri}]}, 91 $self->_snd($_);
78 pack "nna*", $self->{port}, length $_, $_; 92 } else {
93 $self->_snd("");
94 for (my $ofs = 0; $ofs < length; $ofs += MAXMSG) {
95 $send_empty->wait while @send > MAXQ;
96 $self->_snd(substr $_, $ofs, MAXMSG);
97 }
98 $self->_snd("");
99 }
79 } 100 }
80
81 $send_full->send; 101 $send_full->send;
82} 102}
83 103
84sub rcv { 104sub rcv {
85 $_[0]{port} ? $_[0]{rcv}->get : undef; 105 my $self = shift;
106 my $data = $self->_rcv;
107
108 unless (length $data) {
109 my $blk;
110 $data .= $blk while length ($blk = $self->_rcv);
111 }
112
113 $data;
86} 114}
87 115
88sub feed { 116sub feed {
89 $_[0]{rcv}->put($_[1]); 117 $_[0]{rcv}->put($_[1]);
90} 118}
159 $fh->sysread($data, $len) == $len 187 $fh->sysread($data, $len) == $len
160 or die "unexpected read error: $!"; 188 or die "unexpected read error: $!";
161 189
162 slog 8, "<<< :$port ".dumpstr($data)."\n"; #d# 190 slog 8, "<<< :$port ".dumpstr($data)."\n"; #d#
163 191
164 ($port[$port] ||= do { 192 unless ($port[$port]) {
165 my $vc = new_passive vc port => $port;
166 async \&::serve, $vc; 193 async \&::serve, catch vc port => $port;
167 $vc; 194 }
195
168 })->feed($data); 196 $port[$port]->feed($data);
169 } 197 }
170 } else { 198 } else {
171 slog 8, "<<< :$port <$len>\n"; 199 slog 8, "<<< :$port <$len>\n";
172 200
173 if ($len == 0) { 201 if ($len == 0) {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines