… | |
… | |
300 | |
300 | |
301 | if ($rauth2 ne $rauth) { |
301 | if ($rauth2 ne $rauth) { |
302 | return $self->error ("authentication failure/shared secret mismatch"); |
302 | return $self->error ("authentication failure/shared secret mismatch"); |
303 | } |
303 | } |
304 | |
304 | |
|
|
305 | $self->{r_framing} = $r_framing; |
305 | $self->{s_framing} = $s_framing; |
306 | $self->{s_framing} = $s_framing; |
306 | |
307 | |
307 | $hdl->rbuf_max (undef); |
308 | $hdl->rbuf_max (undef); |
308 | |
309 | |
309 | # we rely on TCP retransmit timeouts and keepalives |
310 | # we rely on TCP retransmit timeouts and keepalives |
… | |
… | |
318 | # $self->{hdl}->wtimeout ($timeout); |
319 | # $self->{hdl}->wtimeout ($timeout); |
319 | # $self->{hdl}->on_wtimeout (sub { $self->{send}->([]) }); |
320 | # $self->{hdl}->on_wtimeout (sub { $self->{send}->([]) }); |
320 | # } |
321 | # } |
321 | |
322 | |
322 | # receive handling |
323 | # receive handling |
323 | |
324 | $self->set_snd_framing; |
324 | my $push_write = $hdl->can ("push_write"); |
325 | $self->set_rcv_framing; |
325 | my $push_read = $hdl->can ("push_read"); |
|
|
326 | |
|
|
327 | if ($s_framing eq "json") { |
|
|
328 | $self->{send} = sub { |
|
|
329 | $push_write->($hdl, JSON::XS::encode_json $_[0]); |
|
|
330 | }; |
|
|
331 | } else { |
|
|
332 | $self->{send} = sub { |
|
|
333 | $push_write->($hdl, $s_framing => $_[0]); |
|
|
334 | }; |
|
|
335 | } |
|
|
336 | |
|
|
337 | if ($r_framing eq "json") { |
|
|
338 | my $coder = JSON::XS->new->utf8; |
|
|
339 | |
|
|
340 | $hdl->on_read (sub { |
|
|
341 | local $AnyEvent::MP::Kernel::SRCNODE = $self->{node}; |
|
|
342 | |
|
|
343 | AnyEvent::MP::Kernel::_inject (@$_) |
|
|
344 | for $coder->incr_parse (delete $_[0]{rbuf}); |
|
|
345 | |
|
|
346 | () |
|
|
347 | }); |
|
|
348 | } else { |
|
|
349 | my $rmsg; $rmsg = $self->{rmsg} = sub { |
|
|
350 | $push_read->($_[0], $r_framing => $rmsg); |
|
|
351 | |
|
|
352 | local $AnyEvent::MP::Kernel::SRCNODE = $self->{node}; |
|
|
353 | AnyEvent::MP::Kernel::_inject (@{ $_[1] }); |
|
|
354 | }; |
|
|
355 | eval { |
|
|
356 | $push_read->($_[0], $r_framing => $rmsg); |
|
|
357 | }; |
|
|
358 | Scalar::Util::weaken $rmsg; |
|
|
359 | return $self->error ("$r_framing: unusable remote framing") |
|
|
360 | if $@; |
|
|
361 | } |
|
|
362 | } |
326 | } |
363 | |
327 | |
364 | $self->connected; |
328 | $self->connected; |
365 | }); |
329 | }); |
366 | }); |
330 | }); |
… | |
… | |
368 | } |
332 | } |
369 | |
333 | |
370 | $self |
334 | $self |
371 | } |
335 | } |
372 | |
336 | |
|
|
337 | sub set_snd_framing { |
|
|
338 | my ($self) = @_; |
|
|
339 | |
|
|
340 | my $framing = $self->{s_framing}; |
|
|
341 | my $hdl = $self->{hdl}; |
|
|
342 | my $push_write = $hdl->can ("push_write"); |
|
|
343 | |
|
|
344 | if ($framing eq "json") { |
|
|
345 | $self->{send} = sub { |
|
|
346 | $push_write->($hdl, JSON::XS::encode_json $_[0]); |
|
|
347 | }; |
|
|
348 | } else { |
|
|
349 | $self->{send} = sub { |
|
|
350 | $push_write->($hdl, $framing => $_[0]); |
|
|
351 | }; |
|
|
352 | } |
|
|
353 | } |
|
|
354 | |
|
|
355 | sub set_rcv_framing { |
|
|
356 | my ($self) = @_; |
|
|
357 | |
|
|
358 | my $node = $self->{remote_node}; |
|
|
359 | my $framing = $self->{r_framing}; |
|
|
360 | my $hdl = $self->{hdl}; |
|
|
361 | my $push_read = $hdl->can ("push_read"); |
|
|
362 | |
|
|
363 | if ($framing eq "json") { |
|
|
364 | my $coder = JSON::XS->new->utf8; |
|
|
365 | |
|
|
366 | $hdl->on_read (sub { |
|
|
367 | $AnyEvent::MP::Kernel::SRCNODE = $node; |
|
|
368 | |
|
|
369 | AnyEvent::MP::Kernel::_inject (@$_) |
|
|
370 | for $coder->incr_parse (delete $_[0]{rbuf}); |
|
|
371 | |
|
|
372 | () |
|
|
373 | }); |
|
|
374 | } else { |
|
|
375 | my $rmsg; $rmsg = $self->{rmsg} = sub { |
|
|
376 | $push_read->($_[0], $framing => $rmsg); |
|
|
377 | |
|
|
378 | $AnyEvent::MP::Kernel::SRCNODE = $node; |
|
|
379 | AnyEvent::MP::Kernel::_inject (@{ $_[1] }); |
|
|
380 | }; |
|
|
381 | eval { |
|
|
382 | $push_read->($hdl, $framing => $rmsg); |
|
|
383 | }; |
|
|
384 | Scalar::Util::weaken $rmsg; |
|
|
385 | return $self->error ("$framing: unusable remote framing") |
|
|
386 | if $@; |
|
|
387 | } |
|
|
388 | } |
|
|
389 | |
373 | sub error { |
390 | sub error { |
374 | my ($self, $msg) = @_; |
391 | my ($self, $msg) = @_; |
375 | |
392 | |
376 | delete $self->{keepalive}; |
393 | delete $self->{keepalive}; |
377 | |
394 | |