ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
Revision: 1.29
Committed: Fri Aug 28 00:31:14 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.28: +2 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Node - a single processing node (CPU/process...)
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Node;
8    
9     =head1 DESCRIPTION
10    
11     =cut
12    
13     package AnyEvent::MP::Node;
14    
15     use common::sense;
16    
17     use AE ();
18 root 1.6 use AnyEvent::Util ();
19 root 1.1 use AnyEvent::Socket ();
20    
21     use AnyEvent::MP::Transport ();
22    
23     sub new {
24 root 1.28 my ($self, $id) = @_;
25 root 1.1
26 root 1.28 $self = bless { id => $id }, $self;
27 root 1.1
28 root 1.27 $self->init;
29 root 1.18 $self->transport_reset;
30 root 1.1
31 root 1.18 $self
32     }
33 root 1.1
34 root 1.27 sub init {
35     #
36     }
37    
38 root 1.1 sub send {
39 root 1.19 &{ shift->{send} }
40 root 1.1 }
41    
42 root 1.27 # nodes reachable via the network
43 root 1.18 package AnyEvent::MP::Node::External;
44 root 1.7
45 root 1.18 use base "AnyEvent::MP::Node";
46 root 1.7
47 root 1.18 # called at init time, mostly sets {send}
48     sub transport_reset {
49     my ($self) = @_;
50 root 1.6
51 root 1.18 delete $self->{transport};
52 root 1.6
53 root 1.19 Scalar::Util::weaken $self;
54    
55 root 1.18 $self->{send} = sub {
56 root 1.19 push @{$self->{queue}}, shift;
57     $self->connect;
58 root 1.18 };
59 root 1.23
60     $self->connect
61     if $self->{autoconnect};
62 root 1.6 }
63    
64 root 1.18 # called only after successful handshake
65     sub transport_error {
66     my ($self, @reason) = @_;
67 root 1.6
68 root 1.21 my $no_transport = !$self->{transport};
69    
70     delete $self->{connect_w};
71     delete $self->{connect_to};
72 root 1.20
73 root 1.18 delete $self->{queue};
74     $self->transport_reset;
75 root 1.6
76 root 1.21 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 0, @reason)
77     unless $no_transport;
78 root 1.20
79 root 1.18 if (my $mon = delete $self->{lmon}) {
80     $_->(@reason) for map @$_, values %$mon;
81 root 1.6 }
82     }
83    
84 root 1.18 # called after handshake was successful
85     sub transport_connect {
86 root 1.1 my ($self, $transport) = @_;
87    
88 root 1.28 delete $self->{trial};
89    
90 root 1.20 $self->transport_error (transport_error => "switched connections")
91 root 1.6 if $self->{transport};
92    
93 root 1.18 delete $self->{connect_w};
94     delete $self->{connect_to};
95 root 1.8
96 root 1.15 $self->{transport} = $transport;
97 root 1.1
98 root 1.19 my $transport_send = $transport->can ("send");
99    
100 root 1.18 $self->{send} = sub {
101 root 1.19 $transport_send->($transport, $_[0]);
102 root 1.18 };
103    
104 root 1.20 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 1);
105    
106 root 1.1 $transport->send ($_)
107     for @{ delete $self->{queue} || [] };
108     }
109    
110 root 1.18 sub connect {
111 root 1.28 my ($self, @addresses) = @_;
112 root 1.18
113 root 1.23 return if $self->{transport};
114    
115 root 1.18 Scalar::Util::weaken $self;
116 root 1.8
117 root 1.18 $self->{connect_to} ||= AE::timer
118     $AnyEvent::MP::Config::CFG{monitor_timeout} || $AnyEvent::MP::Kernel::MONITOR_TIMEOUT,
119     0,
120     sub {
121 root 1.28 $self->transport_error (transport_error => $self->{id}, "unable to connect");
122 root 1.18 };
123 root 1.1
124 root 1.29 return unless @addresses;
125    
126 root 1.28 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} with [@addresses]");
127    
128 root 1.18 unless ($self->{connect_w}) {
129     my @endpoints;
130    
131     $self->{connect_w} = AE::timer
132 root 1.26 rand,
133 root 1.18 $AnyEvent::MP::Config::CFG{connect_interval} || $AnyEvent::MP::Kernel::CONNECT_INTERVAL,
134     sub {
135 root 1.28 @endpoints = @addresses
136 root 1.18 unless @endpoints;
137    
138     my $endpoint = shift @endpoints;
139    
140 root 1.28 $AnyEvent::MP::Kernel::WARN->(9, "connecting to $self->{id} at $endpoint");
141    
142     $self->{trial}{$endpoint} ||= do {
143 root 1.18 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
144 root 1.28 or return $AnyEvent::MP::Kernel::WARN->(1, "$self->{id}: not a resolved node reference.");
145 root 1.18
146     AnyEvent::MP::Transport::mp_connect
147     $host, $port,
148 root 1.28 sub { delete $self->{trial}{$endpoint} },
149 root 1.18 };
150     }
151     ;
152 root 1.6 }
153 root 1.8 }
154    
155 root 1.18 sub kill {
156     my ($self, $port, @reason) = @_;
157 root 1.4
158 root 1.18 $self->send (["", kil => $port, @reason]);
159 root 1.1 }
160    
161 root 1.18 sub monitor {
162     my ($self, $portid, $cb) = @_;
163    
164     my $list = $self->{lmon}{$portid} ||= [];
165    
166     $self->send (["", mon1 => $portid])
167 root 1.24 unless @$list || !length $portid;
168 root 1.1
169 root 1.18 push @$list, $cb;
170     }
171 root 1.1
172 root 1.18 sub unmonitor {
173     my ($self, $portid, $cb) = @_;
174 root 1.1
175 root 1.18 my $list = $self->{lmon}{$portid}
176     or return;
177 root 1.1
178 root 1.18 @$list = grep $_ != $cb, @$list;
179 root 1.1
180 root 1.18 unless (@$list) {
181     $self->send (["", mon0 => $portid]);
182     delete $self->{monitor}{$portid};
183 root 1.1 }
184 root 1.18 }
185 root 1.1
186 root 1.27 # used for direct slave connections as well
187 root 1.18 package AnyEvent::MP::Node::Direct;
188    
189     use base "AnyEvent::MP::Node::External";
190 root 1.1
191     package AnyEvent::MP::Node::Self;
192    
193     use base "AnyEvent::MP::Node";
194    
195 root 1.20 sub connect {
196     # we are trivially connected
197     }
198    
199 root 1.18 sub transport_reset {
200     my ($self) = @_;
201 root 1.1
202 root 1.19 Scalar::Util::weaken $self;
203    
204 root 1.18 $self->{send} = sub {
205 root 1.19 local $AnyEvent::MP::Kernel::SRCNODE = $self;
206     AnyEvent::MP::Kernel::_inject (@{ $_[0] });
207 root 1.18 };
208 root 1.6 }
209    
210 root 1.28 sub transport_connect {
211     my ($self, $tp) = @_;
212    
213     $AnyEvent::MP::Kernel::WARN->(9, "I refuse to talk to myself ($tp->{peerhost}:$tp->{peerport})");
214     }
215    
216 root 1.7 sub kill {
217     my ($self, $port, @reason) = @_;
218    
219 root 1.18 delete $AnyEvent::MP::Kernel::PORT{$port};
220     delete $AnyEvent::MP::Kernel::PORT_DATA{$port};
221 root 1.7
222 root 1.18 my $mon = delete $AnyEvent::MP::Kernel::LMON{$port}
223 root 1.8 or !@reason
224 root 1.25 or $AnyEvent::MP::Kernel::WARN->(2, "unmonitored local port $port died with reason: @reason");
225 root 1.7
226     $_->(@reason) for values %$mon;
227     }
228    
229 root 1.6 sub monitor {
230     my ($self, $portid, $cb) = @_;
231    
232 root 1.14 return $cb->(no_such_port => "cannot monitor nonexistent port")
233 root 1.18 unless exists $AnyEvent::MP::Kernel::PORT{$portid};
234 root 1.6
235 root 1.18 $AnyEvent::MP::Kernel::LMON{$portid}{$cb+0} = $cb;
236 root 1.6 }
237    
238     sub unmonitor {
239     my ($self, $portid, $cb) = @_;
240    
241 root 1.18 delete $AnyEvent::MP::Kernel::LMON{$portid}{$cb+0};
242 root 1.1 }
243    
244     =head1 SEE ALSO
245    
246 root 1.17 L<AnyEvent::MP>.
247 root 1.1
248     =head1 AUTHOR
249    
250     Marc Lehmann <schmorp@schmorp.de>
251     http://home.schmorp.de/
252    
253     =cut
254    
255     1
256