ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.14 by root, Sat Aug 8 00:22:16 2009 UTC vs.
Revision 1.17 by root, Mon Aug 10 01:37:19 2009 UTC

17use AE (); 17use AE ();
18use AnyEvent::Util (); 18use AnyEvent::Util ();
19use AnyEvent::Socket (); 19use AnyEvent::Socket ();
20 20
21use AnyEvent::MP::Transport (); 21use AnyEvent::MP::Transport ();
22
23use base Exporter::;
24
25our $VERSION = '0.0';
26 22
27sub new { 23sub new {
28 my ($class, $noderef) = @_; 24 my ($class, $noderef) = @_;
29 25
30 bless { noderef => $noderef }, $class 26 bless { noderef => $noderef }, $class
54} 50}
55 51
56sub monitor { 52sub monitor {
57 my ($self, $portid, $cb) = @_; 53 my ($self, $portid, $cb) = @_;
58 54
59 return $cb->(transport_error => "node unreachable")
60 if $self->{failed};
61
62 my $list = $self->{lmon}{$portid} ||= []; 55 my $list = $self->{lmon}{$portid} ||= [];
63 56
64 $self->send (["", mon1 => $portid]) 57 $self->send (["", mon1 => $portid])
65 unless @$list; 58 unless @$list;
66 59
85 my ($self, $transport) = @_; 78 my ($self, $transport) = @_;
86 79
87 $self->clr_transport 80 $self->clr_transport
88 if $self->{transport}; 81 if $self->{transport};
89 82
90 if (
91 exists $self->{remote_uniq}
92 && $self->{remote_uniq} ne $transport->{remote_uniq}
93 ) {
94 # uniq changed, different node
95 $self->fail (transport_error => $self->{noderef}, "node was restarted");
96 }
97
98 delete $self->{trial}; 83 delete $self->{trial};
84 delete $self->{retry};
99 delete $self->{next_connect}; 85 delete $self->{next_connect};
100 delete $self->{failed};
101 86
102 $self->{remote_uniq} = $transport->{remote_uniq};
103 $self->{transport} = $transport; 87 $self->{transport} = $transport;
104 88
105 $transport->send ($_) 89 $transport->send ($_)
106 for @{ delete $self->{queue} || [] }; 90 for @{ delete $self->{queue} || [] };
107} 91}
108 92
109sub fail { 93sub fail {
110 my ($self, @reason) = @_; 94 my ($self, @reason) = @_;
111 95
112 delete $self->{queue}; 96 delete $self->{queue};
113
114 $self->{failed} = 1;
115 97
116 if (my $mon = delete $self->{lmon}) { 98 if (my $mon = delete $self->{lmon}) {
117 $_->(@reason) for map @$_, values %$mon; 99 $_->(@reason) for map @$_, values %$mon;
118 } 100 }
119} 101}
154 ; 136 ;
155 137
156 [$w, \$g] 138 [$w, \$g]
157 }; 139 };
158 } else { 140 } else {
141 $self->fail (transport_error => $self->{noderef}, "unable to connect");
142 }
143
144 $self->{next_connect} = AE::timer $AnyEvent::MP::Base::CONNECT_INTERVAL, 0, sub {
159 delete $self->{retry}; 145 delete $self->{retry};
160 $self->fail (transport_error => $self->{noderef}, "unable to connect");
161 }
162
163 $self->{next_connect} = AE::timer $AnyEvent::MP::Base::CONNECT_INTERVAL, 0, sub {
164 $self->connect; 146 $self->connect;
165 }; 147 };
166} 148}
167 149
168package AnyEvent::MP::Node::Slave; 150package AnyEvent::MP::Node::Slave;
216 delete $AnyEvent::MP::Base::LMON{$portid}{$cb+0}; 198 delete $AnyEvent::MP::Base::LMON{$portid}{$cb+0};
217} 199}
218 200
219=head1 SEE ALSO 201=head1 SEE ALSO
220 202
221L<AnyEvent>. 203L<AnyEvent::MP>.
222 204
223=head1 AUTHOR 205=head1 AUTHOR
224 206
225 Marc Lehmann <schmorp@schmorp.de> 207 Marc Lehmann <schmorp@schmorp.de>
226 http://home.schmorp.de/ 208 http://home.schmorp.de/

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines