… | |
… | |
162 | $greeting_kv->{tls} = "1.0" if $self->{tls_ctx}; |
162 | $greeting_kv->{tls} = "1.0" if $self->{tls_ctx}; |
163 | $greeting_kv->{provider} = "AE-$AnyEvent::MP::Kernel::VERSION"; |
163 | $greeting_kv->{provider} = "AE-$AnyEvent::MP::Kernel::VERSION"; |
164 | $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport}; |
164 | $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport}; |
165 | $greeting_kv->{timeout} = $self->{timeout}; |
165 | $greeting_kv->{timeout} = $self->{timeout}; |
166 | |
166 | |
|
|
167 | my $protocol = $self->{protocol} || "aemp"; |
|
|
168 | |
167 | # can modify greeting_kv |
169 | # can modify greeting_kv |
168 | $_->($self) for @HOOK_CONNECT; |
170 | $_->($self) for $protocol eq "aemp" ? @HOOK_CONNECT : (); |
169 | |
171 | |
170 | # send greeting |
172 | # send greeting |
171 | my $lgreeting1 = "aemp;$PROTOCOL_VERSION" |
173 | my $lgreeting1 = "$protocol;$PROTOCOL_VERSION" |
172 | . ";$AnyEvent::MP::Kernel::NODE" |
174 | . ";$AnyEvent::MP::Kernel::NODE" |
173 | . ";" . (join ",", @$auth_rcv) |
175 | . ";" . (join ",", @$auth_rcv) |
174 | . ";" . (join ",", @$lframing) |
176 | . ";" . (join ",", @$lframing) |
175 | . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv); |
177 | . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv); |
176 | |
178 | |
… | |
… | |
190 | $self->{remote_greeting} = { |
192 | $self->{remote_greeting} = { |
191 | map /^([^=]+)(?:=(.*))?/ ? ($1 => $2) : (), |
193 | map /^([^=]+)(?:=(.*))?/ ? ($1 => $2) : (), |
192 | @kv |
194 | @kv |
193 | }; |
195 | }; |
194 | |
196 | |
195 | $_->($self) for @HOOK_GREETING; |
197 | $_->($self) for $protocol eq "aemp" ? @HOOK_GREETING : (); |
196 | |
198 | |
197 | if ($aemp ne "aemp") { |
199 | if ($aemp ne $protocol) { |
198 | return $self->error ("unparsable greeting"); |
200 | return $self->error ("unparsable greeting, expected '$protocol', got '$aemp'"); |
199 | } elsif ($version != $PROTOCOL_VERSION) { |
201 | } elsif ($version != $PROTOCOL_VERSION) { |
200 | return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version)"); |
202 | return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version)"); |
201 | } elsif ($rnode eq $AnyEvent::MP::Kernel::NODE) { |
203 | } elsif ($rnode eq $AnyEvent::MP::Kernel::NODE) { |
202 | return $self->error ("I refuse to talk to myself"); |
204 | return $self->error ("I refuse to talk to myself"); |
203 | } elsif ($AnyEvent::MP::Kernel::NODE{$rnode} && $AnyEvent::MP::Kernel::NODE{$rnode}{transport}) { |
205 | } elsif ($AnyEvent::MP::Kernel::NODE{$rnode} && $AnyEvent::MP::Kernel::NODE{$rnode}{transport}) { |
… | |
… | |
296 | |
298 | |
297 | my $queue = delete $self->{queue}; # we are connected |
299 | my $queue = delete $self->{queue}; # we are connected |
298 | |
300 | |
299 | $self->connected; |
301 | $self->connected; |
300 | |
302 | |
|
|
303 | if ($protocol eq "aemp") { |
301 | # send queued messages |
304 | # send queued messages |
302 | $self->send ($_) |
305 | $self->send ($_) |
303 | for @$queue; |
306 | for @$queue; |
304 | |
307 | |
305 | # receive handling |
308 | # receive handling |
306 | my $src_node = $self->{node}; |
309 | my $src_node = $self->{node}; |
307 | my $rmsg; $rmsg = $self->{rmsg} = sub { |
310 | my $rmsg; $rmsg = $self->{rmsg} = sub { |
308 | $_[0]->push_read ($r_framing => $rmsg); |
311 | $_[0]->push_read ($r_framing => $rmsg); |
309 | |
312 | |
310 | local $AnyEvent::MP::Kernel::SRCNODE = $src_node; |
313 | local $AnyEvent::MP::Kernel::SRCNODE = $src_node; |
311 | AnyEvent::MP::Kernel::_inject (@{ $_[1] }); |
314 | AnyEvent::MP::Kernel::_inject (@{ $_[1] }); |
312 | }; |
315 | }; |
313 | $hdl->push_read ($r_framing => $rmsg); |
316 | $hdl->push_read ($r_framing => $rmsg); |
314 | |
317 | |
315 | Scalar::Util::weaken $rmsg; |
318 | Scalar::Util::weaken $rmsg; |
316 | Scalar::Util::weaken $src_node; |
319 | Scalar::Util::weaken $src_node; |
|
|
320 | } |
317 | }); |
321 | }); |
318 | }); |
322 | }); |
319 | }); |
323 | }); |
320 | } |
324 | } |
321 | |
325 | |
… | |
… | |
325 | sub error { |
329 | sub error { |
326 | my ($self, $msg) = @_; |
330 | my ($self, $msg) = @_; |
327 | |
331 | |
328 | delete $self->{keepalive}; |
332 | delete $self->{keepalive}; |
329 | |
333 | |
|
|
334 | if ($self->{protocol}) { |
|
|
335 | # TODO |
|
|
336 | } else { |
330 | $AnyEvent::MP::Kernel::WARN->(9, "$self->{peerhost}:$self->{peerport} $msg");#d# |
337 | $AnyEvent::MP::Kernel::WARN->(9, "$self->{peerhost}:$self->{peerport} $msg");#d# |
331 | |
338 | |
332 | $self->{node}->transport_error (transport_error => $self->{node}{id}, $msg) |
339 | $self->{node}->transport_error (transport_error => $self->{node}{id}, $msg) |
333 | if $self->{node} && $self->{node}{transport} == $self; |
340 | if $self->{node} && $self->{node}{transport} == $self; |
|
|
341 | } |
334 | |
342 | |
335 | (delete $self->{release})->() |
343 | (delete $self->{release})->() |
336 | if exists $self->{release}; |
344 | if exists $self->{release}; |
337 | |
345 | |
338 | # $AnyEvent::MP::Kernel::WARN->(7, "$self->{peerhost}:$self->{peerport}: $msg"); |
346 | # $AnyEvent::MP::Kernel::WARN->(7, "$self->{peerhost}:$self->{peerport}: $msg"); |
… | |
… | |
345 | delete $self->{keepalive}; |
353 | delete $self->{keepalive}; |
346 | |
354 | |
347 | (delete $self->{release})->() |
355 | (delete $self->{release})->() |
348 | if exists $self->{release}; |
356 | if exists $self->{release}; |
349 | |
357 | |
|
|
358 | if ($self->{protocol}) { |
|
|
359 | # TODO |
|
|
360 | } else { |
350 | $AnyEvent::MP::Kernel::WARN->(9, "$self->{peerhost}:$self->{peerport} connected as $self->{remote_node}"); |
361 | $AnyEvent::MP::Kernel::WARN->(9, "$self->{peerhost}:$self->{peerport} connected as $self->{remote_node}"); |
351 | |
362 | |
352 | my $node = AnyEvent::MP::Kernel::add_node ($self->{remote_node}); |
363 | my $node = AnyEvent::MP::Kernel::add_node ($self->{remote_node}); |
353 | Scalar::Util::weaken ($self->{node} = $node); |
364 | Scalar::Util::weaken ($self->{node} = $node); |
354 | $node->transport_connect ($self); |
365 | $node->transport_connect ($self); |
355 | |
366 | |
356 | $_->($self) for @HOOK_CONNECTED; |
367 | $_->($self) for @HOOK_CONNECTED; |
|
|
368 | } |
357 | } |
369 | } |
358 | |
370 | |
359 | sub send { |
371 | sub send { |
360 | $_[0]{hdl}->push_write ($_[0]{s_framing} => $_[1]); |
372 | $_[0]{hdl}->push_write ($_[0]{s_framing} => $_[1]); |
361 | } |
373 | } |