… | |
… | |
139 | |
139 | |
140 | my $config = AnyEvent::MP::Config::config; |
140 | my $config = AnyEvent::MP::Config::config; |
141 | |
141 | |
142 | my $latency = $config->{network_latency} || LATENCY; |
142 | my $latency = $config->{network_latency} || LATENCY; |
143 | |
143 | |
144 | $arg{secret} = $config->{secret} |
144 | $self->{secret} = $config->{secret} |
145 | unless exists $arg{secret}; |
145 | unless exists $self->{secret}; |
146 | |
146 | |
147 | $arg{timeout} = $config->{monitor_timeout} || $AnyEvent::MP::Kernel::MONITOR_TIMEOUT |
147 | $self->{timeout} = $config->{monitor_timeout} || $AnyEvent::MP::Kernel::MONITOR_TIMEOUT |
148 | unless exists $arg{timeout}; |
148 | unless exists $self->{timeout}; |
149 | |
149 | |
150 | $arg{timeout} -= $latency; |
150 | $self->{timeout} -= $latency; |
151 | |
151 | |
152 | $arg{timeout} = 1 + $latency |
152 | $self->{timeout} = 1 + $latency |
153 | if $arg{timeout} < 1 + $latency; |
153 | if $self->{timeout} < 1 + $latency; |
154 | |
154 | |
155 | my $secret = $arg{secret}; |
155 | my $secret = $self->{secret}; |
156 | |
156 | |
157 | if (exists $config->{cert}) { |
157 | if (exists $config->{cert}) { |
158 | $arg{tls_ctx} = { |
158 | $self->{tls_ctx} = { |
159 | sslv2 => 0, |
159 | sslv2 => 0, |
160 | sslv3 => 0, |
160 | sslv3 => 0, |
161 | tlsv1 => 1, |
161 | tlsv1 => 1, |
162 | verify => 1, |
162 | verify => 1, |
163 | cert => $config->{cert}, |
163 | cert => $config->{cert}, |
… | |
… | |
165 | verify_require_client_cert => 1, |
165 | verify_require_client_cert => 1, |
166 | }; |
166 | }; |
167 | } |
167 | } |
168 | |
168 | |
169 | $self->{hdl} = new AnyEvent::Handle |
169 | $self->{hdl} = new AnyEvent::Handle |
170 | fh => delete $arg{fh}, |
170 | fh => delete $self->{fh}, |
171 | autocork => 1, |
171 | autocork => 1, |
172 | no_delay => 1, |
172 | no_delay => 1, |
173 | on_error => sub { |
173 | on_error => sub { |
174 | $self->error ($_[2]); |
174 | $self->error ($_[2]); |
175 | }, |
175 | }, |
176 | rtimeout => $latency, |
176 | rtimeout => $latency, |
177 | peername => delete $arg{peername}, |
177 | peername => delete $self->{peername}, |
178 | ; |
178 | ; |
179 | |
179 | |
180 | my $greeting_kv = $self->{greeting} ||= {}; |
180 | my $greeting_kv = $self->{greeting} ||= {}; |
181 | |
181 | |
182 | $self->{local_node} ||= $AnyEvent::MP::Kernel::NODE; |
182 | $self->{local_node} ||= $AnyEvent::MP::Kernel::NODE; |
183 | |
183 | |
184 | $greeting_kv->{tls} = "1.0" if $arg{tls_ctx}; |
184 | $greeting_kv->{tls} = "1.0" if $self->{tls_ctx}; |
185 | $greeting_kv->{provider} = "AE-$AnyEvent::MP::Kernel::VERSION"; |
185 | $greeting_kv->{provider} = "AE-$AnyEvent::MP::Kernel::VERSION"; |
186 | $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport}; |
186 | $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport}; |
187 | $greeting_kv->{timeout} = $arg{timeout}; |
187 | $greeting_kv->{timeout} = $self->{timeout}; |
188 | |
188 | |
189 | # send greeting |
189 | # send greeting |
190 | my $lgreeting1 = "aemp;$PROTOCOL_VERSION" |
190 | my $lgreeting1 = "aemp;$PROTOCOL_VERSION" |
191 | . ";$self->{local_node}" |
191 | . ";$self->{local_node}" |
192 | . ";" . (join ",", @AUTH_RCV) |
192 | . ";" . (join ",", @AUTH_RCV) |
… | |
… | |
207 | if ($aemp ne "aemp") { |
207 | if ($aemp ne "aemp") { |
208 | return $self->error ("unparsable greeting"); |
208 | return $self->error ("unparsable greeting"); |
209 | } elsif ($version != $PROTOCOL_VERSION) { |
209 | } elsif ($version != $PROTOCOL_VERSION) { |
210 | return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version)"); |
210 | return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version)"); |
211 | } elsif ($rnode eq $self->{local_node}) { |
211 | } elsif ($rnode eq $self->{local_node}) { |
|
|
212 | AnyEvent::MP::Global::avoid_seed ($self->{seed}) |
|
|
213 | if exists $self->{seed}; |
|
|
214 | |
212 | return $self->error ("I refuse to talk to myself"); |
215 | return $self->error ("I refuse to talk to myself"); |
213 | } elsif ($AnyEvent::MP::Kernel::NODE{$rnode} && $AnyEvent::MP::Kernel::NODE{$rnode}{transport}) { |
216 | } elsif ($AnyEvent::MP::Kernel::NODE{$rnode} && $AnyEvent::MP::Kernel::NODE{$rnode}{transport}) { |
214 | return $self->error ("$rnode already connected, not connecting again."); |
217 | return $self->error ("$rnode already connected, not connecting again."); |
215 | } |
218 | } |
216 | |
219 | |
… | |
… | |
299 | |
302 | |
300 | $hdl->rbuf_max (undef); |
303 | $hdl->rbuf_max (undef); |
301 | my $queue = delete $self->{queue}; # we are connected |
304 | my $queue = delete $self->{queue}; # we are connected |
302 | |
305 | |
303 | $self->{hdl}->rtimeout ($self->{remote_greeting}{timeout}); |
306 | $self->{hdl}->rtimeout ($self->{remote_greeting}{timeout}); |
304 | $self->{hdl}->wtimeout ($arg{timeout} - LATENCY); |
307 | $self->{hdl}->wtimeout ($self->{timeout} - LATENCY); |
305 | $self->{hdl}->on_wtimeout (sub { $self->send ([]) }); |
308 | $self->{hdl}->on_wtimeout (sub { $self->send ([]) }); |
306 | |
309 | |
307 | $self->connected; |
310 | $self->connected; |
308 | |
311 | |
309 | # send queued messages |
312 | # send queued messages |
… | |
… | |
365 | } |
368 | } |
366 | |
369 | |
367 | sub destroy { |
370 | sub destroy { |
368 | my ($self) = @_; |
371 | my ($self) = @_; |
369 | |
372 | |
|
|
373 | (delete $self->{release})->() |
|
|
374 | if exists $self->{release}; |
|
|
375 | |
370 | $self->{hdl}->destroy |
376 | $self->{hdl}->destroy |
371 | if $self->{hdl}; |
377 | if $self->{hdl}; |
372 | } |
378 | } |
373 | |
379 | |
374 | sub DESTROY { |
380 | sub DESTROY { |