… | |
… | |
6 | use Coro::Event; |
6 | use Coro::Event; |
7 | use Coro::Channel; |
7 | use Coro::Channel; |
8 | |
8 | |
9 | BEGIN { *slog = \&::slog } |
9 | BEGIN { *slog = \&::slog } |
10 | |
10 | |
11 | $MAXMSG = 1024; |
|
|
12 | |
|
|
13 | use constant MAXMSG => 1024; |
11 | use constant MAXMSG => 1024; |
14 | use constant MAXQ => 10; |
12 | use constant MAXQ => 10; |
|
|
13 | |
|
|
14 | use constant PROTVERSION => 1; |
15 | |
15 | |
16 | my @send; |
16 | my @send; |
17 | my $send_full = new Coro::Signal; |
17 | my $send_full = new Coro::Signal; |
18 | my $send_empty = new Coro::Signal; |
18 | my $send_empty = new Coro::Signal; |
19 | |
19 | |
20 | my @port; |
20 | my @port; |
21 | my $port; |
21 | my $port; |
22 | |
22 | |
23 | sub dumpstr { |
23 | sub dumpstr { |
24 | local $_ = shift; |
24 | local $_ = $_[0]; |
25 | s/([^\x20-\x7f\xa0-\xff])/sprintf "\\x{%02x}", ord $1/ge; |
25 | s/([^\x20-\x7f\xa0-\xff])/sprintf "\\x{%02x}", ord $1/ge; |
26 | $_ = "\"$_\""; |
26 | $_ = "\"$_\""; |
27 | (substr $_, 60) = "\"..." if length($_) > 60; |
27 | (substr $_, 60) = "\"..." if length($_) > 60; |
28 | $_; |
28 | $_; |
29 | } |
29 | } |
30 | |
30 | |
31 | sub new_passive { |
31 | # catch (accept) an existing vc |
|
|
32 | sub catch { |
32 | my $class = shift; |
33 | my $class = shift; |
33 | my $self = bless { |
34 | my $self = bless { |
34 | rcv => (new Coro::Channel 100), |
35 | rcv => (new Coro::Channel 1e9), |
35 | @_, |
36 | @_, |
36 | }, $class; |
37 | }, $class; |
37 | |
38 | |
38 | weaken ($port[$self->{port}] = $self); |
39 | weaken ($port[$self->{port}] = $self); |
39 | $self; |
40 | $self; |
40 | } |
41 | } |
41 | |
42 | |
42 | sub new { |
43 | # create (connect) a new vc |
|
|
44 | sub create { |
43 | my $class = shift; |
45 | my $class = shift; |
44 | my $self = $class->new_passive(port => ($port += 2), @_); |
46 | my $self = $class->catch(port => ($port += 2), @_); |
45 | |
|
|
46 | $self->snd("pri", $self->{pri}) if self->{pri}; |
|
|
47 | |
47 | |
48 | $self; |
48 | $self; |
49 | } |
49 | } |
50 | |
50 | |
51 | sub DESTROY { |
51 | sub DESTROY { |
… | |
… | |
54 | |
54 | |
55 | sub close { |
55 | sub close { |
56 | my $self = shift; |
56 | my $self = shift; |
57 | |
57 | |
58 | if ($self->{port}) { |
58 | if ($self->{port}) { |
|
|
59 | slog 0, "closing port $self->{port}\n";#d# |
59 | push @{$send[0]}, pack "nn", $self->{port}, 65535; |
60 | push @{$send[0]}, pack "nn", $self->{port}, 65535; |
|
|
61 | $send_full->send; |
60 | delete $self->{port}; |
62 | delete $self->{port}; |
61 | } |
63 | } |
62 | } |
64 | } |
63 | |
65 | |
64 | sub pri { |
66 | sub pri { |
65 | my $self = shift; |
67 | my $self = shift; |
66 | $self->{pri} = $_[0]; |
68 | $self->{pri} = $_[0]; |
67 | $self->snd("pri", $_[0]); |
69 | } |
|
|
70 | |
|
|
71 | sub _snd { |
|
|
72 | my $self = shift; |
|
|
73 | |
|
|
74 | slog 8, ">>> :$self->{port} ".dumpstr($_[0])."\n"; #d# |
|
|
75 | push @{$send[$self->{pri}]}, |
|
|
76 | pack "nna*", $self->{port}, length $_[0], $_[0]; |
|
|
77 | |
|
|
78 | } |
|
|
79 | |
|
|
80 | sub _rcv { |
|
|
81 | $_[0]{port} ? $_[0]{rcv}->get : undef; |
68 | } |
82 | } |
69 | |
83 | |
70 | sub snd { |
84 | sub snd { |
71 | my $self = shift; |
85 | my $self = shift; |
|
|
86 | my ($length, $ofs); |
72 | |
87 | |
73 | $send_empty->wait while @send > MAXQ; |
88 | $send_empty->wait while @send > MAXQ; |
74 | |
|
|
75 | for (@_) { |
89 | for (@_) { |
76 | slog 8, ">>> :$vc->{port} ".dumpstr($_)."\n"; #d# |
90 | if (length && MAXMSG >= length) { |
77 | push @{$send[$self->{pri}]}, |
91 | $self->_snd($_); |
78 | pack "nna*", $self->{port}, length $_, $_; |
92 | } else { |
|
|
93 | $self->_snd(""); |
|
|
94 | for (my $ofs = 0; $ofs < length; $ofs += MAXMSG) { |
|
|
95 | $send_empty->wait while @send > MAXQ; |
|
|
96 | $self->_snd(substr $_, $ofs, MAXMSG); |
|
|
97 | } |
|
|
98 | $self->_snd(""); |
|
|
99 | } |
79 | } |
100 | } |
80 | |
|
|
81 | $send_full->send; |
101 | $send_full->send; |
82 | } |
102 | } |
83 | |
103 | |
84 | sub rcv { |
104 | sub rcv { |
85 | $_[0]{port} ? $_[0]{rcv}->get : undef; |
105 | my $self = shift; |
|
|
106 | my $data = $self->_rcv; |
|
|
107 | |
|
|
108 | unless (length $data) { |
|
|
109 | my $blk; |
|
|
110 | $data .= $blk while length ($blk = $self->_rcv); |
|
|
111 | } |
|
|
112 | |
|
|
113 | $data; |
86 | } |
114 | } |
87 | |
115 | |
88 | sub feed { |
116 | sub feed { |
89 | $_[0]{rcv}->put($_[1]); |
117 | $_[0]{rcv}->put($_[1]); |
90 | } |
118 | } |
… | |
… | |
159 | $fh->sysread($data, $len) == $len |
187 | $fh->sysread($data, $len) == $len |
160 | or die "unexpected read error: $!"; |
188 | or die "unexpected read error: $!"; |
161 | |
189 | |
162 | slog 8, "<<< :$port ".dumpstr($data)."\n"; #d# |
190 | slog 8, "<<< :$port ".dumpstr($data)."\n"; #d# |
163 | |
191 | |
164 | ($port[$port] ||= do { |
192 | unless ($port[$port]) { |
165 | my $vc = new_passive vc port => $port; |
|
|
166 | async \&::serve, $vc; |
193 | async \&::serve, catch vc port => $port; |
167 | $vc; |
194 | } |
|
|
195 | |
168 | })->feed($data); |
196 | $port[$port]->feed($data); |
169 | } |
197 | } |
170 | } else { |
198 | } else { |
171 | slog 8, "<<< :$port <$len>\n"; |
199 | slog 8, "<<< :$port <$len>\n"; |
172 | |
200 | |
173 | if ($len == 0) { |
201 | if ($len == 0) { |