ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Node.pm (file contents):
Revision 1.10 by root, Tue Aug 4 23:35:51 2009 UTC vs.
Revision 1.17 by root, Mon Aug 10 01:37:19 2009 UTC

17use AE (); 17use AE ();
18use AnyEvent::Util (); 18use AnyEvent::Util ();
19use AnyEvent::Socket (); 19use AnyEvent::Socket ();
20 20
21use AnyEvent::MP::Transport (); 21use AnyEvent::MP::Transport ();
22
23use base Exporter::;
24
25our $VERSION = '0.0';
26 22
27sub new { 23sub new {
28 my ($class, $noderef) = @_; 24 my ($class, $noderef) = @_;
29 25
30 bless { noderef => $noderef }, $class 26 bless { noderef => $noderef }, $class
54} 50}
55 51
56sub monitor { 52sub monitor {
57 my ($self, $portid, $cb) = @_; 53 my ($self, $portid, $cb) = @_;
58 54
59 return $cb->("node failed conenction")
60 if $self->{failed};
61
62 my $list = $self->{lmon}{$portid} ||= []; 55 my $list = $self->{lmon}{$portid} ||= [];
63 56
64 $self->send (["", mon1 => $portid]) 57 $self->send (["", mon1 => $portid])
65 unless @$list; 58 unless @$list;
66 59
85 my ($self, $transport) = @_; 78 my ($self, $transport) = @_;
86 79
87 $self->clr_transport 80 $self->clr_transport
88 if $self->{transport}; 81 if $self->{transport};
89 82
90 if (
91 exists $self->{remote_uniq}
92 && $self->{remote_uniq} ne $transport->{remote_uniq}
93 ) {
94 # uniq changed, different node
95 $self->fail ("node restart detected");
96 }
97
98 delete $self->{trial}; 83 delete $self->{trial};
84 delete $self->{retry};
99 delete $self->{next_connect}; 85 delete $self->{next_connect};
100 delete $self->{failed};
101 86
102 $self->{remote_uniq} = $transport->{remote_uniq};
103 $self->{transport} = $transport; 87 $self->{transport} = $transport;
104 88
105 $transport->send ($_) 89 $transport->send ($_)
106 for @{ delete $self->{queue} || [] }; 90 for @{ delete $self->{queue} || [] };
107} 91}
108 92
109sub fail { 93sub fail {
110 my ($self, @reason) = @_; 94 my ($self, @reason) = @_;
111 95
112 delete $self->{queue}; 96 delete $self->{queue};
113
114 $self->{failed} = 1;
115 97
116 if (my $mon = delete $self->{lmon}) { 98 if (my $mon = delete $self->{lmon}) {
117 $_->(@reason) for map @$_, values %$mon; 99 $_->(@reason) for map @$_, values %$mon;
118 } 100 }
119} 101}
154 ; 136 ;
155 137
156 [$w, \$g] 138 [$w, \$g]
157 }; 139 };
158 } else { 140 } else {
141 $self->fail (transport_error => $self->{noderef}, "unable to connect");
142 }
143
144 $self->{next_connect} = AE::timer $AnyEvent::MP::Base::CONNECT_INTERVAL, 0, sub {
159 delete $self->{retry}; 145 delete $self->{retry};
160 $self->fail (transport_error => $self->{noderef}, "unable to connect");
161 }
162
163 $self->{next_connect} = AE::timer $AnyEvent::MP::Base::CONNECT_INTERVAL, 0, sub {
164 $self->connect; 146 $self->connect;
165 }; 147 };
166} 148}
167 149
150package AnyEvent::MP::Node::Slave;
151
152use base "AnyEvent::MP::Node::Direct";
153
154sub connect {
155 my ($self) = @_;
156
157 $self->fail (transport_error => $self->{noderef}, "unable to connect to slave node");
158}
159
168package AnyEvent::MP::Node::Self; 160package AnyEvent::MP::Node::Self;
169 161
170use base "AnyEvent::MP::Node"; 162use base "AnyEvent::MP::Node";
171 163
172sub set_transport { 164sub set_transport {
173 die "FATAL error, set_transport was called"; 165 Carp::confess "FATAL error, set_transport was called on local node";
174} 166}
175 167
176sub send { 168sub send {
177 local $AnyEvent::MP::Base::SRCNODE = $_[0]; 169 local $AnyEvent::MP::Base::SRCNODE = $_[0];
178 AnyEvent::MP::Base::_inject (@{ $_[1] }); 170 AnyEvent::MP::Base::_inject (@{ $_[1] });
192} 184}
193 185
194sub monitor { 186sub monitor {
195 my ($self, $portid, $cb) = @_; 187 my ($self, $portid, $cb) = @_;
196 188
197 return $cb->() 189 return $cb->(no_such_port => "cannot monitor nonexistent port")
198 unless exists $AnyEvent::MP::Base::PORT{$portid}; 190 unless exists $AnyEvent::MP::Base::PORT{$portid};
199 191
200 $AnyEvent::MP::Base::LMON{$portid}{$cb+0} = $cb; 192 $AnyEvent::MP::Base::LMON{$portid}{$cb+0} = $cb;
201} 193}
202 194
206 delete $AnyEvent::MP::Base::LMON{$portid}{$cb+0}; 198 delete $AnyEvent::MP::Base::LMON{$portid}{$cb+0};
207} 199}
208 200
209=head1 SEE ALSO 201=head1 SEE ALSO
210 202
211L<AnyEvent>. 203L<AnyEvent::MP>.
212 204
213=head1 AUTHOR 205=head1 AUTHOR
214 206
215 Marc Lehmann <schmorp@schmorp.de> 207 Marc Lehmann <schmorp@schmorp.de>
216 http://home.schmorp.de/ 208 http://home.schmorp.de/

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines