ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/syncmail/vc.pm
Revision: 1.4
Committed: Mon Oct 29 00:37:41 2001 UTC (22 years, 6 months ago) by root
Branch: MAIN
CVS Tags: HEAD
Changes since 1.3: +7 -8 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 package vc;
2    
3     use Convert::Scalar 'weaken';
4    
5     use Coro;
6     use Coro::Event;
7     use Coro::Channel;
8    
9     BEGIN { *slog = \&::slog }
10    
11     use constant MAXMSG => 1024;
12     use constant MAXQ => 10;
13    
14 root 1.3 use constant PROTVERSION => 1;
15    
16 root 1.1 my @send;
17     my $send_full = new Coro::Signal;
18     my $send_empty = new Coro::Signal;
19    
20     my @port;
21     my $port;
22    
23     sub dumpstr {
24 root 1.3 local $_ = $_[0];
25 root 1.1 s/([^\x20-\x7f\xa0-\xff])/sprintf "\\x{%02x}", ord $1/ge;
26     $_ = "\"$_\"";
27     (substr $_, 60) = "\"..." if length($_) > 60;
28     $_;
29     }
30    
31 root 1.3 # catch (accept) an existing vc
32     sub catch {
33 root 1.1 my $class = shift;
34     my $self = bless {
35 root 1.2 rcv => (new Coro::Channel 1e9),
36 root 1.1 @_,
37     }, $class;
38    
39     weaken ($port[$self->{port}] = $self);
40     $self;
41     }
42    
43 root 1.3 # create (connect) a new vc
44     sub create {
45 root 1.1 my $class = shift;
46 root 1.3 my $self = $class->catch(port => ($port += 2), @_);
47 root 1.1
48     $self;
49     }
50    
51     sub DESTROY {
52     &close;
53     }
54    
55     sub close {
56     my $self = shift;
57    
58     if ($self->{port}) {
59 root 1.4 slog 0, "closing port $self->{port}\n";#d#
60 root 1.1 push @{$send[0]}, pack "nn", $self->{port}, 65535;
61 root 1.4 $send_full->send;
62 root 1.1 delete $self->{port};
63     }
64     }
65    
66     sub pri {
67     my $self = shift;
68     $self->{pri} = $_[0];
69     }
70    
71 root 1.3 sub _snd {
72     my $self = shift;
73    
74     slog 8, ">>> :$self->{port} ".dumpstr($_[0])."\n"; #d#
75     push @{$send[$self->{pri}]},
76     pack "nna*", $self->{port}, length $_[0], $_[0];
77    
78     }
79    
80     sub _rcv {
81     $_[0]{port} ? $_[0]{rcv}->get : undef;
82     }
83    
84 root 1.1 sub snd {
85     my $self = shift;
86 root 1.3 my ($length, $ofs);
87 root 1.1
88     $send_empty->wait while @send > MAXQ;
89     for (@_) {
90 root 1.3 if (length && MAXMSG >= length) {
91     $self->_snd($_);
92     } else {
93     $self->_snd("");
94     for (my $ofs = 0; $ofs < length; $ofs += MAXMSG) {
95     $send_empty->wait while @send > MAXQ;
96     $self->_snd(substr $_, $ofs, MAXMSG);
97     }
98     $self->_snd("");
99     }
100 root 1.1 }
101     $send_full->send;
102     }
103    
104     sub rcv {
105 root 1.3 my $self = shift;
106     my $data = $self->_rcv;
107    
108     unless (length $data) {
109     my $blk;
110     $data .= $blk while length ($blk = $self->_rcv);
111     }
112    
113     $data;
114 root 1.1 }
115    
116     sub feed {
117     $_[0]{rcv}->put($_[1]);
118     }
119    
120     sub snd_quit {
121     push @{$send[0]}, undef; $send_full->send;
122     }
123    
124     sub multiplex {
125     my ($fh, $slave) = @_;
126    
127     $port = $slave ? 1 : 0;
128    
129     $fh->timeout($::TIMEOUT);
130    
131     # periodically send nop as keepalive signal to avoid the
132     # rsync-on-slow-disk-timeout problem.
133     my $nopper = Event->timer(parked => 1, cb => sub {
134     push @{$send[0]}, pack "nn", 0, 1; $send_full->send;
135     });
136    
137     my $ident = "\@SYNCMAIL VERSION $VERSION PROTOCOL ".PROTVERSION."\n";
138    
139     # the feeder
140     async {
141     my $id = "a";
142     $fh->print(pack "na*", length $ident, $ident);
143     command:
144     while() {
145     for (@send) {
146     if (@$_) {
147     my $vc = shift @$_;
148     if (defined $vc) {
149     $fh->print($vc);
150     } else {
151     last command;
152     }
153     $nopper->at(time+$::IDLE); $nopper->start;
154     next command;
155     }
156     }
157     $send_empty->send;
158     $send_full->wait;
159     }
160     $fh->print(pack "nn", 0, 0);
161     };
162    
163     async {
164     eval {
165     my ($port, $len, $data);
166    
167     $fh->read($len, 2) == 2
168     or die "unexpected eof: $!";
169     $fh->read($data, unpack "n", $len)
170     or die "unexpected eof: $!";
171    
172     $data eq $ident
173     or die "garbled input or version mismatch: $_";
174    
175     while ($fh->read($port, 4) == 4) {
176     my ($port, $len) = unpack "nn", $port;
177    
178     if ($port) {
179     if ($len > 65000) {
180     slog 8, "||| :$port closing\n";#d#
181     if ($port[$port]) {
182     delete $port[$port]{port};
183     $port[$port]->feed(undef);
184     undef $port[$port];
185     }
186     } else {
187     $fh->sysread($data, $len) == $len
188     or die "unexpected read error: $!";
189    
190     slog 8, "<<< :$port ".dumpstr($data)."\n"; #d#
191    
192 root 1.4 unless ($port[$port]) {
193     async \&::serve, catch vc port => $port;
194     }
195    
196     $port[$port]->feed($data);
197 root 1.1 }
198     } else {
199     slog 8, "<<< :$port <$len>\n";
200    
201     if ($len == 0) {
202     snd_quit;
203     last;
204     } elsif ($len == 1) {
205     # nop
206     } else {
207     die "illegal control code received";
208     }
209     }
210     }
211     };
212    
213     slog 0, "ERROR: $@" if $@;
214     slog 9, "unlooping\n";#d#
215     unloop;
216     };
217    
218     loop;
219     }
220    
221     1;
222