ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Node.pm
Revision: 1.5
Committed: Sun Aug 2 14:44:37 2009 UTC (14 years, 10 months ago) by root
Branch: MAIN
CVS Tags: rel-0_02
Changes since 1.4: +78 -5 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Node - a single processing node (CPU/process...)
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Node;
8    
9     =head1 DESCRIPTION
10    
11     =cut
12    
13     package AnyEvent::MP::Node;
14    
15     use common::sense;
16    
17     use AE ();
18     use AnyEvent::Socket ();
19    
20     use AnyEvent::MP::Transport ();
21    
22     use base Exporter::;
23    
24     our $VERSION = '0.0';
25    
26 root 1.5 our $DEFAULT_PORT = "4040";
27    
28     sub normalise_noderef($) {
29     my ($noderef) = @_;
30    
31     my $cv = AE::cv;
32     my @res;
33    
34     $cv->begin (sub {
35     my %seen;
36     my @refs;
37     for (sort { $a->[0] <=> $b->[0] } @res) {
38     push @refs, $_->[1] unless $seen{$_->[1]}++
39     }
40     shift->send (join ",", @refs);
41     });
42    
43     $noderef = $DEFAULT_PORT unless length $noderef;
44    
45     my $idx;
46     for my $t (split /,/, $noderef) {
47     my $pri = ++$idx;
48    
49     #TODO: this should be outside normalise_noderef and in become_public
50     if ($t =~ /^\d*$/) {
51     require POSIX;
52     my $nodename = (POSIX::uname ())[1];
53    
54     $cv->begin;
55     AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub {
56     for (@_) {
57     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
58     push @res, [
59     $pri += 1e-5,
60     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
61     ];
62     }
63     $cv->end;
64     };
65    
66     # my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename;
67     #
68     # for (@ipv4) {
69     # push @res, [
70     # $pri,
71     # AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT,
72     # ];
73     # }
74     } else {
75     my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT"
76     or Carp::croak "$t: unparsable transport descriptor";
77    
78     $cv->begin;
79     AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub {
80     for (@_) {
81     my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3];
82     push @res, [
83     $pri += 1e-5,
84     AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service
85     ];
86     }
87     $cv->end;
88     }
89     }
90     }
91    
92     $cv->end;
93    
94     $cv
95     }
96    
97 root 1.1 sub new {
98     my ($class, $noderef) = @_;
99    
100     bless { noderef => $noderef }, $class
101     }
102    
103     package AnyEvent::MP::Node::Direct;
104    
105     use base "AnyEvent::MP::Node";
106    
107     sub send {
108     my ($self, $msg) = @_;
109    
110     if ($self->{transport}) {
111     $self->{transport}->send ($msg);
112     } elsif ($self->{queue}) {
113     push @{ $self->{queue} }, $msg;
114     } else {
115     $self->{queue} = [$msg];
116     $self->connect;
117     }
118     }
119    
120     sub set_transport {
121     my ($self, $transport) = @_;
122    
123     delete $self->{trial};
124     delete $self->{next_connect};
125    
126     if (
127     exists $self->{remote_uniq}
128     && $self->{remote_uniq} ne $transport->{remote_uniq}
129     ) {
130     # uniq changed, drop queue
131     delete $self->{queue};
132     #TODO: "DOWN"
133     }
134    
135     $self->{remote_uniq} = $transport->{remote_uniq};
136     $self->{transport} = $transport;
137    
138     $transport->send ($_)
139     for @{ delete $self->{queue} || [] };
140     }
141    
142     sub clr_transport {
143     my ($self) = @_;
144    
145     delete $self->{transport};
146 root 1.4
147     $self->connect;
148 root 1.1 }
149    
150     sub connect {
151     my ($self) = @_;
152    
153     Scalar::Util::weaken $self;
154    
155     unless (exists $self->{n_noderef}) {
156 root 1.5 return if $self->{n_noderef_}++;
157     (AnyEvent::MP::Node::normalise_noderef ($self->{noderef}))->cb (sub {
158 root 1.1 $self or return;
159 root 1.5 delete $self->{n_noderef_};
160 root 1.1 my $noderef = shift->recv;
161    
162     $self->{n_noderef} = $noderef;
163    
164 root 1.5 $AnyEvent::MP::Base::NODE{$_} = $self
165 root 1.1 for split /,/, $noderef;
166    
167     $self->connect;
168     });
169     return;
170     }
171    
172     $self->{retry} ||= [split /,/, $self->{n_noderef}];
173    
174     my $endpoint = shift @{ $self->{retry} };
175    
176     if (defined $endpoint) {
177     $self->{trial}{$endpoint} ||= do {
178     my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
179     or return;
180    
181     my ($w, $g);
182    
183 root 1.5 $w = AE::timer $AnyEvent::MP::Base::CONNECT_TIMEOUT, 0, sub {
184 root 1.1 delete $self->{trial}{$endpoint};
185     };
186     $g = AnyEvent::MP::Transport::mp_connect
187     $host, $port,
188     sub {
189     delete $self->{trial}{$endpoint}
190     unless @_;
191     $g = shift;
192     };
193     ;
194    
195     [$w, \$g]
196     };
197     } else {
198     delete $self->{retry};
199     }
200    
201 root 1.5 $self->{next_connect} = AE::timer $AnyEvent::MP::Base::CONNECT_INTERVAL, 0, sub {
202 root 1.1 $self->connect;
203     };
204     }
205    
206     package AnyEvent::MP::Node::Self;
207    
208     use base "AnyEvent::MP::Node";
209    
210     sub set_transport {
211     die "FATAL error, set_transport was called";
212     }
213    
214     sub send {
215 root 1.5 AnyEvent::MP::Base::_inject ($_[1]);
216 root 1.1 }
217    
218     =head1 SEE ALSO
219    
220     L<AnyEvent>.
221    
222     =head1 AUTHOR
223    
224     Marc Lehmann <schmorp@schmorp.de>
225     http://home.schmorp.de/
226    
227     =cut
228    
229     1
230