… | |
… | |
6 | use Coro::Event; |
6 | use Coro::Event; |
7 | use Coro::Channel; |
7 | use Coro::Channel; |
8 | |
8 | |
9 | BEGIN { *slog = \&::slog } |
9 | BEGIN { *slog = \&::slog } |
10 | |
10 | |
11 | $MAXMSG = 1024; |
|
|
12 | |
|
|
13 | use constant MAXMSG => 1024; |
11 | use constant MAXMSG => 1024; |
14 | use constant MAXQ => 10; |
12 | use constant MAXQ => 10; |
|
|
13 | |
|
|
14 | use constant PROTVERSION => 1; |
15 | |
15 | |
16 | my @send; |
16 | my @send; |
17 | my $send_full = new Coro::Signal; |
17 | my $send_full = new Coro::Signal; |
18 | my $send_empty = new Coro::Signal; |
18 | my $send_empty = new Coro::Signal; |
19 | |
19 | |
20 | my @port; |
20 | my @port; |
21 | my $port; |
21 | my $port; |
22 | |
22 | |
23 | sub dumpstr { |
23 | sub dumpstr { |
24 | local $_ = shift; |
24 | local $_ = $_[0]; |
25 | s/([^\x20-\x7f\xa0-\xff])/sprintf "\\x{%02x}", ord $1/ge; |
25 | s/([^\x20-\x7f\xa0-\xff])/sprintf "\\x{%02x}", ord $1/ge; |
26 | $_ = "\"$_\""; |
26 | $_ = "\"$_\""; |
27 | (substr $_, 60) = "\"..." if length($_) > 60; |
27 | (substr $_, 60) = "\"..." if length($_) > 60; |
28 | $_; |
28 | $_; |
29 | } |
29 | } |
30 | |
30 | |
31 | sub new_passive { |
31 | # catch (accept) an existing vc |
|
|
32 | sub catch { |
32 | my $class = shift; |
33 | my $class = shift; |
33 | my $self = bless { |
34 | my $self = bless { |
34 | rcv => (new Coro::Channel 100), |
35 | rcv => (new Coro::Channel 1e9), |
35 | @_, |
36 | @_, |
36 | }, $class; |
37 | }, $class; |
37 | |
38 | |
38 | weaken ($port[$self->{port}] = $self); |
39 | weaken ($port[$self->{port}] = $self); |
39 | $self; |
40 | $self; |
40 | } |
41 | } |
41 | |
42 | |
42 | sub new { |
43 | # create (connect) a new vc |
|
|
44 | sub create { |
43 | my $class = shift; |
45 | my $class = shift; |
44 | my $self = $class->new_passive(port => ($port += 2), @_); |
46 | my $self = $class->catch(port => ($port += 2), @_); |
45 | |
47 | |
46 | $self->snd("pri", $self->{pri}) if self->{pri}; |
48 | $self->snd("pri", $self->{pri}) if $self->{pri}; |
47 | |
49 | |
48 | $self; |
50 | $self; |
49 | } |
51 | } |
50 | |
52 | |
51 | sub DESTROY { |
53 | sub DESTROY { |
… | |
… | |
65 | my $self = shift; |
67 | my $self = shift; |
66 | $self->{pri} = $_[0]; |
68 | $self->{pri} = $_[0]; |
67 | $self->snd("pri", $_[0]); |
69 | $self->snd("pri", $_[0]); |
68 | } |
70 | } |
69 | |
71 | |
|
|
72 | sub _snd { |
|
|
73 | my $self = shift; |
|
|
74 | |
|
|
75 | slog 8, ">>> :$self->{port} ".dumpstr($_[0])."\n"; #d# |
|
|
76 | push @{$send[$self->{pri}]}, |
|
|
77 | pack "nna*", $self->{port}, length $_[0], $_[0]; |
|
|
78 | |
|
|
79 | } |
|
|
80 | |
|
|
81 | sub _rcv { |
|
|
82 | $_[0]{port} ? $_[0]{rcv}->get : undef; |
|
|
83 | } |
|
|
84 | |
70 | sub snd { |
85 | sub snd { |
71 | my $self = shift; |
86 | my $self = shift; |
|
|
87 | my ($length, $ofs); |
72 | |
88 | |
73 | $send_empty->wait while @send > MAXQ; |
89 | $send_empty->wait while @send > MAXQ; |
74 | |
|
|
75 | for (@_) { |
90 | for (@_) { |
76 | slog 8, ">>> :$vc->{port} ".dumpstr($_)."\n"; #d# |
91 | if (length && MAXMSG >= length) { |
77 | push @{$send[$self->{pri}]}, |
92 | $self->_snd($_); |
78 | pack "nna*", $self->{port}, length $_, $_; |
93 | } else { |
|
|
94 | $self->_snd(""); |
|
|
95 | for (my $ofs = 0; $ofs < length; $ofs += MAXMSG) { |
|
|
96 | $send_empty->wait while @send > MAXQ; |
|
|
97 | $self->_snd(substr $_, $ofs, MAXMSG); |
|
|
98 | } |
|
|
99 | $self->_snd(""); |
|
|
100 | } |
79 | } |
101 | } |
80 | |
|
|
81 | $send_full->send; |
102 | $send_full->send; |
82 | } |
103 | } |
83 | |
104 | |
84 | sub rcv { |
105 | sub rcv { |
85 | $_[0]{port} ? $_[0]{rcv}->get : undef; |
106 | my $self = shift; |
|
|
107 | my $data = $self->_rcv; |
|
|
108 | |
|
|
109 | unless (length $data) { |
|
|
110 | my $blk; |
|
|
111 | $data .= $blk while length ($blk = $self->_rcv); |
|
|
112 | } |
|
|
113 | |
|
|
114 | $data; |
86 | } |
115 | } |
87 | |
116 | |
88 | sub feed { |
117 | sub feed { |
89 | $_[0]{rcv}->put($_[1]); |
118 | $_[0]{rcv}->put($_[1]); |
90 | } |
119 | } |
… | |
… | |
160 | or die "unexpected read error: $!"; |
189 | or die "unexpected read error: $!"; |
161 | |
190 | |
162 | slog 8, "<<< :$port ".dumpstr($data)."\n"; #d# |
191 | slog 8, "<<< :$port ".dumpstr($data)."\n"; #d# |
163 | |
192 | |
164 | ($port[$port] ||= do { |
193 | ($port[$port] ||= do { |
165 | my $vc = new_passive vc port => $port; |
194 | my $vc = catch vc port => $port; |
166 | async \&::serve, $vc; |
195 | async \&::serve, $vc; |
167 | $vc; |
196 | $vc; |
168 | })->feed($data); |
197 | })->feed($data); |
169 | } |
198 | } |
170 | } else { |
199 | } else { |