ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Transport.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Transport.pm (file contents):
Revision 1.58 by root, Wed Nov 4 21:47:14 2009 UTC vs.
Revision 1.59 by root, Thu Nov 5 22:44:56 2009 UTC

43 43
44our @HOOK_CONNECT; # called at connect/accept time 44our @HOOK_CONNECT; # called at connect/accept time
45our @HOOK_GREETING; # called at greeting1 time 45our @HOOK_GREETING; # called at greeting1 time
46our @HOOK_CONNECTED; # called at data phase 46our @HOOK_CONNECTED; # called at data phase
47our @HOOK_DESTROY; # called at destroy time 47our @HOOK_DESTROY; # called at destroy time
48our %HOOK_PROTOCOL = (
49 "aemp-dataconn" => sub {
50 require AnyEvent::MP::DataConn;
51 &AnyEvent::MP::DataConn::_inject;
52 },
53);
48 54
49=item $listener = mp_listener $host, $port, <constructor-args> 55=item $listener = mp_listener $host, $port, <constructor-args>
50 56
51Creates a listener on the given host/port using 57Creates a listener on the given host/port using
52C<AnyEvent::Socket::tcp_server>. 58C<AnyEvent::Socket::tcp_server>.
114 120
115sub new { 121sub new {
116 my ($class, %arg) = @_; 122 my ($class, %arg) = @_;
117 123
118 my $self = bless \%arg, $class; 124 my $self = bless \%arg, $class;
119
120 $self->{queue} = [];
121 125
122 { 126 {
123 Scalar::Util::weaken (my $self = $self); 127 Scalar::Util::weaken (my $self = $self);
124 128
125 my $config = $AnyEvent::MP::Kernel::CONFIG; 129 my $config = $AnyEvent::MP::Kernel::CONFIG;
285 $hdl->rbuf_max (undef); 289 $hdl->rbuf_max (undef);
286 290
287 # we rely on TCP retransmit timeouts and keepalives 291 # we rely on TCP retransmit timeouts and keepalives
288 $self->{hdl}->rtimeout (undef); 292 $self->{hdl}->rtimeout (undef);
289 293
290 # except listener-less nodes, they need to continuously probe
291 unless (@$AnyEvent::MP::Kernel::LISTENER) {
292 $self->{hdl}->wtimeout ($timeout);
293 $self->{hdl}->on_wtimeout (sub { $self->send ([]) });
294 }
295
296 $self->{remote_greeting}{untrusted} = 1 294 $self->{remote_greeting}{untrusted} = 1
297 if $auth_method eq "tls_anon"; 295 if $auth_method eq "tls_anon";
298 296
299 my $queue = delete $self->{queue}; # we are connected
300
301 $self->connected; 297 $self->connected;
302 298
303 if ($protocol eq "aemp") { 299 if ($protocol eq "aemp") {
304 # send queued messages 300 # listener-less node need to continuously probe
305 $self->send ($_) 301 unless (@$AnyEvent::MP::Kernel::LISTENER) {
306 for @$queue; 302 $self->{hdl}->wtimeout ($timeout);
303 $self->{hdl}->on_wtimeout (sub { $self->send ([]) });
304 }
307 305
308 # receive handling 306 # receive handling
309 my $src_node = $self->{node}; 307 my $src_node = $self->{node};
310 my $rmsg; $rmsg = $self->{rmsg} = sub { 308 my $rmsg; $rmsg = $self->{rmsg} = sub {
311 $_[0]->push_read ($r_framing => $rmsg); 309 $_[0]->push_read ($r_framing => $rmsg);
330 my ($self, $msg) = @_; 328 my ($self, $msg) = @_;
331 329
332 delete $self->{keepalive}; 330 delete $self->{keepalive};
333 331
334 if ($self->{protocol}) { 332 if ($self->{protocol}) {
335 # TODO 333 $HOOK_PROTOCOL{$self->{protocol}}->($self, $msg);
336 } else { 334 } else {
337 $AnyEvent::MP::Kernel::WARN->(9, "$self->{peerhost}:$self->{peerport} $msg");#d# 335 $AnyEvent::MP::Kernel::WARN->(9, "$self->{peerhost}:$self->{peerport} $msg");#d#
338 336
339 $self->{node}->transport_error (transport_error => $self->{node}{id}, $msg) 337 $self->{node}->transport_error (transport_error => $self->{node}{id}, $msg)
340 if $self->{node} && $self->{node}{transport} == $self; 338 if $self->{node} && $self->{node}{transport} == $self;
354 352
355 (delete $self->{release})->() 353 (delete $self->{release})->()
356 if exists $self->{release}; 354 if exists $self->{release};
357 355
358 if ($self->{protocol}) { 356 if ($self->{protocol}) {
359 # TODO 357 $self->{hdl}->on_error (undef);
358 $HOOK_PROTOCOL{$self->{protocol}}->($self, undef);
360 } else { 359 } else {
361 $AnyEvent::MP::Kernel::WARN->(9, "$self->{peerhost}:$self->{peerport} connected as $self->{remote_node}"); 360 $AnyEvent::MP::Kernel::WARN->(9, "$self->{peerhost}:$self->{peerport} connected as $self->{remote_node}");
362 361
363 my $node = AnyEvent::MP::Kernel::add_node ($self->{remote_node}); 362 my $node = AnyEvent::MP::Kernel::add_node ($self->{remote_node});
364 Scalar::Util::weaken ($self->{node} = $node); 363 Scalar::Util::weaken ($self->{node} = $node);
379 if exists $self->{release}; 378 if exists $self->{release};
380 379
381 $self->{hdl}->destroy 380 $self->{hdl}->destroy
382 if $self->{hdl}; 381 if $self->{hdl};
383 382
384 $_->($self) for @HOOK_DESTROY; 383 $_->($self) for $self->{protocol} ? () : @HOOK_DESTROY;
385} 384}
386 385
387sub DESTROY { 386sub DESTROY {
388 my ($self) = @_; 387 my ($self) = @_;
389 388
392 391
393=back 392=back
394 393
395=head1 PROTOCOL 394=head1 PROTOCOL
396 395
397The AEMP protocol is relatively simple, and consists of three phases which 396The AEMP protocol is comparatively simple, and consists of three phases
398are symmetrical for both sides: greeting (followed by optionally switching 397which are symmetrical for both sides: greeting (followed by optionally
399to TLS mode), authentication and packet exchange. 398switching to TLS mode), authentication and packet exchange.
400 399
401The protocol is designed to allow both full-text and binary streams. 400The protocol is designed to allow both full-text and binary streams.
402 401
403The greeting consists of two text lines that are ended by either an ASCII 402The greeting consists of two text lines that are ended by either an ASCII
404CR LF pair, or a single ASCII LF (recommended). 403CR LF pair, or a single ASCII LF (recommended).

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines