ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Transport.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Transport.pm (file contents):
Revision 1.19 by root, Tue Aug 4 21:36:28 2009 UTC vs.
Revision 1.25 by root, Thu Aug 6 10:21:48 2009 UTC

131 { 131 {
132 Scalar::Util::weaken (my $self = $self); 132 Scalar::Util::weaken (my $self = $self);
133 133
134 $arg{secret} = AnyEvent::MP::Base::default_secret () 134 $arg{secret} = AnyEvent::MP::Base::default_secret ()
135 unless exists $arg{secret}; 135 unless exists $arg{secret};
136
137 $arg{timeout} = 30
138 unless exists $arg{timeout};
139
140 my $keepalive = (int $arg{timeout} * 0.75) || 1;
136 141
137 my $secret = $arg{secret}; 142 my $secret = $arg{secret};
138 143
139 if ($secret =~ /-----BEGIN RSA PRIVATE KEY-----.*-----END RSA PRIVATE KEY-----.*-----BEGIN CERTIFICATE-----.*-----END CERTIFICATE-----/s) { 144 if ($secret =~ /-----BEGIN RSA PRIVATE KEY-----.*-----END RSA PRIVATE KEY-----.*-----BEGIN CERTIFICATE-----.*-----END CERTIFICATE-----/s) {
140 # assume TLS mode 145 # assume TLS mode
154 autocork => 1, 159 autocork => 1,
155 no_delay => 1, 160 no_delay => 1,
156 on_error => sub { 161 on_error => sub {
157 $self->error ($_[2]); 162 $self->error ($_[2]);
158 }, 163 },
164 timeout => $AnyEvent::MP::Base::CONNECT_TIMEOUT,
159 peername => delete $arg{peername}, 165 peername => delete $arg{peername},
160 ; 166 ;
161 167
162 my $greeting_kv = $self->{greeting} ||= {}; 168 my $greeting_kv = $self->{greeting} ||= {};
169
170 $self->{local_node} = $AnyEvent::MP::Base::NODE;
171
163 $greeting_kv->{"tls"} = "1.0" 172 $greeting_kv->{"tls"} = "1.0" if $arg{tls_ctx};
164 if $arg{tls_ctx};
165 $greeting_kv->{provider} = "AE-$VERSION"; 173 $greeting_kv->{provider} = "AE-$VERSION";
166 $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport}; 174 $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport};
175 $greeting_kv->{maxidle} = $keepalive;
167 176
168 # send greeting 177 # send greeting
169 my $lgreeting1 = "aemp;$PROTOCOL_VERSION" 178 my $lgreeting1 = "aemp;$PROTOCOL_VERSION"
170 . ";$AnyEvent::MP::Base::UNIQ" 179 . ";$AnyEvent::MP::Base::UNIQ"
171 . ";$AnyEvent::MP::Base::NODE" 180 . ";$self->{local_node}"
172 . ";" . (join ",", @AUTH_RCV) 181 . ";" . (join ",", @AUTH_RCV)
173 . ";" . (join ",", @FRAMINGS) 182 . ";" . (join ",", @FRAMINGS)
174 . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv); 183 . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv);
175 184
176 my $lgreeting2 = MIME::Base64::encode_base64 AnyEvent::MP::Base::nonce (33), ""; 185 my $lgreeting2 = MIME::Base64::encode_base64 AnyEvent::MP::Base::nonce (33), "";
263 $self->{s_framing} = $s_framing; 272 $self->{s_framing} = $s_framing;
264 273
265 $hdl->rbuf_max (undef); 274 $hdl->rbuf_max (undef);
266 my $queue = delete $self->{queue}; # we are connected 275 my $queue = delete $self->{queue}; # we are connected
267 276
277 $self->{hdl}->timeout ($self->{remote_greeting}{keepalive} + 5)
278 if $self->{remote_greeting}{keepalive};
279
268 $self->connected; 280 $self->connected;
269 281
270 my $src_node = $self->{node}; 282 my $src_node = $self->{node};
271 283
272 $hdl->push_write ($self->{s_framing} => $_) 284 $self->send ($_)
273 for @$queue; 285 for @$queue;
274 286
275 my $rmsg; $rmsg = sub { 287 my $rmsg; $rmsg = sub {
276 $_[0]->push_read ($r_framing => $rmsg); 288 $_[0]->push_read ($r_framing => $rmsg);
277 289
278 local $AnyEvent::MP::Base::SRCNODE = $src_node; 290 local $AnyEvent::MP::Base::SRCNODE = $src_node;
279 AnyEvent::MP::Base::_inject (@{ $_[1] }); 291 AnyEvent::MP::Base::_inject (@{ $_[1] });
280 }; 292 };
289 301
290sub error { 302sub error {
291 my ($self, $msg) = @_; 303 my ($self, $msg) = @_;
292 304
293 if ($self->{node} && $self->{node}{transport} == $self) { 305 if ($self->{node} && $self->{node}{transport} == $self) {
306 #TODO: store error, but do not instantly fail
294 $self->{node}->fail (transport_error => $msg); 307 $self->{node}->fail (transport_error => $self->{node}{noderef}, $msg);
295 $self->{node}->clr_transport; 308 $self->{node}->clr_transport;
296 } 309 }
297 $AnyEvent::MP::Base::WARN->("$self->{peerhost}:$self->{peerport}: $msg"); 310 $AnyEvent::MP::Base::WARN->("$self->{peerhost}:$self->{peerport}: $msg");
298 $self->destroy; 311 $self->destroy;
299} 312}
300 313
301sub connected { 314sub connected {
302 my ($self) = @_; 315 my ($self) = @_;
303 316
317 if (ref $AnyEvent::MP::Base::SLAVE) {
318 # first connect with a master node
319 my $via = $self->{remote_node};
320 $via =~ s/,/!/g;
321 $AnyEvent::MP::Base::NODE .= "\@$via";
322 $AnyEvent::MP::Base::NODE{$AnyEvent::MP::Base::NODE} = $AnyEvent::MP::Base::NODE{""};
323 $AnyEvent::MP::Base::SLAVE->();
324 }
325
326 if ($self->{local_node} ne $AnyEvent::MP::Base::NODE) {
327 # node changed its name since first greeting
328 $self->send (["", iam => $AnyEvent::MP::Base::NODE]);
329 }
330
304 my $node = AnyEvent::MP::Base::add_node ($self->{remote_node}); 331 my $node = AnyEvent::MP::Base::add_node ($self->{remote_node});
305 Scalar::Util::weaken ($self->{node} = $node); 332 Scalar::Util::weaken ($self->{node} = $node);
306 $node->set_transport ($self); 333 $node->set_transport ($self);
307} 334}
308 335
412 439
413=item tls=<major>.<minor> 440=item tls=<major>.<minor>
414 441
415Indicates that the other side supports TLS (version should be 1.0) and 442Indicates that the other side supports TLS (version should be 1.0) and
416wishes to do a TLS handshake. 443wishes to do a TLS handshake.
444
445=item maxidle=<seconds>
446
447The maximum amount of time the node will not sent data, i.e., idle. This
448can be used to close the conenction when no data has been received for a
449too-long time (say, maxidle + 5 seconds).
417 450
418=back 451=back
419 452
420=head3 Second Greeting Line 453=head3 Second Greeting Line
421 454
433 p/I122ql7kJR8lumW3lXlXCeBnyDAvz8NQo3x5IFowE4 466 p/I122ql7kJR8lumW3lXlXCeBnyDAvz8NQo3x5IFowE4
434 467
435=head2 TLS handshake 468=head2 TLS handshake
436 469
437I<< If, after the handshake, both sides indicate interest in TLS, then the 470I<< If, after the handshake, both sides indicate interest in TLS, then the
438connection B<must> use TLS, or fail.>> 471connection B<must> use TLS, or fail. >>
439 472
440Both sides compare their nonces, and the side who sent the lower nonce 473Both sides compare their nonces, and the side who sent the lower nonce
441value ("string" comparison on the raw octet values) becomes the client, 474value ("string" comparison on the raw octet values) becomes the client,
442and the one with the higher nonce the server. 475and the one with the higher nonce the server.
443 476

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines