… | |
… | |
9 | BEGIN { *slog = \&::slog } |
9 | BEGIN { *slog = \&::slog } |
10 | |
10 | |
11 | use constant MAXMSG => 1024; |
11 | use constant MAXMSG => 1024; |
12 | use constant MAXQ => 10; |
12 | use constant MAXQ => 10; |
13 | |
13 | |
|
|
14 | use constant PROTVERSION => 1; |
|
|
15 | |
14 | my @send; |
16 | my @send; |
15 | my $send_full = new Coro::Signal; |
17 | my $send_full = new Coro::Signal; |
16 | my $send_empty = new Coro::Signal; |
18 | my $send_empty = new Coro::Signal; |
17 | |
19 | |
18 | my @port; |
20 | my @port; |
19 | my $port; |
21 | my $port; |
20 | |
22 | |
21 | sub dumpstr { |
23 | sub dumpstr { |
22 | local $_ = shift; |
24 | local $_ = $_[0]; |
23 | s/([^\x20-\x7f\xa0-\xff])/sprintf "\\x{%02x}", ord $1/ge; |
25 | s/([^\x20-\x7f\xa0-\xff])/sprintf "\\x{%02x}", ord $1/ge; |
24 | $_ = "\"$_\""; |
26 | $_ = "\"$_\""; |
25 | (substr $_, 60) = "\"..." if length($_) > 60; |
27 | (substr $_, 60) = "\"..." if length($_) > 60; |
26 | $_; |
28 | $_; |
27 | } |
29 | } |
28 | |
30 | |
29 | sub new_passive { |
31 | # catch (accept) an existing vc |
|
|
32 | sub catch { |
30 | my $class = shift; |
33 | my $class = shift; |
31 | my $self = bless { |
34 | my $self = bless { |
32 | rcv => (new Coro::Channel 1e9), |
35 | rcv => (new Coro::Channel 1e9), |
33 | @_, |
36 | @_, |
34 | }, $class; |
37 | }, $class; |
35 | |
38 | |
36 | weaken ($port[$self->{port}] = $self); |
39 | weaken ($port[$self->{port}] = $self); |
37 | $self; |
40 | $self; |
38 | } |
41 | } |
39 | |
42 | |
40 | sub new { |
43 | # create (connect) a new vc |
|
|
44 | sub create { |
41 | my $class = shift; |
45 | my $class = shift; |
42 | my $self = $class->new_passive(port => ($port += 2), @_); |
46 | my $self = $class->catch(port => ($port += 2), @_); |
43 | |
47 | |
44 | $self->snd("pri", $self->{pri}) if self->{pri}; |
48 | $self->snd("pri", $self->{pri}) if $self->{pri}; |
45 | |
49 | |
46 | $self; |
50 | $self; |
47 | } |
51 | } |
48 | |
52 | |
49 | sub DESTROY { |
53 | sub DESTROY { |
… | |
… | |
63 | my $self = shift; |
67 | my $self = shift; |
64 | $self->{pri} = $_[0]; |
68 | $self->{pri} = $_[0]; |
65 | $self->snd("pri", $_[0]); |
69 | $self->snd("pri", $_[0]); |
66 | } |
70 | } |
67 | |
71 | |
|
|
72 | sub _snd { |
|
|
73 | my $self = shift; |
|
|
74 | |
|
|
75 | slog 8, ">>> :$self->{port} ".dumpstr($_[0])."\n"; #d# |
|
|
76 | push @{$send[$self->{pri}]}, |
|
|
77 | pack "nna*", $self->{port}, length $_[0], $_[0]; |
|
|
78 | |
|
|
79 | } |
|
|
80 | |
|
|
81 | sub _rcv { |
|
|
82 | $_[0]{port} ? $_[0]{rcv}->get : undef; |
|
|
83 | } |
|
|
84 | |
68 | sub snd { |
85 | sub snd { |
69 | my $self = shift; |
86 | my $self = shift; |
|
|
87 | my ($length, $ofs); |
70 | |
88 | |
71 | $send_empty->wait while @send > MAXQ; |
89 | $send_empty->wait while @send > MAXQ; |
72 | |
|
|
73 | for (@_) { |
90 | for (@_) { |
74 | slog 8, ">>> :$self->{port} ".dumpstr($_)."\n"; #d# |
91 | if (length && MAXMSG >= length) { |
75 | push @{$send[$self->{pri}]}, |
92 | $self->_snd($_); |
76 | pack "nna*", $self->{port}, length $_, $_; |
93 | } else { |
|
|
94 | $self->_snd(""); |
|
|
95 | for (my $ofs = 0; $ofs < length; $ofs += MAXMSG) { |
|
|
96 | $send_empty->wait while @send > MAXQ; |
|
|
97 | $self->_snd(substr $_, $ofs, MAXMSG); |
|
|
98 | } |
|
|
99 | $self->_snd(""); |
|
|
100 | } |
77 | } |
101 | } |
78 | |
|
|
79 | $send_full->send; |
102 | $send_full->send; |
80 | } |
103 | } |
81 | |
104 | |
82 | sub rcv { |
105 | sub rcv { |
83 | $_[0]{port} ? $_[0]{rcv}->get : undef; |
106 | my $self = shift; |
|
|
107 | my $data = $self->_rcv; |
|
|
108 | |
|
|
109 | unless (length $data) { |
|
|
110 | my $blk; |
|
|
111 | $data .= $blk while length ($blk = $self->_rcv); |
|
|
112 | } |
|
|
113 | |
|
|
114 | $data; |
84 | } |
115 | } |
85 | |
116 | |
86 | sub feed { |
117 | sub feed { |
87 | $_[0]{rcv}->put($_[1]); |
118 | $_[0]{rcv}->put($_[1]); |
88 | } |
119 | } |
… | |
… | |
158 | or die "unexpected read error: $!"; |
189 | or die "unexpected read error: $!"; |
159 | |
190 | |
160 | slog 8, "<<< :$port ".dumpstr($data)."\n"; #d# |
191 | slog 8, "<<< :$port ".dumpstr($data)."\n"; #d# |
161 | |
192 | |
162 | ($port[$port] ||= do { |
193 | ($port[$port] ||= do { |
163 | my $vc = new_passive vc port => $port; |
194 | my $vc = catch vc port => $port; |
164 | async \&::serve, $vc; |
195 | async \&::serve, $vc; |
165 | $vc; |
196 | $vc; |
166 | })->feed($data); |
197 | })->feed($data); |
167 | } |
198 | } |
168 | } else { |
199 | } else { |