ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/syncmail/vc.pm
Revision: 1.3
Committed: Sun Oct 28 20:52:25 2001 UTC (22 years, 6 months ago) by root
Branch: MAIN
Changes since 1.2: +43 -12 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 package vc;
2    
3     use Convert::Scalar 'weaken';
4    
5     use Coro;
6     use Coro::Event;
7     use Coro::Channel;
8    
9     BEGIN { *slog = \&::slog }
10    
11     use constant MAXMSG => 1024;
12     use constant MAXQ => 10;
13    
14 root 1.3 use constant PROTVERSION => 1;
15    
16 root 1.1 my @send;
17     my $send_full = new Coro::Signal;
18     my $send_empty = new Coro::Signal;
19    
20     my @port;
21     my $port;
22    
23     sub dumpstr {
24 root 1.3 local $_ = $_[0];
25 root 1.1 s/([^\x20-\x7f\xa0-\xff])/sprintf "\\x{%02x}", ord $1/ge;
26     $_ = "\"$_\"";
27     (substr $_, 60) = "\"..." if length($_) > 60;
28     $_;
29     }
30    
31 root 1.3 # catch (accept) an existing vc
32     sub catch {
33 root 1.1 my $class = shift;
34     my $self = bless {
35 root 1.2 rcv => (new Coro::Channel 1e9),
36 root 1.1 @_,
37     }, $class;
38    
39     weaken ($port[$self->{port}] = $self);
40     $self;
41     }
42    
43 root 1.3 # create (connect) a new vc
44     sub create {
45 root 1.1 my $class = shift;
46 root 1.3 my $self = $class->catch(port => ($port += 2), @_);
47 root 1.1
48 root 1.3 $self->snd("pri", $self->{pri}) if $self->{pri};
49 root 1.1
50     $self;
51     }
52    
53     sub DESTROY {
54     &close;
55     }
56    
57     sub close {
58     my $self = shift;
59    
60     if ($self->{port}) {
61     push @{$send[0]}, pack "nn", $self->{port}, 65535;
62     delete $self->{port};
63     }
64     }
65    
66     sub pri {
67     my $self = shift;
68     $self->{pri} = $_[0];
69     $self->snd("pri", $_[0]);
70     }
71    
72 root 1.3 sub _snd {
73     my $self = shift;
74    
75     slog 8, ">>> :$self->{port} ".dumpstr($_[0])."\n"; #d#
76     push @{$send[$self->{pri}]},
77     pack "nna*", $self->{port}, length $_[0], $_[0];
78    
79     }
80    
81     sub _rcv {
82     $_[0]{port} ? $_[0]{rcv}->get : undef;
83     }
84    
85 root 1.1 sub snd {
86     my $self = shift;
87 root 1.3 my ($length, $ofs);
88 root 1.1
89     $send_empty->wait while @send > MAXQ;
90     for (@_) {
91 root 1.3 if (length && MAXMSG >= length) {
92     $self->_snd($_);
93     } else {
94     $self->_snd("");
95     for (my $ofs = 0; $ofs < length; $ofs += MAXMSG) {
96     $send_empty->wait while @send > MAXQ;
97     $self->_snd(substr $_, $ofs, MAXMSG);
98     }
99     $self->_snd("");
100     }
101 root 1.1 }
102     $send_full->send;
103     }
104    
105     sub rcv {
106 root 1.3 my $self = shift;
107     my $data = $self->_rcv;
108    
109     unless (length $data) {
110     my $blk;
111     $data .= $blk while length ($blk = $self->_rcv);
112     }
113    
114     $data;
115 root 1.1 }
116    
117     sub feed {
118     $_[0]{rcv}->put($_[1]);
119     }
120    
121     sub snd_quit {
122     push @{$send[0]}, undef; $send_full->send;
123     }
124    
125     sub multiplex {
126     my ($fh, $slave) = @_;
127    
128     $port = $slave ? 1 : 0;
129    
130     $fh->timeout($::TIMEOUT);
131    
132     # periodically send nop as keepalive signal to avoid the
133     # rsync-on-slow-disk-timeout problem.
134     my $nopper = Event->timer(parked => 1, cb => sub {
135     push @{$send[0]}, pack "nn", 0, 1; $send_full->send;
136     });
137    
138     my $ident = "\@SYNCMAIL VERSION $VERSION PROTOCOL ".PROTVERSION."\n";
139    
140     # the feeder
141     async {
142     my $id = "a";
143     $fh->print(pack "na*", length $ident, $ident);
144     command:
145     while() {
146     for (@send) {
147     if (@$_) {
148     my $vc = shift @$_;
149     if (defined $vc) {
150     $fh->print($vc);
151     } else {
152     last command;
153     }
154     $nopper->at(time+$::IDLE); $nopper->start;
155     next command;
156     }
157     }
158     $send_empty->send;
159     $send_full->wait;
160     }
161     $fh->print(pack "nn", 0, 0);
162     };
163    
164     async {
165     eval {
166     my ($port, $len, $data);
167    
168     $fh->read($len, 2) == 2
169     or die "unexpected eof: $!";
170     $fh->read($data, unpack "n", $len)
171     or die "unexpected eof: $!";
172    
173     $data eq $ident
174     or die "garbled input or version mismatch: $_";
175    
176     while ($fh->read($port, 4) == 4) {
177     my ($port, $len) = unpack "nn", $port;
178    
179     if ($port) {
180     if ($len > 65000) {
181     slog 8, "||| :$port closing\n";#d#
182     if ($port[$port]) {
183     delete $port[$port]{port};
184     $port[$port]->feed(undef);
185     undef $port[$port];
186     }
187     } else {
188     $fh->sysread($data, $len) == $len
189     or die "unexpected read error: $!";
190    
191     slog 8, "<<< :$port ".dumpstr($data)."\n"; #d#
192    
193     ($port[$port] ||= do {
194 root 1.3 my $vc = catch vc port => $port;
195 root 1.1 async \&::serve, $vc;
196     $vc;
197     })->feed($data);
198     }
199     } else {
200     slog 8, "<<< :$port <$len>\n";
201    
202     if ($len == 0) {
203     snd_quit;
204     last;
205     } elsif ($len == 1) {
206     # nop
207     } else {
208     die "illegal control code received";
209     }
210     }
211     }
212     };
213    
214     slog 0, "ERROR: $@" if $@;
215     slog 9, "unlooping\n";#d#
216     unloop;
217     };
218    
219     loop;
220     }
221    
222     1;
223