ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.7 by root, Tue Aug 4 14:10:51 2009 UTC vs.
Revision 1.8 by root, Tue Aug 4 18:33:30 2009 UTC

120 120
121sub kill { 121sub kill {
122 my ($self, $port, @reason) = @_; 122 my ($self, $port, @reason) = @_;
123 123
124 $self->send (["", kil => $port, @reason]); 124 $self->send (["", kil => $port, @reason]);
125
126# delete $AnyEvent::MP::Base::PORT{$port};
127
128# my $mon = delete $AnyEvent::MP::Base::LMON{$port}
129# or return;
130
131# $_->(@reason) for values %$mon;
132} 125}
133 126
134sub monitor { 127sub monitor {
135 my ($self, $portid, $cb) = @_; 128 my ($self, $portid, $cb) = @_;
136 129
137 return $cb->() 130 return $cb->("node failed conenction")
138 if $self->{failed}; 131 if $self->{failed};
139 132
140 my $list = $self->{lmon}{$portid} ||= []; 133 my $list = $self->{lmon}{$portid} ||= [];
141 134
142 $self->send (["", mon1 => $portid]) 135 $self->send (["", mon1 => $portid])
162sub set_transport { 155sub set_transport {
163 my ($self, $transport) = @_; 156 my ($self, $transport) = @_;
164 157
165 $self->clr_transport 158 $self->clr_transport
166 if $self->{transport}; 159 if $self->{transport};
167
168 delete $self->{trial};
169 delete $self->{next_connect};
170 delete $self->{failed};
171 160
172 if ( 161 if (
173 exists $self->{remote_uniq} 162 exists $self->{remote_uniq}
174 && $self->{remote_uniq} ne $transport->{remote_uniq} 163 && $self->{remote_uniq} ne $transport->{remote_uniq}
175 ) { 164 ) {
176 # uniq changed, drop queue 165 # uniq changed, different node
166 $self->fail ("node restart detected");
167 }
168
169 delete $self->{trial};
170 delete $self->{next_connect};
177 delete $self->{queue}; 171 delete $self->{failed};
178 #TODO: "DOWN"
179 }
180 172
181 $self->{remote_uniq} = $transport->{remote_uniq}; 173 $self->{remote_uniq} = $transport->{remote_uniq};
182 $self->{transport} = $transport; 174 $self->{transport} = $transport;
183 175
184 $transport->send ($_) 176 $transport->send ($_)
185 for @{ delete $self->{queue} || [] }; 177 for @{ delete $self->{queue} || [] };
186} 178}
187 179
180sub fail {
181 my ($self, @reason) = @_;
182
183 delete $self->{queue};
184
185 $self->{failed} = 1;
186
187 if (my $mon = delete $self->{lmon}) {
188 $_->(@reason) for map @$_, values %$mon;
189 }
190}
191
188sub clr_transport { 192sub clr_transport {
189 my ($self) = @_; 193 my ($self, @reason) = @_;
190 194
191 delete $self->{transport}; 195 delete $self->{transport};
192 $self->{failed} = 1;
193
194 if (my $mon = delete $self->{monitor}) {
195 $_->() for map @$_, values %$mon;
196 }
197
198 $self->connect; 196 $self->connect;
199} 197}
200 198
201sub connect { 199sub connect {
202 my ($self) = @_; 200 my ($self) = @_;
269 267
270sub kill { 268sub kill {
271 my ($self, $port, @reason) = @_; 269 my ($self, $port, @reason) = @_;
272 270
273 delete $AnyEvent::MP::Base::PORT{$port}; 271 delete $AnyEvent::MP::Base::PORT{$port};
272 delete $AnyEvent::MP::Base::PORT_DATA{$port};
274 273
275 my $mon = delete $AnyEvent::MP::Base::LMON{$port} 274 my $mon = delete $AnyEvent::MP::Base::LMON{$port}
276 or return; 275 or !@reason
276 or $AnyEvent::MP::Base::WARN->("unmonitored local port $port died with reason: @reason");
277 277
278 $_->(@reason) for values %$mon; 278 $_->(@reason) for values %$mon;
279} 279}
280 280
281sub monitor { 281sub monitor {

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines