… | |
… | |
43 | |
43 | |
44 | our @HOOK_CONNECT; # called at connect/accept time |
44 | our @HOOK_CONNECT; # called at connect/accept time |
45 | our @HOOK_GREETING; # called at greeting1 time |
45 | our @HOOK_GREETING; # called at greeting1 time |
46 | our @HOOK_CONNECTED; # called at data phase |
46 | our @HOOK_CONNECTED; # called at data phase |
47 | our @HOOK_DESTROY; # called at destroy time |
47 | our @HOOK_DESTROY; # called at destroy time |
|
|
48 | our %HOOK_PROTOCOL = ( |
|
|
49 | "aemp-dataconn" => sub { |
|
|
50 | require AnyEvent::MP::DataConn; |
|
|
51 | &AnyEvent::MP::DataConn::_inject; |
|
|
52 | }, |
|
|
53 | ); |
48 | |
54 | |
49 | =item $listener = mp_listener $host, $port, <constructor-args> |
55 | =item $listener = mp_listener $host, $port, <constructor-args> |
50 | |
56 | |
51 | Creates a listener on the given host/port using |
57 | Creates a listener on the given host/port using |
52 | C<AnyEvent::Socket::tcp_server>. |
58 | C<AnyEvent::Socket::tcp_server>. |
… | |
… | |
114 | |
120 | |
115 | sub new { |
121 | sub new { |
116 | my ($class, %arg) = @_; |
122 | my ($class, %arg) = @_; |
117 | |
123 | |
118 | my $self = bless \%arg, $class; |
124 | my $self = bless \%arg, $class; |
119 | |
|
|
120 | $self->{queue} = []; |
|
|
121 | |
125 | |
122 | { |
126 | { |
123 | Scalar::Util::weaken (my $self = $self); |
127 | Scalar::Util::weaken (my $self = $self); |
124 | |
128 | |
125 | my $config = $AnyEvent::MP::Kernel::CONFIG; |
129 | my $config = $AnyEvent::MP::Kernel::CONFIG; |
… | |
… | |
285 | $hdl->rbuf_max (undef); |
289 | $hdl->rbuf_max (undef); |
286 | |
290 | |
287 | # we rely on TCP retransmit timeouts and keepalives |
291 | # we rely on TCP retransmit timeouts and keepalives |
288 | $self->{hdl}->rtimeout (undef); |
292 | $self->{hdl}->rtimeout (undef); |
289 | |
293 | |
290 | # except listener-less nodes, they need to continuously probe |
|
|
291 | unless (@$AnyEvent::MP::Kernel::LISTENER) { |
|
|
292 | $self->{hdl}->wtimeout ($timeout); |
|
|
293 | $self->{hdl}->on_wtimeout (sub { $self->send ([]) }); |
|
|
294 | } |
|
|
295 | |
|
|
296 | $self->{remote_greeting}{untrusted} = 1 |
294 | $self->{remote_greeting}{untrusted} = 1 |
297 | if $auth_method eq "tls_anon"; |
295 | if $auth_method eq "tls_anon"; |
298 | |
296 | |
299 | my $queue = delete $self->{queue}; # we are connected |
|
|
300 | |
|
|
301 | $self->connected; |
297 | $self->connected; |
302 | |
298 | |
303 | if ($protocol eq "aemp") { |
299 | if ($protocol eq "aemp") { |
304 | # send queued messages |
300 | # listener-less node need to continuously probe |
305 | $self->send ($_) |
301 | unless (@$AnyEvent::MP::Kernel::LISTENER) { |
306 | for @$queue; |
302 | $self->{hdl}->wtimeout ($timeout); |
|
|
303 | $self->{hdl}->on_wtimeout (sub { $self->send ([]) }); |
|
|
304 | } |
307 | |
305 | |
308 | # receive handling |
306 | # receive handling |
309 | my $src_node = $self->{node}; |
307 | my $src_node = $self->{node}; |
310 | my $rmsg; $rmsg = $self->{rmsg} = sub { |
308 | my $rmsg; $rmsg = $self->{rmsg} = sub { |
311 | $_[0]->push_read ($r_framing => $rmsg); |
309 | $_[0]->push_read ($r_framing => $rmsg); |
… | |
… | |
330 | my ($self, $msg) = @_; |
328 | my ($self, $msg) = @_; |
331 | |
329 | |
332 | delete $self->{keepalive}; |
330 | delete $self->{keepalive}; |
333 | |
331 | |
334 | if ($self->{protocol}) { |
332 | if ($self->{protocol}) { |
335 | # TODO |
333 | $HOOK_PROTOCOL{$self->{protocol}}->($self, $msg); |
336 | } else { |
334 | } else { |
337 | $AnyEvent::MP::Kernel::WARN->(9, "$self->{peerhost}:$self->{peerport} $msg");#d# |
335 | $AnyEvent::MP::Kernel::WARN->(9, "$self->{peerhost}:$self->{peerport} $msg");#d# |
338 | |
336 | |
339 | $self->{node}->transport_error (transport_error => $self->{node}{id}, $msg) |
337 | $self->{node}->transport_error (transport_error => $self->{node}{id}, $msg) |
340 | if $self->{node} && $self->{node}{transport} == $self; |
338 | if $self->{node} && $self->{node}{transport} == $self; |
… | |
… | |
354 | |
352 | |
355 | (delete $self->{release})->() |
353 | (delete $self->{release})->() |
356 | if exists $self->{release}; |
354 | if exists $self->{release}; |
357 | |
355 | |
358 | if ($self->{protocol}) { |
356 | if ($self->{protocol}) { |
359 | # TODO |
357 | $self->{hdl}->on_error (undef); |
|
|
358 | $HOOK_PROTOCOL{$self->{protocol}}->($self, undef); |
360 | } else { |
359 | } else { |
361 | $AnyEvent::MP::Kernel::WARN->(9, "$self->{peerhost}:$self->{peerport} connected as $self->{remote_node}"); |
360 | $AnyEvent::MP::Kernel::WARN->(9, "$self->{peerhost}:$self->{peerport} connected as $self->{remote_node}"); |
362 | |
361 | |
363 | my $node = AnyEvent::MP::Kernel::add_node ($self->{remote_node}); |
362 | my $node = AnyEvent::MP::Kernel::add_node ($self->{remote_node}); |
364 | Scalar::Util::weaken ($self->{node} = $node); |
363 | Scalar::Util::weaken ($self->{node} = $node); |
… | |
… | |
379 | if exists $self->{release}; |
378 | if exists $self->{release}; |
380 | |
379 | |
381 | $self->{hdl}->destroy |
380 | $self->{hdl}->destroy |
382 | if $self->{hdl}; |
381 | if $self->{hdl}; |
383 | |
382 | |
384 | $_->($self) for @HOOK_DESTROY; |
383 | $_->($self) for $self->{protocol} ? () : @HOOK_DESTROY; |
385 | } |
384 | } |
386 | |
385 | |
387 | sub DESTROY { |
386 | sub DESTROY { |
388 | my ($self) = @_; |
387 | my ($self) = @_; |
389 | |
388 | |
… | |
… | |
392 | |
391 | |
393 | =back |
392 | =back |
394 | |
393 | |
395 | =head1 PROTOCOL |
394 | =head1 PROTOCOL |
396 | |
395 | |
397 | The AEMP protocol is relatively simple, and consists of three phases which |
396 | The AEMP protocol is comparatively simple, and consists of three phases |
398 | are symmetrical for both sides: greeting (followed by optionally switching |
397 | which are symmetrical for both sides: greeting (followed by optionally |
399 | to TLS mode), authentication and packet exchange. |
398 | switching to TLS mode), authentication and packet exchange. |
400 | |
399 | |
401 | The protocol is designed to allow both full-text and binary streams. |
400 | The protocol is designed to allow both full-text and binary streams. |
402 | |
401 | |
403 | The greeting consists of two text lines that are ended by either an ASCII |
402 | The greeting consists of two text lines that are ended by either an ASCII |
404 | CR LF pair, or a single ASCII LF (recommended). |
403 | CR LF pair, or a single ASCII LF (recommended). |