ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
Revision: 1.27
Committed: Thu Aug 27 07:12:48 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.26: +10 -46 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Node - a single processing node (CPU/process...)
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Node;
8    
9     =head1 DESCRIPTION
10    
11     =cut
12    
13     package AnyEvent::MP::Node;
14    
15     use common::sense;
16    
17     use AE ();
18 root 1.6 use AnyEvent::Util ();
19 root 1.1 use AnyEvent::Socket ();
20    
21     use AnyEvent::MP::Transport ();
22    
23     sub new {
24 root 1.18 my ($self, $noderef) = @_;
25 root 1.1
26 root 1.18 $self = bless { noderef => $noderef }, $self;
27 root 1.1
28 root 1.27 $self->init;
29 root 1.18 $self->transport_reset;
30 root 1.1
31 root 1.18 $self
32     }
33 root 1.1
34 root 1.27 sub init {
35     #
36     }
37    
38 root 1.1 sub send {
39 root 1.19 &{ shift->{send} }
40 root 1.1 }
41    
42 root 1.27 # nodes reachable via the network
43 root 1.18 package AnyEvent::MP::Node::External;
44 root 1.7
45 root 1.18 use base "AnyEvent::MP::Node";
46 root 1.7
47 root 1.18 # called at init time, mostly sets {send}
48     sub transport_reset {
49     my ($self) = @_;
50 root 1.6
51 root 1.18 delete $self->{transport};
52 root 1.6
53 root 1.19 Scalar::Util::weaken $self;
54    
55 root 1.18 $self->{send} = sub {
56 root 1.19 push @{$self->{queue}}, shift;
57     $self->connect;
58 root 1.18 };
59 root 1.23
60     $self->connect
61     if $self->{autoconnect};
62 root 1.6 }
63    
64 root 1.18 # called only after successful handshake
65     sub transport_error {
66     my ($self, @reason) = @_;
67 root 1.6
68 root 1.21 my $no_transport = !$self->{transport};
69    
70     delete $self->{connect_w};
71     delete $self->{connect_to};
72 root 1.20
73 root 1.18 delete $self->{queue};
74     $self->transport_reset;
75 root 1.6
76 root 1.21 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 0, @reason)
77     unless $no_transport;
78 root 1.20
79 root 1.18 if (my $mon = delete $self->{lmon}) {
80     $_->(@reason) for map @$_, values %$mon;
81 root 1.6 }
82     }
83    
84 root 1.18 # called after handshake was successful
85     sub transport_connect {
86 root 1.1 my ($self, $transport) = @_;
87    
88 root 1.20 $self->transport_error (transport_error => "switched connections")
89 root 1.6 if $self->{transport};
90    
91 root 1.18 delete $self->{connect_w};
92     delete $self->{connect_to};
93 root 1.8
94 root 1.15 $self->{transport} = $transport;
95 root 1.1
96 root 1.19 my $transport_send = $transport->can ("send");
97    
98 root 1.18 $self->{send} = sub {
99 root 1.19 $transport_send->($transport, $_[0]);
100 root 1.18 };
101    
102 root 1.20 AnyEvent::MP::Kernel::_inject_nodeevent ($self, 1);
103    
104 root 1.1 $transport->send ($_)
105     for @{ delete $self->{queue} || [] };
106     }
107    
108 root 1.18 sub connect {
109     my ($self) = @_;
110    
111 root 1.23 return if $self->{transport};
112    
113 root 1.27 # just ignore connect requests for slave nodes - let's hope it connects to us instead
114     return if $self->{noderef} =~ /^slave\//;
115    
116 root 1.18 Scalar::Util::weaken $self;
117 root 1.8
118 root 1.18 $self->{connect_to} ||= AE::timer
119     $AnyEvent::MP::Config::CFG{monitor_timeout} || $AnyEvent::MP::Kernel::MONITOR_TIMEOUT,
120     0,
121     sub {
122     $self->transport_error (transport_error => $self->{noderef}, "unable to connect");
123     };
124 root 1.1
125 root 1.18 unless ($self->{connect_w}) {
126     my @endpoints;
127     my %trial;
128    
129     $self->{connect_w} = AE::timer
130 root 1.26 rand,
131 root 1.18 $AnyEvent::MP::Config::CFG{connect_interval} || $AnyEvent::MP::Kernel::CONNECT_INTERVAL,
132     sub {
133     @endpoints = split /,/, $self->{noderef}
134     unless @endpoints;
135    
136     my $endpoint = shift @endpoints;
137    
138     $trial{$endpoint} ||= do {
139     my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
140 root 1.25 or return $AnyEvent::MP::Kernel::WARN->(1, "$self->{noderef}: not a resolved node reference.");
141 root 1.18
142     AnyEvent::MP::Transport::mp_connect
143     $host, $port,
144     sub { delete $trial{$endpoint} }
145     ;
146     };
147     }
148     ;
149 root 1.6 }
150 root 1.8 }
151    
152 root 1.18 sub kill {
153     my ($self, $port, @reason) = @_;
154 root 1.4
155 root 1.18 $self->send (["", kil => $port, @reason]);
156 root 1.1 }
157    
158 root 1.18 sub monitor {
159     my ($self, $portid, $cb) = @_;
160    
161     my $list = $self->{lmon}{$portid} ||= [];
162    
163     $self->send (["", mon1 => $portid])
164 root 1.24 unless @$list || !length $portid;
165 root 1.1
166 root 1.18 push @$list, $cb;
167     }
168 root 1.1
169 root 1.18 sub unmonitor {
170     my ($self, $portid, $cb) = @_;
171 root 1.1
172 root 1.18 my $list = $self->{lmon}{$portid}
173     or return;
174 root 1.1
175 root 1.18 @$list = grep $_ != $cb, @$list;
176 root 1.1
177 root 1.18 unless (@$list) {
178     $self->send (["", mon0 => $portid]);
179     delete $self->{monitor}{$portid};
180 root 1.1 }
181 root 1.18 }
182 root 1.1
183 root 1.27 # used for direct slave connections as well
184 root 1.18 package AnyEvent::MP::Node::Direct;
185    
186     use base "AnyEvent::MP::Node::External";
187 root 1.1
188     package AnyEvent::MP::Node::Self;
189    
190     use base "AnyEvent::MP::Node";
191    
192 root 1.20 sub connect {
193     # we are trivially connected
194     }
195    
196 root 1.18 sub transport_reset {
197     my ($self) = @_;
198 root 1.1
199 root 1.19 Scalar::Util::weaken $self;
200    
201 root 1.18 $self->{send} = sub {
202 root 1.19 local $AnyEvent::MP::Kernel::SRCNODE = $self;
203     AnyEvent::MP::Kernel::_inject (@{ $_[0] });
204 root 1.18 };
205 root 1.6 }
206    
207 root 1.7 sub kill {
208     my ($self, $port, @reason) = @_;
209    
210 root 1.18 delete $AnyEvent::MP::Kernel::PORT{$port};
211     delete $AnyEvent::MP::Kernel::PORT_DATA{$port};
212 root 1.7
213 root 1.18 my $mon = delete $AnyEvent::MP::Kernel::LMON{$port}
214 root 1.8 or !@reason
215 root 1.25 or $AnyEvent::MP::Kernel::WARN->(2, "unmonitored local port $port died with reason: @reason");
216 root 1.7
217     $_->(@reason) for values %$mon;
218     }
219    
220 root 1.6 sub monitor {
221     my ($self, $portid, $cb) = @_;
222    
223 root 1.14 return $cb->(no_such_port => "cannot monitor nonexistent port")
224 root 1.18 unless exists $AnyEvent::MP::Kernel::PORT{$portid};
225 root 1.6
226 root 1.18 $AnyEvent::MP::Kernel::LMON{$portid}{$cb+0} = $cb;
227 root 1.6 }
228    
229     sub unmonitor {
230     my ($self, $portid, $cb) = @_;
231    
232 root 1.18 delete $AnyEvent::MP::Kernel::LMON{$portid}{$cb+0};
233 root 1.1 }
234    
235     =head1 SEE ALSO
236    
237 root 1.17 L<AnyEvent::MP>.
238 root 1.1
239     =head1 AUTHOR
240    
241     Marc Lehmann <schmorp@schmorp.de>
242     http://home.schmorp.de/
243    
244     =cut
245    
246     1
247