ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/syncmail/vc.pm
Revision: 1.2
Committed: Sun Oct 28 03:51:24 2001 UTC (22 years, 6 months ago) by root
Branch: MAIN
Changes since 1.1: +2 -4 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 package vc;
2    
3     use Convert::Scalar 'weaken';
4    
5     use Coro;
6     use Coro::Event;
7     use Coro::Channel;
8    
9     BEGIN { *slog = \&::slog }
10    
11     use constant MAXMSG => 1024;
12     use constant MAXQ => 10;
13    
14     my @send;
15     my $send_full = new Coro::Signal;
16     my $send_empty = new Coro::Signal;
17    
18     my @port;
19     my $port;
20    
21     sub dumpstr {
22     local $_ = shift;
23     s/([^\x20-\x7f\xa0-\xff])/sprintf "\\x{%02x}", ord $1/ge;
24     $_ = "\"$_\"";
25     (substr $_, 60) = "\"..." if length($_) > 60;
26     $_;
27     }
28    
29     sub new_passive {
30     my $class = shift;
31     my $self = bless {
32 root 1.2 rcv => (new Coro::Channel 1e9),
33 root 1.1 @_,
34     }, $class;
35    
36     weaken ($port[$self->{port}] = $self);
37     $self;
38     }
39    
40     sub new {
41     my $class = shift;
42     my $self = $class->new_passive(port => ($port += 2), @_);
43    
44     $self->snd("pri", $self->{pri}) if self->{pri};
45    
46     $self;
47     }
48    
49     sub DESTROY {
50     &close;
51     }
52    
53     sub close {
54     my $self = shift;
55    
56     if ($self->{port}) {
57     push @{$send[0]}, pack "nn", $self->{port}, 65535;
58     delete $self->{port};
59     }
60     }
61    
62     sub pri {
63     my $self = shift;
64     $self->{pri} = $_[0];
65     $self->snd("pri", $_[0]);
66     }
67    
68     sub snd {
69     my $self = shift;
70    
71     $send_empty->wait while @send > MAXQ;
72    
73     for (@_) {
74 root 1.2 slog 8, ">>> :$self->{port} ".dumpstr($_)."\n"; #d#
75 root 1.1 push @{$send[$self->{pri}]},
76     pack "nna*", $self->{port}, length $_, $_;
77     }
78    
79     $send_full->send;
80     }
81    
82     sub rcv {
83     $_[0]{port} ? $_[0]{rcv}->get : undef;
84     }
85    
86     sub feed {
87     $_[0]{rcv}->put($_[1]);
88     }
89    
90     sub snd_quit {
91     push @{$send[0]}, undef; $send_full->send;
92     }
93    
94     sub multiplex {
95     my ($fh, $slave) = @_;
96    
97     $port = $slave ? 1 : 0;
98    
99     $fh->timeout($::TIMEOUT);
100    
101     # periodically send nop as keepalive signal to avoid the
102     # rsync-on-slow-disk-timeout problem.
103     my $nopper = Event->timer(parked => 1, cb => sub {
104     push @{$send[0]}, pack "nn", 0, 1; $send_full->send;
105     });
106    
107     my $ident = "\@SYNCMAIL VERSION $VERSION PROTOCOL ".PROTVERSION."\n";
108    
109     # the feeder
110     async {
111     my $id = "a";
112     $fh->print(pack "na*", length $ident, $ident);
113     command:
114     while() {
115     for (@send) {
116     if (@$_) {
117     my $vc = shift @$_;
118     if (defined $vc) {
119     $fh->print($vc);
120     } else {
121     last command;
122     }
123     $nopper->at(time+$::IDLE); $nopper->start;
124     next command;
125     }
126     }
127     $send_empty->send;
128     $send_full->wait;
129     }
130     $fh->print(pack "nn", 0, 0);
131     };
132    
133     async {
134     eval {
135     my ($port, $len, $data);
136    
137     $fh->read($len, 2) == 2
138     or die "unexpected eof: $!";
139     $fh->read($data, unpack "n", $len)
140     or die "unexpected eof: $!";
141    
142     $data eq $ident
143     or die "garbled input or version mismatch: $_";
144    
145     while ($fh->read($port, 4) == 4) {
146     my ($port, $len) = unpack "nn", $port;
147    
148     if ($port) {
149     if ($len > 65000) {
150     slog 8, "||| :$port closing\n";#d#
151     if ($port[$port]) {
152     delete $port[$port]{port};
153     $port[$port]->feed(undef);
154     undef $port[$port];
155     }
156     } else {
157     $fh->sysread($data, $len) == $len
158     or die "unexpected read error: $!";
159    
160     slog 8, "<<< :$port ".dumpstr($data)."\n"; #d#
161    
162     ($port[$port] ||= do {
163     my $vc = new_passive vc port => $port;
164     async \&::serve, $vc;
165     $vc;
166     })->feed($data);
167     }
168     } else {
169     slog 8, "<<< :$port <$len>\n";
170    
171     if ($len == 0) {
172     snd_quit;
173     last;
174     } elsif ($len == 1) {
175     # nop
176     } else {
177     die "illegal control code received";
178     }
179     }
180     }
181     };
182    
183     slog 0, "ERROR: $@" if $@;
184     slog 9, "unlooping\n";#d#
185     unloop;
186     };
187    
188     loop;
189     }
190    
191     1;
192