ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
Revision: 1.15
Committed: Sat Aug 8 21:56:29 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.14: +1 -16 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Node - a single processing node (CPU/process...)
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Node;
8
9 =head1 DESCRIPTION
10
11 =cut
12
13 package AnyEvent::MP::Node;
14
15 use common::sense;
16
17 use AE ();
18 use AnyEvent::Util ();
19 use AnyEvent::Socket ();
20
21 use AnyEvent::MP::Transport ();
22
23 use base Exporter::;
24
25 our $VERSION = '0.0';
26
27 sub new {
28 my ($class, $noderef) = @_;
29
30 bless { noderef => $noderef }, $class
31 }
32
33 package AnyEvent::MP::Node::Direct;
34
35 use base "AnyEvent::MP::Node";
36
37 sub send {
38 my ($self, $msg) = @_;
39
40 if ($self->{transport}) {
41 $self->{transport}->send ($msg);
42 } elsif ($self->{queue}) {
43 push @{ $self->{queue} }, $msg;
44 } else {
45 $self->{queue} = [$msg];
46 $self->connect;
47 }
48 }
49
50 sub kill {
51 my ($self, $port, @reason) = @_;
52
53 $self->send (["", kil => $port, @reason]);
54 }
55
56 sub monitor {
57 my ($self, $portid, $cb) = @_;
58
59 my $list = $self->{lmon}{$portid} ||= [];
60
61 $self->send (["", mon1 => $portid])
62 unless @$list;
63
64 push @$list, $cb;
65 }
66
67 sub unmonitor {
68 my ($self, $portid, $cb) = @_;
69
70 my $list = $self->{lmon}{$portid}
71 or return;
72
73 @$list = grep $_ != $cb, @$list;
74
75 unless (@$list) {
76 $self->send (["", mon0 => $portid]);
77 delete $self->{monitor}{$portid};
78 }
79 }
80
81 sub set_transport {
82 my ($self, $transport) = @_;
83
84 $self->clr_transport
85 if $self->{transport};
86
87 delete $self->{trial};
88 delete $self->{next_connect};
89
90 $self->{transport} = $transport;
91
92 $transport->send ($_)
93 for @{ delete $self->{queue} || [] };
94 }
95
96 sub fail {
97 my ($self, @reason) = @_;
98
99 delete $self->{queue};
100
101 if (my $mon = delete $self->{lmon}) {
102 $_->(@reason) for map @$_, values %$mon;
103 }
104 }
105
106 sub clr_transport {
107 my ($self, @reason) = @_;
108
109 delete $self->{transport};
110 $self->connect;
111 }
112
113 sub connect {
114 my ($self) = @_;
115
116 Scalar::Util::weaken $self;
117
118 $self->{retry} ||= [split /,/, $self->{noderef}];
119
120 my $endpoint = shift @{ $self->{retry} };
121
122 if (defined $endpoint) {
123 $self->{trial}{$endpoint} ||= do {
124 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
125 or return $AnyEvent::MP::Base::WARN->("$self->{noderef}: not a resolved node reference.");
126
127 my ($w, $g);
128
129 $w = AE::timer $AnyEvent::MP::Base::CONNECT_TIMEOUT, 0, sub {
130 delete $self->{trial}{$endpoint};
131 };
132 $g = AnyEvent::MP::Transport::mp_connect
133 $host, $port,
134 sub {
135 delete $self->{trial}{$endpoint}
136 unless @_;
137 $g = shift;
138 };
139 ;
140
141 [$w, \$g]
142 };
143 } else {
144 delete $self->{retry};
145 $self->fail (transport_error => $self->{noderef}, "unable to connect");
146 }
147
148 $self->{next_connect} = AE::timer $AnyEvent::MP::Base::CONNECT_INTERVAL, 0, sub {
149 $self->connect;
150 };
151 }
152
153 package AnyEvent::MP::Node::Slave;
154
155 use base "AnyEvent::MP::Node::Direct";
156
157 sub connect {
158 my ($self) = @_;
159
160 $self->fail (transport_error => $self->{noderef}, "unable to connect to slave node");
161 }
162
163 package AnyEvent::MP::Node::Self;
164
165 use base "AnyEvent::MP::Node";
166
167 sub set_transport {
168 Carp::confess "FATAL error, set_transport was called on local node";
169 }
170
171 sub send {
172 local $AnyEvent::MP::Base::SRCNODE = $_[0];
173 AnyEvent::MP::Base::_inject (@{ $_[1] });
174 }
175
176 sub kill {
177 my ($self, $port, @reason) = @_;
178
179 delete $AnyEvent::MP::Base::PORT{$port};
180 delete $AnyEvent::MP::Base::PORT_DATA{$port};
181
182 my $mon = delete $AnyEvent::MP::Base::LMON{$port}
183 or !@reason
184 or $AnyEvent::MP::Base::WARN->("unmonitored local port $port died with reason: @reason");
185
186 $_->(@reason) for values %$mon;
187 }
188
189 sub monitor {
190 my ($self, $portid, $cb) = @_;
191
192 return $cb->(no_such_port => "cannot monitor nonexistent port")
193 unless exists $AnyEvent::MP::Base::PORT{$portid};
194
195 $AnyEvent::MP::Base::LMON{$portid}{$cb+0} = $cb;
196 }
197
198 sub unmonitor {
199 my ($self, $portid, $cb) = @_;
200
201 delete $AnyEvent::MP::Base::LMON{$portid}{$cb+0};
202 }
203
204 =head1 SEE ALSO
205
206 L<AnyEvent>.
207
208 =head1 AUTHOR
209
210 Marc Lehmann <schmorp@schmorp.de>
211 http://home.schmorp.de/
212
213 =cut
214
215 1
216