ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.9 by root, Tue Aug 4 23:16:57 2009 UTC vs.
Revision 1.15 by root, Sat Aug 8 21:56:29 2009 UTC

54} 54}
55 55
56sub monitor { 56sub monitor {
57 my ($self, $portid, $cb) = @_; 57 my ($self, $portid, $cb) = @_;
58 58
59 return $cb->("node failed conenction")
60 if $self->{failed};
61
62 my $list = $self->{lmon}{$portid} ||= []; 59 my $list = $self->{lmon}{$portid} ||= [];
63 60
64 $self->send (["", mon1 => $portid]) 61 $self->send (["", mon1 => $portid])
65 unless @$list; 62 unless @$list;
66 63
85 my ($self, $transport) = @_; 82 my ($self, $transport) = @_;
86 83
87 $self->clr_transport 84 $self->clr_transport
88 if $self->{transport}; 85 if $self->{transport};
89 86
90 if (
91 exists $self->{remote_uniq}
92 && $self->{remote_uniq} ne $transport->{remote_uniq}
93 ) {
94 # uniq changed, different node
95 $self->fail ("node restart detected");
96 }
97
98 delete $self->{trial}; 87 delete $self->{trial};
99 delete $self->{next_connect}; 88 delete $self->{next_connect};
100 delete $self->{failed};
101 89
102 $self->{remote_uniq} = $transport->{remote_uniq};
103 $self->{transport} = $transport; 90 $self->{transport} = $transport;
104 91
105 $transport->send ($_) 92 $transport->send ($_)
106 for @{ delete $self->{queue} || [] }; 93 for @{ delete $self->{queue} || [] };
107} 94}
108 95
109sub fail { 96sub fail {
110 my ($self, @reason) = @_; 97 my ($self, @reason) = @_;
111 98
112 delete $self->{queue}; 99 delete $self->{queue};
113
114 $self->{failed} = 1;
115 100
116 if (my $mon = delete $self->{lmon}) { 101 if (my $mon = delete $self->{lmon}) {
117 $_->(@reason) for map @$_, values %$mon; 102 $_->(@reason) for map @$_, values %$mon;
118 } 103 }
119} 104}
155 140
156 [$w, \$g] 141 [$w, \$g]
157 }; 142 };
158 } else { 143 } else {
159 delete $self->{retry}; 144 delete $self->{retry};
145 $self->fail (transport_error => $self->{noderef}, "unable to connect");
160 } 146 }
161 147
162 $self->{next_connect} = AE::timer $AnyEvent::MP::Base::CONNECT_INTERVAL, 0, sub { 148 $self->{next_connect} = AE::timer $AnyEvent::MP::Base::CONNECT_INTERVAL, 0, sub {
163 $self->connect; 149 $self->connect;
164 }; 150 };
165} 151}
166 152
153package AnyEvent::MP::Node::Slave;
154
155use base "AnyEvent::MP::Node::Direct";
156
157sub connect {
158 my ($self) = @_;
159
160 $self->fail (transport_error => $self->{noderef}, "unable to connect to slave node");
161}
162
167package AnyEvent::MP::Node::Self; 163package AnyEvent::MP::Node::Self;
168 164
169use base "AnyEvent::MP::Node"; 165use base "AnyEvent::MP::Node";
170 166
171sub set_transport { 167sub set_transport {
172 die "FATAL error, set_transport was called"; 168 Carp::confess "FATAL error, set_transport was called on local node";
173} 169}
174 170
175sub send { 171sub send {
176 local $AnyEvent::MP::Base::SRCNODE = $_[0]; 172 local $AnyEvent::MP::Base::SRCNODE = $_[0];
177 AnyEvent::MP::Base::_inject (@{ $_[1] }); 173 AnyEvent::MP::Base::_inject (@{ $_[1] });
191} 187}
192 188
193sub monitor { 189sub monitor {
194 my ($self, $portid, $cb) = @_; 190 my ($self, $portid, $cb) = @_;
195 191
196 return $cb->() 192 return $cb->(no_such_port => "cannot monitor nonexistent port")
197 unless exists $AnyEvent::MP::Base::PORT{$portid}; 193 unless exists $AnyEvent::MP::Base::PORT{$portid};
198 194
199 $AnyEvent::MP::Base::LMON{$portid}{$cb+0} = $cb; 195 $AnyEvent::MP::Base::LMON{$portid}{$cb+0} = $cb;
200} 196}
201 197

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines