… | |
… | |
300 | $self->{hdl}->rtimeout (undef); |
300 | $self->{hdl}->rtimeout (undef); |
301 | |
301 | |
302 | $self->{remote_greeting}{untrusted} = 1 |
302 | $self->{remote_greeting}{untrusted} = 1 |
303 | if $auth_method eq "tls_anon"; |
303 | if $auth_method eq "tls_anon"; |
304 | |
304 | |
305 | my $push_write = $hdl->can ("push_write"); |
|
|
306 | |
|
|
307 | $self->{send} = sub { |
|
|
308 | $push_write->($hdl, $s_framing => $_[0]); |
|
|
309 | }; |
|
|
310 | |
|
|
311 | if ($protocol eq "aemp" and $self->{hdl}) { |
305 | if ($protocol eq "aemp" and $self->{hdl}) { |
312 | # listener-less node need to continuously probe |
306 | # listener-less node need to continuously probe |
313 | unless (@$AnyEvent::MP::Kernel::LISTENER) { |
307 | unless (@$AnyEvent::MP::Kernel::LISTENER) { |
314 | $self->{hdl}->wtimeout ($timeout); |
308 | $self->{hdl}->wtimeout ($timeout); |
315 | $self->{hdl}->on_wtimeout (sub { $self->send ([]) }); |
309 | $self->{hdl}->on_wtimeout (sub { $self->send ([]) }); |
… | |
… | |
317 | |
311 | |
318 | # receive handling |
312 | # receive handling |
319 | my $src_node = $self->{node}; |
313 | my $src_node = $self->{node}; |
320 | Scalar::Util::weaken $src_node; |
314 | Scalar::Util::weaken $src_node; |
321 | |
315 | |
|
|
316 | # optimisation |
|
|
317 | my $push_write = $hdl->can ("push_write"); |
|
|
318 | my $push_read = $hdl->can ("push_read"); |
|
|
319 | |
322 | if ($r_framing eq "\njsonxyz") {#d# |
320 | if ($r_framing eq "json") { |
|
|
321 | my $coder = JSON::XS->new->utf8; |
|
|
322 | |
|
|
323 | $self->{send} = sub { |
|
|
324 | $push_write->($hdl, JSON::XS::encode_json $_[0]); |
|
|
325 | }; |
|
|
326 | |
|
|
327 | $hdl->on_read (sub { |
|
|
328 | local $AnyEvent::MP::Kernel::SRCNODE = $src_node; |
|
|
329 | |
|
|
330 | AnyEvent::MP::Kernel::_inject (@$_) |
|
|
331 | for $coder->incr_parse (delete $_[0]{rbuf}); |
|
|
332 | |
|
|
333 | () |
|
|
334 | }); |
323 | } else { |
335 | } else { |
324 | my $push_read = $hdl->can ("push_read"); |
336 | $self->{send} = sub { |
|
|
337 | $push_write->($hdl, $s_framing => $_[0]); |
|
|
338 | }; |
325 | |
339 | |
326 | my $rmsg; $rmsg = $self->{rmsg} = sub { |
340 | my $rmsg; $rmsg = $self->{rmsg} = sub { |
327 | $push_read->($_[0], $r_framing => $rmsg); |
341 | $push_read->($_[0], $r_framing => $rmsg); |
328 | |
342 | |
329 | local $AnyEvent::MP::Kernel::SRCNODE = $src_node; |
343 | local $AnyEvent::MP::Kernel::SRCNODE = $src_node; |