… | |
… | |
308 | $self->{hdl}->wtimeout ($timeout); |
308 | $self->{hdl}->wtimeout ($timeout); |
309 | $self->{hdl}->on_wtimeout (sub { $self->send ([]) }); |
309 | $self->{hdl}->on_wtimeout (sub { $self->send ([]) }); |
310 | } |
310 | } |
311 | |
311 | |
312 | # receive handling |
312 | # receive handling |
313 | my $src_node = $self->{node}; |
|
|
314 | Scalar::Util::weaken $src_node; |
|
|
315 | |
313 | |
316 | # optimisation |
|
|
317 | my $push_write = $hdl->can ("push_write"); |
314 | my $push_write = $hdl->can ("push_write"); |
318 | my $push_read = $hdl->can ("push_read"); |
315 | my $push_read = $hdl->can ("push_read"); |
319 | |
316 | |
320 | if ($s_framing eq "json") { |
317 | if ($s_framing eq "json") { |
321 | $self->{send} = sub { |
318 | $self->{send} = sub { |
… | |
… | |
329 | |
326 | |
330 | if ($r_framing eq "json") { |
327 | if ($r_framing eq "json") { |
331 | my $coder = JSON::XS->new->utf8; |
328 | my $coder = JSON::XS->new->utf8; |
332 | |
329 | |
333 | $hdl->on_read (sub { |
330 | $hdl->on_read (sub { |
334 | local $AnyEvent::MP::Kernel::SRCNODE = $src_node; |
331 | local $AnyEvent::MP::Kernel::SRCNODE = $self->{node}; |
335 | |
332 | |
336 | AnyEvent::MP::Kernel::_inject (@$_) |
333 | AnyEvent::MP::Kernel::_inject (@$_) |
337 | for $coder->incr_parse (delete $_[0]{rbuf}); |
334 | for $coder->incr_parse (delete $_[0]{rbuf}); |
338 | |
335 | |
339 | () |
336 | () |
340 | }); |
337 | }); |
341 | } else { |
338 | } else { |
342 | my $rmsg; $rmsg = $self->{rmsg} = sub { |
339 | my $rmsg; $rmsg = $self->{rmsg} = sub { |
343 | $push_read->($_[0], $r_framing => $rmsg); |
340 | $push_read->($_[0], $r_framing => $rmsg); |
344 | |
341 | |
345 | local $AnyEvent::MP::Kernel::SRCNODE = $src_node; |
342 | local $AnyEvent::MP::Kernel::SRCNODE = $self->{node}; |
346 | AnyEvent::MP::Kernel::_inject (@{ $_[1] }); |
343 | AnyEvent::MP::Kernel::_inject (@{ $_[1] }); |
347 | }; |
344 | }; |
348 | eval { |
345 | eval { |
349 | $push_read->($_[0], $r_framing => $rmsg); |
346 | $push_read->($_[0], $r_framing => $rmsg); |
350 | }; |
347 | }; |