ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/syncmail/vc.pm
Revision: 1.3
Committed: Sun Oct 28 20:52:25 2001 UTC (22 years, 6 months ago) by root
Branch: MAIN
Changes since 1.2: +43 -12 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 package vc;
2
3 use Convert::Scalar 'weaken';
4
5 use Coro;
6 use Coro::Event;
7 use Coro::Channel;
8
9 BEGIN { *slog = \&::slog }
10
11 use constant MAXMSG => 1024;
12 use constant MAXQ => 10;
13
14 use constant PROTVERSION => 1;
15
16 my @send;
17 my $send_full = new Coro::Signal;
18 my $send_empty = new Coro::Signal;
19
20 my @port;
21 my $port;
22
23 sub dumpstr {
24 local $_ = $_[0];
25 s/([^\x20-\x7f\xa0-\xff])/sprintf "\\x{%02x}", ord $1/ge;
26 $_ = "\"$_\"";
27 (substr $_, 60) = "\"..." if length($_) > 60;
28 $_;
29 }
30
31 # catch (accept) an existing vc
32 sub catch {
33 my $class = shift;
34 my $self = bless {
35 rcv => (new Coro::Channel 1e9),
36 @_,
37 }, $class;
38
39 weaken ($port[$self->{port}] = $self);
40 $self;
41 }
42
43 # create (connect) a new vc
44 sub create {
45 my $class = shift;
46 my $self = $class->catch(port => ($port += 2), @_);
47
48 $self->snd("pri", $self->{pri}) if $self->{pri};
49
50 $self;
51 }
52
53 sub DESTROY {
54 &close;
55 }
56
57 sub close {
58 my $self = shift;
59
60 if ($self->{port}) {
61 push @{$send[0]}, pack "nn", $self->{port}, 65535;
62 delete $self->{port};
63 }
64 }
65
66 sub pri {
67 my $self = shift;
68 $self->{pri} = $_[0];
69 $self->snd("pri", $_[0]);
70 }
71
72 sub _snd {
73 my $self = shift;
74
75 slog 8, ">>> :$self->{port} ".dumpstr($_[0])."\n"; #d#
76 push @{$send[$self->{pri}]},
77 pack "nna*", $self->{port}, length $_[0], $_[0];
78
79 }
80
81 sub _rcv {
82 $_[0]{port} ? $_[0]{rcv}->get : undef;
83 }
84
85 sub snd {
86 my $self = shift;
87 my ($length, $ofs);
88
89 $send_empty->wait while @send > MAXQ;
90 for (@_) {
91 if (length && MAXMSG >= length) {
92 $self->_snd($_);
93 } else {
94 $self->_snd("");
95 for (my $ofs = 0; $ofs < length; $ofs += MAXMSG) {
96 $send_empty->wait while @send > MAXQ;
97 $self->_snd(substr $_, $ofs, MAXMSG);
98 }
99 $self->_snd("");
100 }
101 }
102 $send_full->send;
103 }
104
105 sub rcv {
106 my $self = shift;
107 my $data = $self->_rcv;
108
109 unless (length $data) {
110 my $blk;
111 $data .= $blk while length ($blk = $self->_rcv);
112 }
113
114 $data;
115 }
116
117 sub feed {
118 $_[0]{rcv}->put($_[1]);
119 }
120
121 sub snd_quit {
122 push @{$send[0]}, undef; $send_full->send;
123 }
124
125 sub multiplex {
126 my ($fh, $slave) = @_;
127
128 $port = $slave ? 1 : 0;
129
130 $fh->timeout($::TIMEOUT);
131
132 # periodically send nop as keepalive signal to avoid the
133 # rsync-on-slow-disk-timeout problem.
134 my $nopper = Event->timer(parked => 1, cb => sub {
135 push @{$send[0]}, pack "nn", 0, 1; $send_full->send;
136 });
137
138 my $ident = "\@SYNCMAIL VERSION $VERSION PROTOCOL ".PROTVERSION."\n";
139
140 # the feeder
141 async {
142 my $id = "a";
143 $fh->print(pack "na*", length $ident, $ident);
144 command:
145 while() {
146 for (@send) {
147 if (@$_) {
148 my $vc = shift @$_;
149 if (defined $vc) {
150 $fh->print($vc);
151 } else {
152 last command;
153 }
154 $nopper->at(time+$::IDLE); $nopper->start;
155 next command;
156 }
157 }
158 $send_empty->send;
159 $send_full->wait;
160 }
161 $fh->print(pack "nn", 0, 0);
162 };
163
164 async {
165 eval {
166 my ($port, $len, $data);
167
168 $fh->read($len, 2) == 2
169 or die "unexpected eof: $!";
170 $fh->read($data, unpack "n", $len)
171 or die "unexpected eof: $!";
172
173 $data eq $ident
174 or die "garbled input or version mismatch: $_";
175
176 while ($fh->read($port, 4) == 4) {
177 my ($port, $len) = unpack "nn", $port;
178
179 if ($port) {
180 if ($len > 65000) {
181 slog 8, "||| :$port closing\n";#d#
182 if ($port[$port]) {
183 delete $port[$port]{port};
184 $port[$port]->feed(undef);
185 undef $port[$port];
186 }
187 } else {
188 $fh->sysread($data, $len) == $len
189 or die "unexpected read error: $!";
190
191 slog 8, "<<< :$port ".dumpstr($data)."\n"; #d#
192
193 ($port[$port] ||= do {
194 my $vc = catch vc port => $port;
195 async \&::serve, $vc;
196 $vc;
197 })->feed($data);
198 }
199 } else {
200 slog 8, "<<< :$port <$len>\n";
201
202 if ($len == 0) {
203 snd_quit;
204 last;
205 } elsif ($len == 1) {
206 # nop
207 } else {
208 die "illegal control code received";
209 }
210 }
211 }
212 };
213
214 slog 0, "ERROR: $@" if $@;
215 slog 9, "unlooping\n";#d#
216 unloop;
217 };
218
219 loop;
220 }
221
222 1;
223