ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Transport.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Transport.pm (file contents):
Revision 1.51 by root, Mon Sep 7 20:00:38 2009 UTC vs.
Revision 1.52 by root, Tue Sep 8 01:38:16 2009 UTC

38use AnyEvent::Handle 4.92 (); 38use AnyEvent::Handle 4.92 ();
39 39
40use AnyEvent::MP::Config (); 40use AnyEvent::MP::Config ();
41 41
42our $PROTOCOL_VERSION = 0; 42our $PROTOCOL_VERSION = 0;
43
44our @HOOK_CONNECT; # called at connect/accept time
45our @HOOK_GREETING; # called at greeting1 time
46our @HOOK_CONNECTED; # called at data phase
47our @HOOK_DESTROY; # called at destroy time
43 48
44=item $listener = mp_listener $host, $port, <constructor-args> 49=item $listener = mp_listener $host, $port, <constructor-args>
45 50
46Creates a listener on the given host/port using 51Creates a listener on the given host/port using
47C<AnyEvent::Socket::tcp_server>. 52C<AnyEvent::Socket::tcp_server>.
76 my $release = pop; 81 my $release = pop;
77 my ($host, $port, @args) = @_; 82 my ($host, $port, @args) = @_;
78 83
79 new AnyEvent::MP::Transport 84 new AnyEvent::MP::Transport
80 connect => [$host, $port], 85 connect => [$host, $port],
86 peerhost => $host,
87 peerport => $port,
81 release => $release, 88 release => $release,
82 @args, 89 @args,
83 ; 90 ;
84} 91}
85 92
138 verify_require_client_cert => 1, 145 verify_require_client_cert => 1,
139 }; 146 };
140 } 147 }
141 148
142 $self->{hdl} = new AnyEvent::Handle 149 $self->{hdl} = new AnyEvent::Handle
143 ($self->{fh} ? (fh => $self->{fh}) : (connect => $self->{connect})), 150 +($self->{fh} ? (fh => $self->{fh}) : (connect => $self->{connect})),
144 autocork => 1, 151 autocork => 1,
145 no_delay => 1, 152 no_delay => 1,
146 keepalive => 1, 153 keepalive => 1,
147 on_error => sub { 154 on_error => sub {
148 $self->error ($_[2]); 155 $self->error ($_[2]);
149 }, 156 },
150 on_connect => sub {
151 $self->{peerhost} = $_[1];
152 $self->{peerport} = $_[2];
153 $self->{peeraddr} = AnyEvent::Socket::format_hostport $_[1], $_[2];
154 },
155 rtimeout => $timeout, 157 rtimeout => $timeout,
156 ; 158 ;
157 159
158 my $greeting_kv = $self->{greeting} ||= {}; 160 my $greeting_kv = $self->{local_greeting} ||= {};
159
160 $self->{local_node} ||= $AnyEvent::MP::Kernel::NODE;
161 161
162 $greeting_kv->{tls} = "1.0" if $self->{tls_ctx}; 162 $greeting_kv->{tls} = "1.0" if $self->{tls_ctx};
163 $greeting_kv->{provider} = "AE-$AnyEvent::MP::Kernel::VERSION"; 163 $greeting_kv->{provider} = "AE-$AnyEvent::MP::Kernel::VERSION";
164 $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport}; 164 $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport};
165 $greeting_kv->{timeout} = $self->{timeout}; 165 $greeting_kv->{timeout} = $self->{timeout};
166 166
167 # can modify greeting_kv
168 $_->($self) for @HOOK_CONNECT;
169
167 # send greeting 170 # send greeting
168 my $lgreeting1 = "aemp;$PROTOCOL_VERSION" 171 my $lgreeting1 = "aemp;$PROTOCOL_VERSION"
169 . ";$self->{local_node}" 172 . ";$AnyEvent::MP::Kernel::NODE"
170 . ";" . (join ",", @$auth_rcv) 173 . ";" . (join ",", @$auth_rcv)
171 . ";" . (join ",", @$lframing) 174 . ";" . (join ",", @$lframing)
172 . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv); 175 . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv);
173 176
174 my $lgreeting2 = MIME::Base64::encode_base64 AnyEvent::MP::Kernel::nonce (66), ""; 177 my $lgreeting2 = MIME::Base64::encode_base64 AnyEvent::MP::Kernel::nonce (66), "";
312 315
313 Scalar::Util::weaken $rmsg; 316 Scalar::Util::weaken $rmsg;
314 Scalar::Util::weaken $src_node; 317 Scalar::Util::weaken $src_node;
315 }); 318 });
316 }); 319 });
320
321 $_->($self) for @HOOK_GREETING;
317 }); 322 });
318 } 323 }
319 324
320 $self 325 $self
321} 326}
348 $AnyEvent::MP::Kernel::WARN->(9, "$self->{peerhost}:$self->{peerport} connected as $self->{remote_node}"); 353 $AnyEvent::MP::Kernel::WARN->(9, "$self->{peerhost}:$self->{peerport} connected as $self->{remote_node}");
349 354
350 my $node = AnyEvent::MP::Kernel::add_node ($self->{remote_node}); 355 my $node = AnyEvent::MP::Kernel::add_node ($self->{remote_node});
351 Scalar::Util::weaken ($self->{node} = $node); 356 Scalar::Util::weaken ($self->{node} = $node);
352 $node->transport_connect ($self); 357 $node->transport_connect ($self);
358
359 $_->($self) for @HOOK_CONNECTED;
353} 360}
354 361
355sub send { 362sub send {
356 $_[0]{hdl}->push_write ($_[0]{s_framing} => $_[1]); 363 $_[0]{hdl}->push_write ($_[0]{s_framing} => $_[1]);
357} 364}
362 (delete $self->{release})->() 369 (delete $self->{release})->()
363 if exists $self->{release}; 370 if exists $self->{release};
364 371
365 $self->{hdl}->destroy 372 $self->{hdl}->destroy
366 if $self->{hdl}; 373 if $self->{hdl};
374
375 $_->($self) for @HOOK_DESTROY;
367} 376}
368 377
369sub DESTROY { 378sub DESTROY {
370 my ($self) = @_; 379 my ($self) = @_;
371 380

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines