ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Transport.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Transport.pm (file contents):
Revision 1.20 by root, Tue Aug 4 23:16:57 2009 UTC vs.
Revision 1.25 by root, Thu Aug 6 10:21:48 2009 UTC

131 { 131 {
132 Scalar::Util::weaken (my $self = $self); 132 Scalar::Util::weaken (my $self = $self);
133 133
134 $arg{secret} = AnyEvent::MP::Base::default_secret () 134 $arg{secret} = AnyEvent::MP::Base::default_secret ()
135 unless exists $arg{secret}; 135 unless exists $arg{secret};
136
137 $arg{timeout} = 30
138 unless exists $arg{timeout};
139
140 my $keepalive = (int $arg{timeout} * 0.75) || 1;
136 141
137 my $secret = $arg{secret}; 142 my $secret = $arg{secret};
138 143
139 if ($secret =~ /-----BEGIN RSA PRIVATE KEY-----.*-----END RSA PRIVATE KEY-----.*-----BEGIN CERTIFICATE-----.*-----END CERTIFICATE-----/s) { 144 if ($secret =~ /-----BEGIN RSA PRIVATE KEY-----.*-----END RSA PRIVATE KEY-----.*-----BEGIN CERTIFICATE-----.*-----END CERTIFICATE-----/s) {
140 # assume TLS mode 145 # assume TLS mode
154 autocork => 1, 159 autocork => 1,
155 no_delay => 1, 160 no_delay => 1,
156 on_error => sub { 161 on_error => sub {
157 $self->error ($_[2]); 162 $self->error ($_[2]);
158 }, 163 },
164 timeout => $AnyEvent::MP::Base::CONNECT_TIMEOUT,
159 peername => delete $arg{peername}, 165 peername => delete $arg{peername},
160 ; 166 ;
161 167
162 my $greeting_kv = $self->{greeting} ||= {}; 168 my $greeting_kv = $self->{greeting} ||= {};
169
170 $self->{local_node} = $AnyEvent::MP::Base::NODE;
171
163 $greeting_kv->{"tls"} = "1.0" 172 $greeting_kv->{"tls"} = "1.0" if $arg{tls_ctx};
164 if $arg{tls_ctx};
165 $greeting_kv->{provider} = "AE-$VERSION"; 173 $greeting_kv->{provider} = "AE-$VERSION";
166 $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport}; 174 $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport};
175 $greeting_kv->{maxidle} = $keepalive;
167 176
168 # send greeting 177 # send greeting
169 my $lgreeting1 = "aemp;$PROTOCOL_VERSION" 178 my $lgreeting1 = "aemp;$PROTOCOL_VERSION"
170 . ";$AnyEvent::MP::Base::UNIQ" 179 . ";$AnyEvent::MP::Base::UNIQ"
171 . ";$AnyEvent::MP::Base::NODE" 180 . ";$self->{local_node}"
172 . ";" . (join ",", @AUTH_RCV) 181 . ";" . (join ",", @AUTH_RCV)
173 . ";" . (join ",", @FRAMINGS) 182 . ";" . (join ",", @FRAMINGS)
174 . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv); 183 . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv);
175 184
176 my $lgreeting2 = MIME::Base64::encode_base64 AnyEvent::MP::Base::nonce (33), ""; 185 my $lgreeting2 = MIME::Base64::encode_base64 AnyEvent::MP::Base::nonce (33), "";
263 $self->{s_framing} = $s_framing; 272 $self->{s_framing} = $s_framing;
264 273
265 $hdl->rbuf_max (undef); 274 $hdl->rbuf_max (undef);
266 my $queue = delete $self->{queue}; # we are connected 275 my $queue = delete $self->{queue}; # we are connected
267 276
277 $self->{hdl}->timeout ($self->{remote_greeting}{keepalive} + 5)
278 if $self->{remote_greeting}{keepalive};
279
268 $self->connected; 280 $self->connected;
269 281
270 my $src_node = $self->{node}; 282 my $src_node = $self->{node};
271 283
272 $hdl->push_write ($self->{s_framing} => $_) 284 $self->send ($_)
273 for @$queue; 285 for @$queue;
274 286
275 my $rmsg; $rmsg = sub { 287 my $rmsg; $rmsg = sub {
276 $_[0]->push_read ($r_framing => $rmsg); 288 $_[0]->push_read ($r_framing => $rmsg);
277 289
278 local $AnyEvent::MP::Base::SRCNODE = $src_node; 290 local $AnyEvent::MP::Base::SRCNODE = $src_node;
279 AnyEvent::MP::Base::_inject (@{ $_[1] }); 291 AnyEvent::MP::Base::_inject (@{ $_[1] });
280 }; 292 };
289 301
290sub error { 302sub error {
291 my ($self, $msg) = @_; 303 my ($self, $msg) = @_;
292 304
293 if ($self->{node} && $self->{node}{transport} == $self) { 305 if ($self->{node} && $self->{node}{transport} == $self) {
306 #TODO: store error, but do not instantly fail
294 $self->{node}->fail (transport_error => $msg); 307 $self->{node}->fail (transport_error => $self->{node}{noderef}, $msg);
295 $self->{node}->clr_transport; 308 $self->{node}->clr_transport;
296 } 309 }
297 $AnyEvent::MP::Base::WARN->("$self->{peerhost}:$self->{peerport}: $msg"); 310 $AnyEvent::MP::Base::WARN->("$self->{peerhost}:$self->{peerport}: $msg");
298 $self->destroy; 311 $self->destroy;
299} 312}
300 313
301sub connected { 314sub connected {
302 my ($self) = @_; 315 my ($self) = @_;
303 316
317 if (ref $AnyEvent::MP::Base::SLAVE) {
318 # first connect with a master node
319 my $via = $self->{remote_node};
320 $via =~ s/,/!/g;
321 $AnyEvent::MP::Base::NODE .= "\@$via";
322 $AnyEvent::MP::Base::NODE{$AnyEvent::MP::Base::NODE} = $AnyEvent::MP::Base::NODE{""};
323 $AnyEvent::MP::Base::SLAVE->();
324 }
325
326 if ($self->{local_node} ne $AnyEvent::MP::Base::NODE) {
327 # node changed its name since first greeting
328 $self->send (["", iam => $AnyEvent::MP::Base::NODE]);
329 }
330
304 my $node = AnyEvent::MP::Base::add_node ($self->{remote_node}); 331 my $node = AnyEvent::MP::Base::add_node ($self->{remote_node});
305 Scalar::Util::weaken ($self->{node} = $node); 332 Scalar::Util::weaken ($self->{node} = $node);
306 $node->set_transport ($self); 333 $node->set_transport ($self);
307} 334}
308 335
412 439
413=item tls=<major>.<minor> 440=item tls=<major>.<minor>
414 441
415Indicates that the other side supports TLS (version should be 1.0) and 442Indicates that the other side supports TLS (version should be 1.0) and
416wishes to do a TLS handshake. 443wishes to do a TLS handshake.
444
445=item maxidle=<seconds>
446
447The maximum amount of time the node will not sent data, i.e., idle. This
448can be used to close the conenction when no data has been received for a
449too-long time (say, maxidle + 5 seconds).
417 450
418=back 451=back
419 452
420=head3 Second Greeting Line 453=head3 Second Greeting Line
421 454

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines