ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/cvsroot/AnyEvent-MP/MP/Transport.pm
(Generate patch)

Comparing cvsroot/AnyEvent-MP/MP/Transport.pm (file contents):
Revision 1.30 by root, Mon Aug 10 19:19:30 2009 UTC vs.
Revision 1.31 by root, Wed Aug 12 21:39:59 2009 UTC

71=item $guard = mp_connect $host, $port, <constructor-args>, $cb->($transport) 71=item $guard = mp_connect $host, $port, <constructor-args>, $cb->($transport)
72 72
73=cut 73=cut
74 74
75sub mp_connect { 75sub mp_connect {
76 my $cb = pop; 76 my $release = pop;
77 my ($host, $port, @args) = @_; 77 my ($host, $port, @args) = @_;
78 78
79 my $state;
80
79 AnyEvent::Socket::tcp_connect $host, $port, sub { 81 $state = AnyEvent::Socket::tcp_connect $host, $port, sub {
80 my ($fh, $nhost, $nport) = @_; 82 my ($fh, $nhost, $nport) = @_;
81 83
82 return $cb->() unless $fh; 84 return $release->() unless $fh;
83 85
84 $cb->(new AnyEvent::MP::Transport 86 $state = new AnyEvent::MP::Transport
85 fh => $fh, 87 fh => $fh,
86 peername => $host, 88 peername => $host,
87 peerhost => $nhost, 89 peerhost => $nhost,
88 peerport => $nport, 90 peerport => $nport,
91 release => $release,
89 @args, 92 @args,
90 ); 93 ;
91 } 94 };
95
96 \$state
92} 97}
93 98
94=item new AnyEvent::MP::Transport 99=item new AnyEvent::MP::Transport
95 100
96 # immediately starts negotiation 101 # immediately starts negotiation
132 { 137 {
133 Scalar::Util::weaken (my $self = $self); 138 Scalar::Util::weaken (my $self = $self);
134 139
135 my $config = AnyEvent::MP::Config::node_config; 140 my $config = AnyEvent::MP::Config::node_config;
136 141
142 my $latency = $config->{network_latency} || LATENCY;
143
137 $arg{secret} = $config->{secret} 144 $arg{secret} = $config->{secret}
138 unless exists $arg{secret}; 145 unless exists $arg{secret};
139 146
140 $arg{timeout} = 30 147 $arg{timeout} = $config->{monitor_timeout} || $AnyEvent::MP::Kernel::MONITOR_TIMEOUT
141 unless exists $arg{timeout}; 148 unless exists $arg{timeout};
142 149
150 $arg{timeout} -= $latency;
151
143 $arg{timeout} = 1 + LATENCY 152 $arg{timeout} = 1 + $latency
144 if $arg{timeout} < 1 + LATENCY; 153 if $arg{timeout} < 1 + $latency;
145 154
146 my $secret = $arg{secret}; 155 my $secret = $arg{secret};
147 156
148 if (exists $config->{cert}) { 157 if (exists $config->{cert}) {
149 $arg{tls_ctx} = { 158 $arg{tls_ctx} = {
162 autocork => 1, 171 autocork => 1,
163 no_delay => 1, 172 no_delay => 1,
164 on_error => sub { 173 on_error => sub {
165 $self->error ($_[2]); 174 $self->error ($_[2]);
166 }, 175 },
167 rtimeout => $AnyEvent::MP::Base::CONNECT_TIMEOUT, 176 rtimeout => $latency,
168 peername => delete $arg{peername}, 177 peername => delete $arg{peername},
169 ; 178 ;
170 179
171 my $greeting_kv = $self->{greeting} ||= {}; 180 my $greeting_kv = $self->{greeting} ||= {};
172 181
173 $self->{local_node} = $AnyEvent::MP::Base::NODE; 182 $self->{local_node} = $AnyEvent::MP::Kernel::NODE;
174 183
175 $greeting_kv->{"tls"} = "1.0" if $arg{tls_ctx}; 184 $greeting_kv->{"tls"} = "1.0" if $arg{tls_ctx};
176 $greeting_kv->{provider} = "AE-$AnyEvent::MP::Base::VERSION"; 185 $greeting_kv->{provider} = "AE-$AnyEvent::MP::Kernel::VERSION";
177 $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport}; 186 $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport};
178 $greeting_kv->{timeout} = $arg{timeout}; 187 $greeting_kv->{timeout} = $arg{timeout};
179 188
180 # send greeting 189 # send greeting
181 my $lgreeting1 = "aemp;$PROTOCOL_VERSION" 190 my $lgreeting1 = "aemp;$PROTOCOL_VERSION"
182 . ";$self->{local_node}" 191 . ";$self->{local_node}"
183 . ";" . (join ",", @AUTH_RCV) 192 . ";" . (join ",", @AUTH_RCV)
184 . ";" . (join ",", @FRAMINGS) 193 . ";" . (join ",", @FRAMINGS)
185 . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv); 194 . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv);
186 195
187 my $lgreeting2 = MIME::Base64::encode_base64 AnyEvent::MP::Base::nonce (66), ""; 196 my $lgreeting2 = MIME::Base64::encode_base64 AnyEvent::MP::Kernel::nonce (66), "";
188 197
189 $self->{hdl}->push_write ("$lgreeting1\012$lgreeting2\012"); 198 $self->{hdl}->push_write ("$lgreeting1\012$lgreeting2\012");
190 199
191 # expect greeting 200 # expect greeting
192 $self->{hdl}->rbuf_max (4 * 1024); 201 $self->{hdl}->rbuf_max (4 * 1024);
292 my $src_node = $self->{node}; 301 my $src_node = $self->{node};
293 302
294 my $rmsg; $rmsg = sub { 303 my $rmsg; $rmsg = sub {
295 $_[0]->push_read ($r_framing => $rmsg); 304 $_[0]->push_read ($r_framing => $rmsg);
296 305
297 local $AnyEvent::MP::Base::SRCNODE = $src_node; 306 local $AnyEvent::MP::Kernel::SRCNODE = $src_node;
298 AnyEvent::MP::Base::_inject (@{ $_[1] }); 307 AnyEvent::MP::Kernel::_inject (@{ $_[1] });
299 }; 308 };
300 $hdl->push_read ($r_framing => $rmsg); 309 $hdl->push_read ($r_framing => $rmsg);
301 }); 310 });
302 }); 311 });
303 }); 312 });
307} 316}
308 317
309sub error { 318sub error {
310 my ($self, $msg) = @_; 319 my ($self, $msg) = @_;
311 320
321 $self->{node}->transport_error (transport_error => $self->{node}{noderef}, $msg)
312 if ($self->{node} && $self->{node}{transport} == $self) { 322 if $self->{node} && $self->{node}{transport} == $self;
313 #TODO: store error, but do not instantly fail 323
314 $self->{node}->fail (transport_error => $self->{node}{noderef}, $msg); 324 (delete $self->{release})->()
315 $self->{node}->clr_transport; 325 if exists $self->{release};
316 } 326
317 $AnyEvent::MP::Base::WARN->("$self->{peerhost}:$self->{peerport}: $msg"); 327 $AnyEvent::MP::Kernel::WARN->("$self->{peerhost}:$self->{peerport}: $msg");
318 $self->destroy; 328 $self->destroy;
319} 329}
320 330
321sub connected { 331sub connected {
322 my ($self) = @_; 332 my ($self) = @_;
323 333
324 if (ref $AnyEvent::MP::Base::SLAVE) { 334 (delete $self->{release})->()
335 if exists $self->{release};
336
325 # first connect with a master node 337 # first connect with a master node
326 my $via = $self->{remote_node}; 338 $AnyEvent::MP::Kernel::SLAVE->($self->{remote_node})
327 $via =~ s/,/!/g;
328 $AnyEvent::MP::Base::NODE .= "\@$via";
329 $AnyEvent::MP::Base::NODE{$AnyEvent::MP::Base::NODE} = $AnyEvent::MP::Base::NODE{""};
330 $AnyEvent::MP::Base::SLAVE->(1); 339 if ref $AnyEvent::MP::Kernel::SLAVE;
331 }
332 340
333 if ($self->{local_node} ne $AnyEvent::MP::Base::NODE) {
334 # node changed its name since first greeting
335 $self->send (["", iam => $AnyEvent::MP::Base::NODE]);
336 }
337
338 my $node = AnyEvent::MP::Base::add_node ($self->{remote_node}); 341 my $node = AnyEvent::MP::Kernel::add_node ($self->{remote_node});
339 Scalar::Util::weaken ($self->{node} = $node); 342 Scalar::Util::weaken ($self->{node} = $node);
340 $node->set_transport ($self); 343 $node->transport_connect ($self);
341} 344}
342 345
343sub send { 346sub send {
344 $_[0]{hdl}->push_write ($_[0]{s_framing} => $_[1]); 347 $_[0]{hdl}->push_write ($_[0]{s_framing} => $_[1]);
345} 348}

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines