ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.6 by root, Mon Aug 3 21:35:03 2009 UTC vs.
Revision 1.8 by root, Tue Aug 4 18:33:30 2009 UTC

116 $self->{queue} = [$msg]; 116 $self->{queue} = [$msg];
117 $self->connect; 117 $self->connect;
118 } 118 }
119} 119}
120 120
121sub kill {
122 my ($self, $port, @reason) = @_;
123
124 $self->send (["", kil => $port, @reason]);
125}
126
121sub monitor { 127sub monitor {
122 my ($self, $portid, $cb) = @_; 128 my ($self, $portid, $cb) = @_;
123 129
124 return $cb->() 130 return $cb->("node failed conenction")
125 if $self->{failed}; 131 if $self->{failed};
126 132
127 my $list = $self->{lmon}{$portid} ||= []; 133 my $list = $self->{lmon}{$portid} ||= [];
128 134
129 $self->send (["", mon1 => $portid]) 135 $self->send (["", mon1 => $portid])
149sub set_transport { 155sub set_transport {
150 my ($self, $transport) = @_; 156 my ($self, $transport) = @_;
151 157
152 $self->clr_transport 158 $self->clr_transport
153 if $self->{transport}; 159 if $self->{transport};
154
155 delete $self->{trial};
156 delete $self->{next_connect};
157 delete $self->{failed};
158 160
159 if ( 161 if (
160 exists $self->{remote_uniq} 162 exists $self->{remote_uniq}
161 && $self->{remote_uniq} ne $transport->{remote_uniq} 163 && $self->{remote_uniq} ne $transport->{remote_uniq}
162 ) { 164 ) {
163 # uniq changed, drop queue 165 # uniq changed, different node
166 $self->fail ("node restart detected");
167 }
168
169 delete $self->{trial};
170 delete $self->{next_connect};
164 delete $self->{queue}; 171 delete $self->{failed};
165 #TODO: "DOWN"
166 }
167 172
168 $self->{remote_uniq} = $transport->{remote_uniq}; 173 $self->{remote_uniq} = $transport->{remote_uniq};
169 $self->{transport} = $transport; 174 $self->{transport} = $transport;
170 175
171 $transport->send ($_) 176 $transport->send ($_)
172 for @{ delete $self->{queue} || [] }; 177 for @{ delete $self->{queue} || [] };
173} 178}
174 179
180sub fail {
181 my ($self, @reason) = @_;
182
183 delete $self->{queue};
184
185 $self->{failed} = 1;
186
187 if (my $mon = delete $self->{lmon}) {
188 $_->(@reason) for map @$_, values %$mon;
189 }
190}
191
175sub clr_transport { 192sub clr_transport {
176 my ($self) = @_; 193 my ($self, @reason) = @_;
177 194
178 delete $self->{transport}; 195 delete $self->{transport};
179 $self->{failed} = 1;
180
181 if (my $mon = delete $self->{monitor}) {
182 $_->() for map @$_, values %$mon;
183 }
184
185 $self->connect; 196 $self->connect;
186} 197}
187 198
188sub connect { 199sub connect {
189 my ($self) = @_; 200 my ($self) = @_;
252sub send { 263sub send {
253 local $AnyEvent::MP::Base::SRCNODE = $_[0]; 264 local $AnyEvent::MP::Base::SRCNODE = $_[0];
254 AnyEvent::MP::Base::_inject (@{ $_[1] }); 265 AnyEvent::MP::Base::_inject (@{ $_[1] });
255} 266}
256 267
268sub kill {
269 my ($self, $port, @reason) = @_;
270
271 delete $AnyEvent::MP::Base::PORT{$port};
272 delete $AnyEvent::MP::Base::PORT_DATA{$port};
273
274 my $mon = delete $AnyEvent::MP::Base::LMON{$port}
275 or !@reason
276 or $AnyEvent::MP::Base::WARN->("unmonitored local port $port died with reason: @reason");
277
278 $_->(@reason) for values %$mon;
279}
280
257sub monitor { 281sub monitor {
258 my ($self, $portid, $cb) = @_; 282 my ($self, $portid, $cb) = @_;
259 283
260 return $cb->() 284 return $cb->()
261 unless exists $AnyEvent::MP::Base::PORT{$portid}; 285 unless exists $AnyEvent::MP::Base::PORT{$portid};

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines