… | |
… | |
300 | $self->{hdl}->rtimeout (undef); |
300 | $self->{hdl}->rtimeout (undef); |
301 | |
301 | |
302 | $self->{remote_greeting}{untrusted} = 1 |
302 | $self->{remote_greeting}{untrusted} = 1 |
303 | if $auth_method eq "tls_anon"; |
303 | if $auth_method eq "tls_anon"; |
304 | |
304 | |
305 | $self->connected; |
305 | my $push_write = $hdl->can ("push_write"); |
|
|
306 | |
|
|
307 | $self->{send} = sub { |
|
|
308 | $push_write->($hdl, $s_framing => $_[0]); |
|
|
309 | }; |
306 | |
310 | |
307 | if ($protocol eq "aemp" and $self->{hdl}) { |
311 | if ($protocol eq "aemp" and $self->{hdl}) { |
308 | # listener-less node need to continuously probe |
312 | # listener-less node need to continuously probe |
309 | unless (@$AnyEvent::MP::Kernel::LISTENER) { |
313 | unless (@$AnyEvent::MP::Kernel::LISTENER) { |
310 | $self->{hdl}->wtimeout ($timeout); |
314 | $self->{hdl}->wtimeout ($timeout); |
… | |
… | |
315 | my $src_node = $self->{node}; |
319 | my $src_node = $self->{node}; |
316 | Scalar::Util::weaken $src_node; |
320 | Scalar::Util::weaken $src_node; |
317 | |
321 | |
318 | if ($r_framing eq "\njsonxyz") {#d# |
322 | if ($r_framing eq "\njsonxyz") {#d# |
319 | } else { |
323 | } else { |
|
|
324 | my $push_read = $hdl->can ("push_read"); |
|
|
325 | |
320 | my $rmsg; $rmsg = $self->{rmsg} = sub { |
326 | my $rmsg; $rmsg = $self->{rmsg} = sub { |
321 | $_[0]->push_read ($r_framing => $rmsg); |
327 | $push_read->($_[0], $r_framing => $rmsg); |
322 | |
328 | |
323 | local $AnyEvent::MP::Kernel::SRCNODE = $src_node; |
329 | local $AnyEvent::MP::Kernel::SRCNODE = $src_node; |
324 | AnyEvent::MP::Kernel::_inject (@{ $_[1] }); |
330 | AnyEvent::MP::Kernel::_inject (@{ $_[1] }); |
325 | }; |
331 | }; |
326 | eval { |
332 | eval { |
327 | $hdl->push_read ($r_framing => $rmsg); |
333 | $push_read->($_[0], $r_framing => $rmsg); |
328 | }; |
334 | }; |
329 | Scalar::Util::weaken $rmsg; |
335 | Scalar::Util::weaken $rmsg; |
330 | return $self->error ("$r_framing: unusable remote framing") |
336 | return $self->error ("$r_framing: unusable remote framing") |
331 | if $@; |
337 | if $@; |
332 | } |
338 | } |
333 | } |
339 | } |
|
|
340 | |
|
|
341 | $self->connected; |
334 | }); |
342 | }); |
335 | }); |
343 | }); |
336 | }); |
344 | }); |
337 | } |
345 | } |
338 | |
346 | |
… | |
… | |
380 | |
388 | |
381 | (delete $self->{release})->() |
389 | (delete $self->{release})->() |
382 | if exists $self->{release}; |
390 | if exists $self->{release}; |
383 | } |
391 | } |
384 | |
392 | |
385 | sub send { |
|
|
386 | $_[0]{hdl}->push_write ($_[0]{s_framing} => $_[1]); |
|
|
387 | } |
|
|
388 | |
|
|
389 | sub destroy { |
393 | sub destroy { |
390 | my ($self) = @_; |
394 | my ($self) = @_; |
391 | |
395 | |
392 | (delete $self->{release})->() |
396 | (delete $self->{release})->() |
393 | if exists $self->{release}; |
397 | if exists $self->{release}; |