… | |
… | |
71 | =item $guard = mp_connect $host, $port, <constructor-args>, $cb->($transport) |
71 | =item $guard = mp_connect $host, $port, <constructor-args>, $cb->($transport) |
72 | |
72 | |
73 | =cut |
73 | =cut |
74 | |
74 | |
75 | sub mp_connect { |
75 | sub mp_connect { |
76 | my $cb = pop; |
76 | my $release = pop; |
77 | my ($host, $port, @args) = @_; |
77 | my ($host, $port, @args) = @_; |
78 | |
78 | |
|
|
79 | my $state; |
|
|
80 | |
79 | AnyEvent::Socket::tcp_connect $host, $port, sub { |
81 | $state = AnyEvent::Socket::tcp_connect $host, $port, sub { |
80 | my ($fh, $nhost, $nport) = @_; |
82 | my ($fh, $nhost, $nport) = @_; |
81 | |
83 | |
82 | return $cb->() unless $fh; |
84 | return $release->() unless $fh; |
83 | |
85 | |
84 | $cb->(new AnyEvent::MP::Transport |
86 | $state = new AnyEvent::MP::Transport |
85 | fh => $fh, |
87 | fh => $fh, |
86 | peername => $host, |
88 | peername => $host, |
87 | peerhost => $nhost, |
89 | peerhost => $nhost, |
88 | peerport => $nport, |
90 | peerport => $nport, |
|
|
91 | release => $release, |
89 | @args, |
92 | @args, |
90 | ); |
93 | ; |
91 | } |
94 | }; |
|
|
95 | |
|
|
96 | \$state |
92 | } |
97 | } |
93 | |
98 | |
94 | =item new AnyEvent::MP::Transport |
99 | =item new AnyEvent::MP::Transport |
95 | |
100 | |
96 | # immediately starts negotiation |
101 | # immediately starts negotiation |
… | |
… | |
132 | { |
137 | { |
133 | Scalar::Util::weaken (my $self = $self); |
138 | Scalar::Util::weaken (my $self = $self); |
134 | |
139 | |
135 | my $config = AnyEvent::MP::Config::node_config; |
140 | my $config = AnyEvent::MP::Config::node_config; |
136 | |
141 | |
|
|
142 | my $latency = $config->{network_latency} || LATENCY; |
|
|
143 | |
137 | $arg{secret} = $config->{secret} |
144 | $arg{secret} = $config->{secret} |
138 | unless exists $arg{secret}; |
145 | unless exists $arg{secret}; |
139 | |
146 | |
140 | $arg{timeout} = 30 |
147 | $arg{timeout} = $config->{monitor_timeout} || $AnyEvent::MP::Kernel::MONITOR_TIMEOUT |
141 | unless exists $arg{timeout}; |
148 | unless exists $arg{timeout}; |
142 | |
149 | |
|
|
150 | $arg{timeout} -= $latency; |
|
|
151 | |
143 | $arg{timeout} = 1 + LATENCY |
152 | $arg{timeout} = 1 + $latency |
144 | if $arg{timeout} < 1 + LATENCY; |
153 | if $arg{timeout} < 1 + $latency; |
145 | |
154 | |
146 | my $secret = $arg{secret}; |
155 | my $secret = $arg{secret}; |
147 | |
156 | |
148 | if (exists $config->{cert}) { |
157 | if (exists $config->{cert}) { |
149 | $arg{tls_ctx} = { |
158 | $arg{tls_ctx} = { |
… | |
… | |
162 | autocork => 1, |
171 | autocork => 1, |
163 | no_delay => 1, |
172 | no_delay => 1, |
164 | on_error => sub { |
173 | on_error => sub { |
165 | $self->error ($_[2]); |
174 | $self->error ($_[2]); |
166 | }, |
175 | }, |
167 | rtimeout => $AnyEvent::MP::Base::CONNECT_TIMEOUT, |
176 | rtimeout => $latency, |
168 | peername => delete $arg{peername}, |
177 | peername => delete $arg{peername}, |
169 | ; |
178 | ; |
170 | |
179 | |
171 | my $greeting_kv = $self->{greeting} ||= {}; |
180 | my $greeting_kv = $self->{greeting} ||= {}; |
172 | |
181 | |
173 | $self->{local_node} = $AnyEvent::MP::Base::NODE; |
182 | $self->{local_node} = $AnyEvent::MP::Kernel::NODE; |
174 | |
183 | |
175 | $greeting_kv->{"tls"} = "1.0" if $arg{tls_ctx}; |
184 | $greeting_kv->{"tls"} = "1.0" if $arg{tls_ctx}; |
176 | $greeting_kv->{provider} = "AE-$AnyEvent::MP::Base::VERSION"; |
185 | $greeting_kv->{provider} = "AE-$AnyEvent::MP::Kernel::VERSION"; |
177 | $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport}; |
186 | $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport}; |
178 | $greeting_kv->{timeout} = $arg{timeout}; |
187 | $greeting_kv->{timeout} = $arg{timeout}; |
179 | |
188 | |
180 | # send greeting |
189 | # send greeting |
181 | my $lgreeting1 = "aemp;$PROTOCOL_VERSION" |
190 | my $lgreeting1 = "aemp;$PROTOCOL_VERSION" |
182 | . ";$self->{local_node}" |
191 | . ";$self->{local_node}" |
183 | . ";" . (join ",", @AUTH_RCV) |
192 | . ";" . (join ",", @AUTH_RCV) |
184 | . ";" . (join ",", @FRAMINGS) |
193 | . ";" . (join ",", @FRAMINGS) |
185 | . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv); |
194 | . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv); |
186 | |
195 | |
187 | my $lgreeting2 = MIME::Base64::encode_base64 AnyEvent::MP::Base::nonce (66), ""; |
196 | my $lgreeting2 = MIME::Base64::encode_base64 AnyEvent::MP::Kernel::nonce (66), ""; |
188 | |
197 | |
189 | $self->{hdl}->push_write ("$lgreeting1\012$lgreeting2\012"); |
198 | $self->{hdl}->push_write ("$lgreeting1\012$lgreeting2\012"); |
190 | |
199 | |
191 | # expect greeting |
200 | # expect greeting |
192 | $self->{hdl}->rbuf_max (4 * 1024); |
201 | $self->{hdl}->rbuf_max (4 * 1024); |
… | |
… | |
292 | my $src_node = $self->{node}; |
301 | my $src_node = $self->{node}; |
293 | |
302 | |
294 | my $rmsg; $rmsg = sub { |
303 | my $rmsg; $rmsg = sub { |
295 | $_[0]->push_read ($r_framing => $rmsg); |
304 | $_[0]->push_read ($r_framing => $rmsg); |
296 | |
305 | |
297 | local $AnyEvent::MP::Base::SRCNODE = $src_node; |
306 | local $AnyEvent::MP::Kernel::SRCNODE = $src_node; |
298 | AnyEvent::MP::Base::_inject (@{ $_[1] }); |
307 | AnyEvent::MP::Kernel::_inject (@{ $_[1] }); |
299 | }; |
308 | }; |
300 | $hdl->push_read ($r_framing => $rmsg); |
309 | $hdl->push_read ($r_framing => $rmsg); |
301 | }); |
310 | }); |
302 | }); |
311 | }); |
303 | }); |
312 | }); |
… | |
… | |
307 | } |
316 | } |
308 | |
317 | |
309 | sub error { |
318 | sub error { |
310 | my ($self, $msg) = @_; |
319 | my ($self, $msg) = @_; |
311 | |
320 | |
|
|
321 | $self->{node}->transport_error (transport_error => $self->{node}{noderef}, $msg) |
312 | if ($self->{node} && $self->{node}{transport} == $self) { |
322 | if $self->{node} && $self->{node}{transport} == $self; |
313 | #TODO: store error, but do not instantly fail |
323 | |
314 | $self->{node}->fail (transport_error => $self->{node}{noderef}, $msg); |
324 | (delete $self->{release})->() |
315 | $self->{node}->clr_transport; |
325 | if exists $self->{release}; |
316 | } |
326 | |
317 | $AnyEvent::MP::Base::WARN->("$self->{peerhost}:$self->{peerport}: $msg"); |
327 | $AnyEvent::MP::Kernel::WARN->("$self->{peerhost}:$self->{peerport}: $msg"); |
318 | $self->destroy; |
328 | $self->destroy; |
319 | } |
329 | } |
320 | |
330 | |
321 | sub connected { |
331 | sub connected { |
322 | my ($self) = @_; |
332 | my ($self) = @_; |
323 | |
333 | |
324 | if (ref $AnyEvent::MP::Base::SLAVE) { |
334 | (delete $self->{release})->() |
|
|
335 | if exists $self->{release}; |
|
|
336 | |
325 | # first connect with a master node |
337 | # first connect with a master node |
326 | my $via = $self->{remote_node}; |
338 | $AnyEvent::MP::Kernel::SLAVE->($self->{remote_node}) |
327 | $via =~ s/,/!/g; |
|
|
328 | $AnyEvent::MP::Base::NODE .= "\@$via"; |
|
|
329 | $AnyEvent::MP::Base::NODE{$AnyEvent::MP::Base::NODE} = $AnyEvent::MP::Base::NODE{""}; |
|
|
330 | $AnyEvent::MP::Base::SLAVE->(1); |
339 | if ref $AnyEvent::MP::Kernel::SLAVE; |
331 | } |
|
|
332 | |
340 | |
333 | if ($self->{local_node} ne $AnyEvent::MP::Base::NODE) { |
|
|
334 | # node changed its name since first greeting |
|
|
335 | $self->send (["", iam => $AnyEvent::MP::Base::NODE]); |
|
|
336 | } |
|
|
337 | |
|
|
338 | my $node = AnyEvent::MP::Base::add_node ($self->{remote_node}); |
341 | my $node = AnyEvent::MP::Kernel::add_node ($self->{remote_node}); |
339 | Scalar::Util::weaken ($self->{node} = $node); |
342 | Scalar::Util::weaken ($self->{node} = $node); |
340 | $node->set_transport ($self); |
343 | $node->transport_connect ($self); |
341 | } |
344 | } |
342 | |
345 | |
343 | sub send { |
346 | sub send { |
344 | $_[0]{hdl}->push_write ($_[0]{s_framing} => $_[1]); |
347 | $_[0]{hdl}->push_write ($_[0]{s_framing} => $_[1]); |
345 | } |
348 | } |