… | |
… | |
116 | }, $class; |
116 | }, $class; |
117 | |
117 | |
118 | { |
118 | { |
119 | Scalar::Util::weaken (my $self = $self); |
119 | Scalar::Util::weaken (my $self = $self); |
120 | |
120 | |
|
|
121 | our $ENDMESSAGE = qr<\012(EndMessage|Data)\012>; |
|
|
122 | |
|
|
123 | # these are declared here for performance reasons |
|
|
124 | my ($k, $v, $type); |
|
|
125 | my $rdata; |
|
|
126 | |
|
|
127 | my $on_read = sub { |
|
|
128 | my ($hdl) = @_; |
|
|
129 | |
|
|
130 | # we only carve out whole messages here |
|
|
131 | while ($hdl->{rbuf} =~ /\012(EndMessage|Data)\012/) { |
|
|
132 | # remember end marker |
|
|
133 | $rdata = $1 eq "Data" |
|
|
134 | or $1 eq "EndMessage" |
|
|
135 | or die "protocol error, expected message end, got $1\n"; |
|
|
136 | |
|
|
137 | my @lines = split /\012/, substr $hdl->{rbuf}, 0, $-[0]; |
|
|
138 | |
|
|
139 | substr $hdl->{rbuf}, 0, $+[0], ""; # remove pkg |
|
|
140 | |
|
|
141 | $type = shift @lines; |
|
|
142 | $type = ($TOLC{$type} ||= tolc $type); |
|
|
143 | |
|
|
144 | my %kv; |
|
|
145 | |
|
|
146 | for (@lines) { |
|
|
147 | ($k, $v) = split /=/, $_, 2; |
|
|
148 | $k = ($TOLC{$k} ||= tolc $k); |
|
|
149 | |
|
|
150 | if ($k =~ /\./) { |
|
|
151 | # generic, slow case |
|
|
152 | my @k = split /\./, $k; |
|
|
153 | my $ro = \\%kv; |
|
|
154 | |
|
|
155 | while (@k) { |
|
|
156 | $k = shift @k; |
|
|
157 | if ($k =~ /^\d+$/) { |
|
|
158 | $ro = \$$ro->[$k]; |
|
|
159 | } else { |
|
|
160 | $ro = \$$ro->{$k}; |
|
|
161 | } |
|
|
162 | } |
|
|
163 | |
|
|
164 | $$ro = $v; |
|
|
165 | |
|
|
166 | next; |
|
|
167 | } |
|
|
168 | |
|
|
169 | # special comon case, for performance only |
|
|
170 | $kv{$k} = $v; |
|
|
171 | } |
|
|
172 | |
|
|
173 | if ($rdata) { |
|
|
174 | $_[0]->push_read (chunk => delete $kv{data_length}, sub { |
|
|
175 | $rdata = \$_[1]; |
|
|
176 | $self->recv ($type, \%kv, $rdata); |
|
|
177 | }); |
|
|
178 | |
|
|
179 | last; # do not tgry to parse more messages |
|
|
180 | } else { |
|
|
181 | $self->recv ($type, \%kv); |
|
|
182 | } |
|
|
183 | } |
|
|
184 | }; |
|
|
185 | |
121 | $self->{hdl} = new AnyEvent::Handle |
186 | $self->{hdl} = new AnyEvent::Handle |
122 | connect => [$self->{host} => $self->{port}], |
187 | connect => [$self->{host} => $self->{port}], |
123 | timeout => $self->{timeout}, |
188 | timeout => $self->{timeout}, |
124 | on_error => sub { |
189 | on_error => sub { |
125 | warn "@_\n";#d# |
190 | warn "@_\n";#d# |
126 | exit 1; |
191 | exit 1; |
127 | }, |
192 | }, |
128 | on_read => sub { $self->on_read (@_) }, |
193 | on_read => $on_read, |
129 | on_eof => $self->{on_eof} || sub { }; |
194 | on_eof => $self->{on_eof} || sub { }, |
|
|
195 | ; |
130 | |
196 | |
131 | Scalar::Util::weaken ($self->{hdl}{fcp} = $self); |
197 | Scalar::Util::weaken ($self->{hdl}{fcp} = $self); |
132 | } |
198 | } |
133 | |
199 | |
134 | $self->send_msg (client_hello => |
200 | $self->send_msg (client_hello => |
… | |
… | |
251 | $cb->($self, $type, $kv, @extra) |
317 | $cb->($self, $type, $kv, @extra) |
252 | and shift @{ $self->{queue} }; |
318 | and shift @{ $self->{queue} }; |
253 | } else { |
319 | } else { |
254 | $self->default_recv ($type, $kv, @extra); |
320 | $self->default_recv ($type, $kv, @extra); |
255 | } |
321 | } |
256 | } |
|
|
257 | |
|
|
258 | sub on_read { |
|
|
259 | my ($self) = @_; |
|
|
260 | |
|
|
261 | my ($k, $v, $type); |
|
|
262 | my %kv; |
|
|
263 | my $rdata; |
|
|
264 | |
|
|
265 | my $hdr_cb; $hdr_cb = sub { |
|
|
266 | if (($v = index $_[1], "=") >= 0) { |
|
|
267 | $k = substr $_[1], 0, $v; |
|
|
268 | $v = substr $_[1], $v + 1; |
|
|
269 | $k = ($TOLC{$k} ||= tolc $k); |
|
|
270 | |
|
|
271 | if ($k !~ /\./) { |
|
|
272 | # special case common case, for performance only |
|
|
273 | $kv{$k} = $v; |
|
|
274 | } else { |
|
|
275 | my @k = split /\./, $k; |
|
|
276 | my $ro = \\%kv; |
|
|
277 | |
|
|
278 | while (@k) { |
|
|
279 | $k = shift @k; |
|
|
280 | if ($k =~ /^\d+$/) { |
|
|
281 | $ro = \$$ro->[$k]; |
|
|
282 | } else { |
|
|
283 | $ro = \$$ro->{$k}; |
|
|
284 | } |
|
|
285 | } |
|
|
286 | |
|
|
287 | $$ro = $v; |
|
|
288 | } |
|
|
289 | |
|
|
290 | $_[0]->push_read (line => $hdr_cb); |
|
|
291 | } elsif ($_[1] eq "Data") { |
|
|
292 | $_[0]->push_read (chunk => delete $kv{data_length}, sub { |
|
|
293 | $rdata = \$_[1]; |
|
|
294 | $self->recv ($type, \%kv, $rdata); |
|
|
295 | }); |
|
|
296 | } elsif ($_[1] eq "EndMessage") { |
|
|
297 | $self->recv ($type, \%kv); |
|
|
298 | } else { |
|
|
299 | die "protocol error, expected message end, got $_[1]\n";#d# |
|
|
300 | } |
|
|
301 | }; |
|
|
302 | |
|
|
303 | $self->{hdl}->push_read (line => sub { |
|
|
304 | $type = ($TOLC{$_[1]} ||= tolc $_[1]); |
|
|
305 | $_[0]->push_read (line => $hdr_cb); |
|
|
306 | }); |
|
|
307 | } |
322 | } |
308 | |
323 | |
309 | sub default_recv { |
324 | sub default_recv { |
310 | my ($self, $type, $kv, $rdata) = @_; |
325 | my ($self, $type, $kv, $rdata) = @_; |
311 | |
326 | |