ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/cvsroot/AnyEvent-MP/MP/Transport.pm
(Generate patch)

Comparing cvsroot/AnyEvent-MP/MP/Transport.pm (file contents):
Revision 1.78 by root, Sun Mar 4 18:48:27 2012 UTC vs.
Revision 1.79 by root, Wed Mar 21 15:22:16 2012 UTC

300 300
301 if ($rauth2 ne $rauth) { 301 if ($rauth2 ne $rauth) {
302 return $self->error ("authentication failure/shared secret mismatch"); 302 return $self->error ("authentication failure/shared secret mismatch");
303 } 303 }
304 304
305 $self->{r_framing} = $r_framing;
305 $self->{s_framing} = $s_framing; 306 $self->{s_framing} = $s_framing;
306 307
307 $hdl->rbuf_max (undef); 308 $hdl->rbuf_max (undef);
308 309
309 # we rely on TCP retransmit timeouts and keepalives 310 # we rely on TCP retransmit timeouts and keepalives
318# $self->{hdl}->wtimeout ($timeout); 319# $self->{hdl}->wtimeout ($timeout);
319# $self->{hdl}->on_wtimeout (sub { $self->{send}->([]) }); 320# $self->{hdl}->on_wtimeout (sub { $self->{send}->([]) });
320# } 321# }
321 322
322 # receive handling 323 # receive handling
323 324 $self->set_snd_framing;
324 my $push_write = $hdl->can ("push_write"); 325 $self->set_rcv_framing;
325 my $push_read = $hdl->can ("push_read");
326
327 if ($s_framing eq "json") {
328 $self->{send} = sub {
329 $push_write->($hdl, JSON::XS::encode_json $_[0]);
330 };
331 } else {
332 $self->{send} = sub {
333 $push_write->($hdl, $s_framing => $_[0]);
334 };
335 }
336
337 if ($r_framing eq "json") {
338 my $coder = JSON::XS->new->utf8;
339
340 $hdl->on_read (sub {
341 local $AnyEvent::MP::Kernel::SRCNODE = $self->{node};
342
343 AnyEvent::MP::Kernel::_inject (@$_)
344 for $coder->incr_parse (delete $_[0]{rbuf});
345
346 ()
347 });
348 } else {
349 my $rmsg; $rmsg = $self->{rmsg} = sub {
350 $push_read->($_[0], $r_framing => $rmsg);
351
352 local $AnyEvent::MP::Kernel::SRCNODE = $self->{node};
353 AnyEvent::MP::Kernel::_inject (@{ $_[1] });
354 };
355 eval {
356 $push_read->($_[0], $r_framing => $rmsg);
357 };
358 Scalar::Util::weaken $rmsg;
359 return $self->error ("$r_framing: unusable remote framing")
360 if $@;
361 }
362 } 326 }
363 327
364 $self->connected; 328 $self->connected;
365 }); 329 });
366 }); 330 });
368 } 332 }
369 333
370 $self 334 $self
371} 335}
372 336
337sub set_snd_framing {
338 my ($self) = @_;
339
340 my $framing = $self->{s_framing};
341 my $hdl = $self->{hdl};
342 my $push_write = $hdl->can ("push_write");
343
344 if ($framing eq "json") {
345 $self->{send} = sub {
346 $push_write->($hdl, JSON::XS::encode_json $_[0]);
347 };
348 } else {
349 $self->{send} = sub {
350 $push_write->($hdl, $framing => $_[0]);
351 };
352 }
353}
354
355sub set_rcv_framing {
356 my ($self) = @_;
357
358 my $node = $self->{remote_node};
359 my $framing = $self->{r_framing};
360 my $hdl = $self->{hdl};
361 my $push_read = $hdl->can ("push_read");
362
363 if ($framing eq "json") {
364 my $coder = JSON::XS->new->utf8;
365
366 $hdl->on_read (sub {
367 $AnyEvent::MP::Kernel::SRCNODE = $node;
368
369 AnyEvent::MP::Kernel::_inject (@$_)
370 for $coder->incr_parse (delete $_[0]{rbuf});
371
372 ()
373 });
374 } else {
375 my $rmsg; $rmsg = $self->{rmsg} = sub {
376 $push_read->($_[0], $framing => $rmsg);
377
378 $AnyEvent::MP::Kernel::SRCNODE = $node;
379 AnyEvent::MP::Kernel::_inject (@{ $_[1] });
380 };
381 eval {
382 $push_read->($hdl, $framing => $rmsg);
383 };
384 Scalar::Util::weaken $rmsg;
385 return $self->error ("$framing: unusable remote framing")
386 if $@;
387 }
388}
389
373sub error { 390sub error {
374 my ($self, $msg) = @_; 391 my ($self, $msg) = @_;
375 392
376 delete $self->{keepalive}; 393 delete $self->{keepalive};
377 394

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines