ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.11
Committed: Fri Aug 28 00:22:04 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.10: +3 -2 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Global - some network-global services
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Global;
8     # -OR-
9     aemp addservice AnyEvent::MP::Global::
10    
11     =head1 DESCRIPTION
12    
13     This module provides an assortment of network-global functions: group name
14     registration and non-local locks.
15    
16     It will also try to build and maintain a full mesh of all network nodes.
17    
18 root 1.3 While it isn't mandatory to run the global services, running it on one
19     node will automatically run it on all nodes.
20    
21 root 1.1 =head1 GLOBALS AND FUNCTIONS
22    
23     =over 4
24    
25     =cut
26    
27     package AnyEvent::MP::Global;
28    
29     use common::sense;
30     use Carp ();
31     use MIME::Base64 ();
32    
33 root 1.8 use AnyEvent ();
34 root 1.4 use AnyEvent::Util ();
35    
36 root 1.1 use AnyEvent::MP;
37     use AnyEvent::MP::Kernel;
38    
39     our $VERSION = $AnyEvent::MP::VERSION;
40    
41 root 1.8 our %addr; # port ID => [address...] mapping
42    
43 root 1.4 our %port; # our rendezvous port on the other side
44     our %lreg; # local registry, name => [pid...]
45     our %lmon; # local rgeistry monitoring name,pid => mon
46     our %greg; # global regstry, name => [pid...]
47    
48 root 1.8 our $nodecnt;
49    
50 root 1.5 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
51    
52 root 1.8 #############################################################################
53     # seednodes
54    
55     our @SEEDS;
56     our $SEED_WATCHER;
57    
58     sub seed_connect {
59     my ($seed) = @_;
60    
61     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
62     or Carp::croak "$seed: unparsable seed address";
63    
64     # ughhh
65     my $tp; $tp = AnyEvent::MP::Transport::mp_connect $host, $port,
66     release => \&more_seeding,
67     sub {
68     $tp = shift;
69     $tp->{keepalive} = $tp;
70     },
71     ;
72     }
73    
74     sub more_seeding {
75     return if $nodecnt;
76     return unless @SEEDS;
77    
78     $AnyEvent::MP::Kernel::WARN->(9, "no nodes connected, seeding.");
79    
80     seed_connect $SEEDS[rand @SEEDS];
81     }
82    
83     sub set_seeds(@) {
84     @SEEDS = @_;
85    
86     $SEED_WATCHER ||= AE::timer 5, $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, \&more_seeding;
87    
88     seed_connect $_
89     for @SEEDS;
90     }
91    
92     #############################################################################
93    
94 root 1.4 sub unreg_groups($) {
95 root 1.8 my ($node) = @_;
96 root 1.4
97 root 1.8 my $qr = qr/^\Q$node\E(?:#|$)/;
98 root 1.4
99     for my $group (values %greg) {
100     @$group = grep $_ !~ $qr, @$group;
101     }
102     }
103    
104     sub set_groups($$) {
105 root 1.8 my ($node, $lreg) = @_;
106 root 1.5
107 root 1.6 while (my ($k, $v) = each %$lreg) {
108     push @{ $greg{$k} }, @$v;
109     }
110 root 1.4 }
111    
112 root 1.7 =item $ports = find $group
113    
114     Returns all the ports currently registered to the given group (as
115     read-only array reference). When the group has no registered members,
116     return C<undef>.
117    
118     =cut
119    
120     sub find($) {
121     @{ $greg{$_[0]} }
122     ? $greg{$_[0]}
123     : undef
124     }
125    
126 root 1.4 =item $guard = register $port, $group
127    
128     Register the given (local!) port in the named global group C<$group>.
129    
130     The port will be unregistered automatically when the port is destroyed.
131    
132     When not called in void context, then a guard object will be returned that
133     will also cause the name to be unregistered when destroyed.
134    
135     =cut
136    
137     # register port from any node
138     sub _register {
139     my ($port, $group) = @_;
140    
141     push @{ $greg{$group} }, $port;
142     }
143    
144 root 1.6 # unregister port from any node
145     sub _unregister {
146     my ($port, $group) = @_;
147    
148     @{ $greg{$group} } = grep $_ ne $port, @{ $greg{$group} };
149     }
150    
151 root 1.4 # unregister local port
152     sub unregister {
153     my ($port, $group) = @_;
154    
155     delete $lmon{"$group\x00$port"};
156     @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
157    
158     _unregister $port, $group;
159    
160     snd $_, reg0 => $port, $group
161     for values %port;
162     }
163    
164     # register local port
165     sub register($$) {
166     my ($port, $group) = @_;
167    
168     port_is_local $port
169     or Carp::croak "AnyEvent::MP::Global::register can only be called for local ports, caught";
170    
171     $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
172     push @{ $lreg{$group} }, $port;
173    
174     snd $_, reg1 => $port, $group
175     for values %port;
176    
177     _register $port, $group;
178    
179     wantarray && AnyEvent::Util::guard { unregister $port, $group }
180     }
181    
182     sub start_node {
183 root 1.8 my ($node) = @_;
184 root 1.4
185 root 1.8 return if exists $port{$node};
186     return if $node eq $NODE; # do not connect to ourselves
187 root 1.4
188     # establish connection
189 root 1.8 my $port = $port{$node} = spawn $node, "AnyEvent::MP::Global::connect", 0, $NODE;
190    
191 root 1.4 mon $port, sub {
192 root 1.8 unreg_groups $node;
193     delete $port{$node};
194 root 1.4 };
195 root 1.8
196 root 1.11 use Data::Dumper; warn Dumper ["addr => ", $AnyEvent::MP::Kernel::LISTENER];#d#
197 root 1.10 snd $port, addr => $AnyEvent::MP::Kernel::LISTENER;
198 root 1.8 snd $port, connect_nodes => \%addr if %addr;
199     snd $port, set => \%lreg if %lreg;
200 root 1.4 }
201 root 1.3
202 root 1.5 # other nodes connect via this
203 root 1.3 sub connect {
204 root 1.8 my ($version, $node) = @_;
205 root 1.1
206 root 1.3 # monitor them, silently die
207 root 1.8 mon $node, psub { kil $SELF };
208 root 1.3
209 root 1.4 rcv $SELF,
210 root 1.8 addr => sub {
211     my $addresses = shift;
212     $AnyEvent::MP::Kernel::WARN->(9, "$node told us it's addresses (@$addresses).");
213     $addr{$node} = $addresses;
214 root 1.9
215     # to help listener-less nodes, we broadcast new addresses to them unconditionally
216     #TODO: should be done by a node finding out about a listener-less one
217     if (@$addresses) {
218     for my $other (values %AnyEvent::MP::NODE) {
219     if ($other->{transport}) {
220     if ($addr{$other->{id}}) {
221 root 1.11 if (!@{ $addr{$other->{id}} }) {
222 root 1.9 $AnyEvent::MP::Kernel::WARN->(9, "helping $other->{id} to find $node.");
223 root 1.11 snd $port{$other->{id}}, connect_nodes => { $node => $addresses };
224 root 1.9 }
225     }
226     }
227     }
228     }
229 root 1.8 },
230 root 1.4 connect_nodes => sub {
231 root 1.8 my ($kv) = @_;
232    
233     use JSON::XS;#d#
234     my $kv_txt = JSON::XS->new->encode ($kv);#d#
235     $AnyEvent::MP::Kernel::WARN->(9, "$node told us it knows about $kv_txt.");#d#
236    
237     while (my ($id, $addresses) = each %$kv) {
238     my $node = AnyEvent::MP::Kernel::add_node $id;
239     $node->connect (@$addresses);
240     start_node $id;
241 root 1.4 }
242     },
243 root 1.8 find_node => sub {
244     my ($othernode) = @_;
245    
246     $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to find $othernode.");
247     snd $port{$node}, connect_nodes => { $othernode => $addr{$othernode} }
248     if $addr{$othernode};
249     },
250 root 1.4 set => sub {
251 root 1.8 set_groups $node, shift;
252 root 1.4 },
253     reg1 => \&_register,
254     reg0 => \&_unregister,
255     ;
256 root 1.3 }
257 root 1.1
258     sub mon_node {
259 root 1.8 my ($node, $is_up) = @_;
260 root 1.1
261     if ($is_up) {
262 root 1.8 ++$nodecnt;
263     start_node $node;
264 root 1.2 } else {
265 root 1.8 --$nodecnt;
266     more_seeding unless $nodecnt;
267     unreg_groups $node;
268    
269     # forget about the node
270     delete $addr{$node};
271     # ask other nodes if they know the node
272     snd $_, find_node => $node
273     for values %port;
274 root 1.1 }
275 root 1.8 #warn "node<$node,$is_up>\n";#d#
276 root 1.1 }
277    
278     mon_node $_, 1
279     for up_nodes;
280    
281     mon_nodes \&mon_node;
282    
283     =back
284    
285     =head1 SEE ALSO
286    
287     L<AnyEvent::MP>.
288    
289     =head1 AUTHOR
290    
291     Marc Lehmann <schmorp@schmorp.de>
292     http://home.schmorp.de/
293    
294     =cut
295    
296     1
297