… | |
… | |
315 | |
315 | |
316 | # optimisation |
316 | # optimisation |
317 | my $push_write = $hdl->can ("push_write"); |
317 | my $push_write = $hdl->can ("push_write"); |
318 | my $push_read = $hdl->can ("push_read"); |
318 | my $push_read = $hdl->can ("push_read"); |
319 | |
319 | |
320 | if ($r_framing eq "json") { |
320 | if ($s_framing eq "json") { |
321 | my $coder = JSON::XS->new->utf8; |
|
|
322 | |
|
|
323 | $self->{send} = sub { |
321 | $self->{send} = sub { |
324 | $push_write->($hdl, JSON::XS::encode_json $_[0]); |
322 | $push_write->($hdl, JSON::XS::encode_json $_[0]); |
325 | }; |
323 | }; |
326 | |
|
|
327 | $hdl->on_read (sub { |
|
|
328 | local $AnyEvent::MP::Kernel::SRCNODE = $src_node; |
|
|
329 | |
|
|
330 | AnyEvent::MP::Kernel::_inject (@$_) |
|
|
331 | for $coder->incr_parse (delete $_[0]{rbuf}); |
|
|
332 | |
|
|
333 | () |
|
|
334 | }); |
|
|
335 | } else { |
324 | } else { |
336 | $self->{send} = sub { |
325 | $self->{send} = sub { |
337 | $push_write->($hdl, $s_framing => $_[0]); |
326 | $push_write->($hdl, $s_framing => $_[0]); |
338 | }; |
327 | }; |
|
|
328 | } |
339 | |
329 | |
|
|
330 | if ($r_framing eq "json") { |
|
|
331 | my $coder = JSON::XS->new->utf8; |
|
|
332 | |
|
|
333 | $hdl->on_read (sub { |
|
|
334 | local $AnyEvent::MP::Kernel::SRCNODE = $src_node; |
|
|
335 | |
|
|
336 | AnyEvent::MP::Kernel::_inject (@$_) |
|
|
337 | for $coder->incr_parse (delete $_[0]{rbuf}); |
|
|
338 | |
|
|
339 | () |
|
|
340 | }); |
|
|
341 | } else { |
340 | my $rmsg; $rmsg = $self->{rmsg} = sub { |
342 | my $rmsg; $rmsg = $self->{rmsg} = sub { |
341 | $push_read->($_[0], $r_framing => $rmsg); |
343 | $push_read->($_[0], $r_framing => $rmsg); |
342 | |
344 | |
343 | local $AnyEvent::MP::Kernel::SRCNODE = $src_node; |
345 | local $AnyEvent::MP::Kernel::SRCNODE = $src_node; |
344 | AnyEvent::MP::Kernel::_inject (@{ $_[1] }); |
346 | AnyEvent::MP::Kernel::_inject (@{ $_[1] }); |