ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/syncmail/vc.pm
Revision: 1.1
Committed: Sat Oct 27 23:53:49 2001 UTC (22 years, 6 months ago) by root
Branch: MAIN
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 package vc;
2    
3     use Convert::Scalar 'weaken';
4    
5     use Coro;
6     use Coro::Event;
7     use Coro::Channel;
8    
9     BEGIN { *slog = \&::slog }
10    
11     $MAXMSG = 1024;
12    
13     use constant MAXMSG => 1024;
14     use constant MAXQ => 10;
15    
16     my @send;
17     my $send_full = new Coro::Signal;
18     my $send_empty = new Coro::Signal;
19    
20     my @port;
21     my $port;
22    
23     sub dumpstr {
24     local $_ = shift;
25     s/([^\x20-\x7f\xa0-\xff])/sprintf "\\x{%02x}", ord $1/ge;
26     $_ = "\"$_\"";
27     (substr $_, 60) = "\"..." if length($_) > 60;
28     $_;
29     }
30    
31     sub new_passive {
32     my $class = shift;
33     my $self = bless {
34     rcv => (new Coro::Channel 100),
35     @_,
36     }, $class;
37    
38     weaken ($port[$self->{port}] = $self);
39     $self;
40     }
41    
42     sub new {
43     my $class = shift;
44     my $self = $class->new_passive(port => ($port += 2), @_);
45    
46     $self->snd("pri", $self->{pri}) if self->{pri};
47    
48     $self;
49     }
50    
51     sub DESTROY {
52     &close;
53     }
54    
55     sub close {
56     my $self = shift;
57    
58     if ($self->{port}) {
59     push @{$send[0]}, pack "nn", $self->{port}, 65535;
60     delete $self->{port};
61     }
62     }
63    
64     sub pri {
65     my $self = shift;
66     $self->{pri} = $_[0];
67     $self->snd("pri", $_[0]);
68     }
69    
70     sub snd {
71     my $self = shift;
72    
73     $send_empty->wait while @send > MAXQ;
74    
75     for (@_) {
76     slog 8, ">>> :$vc->{port} ".dumpstr($_)."\n"; #d#
77     push @{$send[$self->{pri}]},
78     pack "nna*", $self->{port}, length $_, $_;
79     }
80    
81     $send_full->send;
82     }
83    
84     sub rcv {
85     $_[0]{port} ? $_[0]{rcv}->get : undef;
86     }
87    
88     sub feed {
89     $_[0]{rcv}->put($_[1]);
90     }
91    
92     sub snd_quit {
93     push @{$send[0]}, undef; $send_full->send;
94     }
95    
96     sub multiplex {
97     my ($fh, $slave) = @_;
98    
99     $port = $slave ? 1 : 0;
100    
101     $fh->timeout($::TIMEOUT);
102    
103     # periodically send nop as keepalive signal to avoid the
104     # rsync-on-slow-disk-timeout problem.
105     my $nopper = Event->timer(parked => 1, cb => sub {
106     push @{$send[0]}, pack "nn", 0, 1; $send_full->send;
107     });
108    
109     my $ident = "\@SYNCMAIL VERSION $VERSION PROTOCOL ".PROTVERSION."\n";
110    
111     # the feeder
112     async {
113     my $id = "a";
114     $fh->print(pack "na*", length $ident, $ident);
115     command:
116     while() {
117     for (@send) {
118     if (@$_) {
119     my $vc = shift @$_;
120     if (defined $vc) {
121     $fh->print($vc);
122     } else {
123     last command;
124     }
125     $nopper->at(time+$::IDLE); $nopper->start;
126     next command;
127     }
128     }
129     $send_empty->send;
130     $send_full->wait;
131     }
132     $fh->print(pack "nn", 0, 0);
133     };
134    
135     async {
136     eval {
137     my ($port, $len, $data);
138    
139     $fh->read($len, 2) == 2
140     or die "unexpected eof: $!";
141     $fh->read($data, unpack "n", $len)
142     or die "unexpected eof: $!";
143    
144     $data eq $ident
145     or die "garbled input or version mismatch: $_";
146    
147     while ($fh->read($port, 4) == 4) {
148     my ($port, $len) = unpack "nn", $port;
149    
150     if ($port) {
151     if ($len > 65000) {
152     slog 8, "||| :$port closing\n";#d#
153     if ($port[$port]) {
154     delete $port[$port]{port};
155     $port[$port]->feed(undef);
156     undef $port[$port];
157     }
158     } else {
159     $fh->sysread($data, $len) == $len
160     or die "unexpected read error: $!";
161    
162     slog 8, "<<< :$port ".dumpstr($data)."\n"; #d#
163    
164     ($port[$port] ||= do {
165     my $vc = new_passive vc port => $port;
166     async \&::serve, $vc;
167     $vc;
168     })->feed($data);
169     }
170     } else {
171     slog 8, "<<< :$port <$len>\n";
172    
173     if ($len == 0) {
174     snd_quit;
175     last;
176     } elsif ($len == 1) {
177     # nop
178     } else {
179     die "illegal control code received";
180     }
181     }
182     }
183     };
184    
185     slog 0, "ERROR: $@" if $@;
186     slog 9, "unlooping\n";#d#
187     unloop;
188     };
189    
190     loop;
191     }
192    
193     1;
194