ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
Revision: 1.30
Committed: Fri Aug 28 20:57:42 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.29: +3 -6 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Node - a single processing node (CPU/process...)
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Node;
8    
9     =head1 DESCRIPTION
10    
11     =cut
12    
13     package AnyEvent::MP::Node;
14    
15     use common::sense;
16    
17     use AE ();
18 root 1.6 use AnyEvent::Util ();
19 root 1.1 use AnyEvent::Socket ();
20    
21     use AnyEvent::MP::Transport ();
22    
23     sub new {
24 root 1.28 my ($self, $id) = @_;
25 root 1.1
26 root 1.28 $self = bless { id => $id }, $self;
27 root 1.1
28 root 1.27 $self->init;
29 root 1.18 $self->transport_reset;
30 root 1.1
31 root 1.18 $self
32     }
33 root 1.1
34 root 1.27 sub init {
35     #
36     }
37    
38 root 1.1 sub send {
39 root 1.19 &{ shift->{send} }
40 root 1.1 }
41    
42 root 1.27 # nodes reachable via the network
43 root 1.18 package AnyEvent::MP::Node::External;
44 root 1.7
45 root 1.18 use base "AnyEvent::MP::Node";
46 root 1.7
47 root 1.18 # called at init time, mostly sets {send}
48     sub transport_reset {
49     my ($self) = @_;
50 root 1.6
51 root 1.18 delete $self->{transport};
52 root 1.6
53 root 1.19 Scalar::Util::weaken $self;
54    
55 root 1.18 $self->{send} = sub {
56 root 1.19 push @{$self->{queue}}, shift;
57     $self->connect;
58 root 1.18 };
59 root 1.6 }
60    
61 root 1.18 # called only after successful handshake
62     sub transport_error {
63     my ($self, @reason) = @_;
64 root 1.6
65 root 1.21 my $no_transport = !$self->{transport};
66    
67     delete $self->{connect_w};
68     delete $self->{connect_to};
69 root 1.20
70 root 1.18 delete $self->{queue};
71     $self->transport_reset;
72 root 1.6
73 root 1.18 if (my $mon = delete $self->{lmon}) {
74     $_->(@reason) for map @$_, values %$mon;
75 root 1.6 }
76 root 1.30
77     AnyEvent::MP::Kernel::_inject_nodeevent ($self, 0, @reason)
78     unless $no_transport;
79 root 1.6 }
80    
81 root 1.18 # called after handshake was successful
82     sub transport_connect {
83 root 1.1 my ($self, $transport) = @_;
84    
85 root 1.28 delete $self->{trial};
86    
87 root 1.20 $self->transport_error (transport_error => "switched connections")
88 root 1.6 if $self->{transport};
89    
90 root 1.18 delete $self->{connect_w};
91     delete $self->{connect_to};
92 root 1.8
93 root 1.15 $self->{transport} = $transport;
94 root 1.1
95 root 1.19 my $transport_send = $transport->can ("send");
96    
97 root 1.18 $self->{send} = sub {
98 root 1.19 $transport_send->($transport, $_[0]);
99 root 1.18 };
100    
101 root 1.20 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 1);
102    
103 root 1.1 $transport->send ($_)
104     for @{ delete $self->{queue} || [] };
105     }
106    
107 root 1.18 sub connect {
108 root 1.28 my ($self, @addresses) = @_;
109 root 1.18
110 root 1.23 return if $self->{transport};
111    
112 root 1.18 Scalar::Util::weaken $self;
113 root 1.8
114 root 1.18 $self->{connect_to} ||= AE::timer
115     $AnyEvent::MP::Config::CFG{monitor_timeout} || $AnyEvent::MP::Kernel::MONITOR_TIMEOUT,
116     0,
117     sub {
118 root 1.28 $self->transport_error (transport_error => $self->{id}, "unable to connect");
119 root 1.18 };
120 root 1.1
121 root 1.29 return unless @addresses;
122    
123 root 1.28 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} with [@addresses]");
124    
125 root 1.18 unless ($self->{connect_w}) {
126     my @endpoints;
127    
128     $self->{connect_w} = AE::timer
129 root 1.26 rand,
130 root 1.18 $AnyEvent::MP::Config::CFG{connect_interval} || $AnyEvent::MP::Kernel::CONNECT_INTERVAL,
131     sub {
132 root 1.28 @endpoints = @addresses
133 root 1.18 unless @endpoints;
134    
135     my $endpoint = shift @endpoints;
136    
137 root 1.28 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} at $endpoint");
138    
139     $self->{trial}{$endpoint} ||= do {
140 root 1.18 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
141 root 1.28 or return $AnyEvent::MP::Kernel::WARN->(1, "$self->{id}: not a resolved node reference.");
142 root 1.18
143     AnyEvent::MP::Transport::mp_connect
144     $host, $port,
145 root 1.28 sub { delete $self->{trial}{$endpoint} },
146 root 1.18 };
147     }
148     ;
149 root 1.6 }
150 root 1.8 }
151    
152 root 1.18 sub kill {
153     my ($self, $port, @reason) = @_;
154 root 1.4
155 root 1.18 $self->send (["", kil => $port, @reason]);
156 root 1.1 }
157    
158 root 1.18 sub monitor {
159     my ($self, $portid, $cb) = @_;
160    
161     my $list = $self->{lmon}{$portid} ||= [];
162    
163     $self->send (["", mon1 => $portid])
164 root 1.24 unless @$list || !length $portid;
165 root 1.1
166 root 1.18 push @$list, $cb;
167     }
168 root 1.1
169 root 1.18 sub unmonitor {
170     my ($self, $portid, $cb) = @_;
171 root 1.1
172 root 1.18 my $list = $self->{lmon}{$portid}
173     or return;
174 root 1.1
175 root 1.18 @$list = grep $_ != $cb, @$list;
176 root 1.1
177 root 1.18 unless (@$list) {
178     $self->send (["", mon0 => $portid]);
179     delete $self->{monitor}{$portid};
180 root 1.1 }
181 root 1.18 }
182 root 1.1
183 root 1.27 # used for direct slave connections as well
184 root 1.18 package AnyEvent::MP::Node::Direct;
185    
186     use base "AnyEvent::MP::Node::External";
187 root 1.1
188     package AnyEvent::MP::Node::Self;
189    
190     use base "AnyEvent::MP::Node";
191    
192 root 1.20 sub connect {
193     # we are trivially connected
194     }
195    
196 root 1.18 sub transport_reset {
197     my ($self) = @_;
198 root 1.1
199 root 1.19 Scalar::Util::weaken $self;
200    
201 root 1.18 $self->{send} = sub {
202 root 1.19 local $AnyEvent::MP::Kernel::SRCNODE = $self;
203     AnyEvent::MP::Kernel::_inject (@{ $_[0] });
204 root 1.18 };
205 root 1.6 }
206    
207 root 1.28 sub transport_connect {
208     my ($self, $tp) = @_;
209    
210     $AnyEvent::MP::Kernel::WARN->(9, "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})");
211     }
212    
213 root 1.7 sub kill {
214     my ($self, $port, @reason) = @_;
215    
216 root 1.18 delete $AnyEvent::MP::Kernel::PORT{$port};
217     delete $AnyEvent::MP::Kernel::PORT_DATA{$port};
218 root 1.7
219 root 1.18 my $mon = delete $AnyEvent::MP::Kernel::LMON{$port}
220 root 1.8 or !@reason
221 root 1.25 or $AnyEvent::MP::Kernel::WARN->(2, "unmonitored local port $port died with reason: @reason");
222 root 1.7
223     $_->(@reason) for values %$mon;
224     }
225    
226 root 1.6 sub monitor {
227     my ($self, $portid, $cb) = @_;
228    
229 root 1.14 return $cb->(no_such_port => "cannot monitor nonexistent port")
230 root 1.18 unless exists $AnyEvent::MP::Kernel::PORT{$portid};
231 root 1.6
232 root 1.18 $AnyEvent::MP::Kernel::LMON{$portid}{$cb+0} = $cb;
233 root 1.6 }
234    
235     sub unmonitor {
236     my ($self, $portid, $cb) = @_;
237    
238 root 1.18 delete $AnyEvent::MP::Kernel::LMON{$portid}{$cb+0};
239 root 1.1 }
240    
241     =head1 SEE ALSO
242    
243 root 1.17 L<AnyEvent::MP>.
244 root 1.1
245     =head1 AUTHOR
246    
247     Marc Lehmann <schmorp@schmorp.de>
248     http://home.schmorp.de/
249    
250     =cut
251    
252     1
253