ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/syncmail/vc.pm
Revision: 1.2
Committed: Sun Oct 28 03:51:24 2001 UTC (22 years, 6 months ago) by root
Branch: MAIN
Changes since 1.1: +2 -4 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 package vc;
2
3 use Convert::Scalar 'weaken';
4
5 use Coro;
6 use Coro::Event;
7 use Coro::Channel;
8
9 BEGIN { *slog = \&::slog }
10
11 use constant MAXMSG => 1024;
12 use constant MAXQ => 10;
13
14 my @send;
15 my $send_full = new Coro::Signal;
16 my $send_empty = new Coro::Signal;
17
18 my @port;
19 my $port;
20
21 sub dumpstr {
22 local $_ = shift;
23 s/([^\x20-\x7f\xa0-\xff])/sprintf "\\x{%02x}", ord $1/ge;
24 $_ = "\"$_\"";
25 (substr $_, 60) = "\"..." if length($_) > 60;
26 $_;
27 }
28
29 sub new_passive {
30 my $class = shift;
31 my $self = bless {
32 rcv => (new Coro::Channel 1e9),
33 @_,
34 }, $class;
35
36 weaken ($port[$self->{port}] = $self);
37 $self;
38 }
39
40 sub new {
41 my $class = shift;
42 my $self = $class->new_passive(port => ($port += 2), @_);
43
44 $self->snd("pri", $self->{pri}) if self->{pri};
45
46 $self;
47 }
48
49 sub DESTROY {
50 &close;
51 }
52
53 sub close {
54 my $self = shift;
55
56 if ($self->{port}) {
57 push @{$send[0]}, pack "nn", $self->{port}, 65535;
58 delete $self->{port};
59 }
60 }
61
62 sub pri {
63 my $self = shift;
64 $self->{pri} = $_[0];
65 $self->snd("pri", $_[0]);
66 }
67
68 sub snd {
69 my $self = shift;
70
71 $send_empty->wait while @send > MAXQ;
72
73 for (@_) {
74 slog 8, ">>> :$self->{port} ".dumpstr($_)."\n"; #d#
75 push @{$send[$self->{pri}]},
76 pack "nna*", $self->{port}, length $_, $_;
77 }
78
79 $send_full->send;
80 }
81
82 sub rcv {
83 $_[0]{port} ? $_[0]{rcv}->get : undef;
84 }
85
86 sub feed {
87 $_[0]{rcv}->put($_[1]);
88 }
89
90 sub snd_quit {
91 push @{$send[0]}, undef; $send_full->send;
92 }
93
94 sub multiplex {
95 my ($fh, $slave) = @_;
96
97 $port = $slave ? 1 : 0;
98
99 $fh->timeout($::TIMEOUT);
100
101 # periodically send nop as keepalive signal to avoid the
102 # rsync-on-slow-disk-timeout problem.
103 my $nopper = Event->timer(parked => 1, cb => sub {
104 push @{$send[0]}, pack "nn", 0, 1; $send_full->send;
105 });
106
107 my $ident = "\@SYNCMAIL VERSION $VERSION PROTOCOL ".PROTVERSION."\n";
108
109 # the feeder
110 async {
111 my $id = "a";
112 $fh->print(pack "na*", length $ident, $ident);
113 command:
114 while() {
115 for (@send) {
116 if (@$_) {
117 my $vc = shift @$_;
118 if (defined $vc) {
119 $fh->print($vc);
120 } else {
121 last command;
122 }
123 $nopper->at(time+$::IDLE); $nopper->start;
124 next command;
125 }
126 }
127 $send_empty->send;
128 $send_full->wait;
129 }
130 $fh->print(pack "nn", 0, 0);
131 };
132
133 async {
134 eval {
135 my ($port, $len, $data);
136
137 $fh->read($len, 2) == 2
138 or die "unexpected eof: $!";
139 $fh->read($data, unpack "n", $len)
140 or die "unexpected eof: $!";
141
142 $data eq $ident
143 or die "garbled input or version mismatch: $_";
144
145 while ($fh->read($port, 4) == 4) {
146 my ($port, $len) = unpack "nn", $port;
147
148 if ($port) {
149 if ($len > 65000) {
150 slog 8, "||| :$port closing\n";#d#
151 if ($port[$port]) {
152 delete $port[$port]{port};
153 $port[$port]->feed(undef);
154 undef $port[$port];
155 }
156 } else {
157 $fh->sysread($data, $len) == $len
158 or die "unexpected read error: $!";
159
160 slog 8, "<<< :$port ".dumpstr($data)."\n"; #d#
161
162 ($port[$port] ||= do {
163 my $vc = new_passive vc port => $port;
164 async \&::serve, $vc;
165 $vc;
166 })->feed($data);
167 }
168 } else {
169 slog 8, "<<< :$port <$len>\n";
170
171 if ($len == 0) {
172 snd_quit;
173 last;
174 } elsif ($len == 1) {
175 # nop
176 } else {
177 die "illegal control code received";
178 }
179 }
180 }
181 };
182
183 slog 0, "ERROR: $@" if $@;
184 slog 9, "unlooping\n";#d#
185 unloop;
186 };
187
188 loop;
189 }
190
191 1;
192