ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.55 by root, Wed Feb 29 19:23:44 2012 UTC vs.
Revision 1.59 by root, Sat Mar 3 11:38:43 2012 UTC

92 delete $self->{trial}; 92 delete $self->{trial};
93 93
94 $self->transport_error (transport_error => $self->{id}, "switched connections") 94 $self->transport_error (transport_error => $self->{id}, "switched connections")
95 if $self->{transport}; 95 if $self->{transport};
96 96
97 delete $self->{connect_addr};
98 delete $self->{connect_w}; 97 delete $self->{connect_w};
99 delete $self->{connect_to}; 98 delete $self->{connect_to};
100 99
101 $self->{transport} = $transport; 100 $self->{transport} = $transport;
102 101
116 return if $self->{transport}; 115 return if $self->{transport};
117 return if $self->{connect_w}; 116 return if $self->{connect_w};
118 117
119 Scalar::Util::weaken $self; 118 Scalar::Util::weaken $self;
120 119
120 $self->{connect_to} ||= AE::timer $AnyEvent::MP::Kernel::CONFIG->{connect_interval}, 0, sub {
121 $self->transport_error (transport_error => $self->{id}, "unable to connect");
122 };
123
124 # maybe @$addresses?
125 my $addresses = $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}};
126
127 if ($addresses) {
128 $self->connect_to ($addresses);
129 } else {
130 # on global nodes, all bets are off now - we either know the node, or we don't
131 unless ($AnyEvent::MP::Kernel::GLOBAL) {
132 AnyEvent::MP::Kernel::g_find ($self->{id});
133 }
134 }
135}
136
137sub connect_to {
138 my ($self, $addresses) = @_;
139
140 return if $self->{transport};
141 return if $self->{connect_w};
142
143 return unless @$addresses;
144
145 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} with [@$addresses]");
146
147 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
148 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval};
149
150 $interval = ($monitor - $interval) / @$addresses
151 if ($monitor - $interval) / @$addresses < $interval;
152
153 $interval = 0.4 if $interval < 0.4;
154
155 my @endpoints = reverse @$addresses;
156
157 $self->{connect_w} = AE::timer 0, $interval * (0.9 + 0.1 * rand), sub {
158 my $endpoint = pop @endpoints;
159
160 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} at $endpoint");
161
162 $self->{trial}{$endpoint} ||= do {
163 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
164 or return $AnyEvent::MP::Kernel::WARN->(1, "$self->{id}: not a resolved node reference.");
165
166 AnyEvent::MP::Transport::mp_connect
167 $host, $port,
168 sub { delete $self->{trial}{$endpoint} },
169 };
170 };
171}
172
173sub connect {
174 my ($self) = @_;
175
176 return if $self->{transport};
177 return if $self->{connect_w};
178
179 Scalar::Util::weaken $self;
180
121 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}; 181 my $monitor = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout};
122 182
123 $self->{connect_to} ||= AE::timer $monitor, 0, sub { 183 $self->{connect_to} ||= AE::timer $monitor, 0, sub {
124 $self->transport_error (transport_error => $self->{id}, "unable to connect"); 184 $self->transport_error (transport_error => $self->{id}, "unable to connect");
125 }; 185 };
126 186
127 # maybe @$addresses? 187 # maybe @$addresses?
128 my @addresses = @{ 188 my @addresses = @{ $AnyEvent::MP::Kernel::GLOBAL_DB{"'l"}{$self->{id}} };
129 $AnyEvent::MP::Kernel::GLOBAL_ADDR->{$self->{id}}
130 || $AnyEvent::MP::Kernel::NODE_ADDR->{$self->{id}}
131 };
132 189
133 unless (@addresses) { 190 unless (@addresses) {
134 # on global nodes, all bets are off now - we either know the node, or we don't 191 # on global nodes, all bets are off now - we either know the node, or we don't
135 unless ($AnyEvent::MP::Kernel::GLOBAL) { 192 unless ($AnyEvent::MP::Kernel::GLOBAL) {
136 $self->{connect_w} = AnyEvent::MP::Kernel::global_req ( 193 AnyEvent::MP::Kernel::g_find ($self->{id});
137 g_find => $self->{id},
138 sub {
139 return unless $self; # just to be sure
140 return unless @{ $_[0] };
141
142 local $AnyEvent::MP::Kernel::GLOBAL_ADDR->{$self->{id}} = $_[0]; #d# UGLY
143 delete $self->{connect_w};
144 $self->connect;
145 }
146 );
147 } 194 }
148
149 return; 195 return;
150 } 196 }
151
152 $self->{connect_addr} = \@addresses; # a bit weird, but efficient
153 197
154 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} with [@addresses]"); 198 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} with [@addresses]");
155 199
156 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval}; 200 my $interval = $AnyEvent::MP::Kernel::CONFIG->{connect_interval};
157 201

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines