ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/syncmail/vc.pm
Revision: 1.1
Committed: Sat Oct 27 23:53:49 2001 UTC (22 years, 6 months ago) by root
Branch: MAIN
Log Message:
*** empty log message ***

File Contents

# Content
1 package vc;
2
3 use Convert::Scalar 'weaken';
4
5 use Coro;
6 use Coro::Event;
7 use Coro::Channel;
8
9 BEGIN { *slog = \&::slog }
10
11 $MAXMSG = 1024;
12
13 use constant MAXMSG => 1024;
14 use constant MAXQ => 10;
15
16 my @send;
17 my $send_full = new Coro::Signal;
18 my $send_empty = new Coro::Signal;
19
20 my @port;
21 my $port;
22
23 sub dumpstr {
24 local $_ = shift;
25 s/([^\x20-\x7f\xa0-\xff])/sprintf "\\x{%02x}", ord $1/ge;
26 $_ = "\"$_\"";
27 (substr $_, 60) = "\"..." if length($_) > 60;
28 $_;
29 }
30
31 sub new_passive {
32 my $class = shift;
33 my $self = bless {
34 rcv => (new Coro::Channel 100),
35 @_,
36 }, $class;
37
38 weaken ($port[$self->{port}] = $self);
39 $self;
40 }
41
42 sub new {
43 my $class = shift;
44 my $self = $class->new_passive(port => ($port += 2), @_);
45
46 $self->snd("pri", $self->{pri}) if self->{pri};
47
48 $self;
49 }
50
51 sub DESTROY {
52 &close;
53 }
54
55 sub close {
56 my $self = shift;
57
58 if ($self->{port}) {
59 push @{$send[0]}, pack "nn", $self->{port}, 65535;
60 delete $self->{port};
61 }
62 }
63
64 sub pri {
65 my $self = shift;
66 $self->{pri} = $_[0];
67 $self->snd("pri", $_[0]);
68 }
69
70 sub snd {
71 my $self = shift;
72
73 $send_empty->wait while @send > MAXQ;
74
75 for (@_) {
76 slog 8, ">>> :$vc->{port} ".dumpstr($_)."\n"; #d#
77 push @{$send[$self->{pri}]},
78 pack "nna*", $self->{port}, length $_, $_;
79 }
80
81 $send_full->send;
82 }
83
84 sub rcv {
85 $_[0]{port} ? $_[0]{rcv}->get : undef;
86 }
87
88 sub feed {
89 $_[0]{rcv}->put($_[1]);
90 }
91
92 sub snd_quit {
93 push @{$send[0]}, undef; $send_full->send;
94 }
95
96 sub multiplex {
97 my ($fh, $slave) = @_;
98
99 $port = $slave ? 1 : 0;
100
101 $fh->timeout($::TIMEOUT);
102
103 # periodically send nop as keepalive signal to avoid the
104 # rsync-on-slow-disk-timeout problem.
105 my $nopper = Event->timer(parked => 1, cb => sub {
106 push @{$send[0]}, pack "nn", 0, 1; $send_full->send;
107 });
108
109 my $ident = "\@SYNCMAIL VERSION $VERSION PROTOCOL ".PROTVERSION."\n";
110
111 # the feeder
112 async {
113 my $id = "a";
114 $fh->print(pack "na*", length $ident, $ident);
115 command:
116 while() {
117 for (@send) {
118 if (@$_) {
119 my $vc = shift @$_;
120 if (defined $vc) {
121 $fh->print($vc);
122 } else {
123 last command;
124 }
125 $nopper->at(time+$::IDLE); $nopper->start;
126 next command;
127 }
128 }
129 $send_empty->send;
130 $send_full->wait;
131 }
132 $fh->print(pack "nn", 0, 0);
133 };
134
135 async {
136 eval {
137 my ($port, $len, $data);
138
139 $fh->read($len, 2) == 2
140 or die "unexpected eof: $!";
141 $fh->read($data, unpack "n", $len)
142 or die "unexpected eof: $!";
143
144 $data eq $ident
145 or die "garbled input or version mismatch: $_";
146
147 while ($fh->read($port, 4) == 4) {
148 my ($port, $len) = unpack "nn", $port;
149
150 if ($port) {
151 if ($len > 65000) {
152 slog 8, "||| :$port closing\n";#d#
153 if ($port[$port]) {
154 delete $port[$port]{port};
155 $port[$port]->feed(undef);
156 undef $port[$port];
157 }
158 } else {
159 $fh->sysread($data, $len) == $len
160 or die "unexpected read error: $!";
161
162 slog 8, "<<< :$port ".dumpstr($data)."\n"; #d#
163
164 ($port[$port] ||= do {
165 my $vc = new_passive vc port => $port;
166 async \&::serve, $vc;
167 $vc;
168 })->feed($data);
169 }
170 } else {
171 slog 8, "<<< :$port <$len>\n";
172
173 if ($len == 0) {
174 snd_quit;
175 last;
176 } elsif ($len == 1) {
177 # nop
178 } else {
179 die "illegal control code received";
180 }
181 }
182 }
183 };
184
185 slog 0, "ERROR: $@" if $@;
186 slog 9, "unlooping\n";#d#
187 unloop;
188 };
189
190 loop;
191 }
192
193 1;
194