ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Transport.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Transport.pm (file contents):
Revision 1.38 by root, Thu Aug 27 07:12:48 2009 UTC vs.
Revision 1.39 by root, Thu Aug 27 21:29:37 2009 UTC

39 39
40use AnyEvent::MP::Config (); 40use AnyEvent::MP::Config ();
41 41
42our $PROTOCOL_VERSION = 0; 42our $PROTOCOL_VERSION = 0;
43 43
44=item $listener = mp_listener $host, $port, <constructor-args>, $cb->($transport) 44=item $listener = mp_listener $host, $port, <constructor-args>
45 45
46Creates a listener on the given host/port using 46Creates a listener on the given host/port using
47C<AnyEvent::Socket::tcp_server>. 47C<AnyEvent::Socket::tcp_server>.
48 48
49See C<new>, below, for constructor arguments. 49See C<new>, below, for constructor arguments.
51Defaults for peerhost, peerport and fh are provided. 51Defaults for peerhost, peerport and fh are provided.
52 52
53=cut 53=cut
54 54
55sub mp_server($$@) { 55sub mp_server($$@) {
56 my $cb = pop;
57 my ($host, $port, @args) = @_; 56 my ($host, $port, @args) = @_;
58 57
59 AnyEvent::Socket::tcp_server $host, $port, sub { 58 AnyEvent::Socket::tcp_server $host, $port, sub {
60 my ($fh, $host, $port) = @_; 59 my ($fh, $host, $port) = @_;
61 60
62 $cb->(new AnyEvent::MP::Transport 61 my $tp = new AnyEvent::MP::Transport
63 fh => $fh, 62 fh => $fh,
64 peerhost => $host, 63 peerhost => $host,
65 peerport => $port, 64 peerport => $port,
66 @args, 65 @args,
67 ); 66 ;
67 $tp->{keepalive} = $tp;
68 } 68 }
69} 69}
70 70
71=item $guard = mp_connect $host, $port, <constructor-args>, $cb->($transport) 71=item $guard = mp_connect $host, $port, <constructor-args>, $cb->($transport)
72 72
206 206
207 if ($aemp ne "aemp") { 207 if ($aemp ne "aemp") {
208 return $self->error ("unparsable greeting"); 208 return $self->error ("unparsable greeting");
209 } elsif ($version != $PROTOCOL_VERSION) { 209 } elsif ($version != $PROTOCOL_VERSION) {
210 return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version)"); 210 return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version)");
211 } elsif ($rnode eq $self->{local_node}) {
212 return $self->error ("I refuse to talk to myself");
213 } elsif ($AnyEvent::MP::Kernel::NODE{$rnode} && $AnyEvent::MP::Kernel::NODE{$rnode}{transport}) {
214 return $self->error ("$rnode already connected, not connecting again.");
211 } 215 }
212 216
213 my $s_auth; 217 my $s_auth;
214 for my $auth_ (split /,/, $auths) { 218 for my $auth_ (split /,/, $auths) {
215 if (grep $auth_ eq $_, @AUTH_SND) { 219 if (grep $auth_ eq $_, @AUTH_SND) {
316} 320}
317 321
318sub error { 322sub error {
319 my ($self, $msg) = @_; 323 my ($self, $msg) = @_;
320 324
325 delete $self->{keepalive};
326
327 $AnyEvent::MP::Kernel::WARN->(9, "$self->{peerhost}:$self->{peerport} $msg");#d#
328
321 $self->{node}->transport_error (transport_error => $self->{node}{noderef}, $msg) 329 $self->{node}->transport_error (transport_error => $self->{node}{id}, $msg)
322 if $self->{node} && $self->{node}{transport} == $self; 330 if $self->{node} && $self->{node}{transport} == $self;
323 331
324 (delete $self->{release})->() 332 (delete $self->{release})->()
325 if exists $self->{release}; 333 if exists $self->{release};
326 334
329} 337}
330 338
331sub connected { 339sub connected {
332 my ($self) = @_; 340 my ($self) = @_;
333 341
342 delete $self->{keepalive};
343
334 (delete $self->{release})->() 344 (delete $self->{release})->()
335 if exists $self->{release}; 345 if exists $self->{release};
336 346
347 $AnyEvent::MP::Kernel::WARN->(9, "$self->{peerhost}:$self->{peerport} connected as $self->{remote_node}");
348
337 my $node = AnyEvent::MP::Kernel::add_node ($self->{remote_node}); 349 my $node = AnyEvent::MP::Kernel::add_node ($self->{remote_node});
338
339 Scalar::Util::weaken ($self->{node} = $node); 350 Scalar::Util::weaken ($self->{node} = $node);
340 $node->transport_connect ($self); 351 $node->transport_connect ($self);
341} 352}
342 353
343sub send { 354sub send {
399 410
400The protocol version supported by this end, currently C<0>. If the 411The protocol version supported by this end, currently C<0>. If the
401versions don't match then no communication is possible. Minor extensions 412versions don't match then no communication is possible. Minor extensions
402are supposed to be handled through additional key-value pairs. 413are supposed to be handled through additional key-value pairs.
403 414
404=item the node endpoint descriptors 415=item the node id
405 416
406for public nodes, this is a comma-separated list of protocol endpoints, 417This is the node ID of the connecting node.
407i.e., the noderef. For slave nodes, this is a unique identifier of the
408form C<slave/nonce>.
409 418
410=item the acceptable authentication methods 419=item the acceptable authentication methods
411 420
412A comma-separated list of authentication methods supported by the 421A comma-separated list of authentication methods supported by the
413node. Note that AnyEvent::MP supports a C<hex_secret> authentication 422node. Note that AnyEvent::MP supports a C<hex_secret> authentication
434The software provider for this implementation. For AnyEvent::MP, this is 443The software provider for this implementation. For AnyEvent::MP, this is
435C<AE-0.0> or whatever version it currently is at. 444C<AE-0.0> or whatever version it currently is at.
436 445
437=item peeraddr=<host>:<port> 446=item peeraddr=<host>:<port>
438 447
439The peer address (socket address of the other side) as seen locally, in the same format 448The peer address (socket address of the other side) as seen locally.
440as noderef endpoints.
441 449
442=item tls=<major>.<minor> 450=item tls=<major>.<minor>
443 451
444Indicates that the other side supports TLS (version should be 1.0) and 452Indicates that the other side supports TLS (version should be 1.0) and
445wishes to do a TLS handshake. 453wishes to do a TLS handshake.

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines