ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
Revision: 1.17
Committed: Mon Aug 10 01:37:19 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.16: +1 -3 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Node - a single processing node (CPU/process...)
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Node;
8    
9     =head1 DESCRIPTION
10    
11     =cut
12    
13     package AnyEvent::MP::Node;
14    
15     use common::sense;
16    
17     use AE ();
18 root 1.6 use AnyEvent::Util ();
19 root 1.1 use AnyEvent::Socket ();
20    
21     use AnyEvent::MP::Transport ();
22    
23     sub new {
24     my ($class, $noderef) = @_;
25    
26     bless { noderef => $noderef }, $class
27     }
28    
29     package AnyEvent::MP::Node::Direct;
30    
31     use base "AnyEvent::MP::Node";
32    
33     sub send {
34     my ($self, $msg) = @_;
35    
36     if ($self->{transport}) {
37     $self->{transport}->send ($msg);
38     } elsif ($self->{queue}) {
39     push @{ $self->{queue} }, $msg;
40     } else {
41     $self->{queue} = [$msg];
42     $self->connect;
43     }
44     }
45    
46 root 1.7 sub kill {
47     my ($self, $port, @reason) = @_;
48    
49     $self->send (["", kil => $port, @reason]);
50     }
51    
52 root 1.6 sub monitor {
53     my ($self, $portid, $cb) = @_;
54    
55     my $list = $self->{lmon}{$portid} ||= [];
56    
57     $self->send (["", mon1 => $portid])
58     unless @$list;
59    
60     push @$list, $cb;
61     }
62    
63     sub unmonitor {
64     my ($self, $portid, $cb) = @_;
65    
66     my $list = $self->{lmon}{$portid}
67     or return;
68    
69     @$list = grep $_ != $cb, @$list;
70    
71     unless (@$list) {
72     $self->send (["", mon0 => $portid]);
73     delete $self->{monitor}{$portid};
74     }
75     }
76    
77 root 1.1 sub set_transport {
78     my ($self, $transport) = @_;
79    
80 root 1.6 $self->clr_transport
81     if $self->{transport};
82    
83 root 1.8 delete $self->{trial};
84 root 1.16 delete $self->{retry};
85 root 1.8 delete $self->{next_connect};
86    
87 root 1.15 $self->{transport} = $transport;
88 root 1.1
89     $transport->send ($_)
90     for @{ delete $self->{queue} || [] };
91     }
92    
93 root 1.8 sub fail {
94     my ($self, @reason) = @_;
95    
96     delete $self->{queue};
97 root 1.1
98 root 1.8 if (my $mon = delete $self->{lmon}) {
99     $_->(@reason) for map @$_, values %$mon;
100 root 1.6 }
101 root 1.8 }
102    
103     sub clr_transport {
104     my ($self, @reason) = @_;
105 root 1.4
106 root 1.8 delete $self->{transport};
107 root 1.4 $self->connect;
108 root 1.1 }
109    
110     sub connect {
111     my ($self) = @_;
112    
113     Scalar::Util::weaken $self;
114    
115 root 1.9 $self->{retry} ||= [split /,/, $self->{noderef}];
116 root 1.1
117     my $endpoint = shift @{ $self->{retry} };
118    
119     if (defined $endpoint) {
120     $self->{trial}{$endpoint} ||= do {
121     my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
122 root 1.9 or return $AnyEvent::MP::Base::WARN->("$self->{noderef}: not a resolved node reference.");
123 root 1.1
124     my ($w, $g);
125    
126 root 1.5 $w = AE::timer $AnyEvent::MP::Base::CONNECT_TIMEOUT, 0, sub {
127 root 1.1 delete $self->{trial}{$endpoint};
128     };
129     $g = AnyEvent::MP::Transport::mp_connect
130     $host, $port,
131     sub {
132     delete $self->{trial}{$endpoint}
133     unless @_;
134     $g = shift;
135     };
136     ;
137    
138     [$w, \$g]
139     };
140     } else {
141 root 1.10 $self->fail (transport_error => $self->{noderef}, "unable to connect");
142 root 1.1 }
143    
144 root 1.5 $self->{next_connect} = AE::timer $AnyEvent::MP::Base::CONNECT_INTERVAL, 0, sub {
145 root 1.16 delete $self->{retry};
146 root 1.1 $self->connect;
147     };
148     }
149    
150 root 1.13 package AnyEvent::MP::Node::Slave;
151    
152     use base "AnyEvent::MP::Node::Direct";
153    
154     sub connect {
155     my ($self) = @_;
156    
157     $self->fail (transport_error => $self->{noderef}, "unable to connect to slave node");
158     }
159    
160 root 1.1 package AnyEvent::MP::Node::Self;
161    
162     use base "AnyEvent::MP::Node";
163    
164     sub set_transport {
165 root 1.12 Carp::confess "FATAL error, set_transport was called on local node";
166 root 1.1 }
167    
168     sub send {
169 root 1.6 local $AnyEvent::MP::Base::SRCNODE = $_[0];
170     AnyEvent::MP::Base::_inject (@{ $_[1] });
171     }
172    
173 root 1.7 sub kill {
174     my ($self, $port, @reason) = @_;
175    
176     delete $AnyEvent::MP::Base::PORT{$port};
177 root 1.8 delete $AnyEvent::MP::Base::PORT_DATA{$port};
178 root 1.7
179     my $mon = delete $AnyEvent::MP::Base::LMON{$port}
180 root 1.8 or !@reason
181     or $AnyEvent::MP::Base::WARN->("unmonitored local port $port died with reason: @reason");
182 root 1.7
183     $_->(@reason) for values %$mon;
184     }
185    
186 root 1.6 sub monitor {
187     my ($self, $portid, $cb) = @_;
188    
189 root 1.14 return $cb->(no_such_port => "cannot monitor nonexistent port")
190 root 1.6 unless exists $AnyEvent::MP::Base::PORT{$portid};
191    
192     $AnyEvent::MP::Base::LMON{$portid}{$cb+0} = $cb;
193     }
194    
195     sub unmonitor {
196     my ($self, $portid, $cb) = @_;
197    
198     delete $AnyEvent::MP::Base::LMON{$portid}{$cb+0};
199 root 1.1 }
200    
201     =head1 SEE ALSO
202    
203 root 1.17 L<AnyEvent::MP>.
204 root 1.1
205     =head1 AUTHOR
206    
207     Marc Lehmann <schmorp@schmorp.de>
208     http://home.schmorp.de/
209    
210     =cut
211    
212     1
213