ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.13 by root, Wed Aug 5 22:40:51 2009 UTC vs.
Revision 1.16 by root, Sun Aug 9 16:08:16 2009 UTC

19use AnyEvent::Socket (); 19use AnyEvent::Socket ();
20 20
21use AnyEvent::MP::Transport (); 21use AnyEvent::MP::Transport ();
22 22
23use base Exporter::; 23use base Exporter::;
24
25our $VERSION = '0.0';
26 24
27sub new { 25sub new {
28 my ($class, $noderef) = @_; 26 my ($class, $noderef) = @_;
29 27
30 bless { noderef => $noderef }, $class 28 bless { noderef => $noderef }, $class
54} 52}
55 53
56sub monitor { 54sub monitor {
57 my ($self, $portid, $cb) = @_; 55 my ($self, $portid, $cb) = @_;
58 56
59 return $cb->(transport_error => "node unreachable")
60 if $self->{failed};
61
62 my $list = $self->{lmon}{$portid} ||= []; 57 my $list = $self->{lmon}{$portid} ||= [];
63 58
64 $self->send (["", mon1 => $portid]) 59 $self->send (["", mon1 => $portid])
65 unless @$list; 60 unless @$list;
66 61
85 my ($self, $transport) = @_; 80 my ($self, $transport) = @_;
86 81
87 $self->clr_transport 82 $self->clr_transport
88 if $self->{transport}; 83 if $self->{transport};
89 84
90 if (
91 exists $self->{remote_uniq}
92 && $self->{remote_uniq} ne $transport->{remote_uniq}
93 ) {
94 # uniq changed, different node
95 $self->fail (transport_error => $self->{noderef}, "node was restarted");
96 }
97
98 delete $self->{trial}; 85 delete $self->{trial};
86 delete $self->{retry};
99 delete $self->{next_connect}; 87 delete $self->{next_connect};
100 delete $self->{failed};
101 88
102 $self->{remote_uniq} = $transport->{remote_uniq};
103 $self->{transport} = $transport; 89 $self->{transport} = $transport;
104 90
105 $transport->send ($_) 91 $transport->send ($_)
106 for @{ delete $self->{queue} || [] }; 92 for @{ delete $self->{queue} || [] };
107} 93}
108 94
109sub fail { 95sub fail {
110 my ($self, @reason) = @_; 96 my ($self, @reason) = @_;
111 97
112 delete $self->{queue}; 98 delete $self->{queue};
113
114 $self->{failed} = 1;
115 99
116 if (my $mon = delete $self->{lmon}) { 100 if (my $mon = delete $self->{lmon}) {
117 $_->(@reason) for map @$_, values %$mon; 101 $_->(@reason) for map @$_, values %$mon;
118 } 102 }
119} 103}
154 ; 138 ;
155 139
156 [$w, \$g] 140 [$w, \$g]
157 }; 141 };
158 } else { 142 } else {
143 $self->fail (transport_error => $self->{noderef}, "unable to connect");
144 }
145
146 $self->{next_connect} = AE::timer $AnyEvent::MP::Base::CONNECT_INTERVAL, 0, sub {
159 delete $self->{retry}; 147 delete $self->{retry};
160 $self->fail (transport_error => $self->{noderef}, "unable to connect");
161 }
162
163 $self->{next_connect} = AE::timer $AnyEvent::MP::Base::CONNECT_INTERVAL, 0, sub {
164 $self->connect; 148 $self->connect;
165 }; 149 };
166} 150}
167 151
168package AnyEvent::MP::Node::Slave; 152package AnyEvent::MP::Node::Slave;
202} 186}
203 187
204sub monitor { 188sub monitor {
205 my ($self, $portid, $cb) = @_; 189 my ($self, $portid, $cb) = @_;
206 190
207 return $cb->() 191 return $cb->(no_such_port => "cannot monitor nonexistent port")
208 unless exists $AnyEvent::MP::Base::PORT{$portid}; 192 unless exists $AnyEvent::MP::Base::PORT{$portid};
209 193
210 $AnyEvent::MP::Base::LMON{$portid}{$cb+0} = $cb; 194 $AnyEvent::MP::Base::LMON{$portid}{$cb+0} = $cb;
211} 195}
212 196

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines