ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
Revision: 1.13
Committed: Wed Aug 5 22:40:51 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.12: +11 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Node - a single processing node (CPU/process...)
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Node;
8
9 =head1 DESCRIPTION
10
11 =cut
12
13 package AnyEvent::MP::Node;
14
15 use common::sense;
16
17 use AE ();
18 use AnyEvent::Util ();
19 use AnyEvent::Socket ();
20
21 use AnyEvent::MP::Transport ();
22
23 use base Exporter::;
24
25 our $VERSION = '0.0';
26
27 sub new {
28 my ($class, $noderef) = @_;
29
30 bless { noderef => $noderef }, $class
31 }
32
33 package AnyEvent::MP::Node::Direct;
34
35 use base "AnyEvent::MP::Node";
36
37 sub send {
38 my ($self, $msg) = @_;
39
40 if ($self->{transport}) {
41 $self->{transport}->send ($msg);
42 } elsif ($self->{queue}) {
43 push @{ $self->{queue} }, $msg;
44 } else {
45 $self->{queue} = [$msg];
46 $self->connect;
47 }
48 }
49
50 sub kill {
51 my ($self, $port, @reason) = @_;
52
53 $self->send (["", kil => $port, @reason]);
54 }
55
56 sub monitor {
57 my ($self, $portid, $cb) = @_;
58
59 return $cb->(transport_error => "node unreachable")
60 if $self->{failed};
61
62 my $list = $self->{lmon}{$portid} ||= [];
63
64 $self->send (["", mon1 => $portid])
65 unless @$list;
66
67 push @$list, $cb;
68 }
69
70 sub unmonitor {
71 my ($self, $portid, $cb) = @_;
72
73 my $list = $self->{lmon}{$portid}
74 or return;
75
76 @$list = grep $_ != $cb, @$list;
77
78 unless (@$list) {
79 $self->send (["", mon0 => $portid]);
80 delete $self->{monitor}{$portid};
81 }
82 }
83
84 sub set_transport {
85 my ($self, $transport) = @_;
86
87 $self->clr_transport
88 if $self->{transport};
89
90 if (
91 exists $self->{remote_uniq}
92 && $self->{remote_uniq} ne $transport->{remote_uniq}
93 ) {
94 # uniq changed, different node
95 $self->fail (transport_error => $self->{noderef}, "node was restarted");
96 }
97
98 delete $self->{trial};
99 delete $self->{next_connect};
100 delete $self->{failed};
101
102 $self->{remote_uniq} = $transport->{remote_uniq};
103 $self->{transport} = $transport;
104
105 $transport->send ($_)
106 for @{ delete $self->{queue} || [] };
107 }
108
109 sub fail {
110 my ($self, @reason) = @_;
111
112 delete $self->{queue};
113
114 $self->{failed} = 1;
115
116 if (my $mon = delete $self->{lmon}) {
117 $_->(@reason) for map @$_, values %$mon;
118 }
119 }
120
121 sub clr_transport {
122 my ($self, @reason) = @_;
123
124 delete $self->{transport};
125 $self->connect;
126 }
127
128 sub connect {
129 my ($self) = @_;
130
131 Scalar::Util::weaken $self;
132
133 $self->{retry} ||= [split /,/, $self->{noderef}];
134
135 my $endpoint = shift @{ $self->{retry} };
136
137 if (defined $endpoint) {
138 $self->{trial}{$endpoint} ||= do {
139 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
140 or return $AnyEvent::MP::Base::WARN->("$self->{noderef}: not a resolved node reference.");
141
142 my ($w, $g);
143
144 $w = AE::timer $AnyEvent::MP::Base::CONNECT_TIMEOUT, 0, sub {
145 delete $self->{trial}{$endpoint};
146 };
147 $g = AnyEvent::MP::Transport::mp_connect
148 $host, $port,
149 sub {
150 delete $self->{trial}{$endpoint}
151 unless @_;
152 $g = shift;
153 };
154 ;
155
156 [$w, \$g]
157 };
158 } else {
159 delete $self->{retry};
160 $self->fail (transport_error => $self->{noderef}, "unable to connect");
161 }
162
163 $self->{next_connect} = AE::timer $AnyEvent::MP::Base::CONNECT_INTERVAL, 0, sub {
164 $self->connect;
165 };
166 }
167
168 package AnyEvent::MP::Node::Slave;
169
170 use base "AnyEvent::MP::Node::Direct";
171
172 sub connect {
173 my ($self) = @_;
174
175 $self->fail (transport_error => $self->{noderef}, "unable to connect to slave node");
176 }
177
178 package AnyEvent::MP::Node::Self;
179
180 use base "AnyEvent::MP::Node";
181
182 sub set_transport {
183 Carp::confess "FATAL error, set_transport was called on local node";
184 }
185
186 sub send {
187 local $AnyEvent::MP::Base::SRCNODE = $_[0];
188 AnyEvent::MP::Base::_inject (@{ $_[1] });
189 }
190
191 sub kill {
192 my ($self, $port, @reason) = @_;
193
194 delete $AnyEvent::MP::Base::PORT{$port};
195 delete $AnyEvent::MP::Base::PORT_DATA{$port};
196
197 my $mon = delete $AnyEvent::MP::Base::LMON{$port}
198 or !@reason
199 or $AnyEvent::MP::Base::WARN->("unmonitored local port $port died with reason: @reason");
200
201 $_->(@reason) for values %$mon;
202 }
203
204 sub monitor {
205 my ($self, $portid, $cb) = @_;
206
207 return $cb->()
208 unless exists $AnyEvent::MP::Base::PORT{$portid};
209
210 $AnyEvent::MP::Base::LMON{$portid}{$cb+0} = $cb;
211 }
212
213 sub unmonitor {
214 my ($self, $portid, $cb) = @_;
215
216 delete $AnyEvent::MP::Base::LMON{$portid}{$cb+0};
217 }
218
219 =head1 SEE ALSO
220
221 L<AnyEvent>.
222
223 =head1 AUTHOR
224
225 Marc Lehmann <schmorp@schmorp.de>
226 http://home.schmorp.de/
227
228 =cut
229
230 1
231