ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Transport.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Transport.pm (file contents):
Revision 1.48 by root, Thu Sep 3 20:16:36 2009 UTC vs.
Revision 1.49 by root, Fri Sep 4 21:01:22 2009 UTC

116 peername => $peername, # for verification 116 peername => $peername, # for verification
117 ; 117 ;
118 118
119=cut 119=cut
120 120
121sub LATENCY() { 5 } # assumed max. network latency
122
123our @FRAMINGS = qw(json storable); # the framing types we accept and send, in order of preference 121our @FRAMINGS = qw(json storable); # the framing types we accept and send, in order of preference
124our @AUTH_SND = qw(tls_md6_64_256 hmac_md6_64_256); # auth types we send 122our @AUTH_SND = qw(tls_md6_64_256 hmac_md6_64_256); # auth types we send
125our @AUTH_RCV = (@AUTH_SND, qw(tls_anon cleartext)); # auth types we accept 123our @AUTH_RCV = (@AUTH_SND, qw(tls_anon cleartext)); # auth types we accept
126 124
127#AnyEvent::Handle::register_write_type mp_record => sub { 125#AnyEvent::Handle::register_write_type mp_record => sub {
137 { 135 {
138 Scalar::Util::weaken (my $self = $self); 136 Scalar::Util::weaken (my $self = $self);
139 137
140 my $config = AnyEvent::MP::Config::config; 138 my $config = AnyEvent::MP::Config::config;
141 139
142 my $latency = $config->{network_latency} || LATENCY; 140 my $timeout = $self->{timeout} || $config->{monitor_timeout};
143 141
144 $self->{secret} = $config->{secret} 142 $self->{secret} = $config->{secret}
145 unless exists $self->{secret}; 143 unless exists $self->{secret};
146
147 $self->{timeout} = $config->{monitor_timeout} || $AnyEvent::MP::Kernel::MONITOR_TIMEOUT
148 unless exists $self->{timeout};
149
150 $self->{timeout} -= $latency;
151
152 $self->{timeout} = 1 + $latency
153 if $self->{timeout} < 1 + $latency;
154 144
155 my $secret = $self->{secret}; 145 my $secret = $self->{secret};
156 146
157 if (exists $config->{cert}) { 147 if (exists $config->{cert}) {
158 $self->{tls_ctx} = { 148 $self->{tls_ctx} = {
172 no_delay => 1, 162 no_delay => 1,
173 keepalive => 1, 163 keepalive => 1,
174 on_error => sub { 164 on_error => sub {
175 $self->error ($_[2]); 165 $self->error ($_[2]);
176 }, 166 },
177 rtimeout => $latency, 167 rtimeout => $timeout,
178 peername => delete $self->{peername}, 168 peername => delete $self->{peername},
179 ; 169 ;
180 170
181 my $greeting_kv = $self->{greeting} ||= {}; 171 my $greeting_kv = $self->{greeting} ||= {};
182 172
301 291
302 $self->{s_framing} = $s_framing; 292 $self->{s_framing} = $s_framing;
303 293
304 $hdl->rbuf_max (undef); 294 $hdl->rbuf_max (undef);
305 295
306 $self->{hdl}->rtimeout ($self->{remote_greeting}{timeout}); 296 # we rely on TCP retransmit timeouts and keepalives
297 $self->{hdl}->rtimeout (undef);
298
299 # except listener-less nodes, they need to continuously probe
300 unless (@$AnyEvent::MP::Kernel::LISTENER) {
307 $self->{hdl}->wtimeout ($self->{timeout} - LATENCY); 301 $self->{hdl}->wtimeout ($timeout);
308 $self->{hdl}->on_wtimeout (sub { $self->send ([]) }); 302 $self->{hdl}->on_wtimeout (sub { $self->send ([]) });
303 }
304
305 $self->{remote_greeting}{untrusted} = 1
306 if $auth_method eq "tls_anon";
309 307
310 my $queue = delete $self->{queue}; # we are connected 308 my $queue = delete $self->{queue}; # we are connected
311 309
312 $self->connected; 310 $self->connected;
313 311
457The remaining arguments are C<KEY=VALUE> pairs. The following key-value 455The remaining arguments are C<KEY=VALUE> pairs. The following key-value
458pairs are known at this time: 456pairs are known at this time:
459 457
460=over 4 458=over 4
461 459
462=item timeout=<seconds>
463
464The amount of time after which this node should be detected as dead unless
465some data has been received. The node is responsible to send traffic
466reasonably more often than this interval (such as every timeout minus
467five seconds).
468
469=item provider=<module-version> 460=item provider=<module-version>
470 461
471The software provider for this implementation. For AnyEvent::MP, this is 462The software provider for this implementation. For AnyEvent::MP, this is
472C<AE-0.0> or whatever version it currently is at. 463C<AE-0.0> or whatever version it currently is at.
473 464
633 < ["","AnyEvent::MP::_spawn","57Cs1CggVJjzYaQp13XXg4.c","AnyEvent::MP::Global::connect",0,"anon/57Cs1CggVJjzYaQp13XXg4"] 624 < ["","AnyEvent::MP::_spawn","57Cs1CggVJjzYaQp13XXg4.c","AnyEvent::MP::Global::connect",0,"anon/57Cs1CggVJjzYaQp13XXg4"]
634 ... 625 ...
635 626
636The shared secret in use was C<8ugxrtw6H5tKnfPWfaSr4HGhE8MoJXmzTT1BWq7sLutNcD0IbXprQlZjIbl7MBKoeklG3IEfY9GlJthC0pENzk>. 627The shared secret in use was C<8ugxrtw6H5tKnfPWfaSr4HGhE8MoJXmzTT1BWq7sLutNcD0IbXprQlZjIbl7MBKoeklG3IEfY9GlJthC0pENzk>.
637 628
629=head2 MONITORING
630
631Monitoring the connection itself is transport-specific. For TCP, all
632connection monitoring is currently left to TCP retransmit time-outs
633on a busy link, and TCP keepalive (which should be enabled) for idle
634connections.
635
636This is not sufficient for listener-less nodes, however: they need
637to regularly send data (30 seconds, or the monitoring interval, is
638recommended), so TCP actively probes.
639
640Future implementations of AnyEvent::Transport might query the kernel TCP
641buffer after a write timeout occurs, and if it is non-empty, shut down the
642connections, but this is an area of future research :)
643
644=head2 NODE PROTOCOL
645
646The transport simply transfers messages, but to implement a full node, a
647special node port must exist that understands a number of requests.
648
649If you are interested in implementing this, drop us a note so we finish
650the documentation.
651
638=head1 SEE ALSO 652=head1 SEE ALSO
639 653
640L<AnyEvent::MP>. 654L<AnyEvent::MP>.
641 655
642=head1 AUTHOR 656=head1 AUTHOR

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines