ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.54 by root, Wed Feb 29 18:44:59 2012 UTC vs.
Revision 1.59 by root, Sat Mar 3 11:38:43 2012 UTC

92 delete $self->{trial}; 92 delete $self->{trial};
93 93
94 $self->transport_error (transport_error => $self->{id}, "switched connections") 94 $self->transport_error (transport_error => $self->{id}, "switched connections")
95 if $self->{transport}; 95 if $self->{transport};
96 96
97 delete $self->{connect_addr};
98 delete $self->{connect_w}; 97 delete $self->{connect_w};
99 delete $self->{connect_to}; 98 delete $self->{connect_to};
100 99
101 $self->{transport} = $transport; 100 $self->{transport} = $transport;
102 101
112 111
113sub connect { 112sub connect {
114 my ($self) = @_; 113 my ($self) = @_;
115 114
116 return if $self->{transport}; 115 return if $self->{transport};
116 return if $self->{connect_w};
117
118 Scalar::Util::weaken $self;
119
120 $self->{connect_to} ||= AE::timer $AnyEvent::MP::Kernel::CONFIG->{connect_interval}, 0, sub {
121 $self->transport_error (transport_error => $self->{id}, "unable to connect");
122 };
123
124 # maybe @$addresses?
125 my $addresses = $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}};
126
127 if ($addresses) {
128 $self->connect_to ($addresses);
129 } else {
130 # on global nodes, all bets are off now - we either know the node, or we don't
131 unless ($AnyEvent::MP::Kernel::GLOBAL) {
132 AnyEvent::MP::Kernel::g_find ($self->{id});
133 }
134 }
135}
136
137sub connect_to {
138 my ($self, $addresses) = @_;
139
140 return if $self->{transport};
141 return if $self->{connect_w};
142
143 return unless @$addresses;
144
145 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} with [@$addresses]");
146
147 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
148 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval};
149
150 $interval = ($monitor - $interval) / @$addresses
151 if ($monitor - $interval) / @$addresses < $interval;
152
153 $interval = 0.4 if $interval < 0.4;
154
155 my @endpoints = reverse @$addresses;
156
157 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub {
158 my $endpoint = pop @endpoints;
159
160 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} at $endpoint");
161
162 $self->{trial}{$endpoint} ||= do {
163 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
164 or return $AnyEvent::MP::Kernel::WARN->(1, "$self->{id}: not a resolved node reference.");
165
166 AnyEvent::MP::Transport::mp_connect
167 $host, $port,
168 sub { delete $self->{trial}{$endpoint} },
169 };
170 };
171}
172
173sub connect {
174 my ($self) = @_;
175
176 return if $self->{transport};
177 return if $self->{connect_w};
117 178
118 Scalar::Util::weaken $self; 179 Scalar::Util::weaken $self;
119 180
120 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}; 181 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
121 182
122 $self->{connect_to} ||= AE::timer $monitor, 0, sub { 183 $self->{connect_to} ||= AE::timer $monitor, 0, sub {
123 $self->transport_error (transport_error => $self->{id}, "unable to connect"); 184 $self->transport_error (transport_error => $self->{id}, "unable to connect");
124 }; 185 };
125 186
126 # maybe @$addresses? 187 # maybe @$addresses?
127 my @addresses = @{ 188 my @addresses = @{ $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}} };
128 $AnyEvent::MP::Kernel::GLOBAL_ADDR->{$self->{id}}
129 || $AnyEvent::MP::Kernel::NODE_ADDR->{$self->{id}}
130 };
131 189
132 unless (@addresses) { 190 unless (@addresses) {
133 # on global nodes, all betsa re off now - we either know the node, or we don't 191 # on global nodes, all bets are off now - we either know the node, or we don't
134 unless ($AnyEvent::MP::Kernel::GLOBAL) { 192 unless ($AnyEvent::MP::Kernel::GLOBAL) {
135 $self->{connect_w} = AnyEvent::MP::Kernel::global_req ( 193 AnyEvent::MP::Kernel::g_find ($self->{id});
136 g_find => $self->{id},
137 sub {
138 return unless $self; # just to be sure
139 return unless @{ $_[0] };
140
141 local $AnyEvent::MP::Kernel::GLOBAL_ADDR->{$self->{id}} = $_[0]; #d# UGLY
142 $self->connect;
143 }
144 );
145 } 194 }
146
147 return; 195 return;
148 } 196 }
149
150 $self->{connect_addr} = \@addresses; # a bit weird, but efficient
151 197
152 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} with [@addresses]"); 198 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} with [@addresses]");
153 199
154 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval}; 200 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval};
155 201
158 204
159 $interval = 0.4 if $interval < 0.4; 205 $interval = 0.4 if $interval < 0.4;
160 206
161 my @endpoints; 207 my @endpoints;
162 208
163 $self->{connect_w} = AE::timer 0.050 * rand, $interval * (0.9 + 0.1 * rand), sub { 209 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub {
164 @endpoints = @addresses 210 @endpoints = @addresses
165 unless @endpoints; 211 unless @endpoints;
166 212
167 my $endpoint = shift @endpoints; 213 my $endpoint = shift @endpoints;
168 214

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines