ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/syncmail/vc.pm
Revision: 1.4
Committed: Mon Oct 29 00:37:41 2001 UTC (22 years, 6 months ago) by root
Branch: MAIN
CVS Tags: HEAD
Changes since 1.3: +7 -8 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 package vc;
2
3 use Convert::Scalar 'weaken';
4
5 use Coro;
6 use Coro::Event;
7 use Coro::Channel;
8
9 BEGIN { *slog = \&::slog }
10
11 use constant MAXMSG => 1024;
12 use constant MAXQ => 10;
13
14 use constant PROTVERSION => 1;
15
16 my @send;
17 my $send_full = new Coro::Signal;
18 my $send_empty = new Coro::Signal;
19
20 my @port;
21 my $port;
22
23 sub dumpstr {
24 local $_ = $_[0];
25 s/([^\x20-\x7f\xa0-\xff])/sprintf "\\x{%02x}", ord $1/ge;
26 $_ = "\"$_\"";
27 (substr $_, 60) = "\"..." if length($_) > 60;
28 $_;
29 }
30
31 # catch (accept) an existing vc
32 sub catch {
33 my $class = shift;
34 my $self = bless {
35 rcv => (new Coro::Channel 1e9),
36 @_,
37 }, $class;
38
39 weaken ($port[$self->{port}] = $self);
40 $self;
41 }
42
43 # create (connect) a new vc
44 sub create {
45 my $class = shift;
46 my $self = $class->catch(port => ($port += 2), @_);
47
48 $self;
49 }
50
51 sub DESTROY {
52 &close;
53 }
54
55 sub close {
56 my $self = shift;
57
58 if ($self->{port}) {
59 slog 0, "closing port $self->{port}\n";#d#
60 push @{$send[0]}, pack "nn", $self->{port}, 65535;
61 $send_full->send;
62 delete $self->{port};
63 }
64 }
65
66 sub pri {
67 my $self = shift;
68 $self->{pri} = $_[0];
69 }
70
71 sub _snd {
72 my $self = shift;
73
74 slog 8, ">>> :$self->{port} ".dumpstr($_[0])."\n"; #d#
75 push @{$send[$self->{pri}]},
76 pack "nna*", $self->{port}, length $_[0], $_[0];
77
78 }
79
80 sub _rcv {
81 $_[0]{port} ? $_[0]{rcv}->get : undef;
82 }
83
84 sub snd {
85 my $self = shift;
86 my ($length, $ofs);
87
88 $send_empty->wait while @send > MAXQ;
89 for (@_) {
90 if (length && MAXMSG >= length) {
91 $self->_snd($_);
92 } else {
93 $self->_snd("");
94 for (my $ofs = 0; $ofs < length; $ofs += MAXMSG) {
95 $send_empty->wait while @send > MAXQ;
96 $self->_snd(substr $_, $ofs, MAXMSG);
97 }
98 $self->_snd("");
99 }
100 }
101 $send_full->send;
102 }
103
104 sub rcv {
105 my $self = shift;
106 my $data = $self->_rcv;
107
108 unless (length $data) {
109 my $blk;
110 $data .= $blk while length ($blk = $self->_rcv);
111 }
112
113 $data;
114 }
115
116 sub feed {
117 $_[0]{rcv}->put($_[1]);
118 }
119
120 sub snd_quit {
121 push @{$send[0]}, undef; $send_full->send;
122 }
123
124 sub multiplex {
125 my ($fh, $slave) = @_;
126
127 $port = $slave ? 1 : 0;
128
129 $fh->timeout($::TIMEOUT);
130
131 # periodically send nop as keepalive signal to avoid the
132 # rsync-on-slow-disk-timeout problem.
133 my $nopper = Event->timer(parked => 1, cb => sub {
134 push @{$send[0]}, pack "nn", 0, 1; $send_full->send;
135 });
136
137 my $ident = "\@SYNCMAIL VERSION $VERSION PROTOCOL ".PROTVERSION."\n";
138
139 # the feeder
140 async {
141 my $id = "a";
142 $fh->print(pack "na*", length $ident, $ident);
143 command:
144 while() {
145 for (@send) {
146 if (@$_) {
147 my $vc = shift @$_;
148 if (defined $vc) {
149 $fh->print($vc);
150 } else {
151 last command;
152 }
153 $nopper->at(time+$::IDLE); $nopper->start;
154 next command;
155 }
156 }
157 $send_empty->send;
158 $send_full->wait;
159 }
160 $fh->print(pack "nn", 0, 0);
161 };
162
163 async {
164 eval {
165 my ($port, $len, $data);
166
167 $fh->read($len, 2) == 2
168 or die "unexpected eof: $!";
169 $fh->read($data, unpack "n", $len)
170 or die "unexpected eof: $!";
171
172 $data eq $ident
173 or die "garbled input or version mismatch: $_";
174
175 while ($fh->read($port, 4) == 4) {
176 my ($port, $len) = unpack "nn", $port;
177
178 if ($port) {
179 if ($len > 65000) {
180 slog 8, "||| :$port closing\n";#d#
181 if ($port[$port]) {
182 delete $port[$port]{port};
183 $port[$port]->feed(undef);
184 undef $port[$port];
185 }
186 } else {
187 $fh->sysread($data, $len) == $len
188 or die "unexpected read error: $!";
189
190 slog 8, "<<< :$port ".dumpstr($data)."\n"; #d#
191
192 unless ($port[$port]) {
193 async \&::serve, catch vc port => $port;
194 }
195
196 $port[$port]->feed($data);
197 }
198 } else {
199 slog 8, "<<< :$port <$len>\n";
200
201 if ($len == 0) {
202 snd_quit;
203 last;
204 } elsif ($len == 1) {
205 # nop
206 } else {
207 die "illegal control code received";
208 }
209 }
210 }
211 };
212
213 slog 0, "ERROR: $@" if $@;
214 slog 9, "unlooping\n";#d#
215 unloop;
216 };
217
218 loop;
219 }
220
221 1;
222