ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Transport.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Transport.pm (file contents):
Revision 1.57 by root, Wed Nov 4 21:28:31 2009 UTC vs.
Revision 1.58 by root, Wed Nov 4 21:47:14 2009 UTC

162 $greeting_kv->{tls} = "1.0" if $self->{tls_ctx}; 162 $greeting_kv->{tls} = "1.0" if $self->{tls_ctx};
163 $greeting_kv->{provider} = "AE-$AnyEvent::MP::Kernel::VERSION"; 163 $greeting_kv->{provider} = "AE-$AnyEvent::MP::Kernel::VERSION";
164 $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport}; 164 $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport};
165 $greeting_kv->{timeout} = $self->{timeout}; 165 $greeting_kv->{timeout} = $self->{timeout};
166 166
167 my $protocol = $self->{protocol} || "aemp";
168
167 # can modify greeting_kv 169 # can modify greeting_kv
168 $_->($self) for @HOOK_CONNECT; 170 $_->($self) for $protocol eq "aemp" ? @HOOK_CONNECT : ();
169 171
170 # send greeting 172 # send greeting
171 my $lgreeting1 = "aemp;$PROTOCOL_VERSION" 173 my $lgreeting1 = "$protocol;$PROTOCOL_VERSION"
172 . ";$AnyEvent::MP::Kernel::NODE" 174 . ";$AnyEvent::MP::Kernel::NODE"
173 . ";" . (join ",", @$auth_rcv) 175 . ";" . (join ",", @$auth_rcv)
174 . ";" . (join ",", @$lframing) 176 . ";" . (join ",", @$lframing)
175 . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv); 177 . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv);
176 178
190 $self->{remote_greeting} = { 192 $self->{remote_greeting} = {
191 map /^([^=]+)(?:=(.*))?/ ? ($1 => $2) : (), 193 map /^([^=]+)(?:=(.*))?/ ? ($1 => $2) : (),
192 @kv 194 @kv
193 }; 195 };
194 196
195 $_->($self) for @HOOK_GREETING; 197 $_->($self) for $protocol eq "aemp" ? @HOOK_GREETING : ();
196 198
197 if ($aemp ne "aemp") { 199 if ($aemp ne $protocol) {
198 return $self->error ("unparsable greeting"); 200 return $self->error ("unparsable greeting, expected '$protocol', got '$aemp'");
199 } elsif ($version != $PROTOCOL_VERSION) { 201 } elsif ($version != $PROTOCOL_VERSION) {
200 return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version)"); 202 return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version)");
201 } elsif ($rnode eq $AnyEvent::MP::Kernel::NODE) { 203 } elsif ($rnode eq $AnyEvent::MP::Kernel::NODE) {
202 return $self->error ("I refuse to talk to myself"); 204 return $self->error ("I refuse to talk to myself");
203 } elsif ($AnyEvent::MP::Kernel::NODE{$rnode} && $AnyEvent::MP::Kernel::NODE{$rnode}{transport}) { 205 } elsif ($AnyEvent::MP::Kernel::NODE{$rnode} && $AnyEvent::MP::Kernel::NODE{$rnode}{transport}) {
296 298
297 my $queue = delete $self->{queue}; # we are connected 299 my $queue = delete $self->{queue}; # we are connected
298 300
299 $self->connected; 301 $self->connected;
300 302
303 if ($protocol eq "aemp") {
301 # send queued messages 304 # send queued messages
302 $self->send ($_) 305 $self->send ($_)
303 for @$queue; 306 for @$queue;
304 307
305 # receive handling 308 # receive handling
306 my $src_node = $self->{node}; 309 my $src_node = $self->{node};
307 my $rmsg; $rmsg = $self->{rmsg} = sub { 310 my $rmsg; $rmsg = $self->{rmsg} = sub {
308 $_[0]->push_read ($r_framing => $rmsg); 311 $_[0]->push_read ($r_framing => $rmsg);
309 312
310 local $AnyEvent::MP::Kernel::SRCNODE = $src_node; 313 local $AnyEvent::MP::Kernel::SRCNODE = $src_node;
311 AnyEvent::MP::Kernel::_inject (@{ $_[1] }); 314 AnyEvent::MP::Kernel::_inject (@{ $_[1] });
312 }; 315 };
313 $hdl->push_read ($r_framing => $rmsg); 316 $hdl->push_read ($r_framing => $rmsg);
314 317
315 Scalar::Util::weaken $rmsg; 318 Scalar::Util::weaken $rmsg;
316 Scalar::Util::weaken $src_node; 319 Scalar::Util::weaken $src_node;
320 }
317 }); 321 });
318 }); 322 });
319 }); 323 });
320 } 324 }
321 325
325sub error { 329sub error {
326 my ($self, $msg) = @_; 330 my ($self, $msg) = @_;
327 331
328 delete $self->{keepalive}; 332 delete $self->{keepalive};
329 333
334 if ($self->{protocol}) {
335 # TODO
336 } else {
330 $AnyEvent::MP::Kernel::WARN->(9, "$self->{peerhost}:$self->{peerport} $msg");#d# 337 $AnyEvent::MP::Kernel::WARN->(9, "$self->{peerhost}:$self->{peerport} $msg");#d#
331 338
332 $self->{node}->transport_error (transport_error => $self->{node}{id}, $msg) 339 $self->{node}->transport_error (transport_error => $self->{node}{id}, $msg)
333 if $self->{node} && $self->{node}{transport} == $self; 340 if $self->{node} && $self->{node}{transport} == $self;
341 }
334 342
335 (delete $self->{release})->() 343 (delete $self->{release})->()
336 if exists $self->{release}; 344 if exists $self->{release};
337 345
338# $AnyEvent::MP::Kernel::WARN->(7, "$self->{peerhost}:$self->{peerport}: $msg"); 346# $AnyEvent::MP::Kernel::WARN->(7, "$self->{peerhost}:$self->{peerport}: $msg");
345 delete $self->{keepalive}; 353 delete $self->{keepalive};
346 354
347 (delete $self->{release})->() 355 (delete $self->{release})->()
348 if exists $self->{release}; 356 if exists $self->{release};
349 357
358 if ($self->{protocol}) {
359 # TODO
360 } else {
350 $AnyEvent::MP::Kernel::WARN->(9, "$self->{peerhost}:$self->{peerport} connected as $self->{remote_node}"); 361 $AnyEvent::MP::Kernel::WARN->(9, "$self->{peerhost}:$self->{peerport} connected as $self->{remote_node}");
351 362
352 my $node = AnyEvent::MP::Kernel::add_node ($self->{remote_node}); 363 my $node = AnyEvent::MP::Kernel::add_node ($self->{remote_node});
353 Scalar::Util::weaken ($self->{node} = $node); 364 Scalar::Util::weaken ($self->{node} = $node);
354 $node->transport_connect ($self); 365 $node->transport_connect ($self);
355 366
356 $_->($self) for @HOOK_CONNECTED; 367 $_->($self) for @HOOK_CONNECTED;
368 }
357} 369}
358 370
359sub send { 371sub send {
360 $_[0]{hdl}->push_write ($_[0]{s_framing} => $_[1]); 372 $_[0]{hdl}->push_write ($_[0]{s_framing} => $_[1]);
361} 373}

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines