ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Transport.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Transport.pm (file contents):
Revision 1.74 by root, Sun Feb 26 10:29:59 2012 UTC vs.
Revision 1.75 by root, Tue Feb 28 18:37:24 2012 UTC

39 39
40use AnyEvent::MP::Config (); 40use AnyEvent::MP::Config ();
41 41
42our $PROTOCOL_VERSION = 1; 42our $PROTOCOL_VERSION = 1;
43 43
44our @HOOK_CONNECT; # called at connect/accept time 44our @HOOK_GREET; # called at connect/accept time
45our @HOOK_GREETING; # called at greeting1 time 45our @HOOK_GREETED; # called at greeting1 time
46our @HOOK_CONNECTED; # called at data phase 46our @HOOK_CONNECT; # called at data phase
47our @HOOK_DESTROY; # called at destroy time 47our @HOOK_DESTROY; # called at destroy time
48our %HOOK_PROTOCOL = ( 48our %HOOK_PROTOCOL = (
49 "aemp-dataconn" => sub { 49 "aemp-dataconn" => sub {
50 require AnyEvent::MP::DataConn; 50 require AnyEvent::MP::DataConn;
51 &AnyEvent::MP::DataConn::_inject; 51 &AnyEvent::MP::DataConn::_inject;
52 }, 52 },
53); 53);
54 54
55=item $listener = mp_listener $host, $port, <constructor-args> 55=item $listener = mp_server $host, $port, <constructor-args>
56 56
57Creates a listener on the given host/port using 57Creates a listener on the given host/port using
58C<AnyEvent::Socket::tcp_server>. 58C<AnyEvent::Socket::tcp_server>.
59 59
60See C<new>, below, for constructor arguments. 60See C<new>, below, for constructor arguments.
96 ; 96 ;
97} 97}
98 98
99=item new AnyEvent::MP::Transport 99=item new AnyEvent::MP::Transport
100 100
101Create a new transport - usually used via C<mp_server> or C<mp_connect>
102instead.
103
101 # immediately starts negotiation 104 # immediately starts negotiation
102 my $transport = new AnyEvent::MP::Transport 105 my $transport = new AnyEvent::MP::Transport
103 # mandatory 106 # mandatory
104 fh => $filehandle, 107 fh => $filehandle,
105 local_id => $identifier, 108 local_id => $identifier,
106 on_recv => sub { receive-callback }, 109 on_recv => sub { receive-callback },
107 on_error => sub { error-callback }, 110 on_error => sub { error-callback },
108 111
109 # optional 112 # optional
110 on_eof => sub { clean-close-callback }, 113 on_greet => sub { before sending greeting },
114 on_greeted => sub { after receiving greeting },
111 on_connect => sub { successful-connect-callback }, 115 on_connect => sub { successful-connect-callback },
112 greeting => { key => value }, 116 greeting => { key => value },
113 117
114 # tls support 118 # tls support
115 tls_ctx => AnyEvent::TLS, 119 tls_ctx => AnyEvent::TLS,
116 peername => $peername, # for verification 120 peername => $peername, # for verification
117 ; 121 ;
118 122
119=cut 123=cut
120 124
121sub new { 125sub new {
162 ; 166 ;
163 167
164 my $greeting_kv = $self->{local_greeting} ||= {}; 168 my $greeting_kv = $self->{local_greeting} ||= {};
165 169
166 $greeting_kv->{tls} = "1.0" if $self->{tls_ctx}; 170 $greeting_kv->{tls} = "1.0" if $self->{tls_ctx};
167 $greeting_kv->{provider} = "AE-$AnyEvent::MP::VERSION"; # MP.pm might not be loaded, so best effort :( 171 $greeting_kv->{provider} = "AE-$AnyEvent::MP::Config::VERSION";
168 $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport}; 172 $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport};
169 173
170 my $protocol = $self->{protocol} || "aemp"; 174 my $protocol = $self->{protocol} || "aemp";
171 175
172 # can modify greeting_kv 176 # can modify greeting_kv
173 $_->($self) for $protocol eq "aemp" ? @HOOK_CONNECT : (); 177 $_->($self) for $protocol eq "aemp" ? @HOOK_GREET : ();
178 (delete $self->{on_greet})->($self)
179 if exists $self->{on_greet};
174 180
175 # send greeting 181 # send greeting
176 my $lgreeting1 = "$protocol;$PROTOCOL_VERSION" 182 my $lgreeting1 = "$protocol;$PROTOCOL_VERSION"
177 . ";$AnyEvent::MP::Kernel::NODE" 183 . ";$AnyEvent::MP::Kernel::NODE"
178 . ";" . (join ",", @$auth_rcv) 184 . ";" . (join ",", @$auth_rcv)
202 if ($protocol eq "aemp" and $aemp =~ /^aemp-\w+$/) { 208 if ($protocol eq "aemp" and $aemp =~ /^aemp-\w+$/) {
203 # maybe check for existence of the protocol handler? 209 # maybe check for existence of the protocol handler?
204 $self->{protocol} = $protocol = $aemp; 210 $self->{protocol} = $protocol = $aemp;
205 } 211 }
206 212
207 $_->($self) for $protocol eq "aemp" ? @HOOK_GREETING : (); 213 $_->($self) for $protocol eq "aemp" ? @HOOK_GREETED : ();
214 (delete $self->{on_greeted})->($self)
215 if exists $self->{on_greeted};
208 216
209 if ($aemp ne $protocol and $aemp ne "aemp") { 217 if ($aemp ne $protocol and $aemp ne "aemp") {
210 return $self->error ("unparsable greeting, expected '$protocol', got '$aemp'"); 218 return $self->error ("unparsable greeting, expected '$protocol', got '$aemp'");
211 } elsif ($version != $PROTOCOL_VERSION) { 219 } elsif ($version != $PROTOCOL_VERSION) {
212 return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version)"); 220 return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version)");
396 404
397 my $node = AnyEvent::MP::Kernel::add_node ($self->{remote_node}); 405 my $node = AnyEvent::MP::Kernel::add_node ($self->{remote_node});
398 Scalar::Util::weaken ($self->{node} = $node); 406 Scalar::Util::weaken ($self->{node} = $node);
399 $node->transport_connect ($self); 407 $node->transport_connect ($self);
400 408
401 $_->($self) for @HOOK_CONNECTED; 409 $_->($self) for @HOOK_CONNECT;
402 } 410 }
403 411
404 (delete $self->{release})->() 412 (delete $self->{release})->()
405 if exists $self->{release}; 413 if exists $self->{release};
414
415 (delete $self->{on_connect})->($self)
416 if exists $self->{on_connect};
406} 417}
407 418
408sub destroy { 419sub destroy {
409 my ($self) = @_; 420 my ($self) = @_;
410 421
412 if exists $self->{release}; 423 if exists $self->{release};
413 424
414 $self->{hdl}->destroy 425 $self->{hdl}->destroy
415 if $self->{hdl}; 426 if $self->{hdl};
416 427
428 (delete $self->{on_destroy})->($self)
429 if exists $self->{on_destroy};
417 $_->($self) for $self->{protocol} ? () : @HOOK_DESTROY; 430 $_->($self) for $self->{protocol} ? () : @HOOK_DESTROY;
418 431
419 $self->{protocol} = "destroyed"; # to keep hooks from invoked twice. 432 $self->{protocol} = "destroyed"; # to keep hooks from invoked twice.
420} 433}
421 434

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines