ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.42
Committed: Sun Feb 26 11:12:54 2012 UTC (12 years, 3 months ago) by root
Branch: MAIN
Changes since 1.41: +2 -2 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Global - some network-global services
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Global;
8    
9     =head1 DESCRIPTION
10    
11 root 1.42 This module maintains a fully-meshed network between global nodes and
12     tries to have connections with all nodes in the network.
13 root 1.1
14 root 1.15 It also manages named port groups - ports can register themselves in any
15     number of groups that will be available network-wide, which is great for
16     discovering services.
17 root 1.1
18 root 1.15 Running it on one node will automatically run it on all nodes, although,
19     at the moment, the global service is started by default anyways.
20 root 1.3
21 root 1.1 =head1 GLOBALS AND FUNCTIONS
22    
23     =over 4
24    
25     =cut
26    
27     package AnyEvent::MP::Global;
28    
29     use common::sense;
30     use Carp ();
31    
32 root 1.8 use AnyEvent ();
33 root 1.4 use AnyEvent::Util ();
34    
35 root 1.1 use AnyEvent::MP;
36     use AnyEvent::MP::Kernel;
37 root 1.22 use AnyEvent::MP::Transport ();
38 root 1.1
39 root 1.20 use base "Exporter";
40    
41     our @EXPORT = qw(
42     grp_reg
43     grp_get
44     grp_mon
45     );
46    
47 root 1.31 our $GLOBAL_VERSION = 0;
48    
49 root 1.35 our %ON_SETUP; # take note: not public
50    
51 root 1.8 our %addr; # port ID => [address...] mapping
52    
53 root 1.4 our %port; # our rendezvous port on the other side
54     our %lreg; # local registry, name => [pid...]
55 root 1.20 our %lmon; # local registry monitoring name,pid => mon
56     our %greg; # global regstry, name => pid => undef
57 root 1.22 our %gmon; # group monitoring, group => [$cb...]
58 root 1.4
59 root 1.8 our $nodecnt;
60    
61 root 1.5 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
62    
63 root 1.8 #############################################################################
64     # seednodes
65    
66 root 1.26 our $MASTER; # our current master (which we regularly query for net updates)
67 root 1.41 our %SEEDME;
68 root 1.8
69 root 1.26 sub up_seeds() {
70 root 1.41 grep node_is_up $_, keys %AnyEvent::MP::Kernel::NODE_SEED
71 root 1.26 }
72    
73     sub node_is_seed($) {
74 root 1.41 grep $_ eq $_[0], grep defined, keys %AnyEvent::MP::Kernel::NODE_SEED
75 root 1.26 }
76    
77 root 1.22 # returns all (up) seed nodes, or all nodes if no seednodes are up/known
78 root 1.26 sub route_nodes {
79     my @seeds = up_seeds;
80 root 1.22 @seeds = up_nodes unless @seeds;
81     @seeds
82     }
83    
84 root 1.8 #############################################################################
85    
86 root 1.20 sub _change {
87     my ($group, $add, $del) = @_;
88    
89     my $kv = $greg{$group} ||= {};
90    
91     delete @$kv{@$del};
92     @$kv{@$add} = ();
93    
94     my $ports = [keys %$kv];
95     $_->($ports, $add, $del) for @{ $gmon{$group} };
96     }
97    
98 root 1.4 sub unreg_groups($) {
99 root 1.8 my ($node) = @_;
100 root 1.4
101 root 1.8 my $qr = qr/^\Q$node\E(?:#|$)/;
102 root 1.20 my @del;
103 root 1.4
104 root 1.20 while (my ($group, $ports) = each %greg) {
105     @del = grep /$qr/, keys %$ports;
106     _change $group, [], \@del
107     if @del;
108 root 1.4 }
109     }
110    
111     sub set_groups($$) {
112 root 1.8 my ($node, $lreg) = @_;
113 root 1.5
114 root 1.6 while (my ($k, $v) = each %$lreg) {
115 root 1.20 _change $k, $v, [];
116 root 1.6 }
117 root 1.4 }
118    
119 root 1.20 =item $guard = grp_reg $group, $port
120 root 1.4
121     Register the given (local!) port in the named global group C<$group>.
122    
123     The port will be unregistered automatically when the port is destroyed.
124    
125     When not called in void context, then a guard object will be returned that
126     will also cause the name to be unregistered when destroyed.
127    
128     =cut
129    
130     # unregister local port
131     sub unregister {
132     my ($port, $group) = @_;
133    
134     delete $lmon{"$group\x00$port"};
135     @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
136    
137 root 1.20 _change $group, [], [$port];
138 root 1.4
139 root 1.20 snd $_, reg0 => $group, $port
140 root 1.4 for values %port;
141     }
142    
143     # register local port
144 root 1.20 sub grp_reg($$) {
145     my ($group, $port) = @_;
146 root 1.4
147     port_is_local $port
148 root 1.20 or Carp::croak "AnyEvent::MP::Global::grp_reg can only be called for local ports, caught";
149    
150     grep $_ eq $port, @{ $lreg{$group} }
151     and Carp::croak "'$group': group already registered, cannot register a second time";
152 root 1.4
153     $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
154     push @{ $lreg{$group} }, $port;
155    
156 root 1.20 snd $_, reg1 => $group, $port
157 root 1.4 for values %port;
158    
159 root 1.20 _change $group, [$port], [];
160 root 1.4
161 elmex 1.33 defined wantarray && AnyEvent::Util::guard { unregister $port, $group }
162 root 1.4 }
163    
164 root 1.20 =item $ports = grp_get $group
165 root 1.15
166     Returns all the ports currently registered to the given group (as
167 root 1.20 read-only(!) array reference). When the group has no registered members,
168 root 1.15 return C<undef>.
169    
170     =cut
171    
172 root 1.20 sub grp_get($) {
173     my @ports = keys %{ $greg{$_[0]} };
174     @ports ? \@ports : undef
175     }
176    
177     =item $guard = grp_mon $group, $callback->($ports, $add, $del)
178    
179     Installs a monitor on the given group. Each time there is a change it
180     will be called with the current group members as an arrayref as the
181     first argument. The second argument only contains ports added, the third
182     argument only ports removed.
183    
184     Unlike C<grp_get>, all three arguments will always be array-refs, even if
185     the array is empty. None of the arrays must be modified in any way.
186    
187     The first invocation will be with the first two arguments set to the
188     current members, as if all of them were just added, but only when the
189 root 1.39 group is actually non-empty.
190 root 1.20
191     Optionally returns a guard object that uninstalls the watcher when it is
192     destroyed.
193    
194     =cut
195    
196     sub grp_mon($$) {
197     my ($grp, $cb) = @_;
198    
199     AnyEvent::MP::Kernel::delay sub {
200     return unless $cb;
201    
202     push @{ $gmon{$grp} }, $cb;
203     $cb->(((grp_get $grp) || return) x 2, []);
204     };
205    
206 root 1.30 defined wantarray && AnyEvent::Util::guard {
207 root 1.20 my @mon = grep $_ != $cb, @{ delete $gmon{$grp} };
208     $gmon{$grp} = \@mon if @mon;
209     undef $cb;
210     }
211 root 1.15 }
212    
213 root 1.4 sub start_node {
214 root 1.8 my ($node) = @_;
215 root 1.4
216 root 1.8 return if exists $port{$node};
217     return if $node eq $NODE; # do not connect to ourselves
218 root 1.4
219     # establish connection
220 root 1.31 my $port = $port{$node} = spawn $node, "AnyEvent::MP::Global::connect", $GLOBAL_VERSION, $NODE;
221 root 1.8
222 root 1.4 mon $port, sub {
223 root 1.8 unreg_groups $node;
224     delete $port{$node};
225 root 1.4 };
226 root 1.8
227 root 1.10 snd $port, addr => $AnyEvent::MP::Kernel::LISTENER;
228 root 1.17 snd $port, nodes => \%addr if %addr;
229 root 1.8 snd $port, set => \%lreg if %lreg;
230 root 1.35 snd $port, "setup"; # tell the other side that we are in business now
231 root 1.4 }
232 root 1.3
233 root 1.5 # other nodes connect via this
234 root 1.3 sub connect {
235 root 1.8 my ($version, $node) = @_;
236 root 1.1
237 root 1.31 (int $version) == (int $GLOBAL_VERSION)
238     or die "node version mismatch (requested $version; we have $GLOBAL_VERSION)";
239    
240 root 1.3 # monitor them, silently die
241 root 1.26 mon $node, psub {
242     delete $SEEDME{$node};
243     kil $SELF;
244     };
245 root 1.3
246 root 1.4 rcv $SELF,
247 root 1.35 setup => sub {
248     $_->($node) for values %ON_SETUP;
249     },
250 root 1.8 addr => sub {
251     my $addresses = shift;
252 root 1.14 $AnyEvent::MP::Kernel::WARN->(9, "$node told us its addresses (@$addresses).");
253 root 1.8 $addr{$node} = $addresses;
254 root 1.9
255 root 1.38 # delay broadcast by a random amount, to avoid nodes connecting to each other
256     # at the same time.
257 root 1.40 after 2 + rand, sub {
258 root 1.38 for my $slave (keys %SEEDME) {
259 root 1.40 snd $port{$slave} || next, nodes => { $node => $addresses };
260 root 1.38 }
261     };
262 root 1.8 },
263 root 1.17 nodes => sub {
264 root 1.8 my ($kv) = @_;
265    
266     use JSON::XS;#d#
267     my $kv_txt = JSON::XS->new->encode ($kv);#d#
268     $AnyEvent::MP::Kernel::WARN->(9, "$node told us it knows about $kv_txt.");#d#
269    
270     while (my ($id, $addresses) = each %$kv) {
271     my $node = AnyEvent::MP::Kernel::add_node $id;
272     $node->connect (@$addresses);
273     start_node $id;
274 root 1.4 }
275     },
276 root 1.35 set => sub {
277     set_groups $node, shift;
278     },
279 root 1.16 find => sub {
280 root 1.8 my ($othernode) = @_;
281    
282     $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to find $othernode.");
283 root 1.17 snd $port{$node}, nodes => { $othernode => $addr{$othernode} }
284 root 1.8 if $addr{$othernode};
285     },
286 root 1.20 reg0 => sub {
287     _change $_[0], [], [$_[1]];
288     },
289     reg1 => sub {
290     _change $_[0], [$_[1]], [];
291     },
292 root 1.26
293     # some node asks us to provide network updates
294     seedme0 => sub {
295 root 1.36 $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to NOT seed it.");#d#
296 root 1.26 delete $SEEDME{$node};
297     },
298     seedme1 => sub {
299 root 1.36 $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to seed it.");#d#
300 root 1.26 $SEEDME{$node} = ();
301    
302     # for good measure
303     snd $port{$node}, nodes => \%addr if %addr;
304     },
305 root 1.4 ;
306 root 1.3 }
307 root 1.1
308 root 1.29 sub set_master($) {
309     return if $MASTER eq $_[0];
310    
311     snd $port{$MASTER}, "seedme0"
312     if $MASTER && node_is_up $MASTER;
313    
314     $MASTER = $_[0];
315    
316     if ($MASTER) {
317     snd $port{$MASTER}, "seedme1";
318     $AnyEvent::MP::Kernel::WARN->(7, "selected new master: $MASTER.");
319     } else {
320 root 1.37 $AnyEvent::MP::Kernel::WARN->(1, "no contact to any other node, cannot seed.");
321 root 1.29 }
322     }
323    
324 root 1.1 sub mon_node {
325 root 1.8 my ($node, $is_up) = @_;
326 root 1.1
327     if ($is_up) {
328 root 1.8 ++$nodecnt;
329     start_node $node;
330 root 1.26
331 root 1.29 if (node_is_seed $node) {
332     if (node_is_seed $MASTER) {
333     my @SEEDS = up_seeds;
334    
335     # switching here with lower chance roughly hopefully still gives us
336     # an equal selection.
337     set_master $node
338     if 1 < rand @SEEDS;
339     } else {
340     # a seed always beats a non-seed
341     set_master $node;
342     }
343     }
344 root 1.26 }
345    
346 root 1.29 # select a new(?) master, if required
347     unless ($MASTER and node_is_up $MASTER) {
348     if (my @SEEDS = up_seeds) {
349     set_master $SEEDS[rand @SEEDS];
350 root 1.26 } else {
351 root 1.29 # select "last" non-seed node
352     set_master +(sort +up_nodes)[-1];
353 root 1.26 }
354     }
355    
356     unless ($is_up) {
357 root 1.8 --$nodecnt;
358 root 1.41 #d#more_seeding unless $nodecnt;
359 root 1.8 unreg_groups $node;
360    
361     # forget about the node
362     delete $addr{$node};
363 root 1.26
364     # ask our master for quick recovery
365     snd $port{$MASTER}, find => $node
366     if $MASTER;
367 root 1.1 }
368     }
369    
370     mon_node $_, 1
371     for up_nodes;
372    
373     mon_nodes \&mon_node;
374    
375     =back
376    
377     =head1 SEE ALSO
378    
379     L<AnyEvent::MP>.
380    
381     =head1 AUTHOR
382    
383     Marc Lehmann <schmorp@schmorp.de>
384     http://home.schmorp.de/
385    
386     =cut
387    
388     1
389