ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Transport.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Transport.pm (file contents):
Revision 1.41 by root, Fri Aug 28 16:37:30 2009 UTC vs.
Revision 1.42 by root, Fri Aug 28 20:57:42 2009 UTC

139 139
140 my $config = AnyEvent::MP::Config::config; 140 my $config = AnyEvent::MP::Config::config;
141 141
142 my $latency = $config->{network_latency} || LATENCY; 142 my $latency = $config->{network_latency} || LATENCY;
143 143
144 $arg{secret} = $config->{secret} 144 $self->{secret} = $config->{secret}
145 unless exists $arg{secret}; 145 unless exists $self->{secret};
146 146
147 $arg{timeout} = $config->{monitor_timeout} || $AnyEvent::MP::Kernel::MONITOR_TIMEOUT 147 $self->{timeout} = $config->{monitor_timeout} || $AnyEvent::MP::Kernel::MONITOR_TIMEOUT
148 unless exists $arg{timeout}; 148 unless exists $self->{timeout};
149 149
150 $arg{timeout} -= $latency; 150 $self->{timeout} -= $latency;
151 151
152 $arg{timeout} = 1 + $latency 152 $self->{timeout} = 1 + $latency
153 if $arg{timeout} < 1 + $latency; 153 if $self->{timeout} < 1 + $latency;
154 154
155 my $secret = $arg{secret}; 155 my $secret = $self->{secret};
156 156
157 if (exists $config->{cert}) { 157 if (exists $config->{cert}) {
158 $arg{tls_ctx} = { 158 $self->{tls_ctx} = {
159 sslv2 => 0, 159 sslv2 => 0,
160 sslv3 => 0, 160 sslv3 => 0,
161 tlsv1 => 1, 161 tlsv1 => 1,
162 verify => 1, 162 verify => 1,
163 cert => $config->{cert}, 163 cert => $config->{cert},
165 verify_require_client_cert => 1, 165 verify_require_client_cert => 1,
166 }; 166 };
167 } 167 }
168 168
169 $self->{hdl} = new AnyEvent::Handle 169 $self->{hdl} = new AnyEvent::Handle
170 fh => delete $arg{fh}, 170 fh => delete $self->{fh},
171 autocork => 1, 171 autocork => 1,
172 no_delay => 1, 172 no_delay => 1,
173 on_error => sub { 173 on_error => sub {
174 $self->error ($_[2]); 174 $self->error ($_[2]);
175 }, 175 },
176 rtimeout => $latency, 176 rtimeout => $latency,
177 peername => delete $arg{peername}, 177 peername => delete $self->{peername},
178 ; 178 ;
179 179
180 my $greeting_kv = $self->{greeting} ||= {}; 180 my $greeting_kv = $self->{greeting} ||= {};
181 181
182 $self->{local_node} ||= $AnyEvent::MP::Kernel::NODE; 182 $self->{local_node} ||= $AnyEvent::MP::Kernel::NODE;
183 183
184 $greeting_kv->{tls} = "1.0" if $arg{tls_ctx}; 184 $greeting_kv->{tls} = "1.0" if $self->{tls_ctx};
185 $greeting_kv->{provider} = "AE-$AnyEvent::MP::Kernel::VERSION"; 185 $greeting_kv->{provider} = "AE-$AnyEvent::MP::Kernel::VERSION";
186 $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport}; 186 $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport};
187 $greeting_kv->{timeout} = $arg{timeout}; 187 $greeting_kv->{timeout} = $self->{timeout};
188 188
189 # send greeting 189 # send greeting
190 my $lgreeting1 = "aemp;$PROTOCOL_VERSION" 190 my $lgreeting1 = "aemp;$PROTOCOL_VERSION"
191 . ";$self->{local_node}" 191 . ";$self->{local_node}"
192 . ";" . (join ",", @AUTH_RCV) 192 . ";" . (join ",", @AUTH_RCV)
207 if ($aemp ne "aemp") { 207 if ($aemp ne "aemp") {
208 return $self->error ("unparsable greeting"); 208 return $self->error ("unparsable greeting");
209 } elsif ($version != $PROTOCOL_VERSION) { 209 } elsif ($version != $PROTOCOL_VERSION) {
210 return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version)"); 210 return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version)");
211 } elsif ($rnode eq $self->{local_node}) { 211 } elsif ($rnode eq $self->{local_node}) {
212 AnyEvent::MP::Global::avoid_seed ($self->{seed})
213 if exists $self->{seed};
214
212 return $self->error ("I refuse to talk to myself"); 215 return $self->error ("I refuse to talk to myself");
213 } elsif ($AnyEvent::MP::Kernel::NODE{$rnode} && $AnyEvent::MP::Kernel::NODE{$rnode}{transport}) { 216 } elsif ($AnyEvent::MP::Kernel::NODE{$rnode} && $AnyEvent::MP::Kernel::NODE{$rnode}{transport}) {
214 return $self->error ("$rnode already connected, not connecting again."); 217 return $self->error ("$rnode already connected, not connecting again.");
215 } 218 }
216 219
299 302
300 $hdl->rbuf_max (undef); 303 $hdl->rbuf_max (undef);
301 my $queue = delete $self->{queue}; # we are connected 304 my $queue = delete $self->{queue}; # we are connected
302 305
303 $self->{hdl}->rtimeout ($self->{remote_greeting}{timeout}); 306 $self->{hdl}->rtimeout ($self->{remote_greeting}{timeout});
304 $self->{hdl}->wtimeout ($arg{timeout} - LATENCY); 307 $self->{hdl}->wtimeout ($self->{timeout} - LATENCY);
305 $self->{hdl}->on_wtimeout (sub { $self->send ([]) }); 308 $self->{hdl}->on_wtimeout (sub { $self->send ([]) });
306 309
307 $self->connected; 310 $self->connected;
308 311
309 # send queued messages 312 # send queued messages
365} 368}
366 369
367sub destroy { 370sub destroy {
368 my ($self) = @_; 371 my ($self) = @_;
369 372
373 (delete $self->{release})->()
374 if exists $self->{release};
375
370 $self->{hdl}->destroy 376 $self->{hdl}->destroy
371 if $self->{hdl}; 377 if $self->{hdl};
372} 378}
373 379
374sub DESTROY { 380sub DESTROY {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines