ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.43
Committed: Tue Feb 28 18:37:24 2012 UTC (12 years, 3 months ago) by root
Branch: MAIN
Changes since 1.42: +3 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Global - some network-global services
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Global;
8    
9     =head1 DESCRIPTION
10    
11 root 1.42 This module maintains a fully-meshed network between global nodes and
12     tries to have connections with all nodes in the network.
13 root 1.1
14 root 1.15 It also manages named port groups - ports can register themselves in any
15     number of groups that will be available network-wide, which is great for
16     discovering services.
17 root 1.1
18 root 1.15 Running it on one node will automatically run it on all nodes, although,
19     at the moment, the global service is started by default anyways.
20 root 1.3
21 root 1.1 =head1 GLOBALS AND FUNCTIONS
22    
23     =over 4
24    
25     =cut
26    
27     package AnyEvent::MP::Global;
28    
29     use common::sense;
30     use Carp ();
31    
32 root 1.8 use AnyEvent ();
33 root 1.4 use AnyEvent::Util ();
34    
35 root 1.1 use AnyEvent::MP;
36     use AnyEvent::MP::Kernel;
37 root 1.22 use AnyEvent::MP::Transport ();
38 root 1.1
39 root 1.20 use base "Exporter";
40    
41     our @EXPORT = qw(
42     grp_reg
43     grp_get
44     grp_mon
45     );
46    
47 root 1.31 our $GLOBAL_VERSION = 0;
48    
49 root 1.35 our %ON_SETUP; # take note: not public
50    
51 root 1.8 our %addr; # port ID => [address...] mapping
52    
53 root 1.4 our %port; # our rendezvous port on the other side
54     our %lreg; # local registry, name => [pid...]
55 root 1.20 our %lmon; # local registry monitoring name,pid => mon
56     our %greg; # global regstry, name => pid => undef
57 root 1.22 our %gmon; # group monitoring, group => [$cb...]
58 root 1.4
59 root 1.8 our $nodecnt;
60    
61 root 1.5 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
62    
63 root 1.8 #############################################################################
64     # seednodes
65    
66 root 1.26 our $MASTER; # our current master (which we regularly query for net updates)
67 root 1.41 our %SEEDME;
68 root 1.8
69 root 1.26 sub up_seeds() {
70 root 1.41 grep node_is_up $_, keys %AnyEvent::MP::Kernel::NODE_SEED
71 root 1.26 }
72    
73     sub node_is_seed($) {
74 root 1.41 grep $_ eq $_[0], grep defined, keys %AnyEvent::MP::Kernel::NODE_SEED
75 root 1.26 }
76    
77 root 1.22 # returns all (up) seed nodes, or all nodes if no seednodes are up/known
78 root 1.26 sub route_nodes {
79     my @seeds = up_seeds;
80 root 1.22 @seeds = up_nodes unless @seeds;
81     @seeds
82     }
83    
84 root 1.8 #############################################################################
85    
86 root 1.20 sub _change {
87     my ($group, $add, $del) = @_;
88    
89     my $kv = $greg{$group} ||= {};
90    
91     delete @$kv{@$del};
92     @$kv{@$add} = ();
93    
94     my $ports = [keys %$kv];
95     $_->($ports, $add, $del) for @{ $gmon{$group} };
96     }
97    
98 root 1.4 sub unreg_groups($) {
99 root 1.8 my ($node) = @_;
100 root 1.4
101 root 1.8 my $qr = qr/^\Q$node\E(?:#|$)/;
102 root 1.20 my @del;
103 root 1.4
104 root 1.20 while (my ($group, $ports) = each %greg) {
105     @del = grep /$qr/, keys %$ports;
106     _change $group, [], \@del
107     if @del;
108 root 1.4 }
109     }
110    
111     sub set_groups($$) {
112 root 1.8 my ($node, $lreg) = @_;
113 root 1.5
114 root 1.6 while (my ($k, $v) = each %$lreg) {
115 root 1.20 _change $k, $v, [];
116 root 1.6 }
117 root 1.4 }
118    
119 root 1.20 =item $guard = grp_reg $group, $port
120 root 1.4
121     Register the given (local!) port in the named global group C<$group>.
122    
123     The port will be unregistered automatically when the port is destroyed.
124    
125     When not called in void context, then a guard object will be returned that
126     will also cause the name to be unregistered when destroyed.
127    
128     =cut
129    
130     # unregister local port
131     sub unregister {
132     my ($port, $group) = @_;
133    
134     delete $lmon{"$group\x00$port"};
135     @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
136    
137 root 1.20 _change $group, [], [$port];
138 root 1.4
139 root 1.20 snd $_, reg0 => $group, $port
140 root 1.4 for values %port;
141     }
142    
143     # register local port
144 root 1.20 sub grp_reg($$) {
145     my ($group, $port) = @_;
146 root 1.4
147     port_is_local $port
148 root 1.20 or Carp::croak "AnyEvent::MP::Global::grp_reg can only be called for local ports, caught";
149    
150     grep $_ eq $port, @{ $lreg{$group} }
151     and Carp::croak "'$group': group already registered, cannot register a second time";
152 root 1.4
153     $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
154     push @{ $lreg{$group} }, $port;
155    
156 root 1.20 snd $_, reg1 => $group, $port
157 root 1.4 for values %port;
158    
159 root 1.20 _change $group, [$port], [];
160 root 1.4
161 elmex 1.33 defined wantarray && AnyEvent::Util::guard { unregister $port, $group }
162 root 1.4 }
163    
164 root 1.20 =item $ports = grp_get $group
165 root 1.15
166     Returns all the ports currently registered to the given group (as
167 root 1.20 read-only(!) array reference). When the group has no registered members,
168 root 1.15 return C<undef>.
169    
170     =cut
171    
172 root 1.20 sub grp_get($) {
173     my @ports = keys %{ $greg{$_[0]} };
174     @ports ? \@ports : undef
175     }
176    
177     =item $guard = grp_mon $group, $callback->($ports, $add, $del)
178    
179     Installs a monitor on the given group. Each time there is a change it
180     will be called with the current group members as an arrayref as the
181     first argument. The second argument only contains ports added, the third
182     argument only ports removed.
183    
184     Unlike C<grp_get>, all three arguments will always be array-refs, even if
185     the array is empty. None of the arrays must be modified in any way.
186    
187     The first invocation will be with the first two arguments set to the
188     current members, as if all of them were just added, but only when the
189 root 1.39 group is actually non-empty.
190 root 1.20
191     Optionally returns a guard object that uninstalls the watcher when it is
192     destroyed.
193    
194     =cut
195    
196     sub grp_mon($$) {
197     my ($grp, $cb) = @_;
198    
199     AnyEvent::MP::Kernel::delay sub {
200     return unless $cb;
201    
202     push @{ $gmon{$grp} }, $cb;
203     $cb->(((grp_get $grp) || return) x 2, []);
204     };
205    
206 root 1.30 defined wantarray && AnyEvent::Util::guard {
207 root 1.20 my @mon = grep $_ != $cb, @{ delete $gmon{$grp} };
208     $gmon{$grp} = \@mon if @mon;
209     undef $cb;
210     }
211 root 1.15 }
212    
213 root 1.4 sub start_node {
214 root 1.8 my ($node) = @_;
215 root 1.4
216 root 1.8 return if exists $port{$node};
217     return if $node eq $NODE; # do not connect to ourselves
218 root 1.4
219     # establish connection
220 root 1.31 my $port = $port{$node} = spawn $node, "AnyEvent::MP::Global::connect", $GLOBAL_VERSION, $NODE;
221 root 1.8
222 root 1.4 mon $port, sub {
223 root 1.8 unreg_groups $node;
224     delete $port{$node};
225 root 1.4 };
226 root 1.8
227 root 1.10 snd $port, addr => $AnyEvent::MP::Kernel::LISTENER;
228 root 1.17 snd $port, nodes => \%addr if %addr;
229 root 1.8 snd $port, set => \%lreg if %lreg;
230 root 1.35 snd $port, "setup"; # tell the other side that we are in business now
231 root 1.4 }
232 root 1.3
233 root 1.5 # other nodes connect via this
234 root 1.3 sub connect {
235 root 1.8 my ($version, $node) = @_;
236 root 1.1
237 root 1.31 (int $version) == (int $GLOBAL_VERSION)
238     or die "node version mismatch (requested $version; we have $GLOBAL_VERSION)";
239    
240 root 1.3 # monitor them, silently die
241 root 1.26 mon $node, psub {
242     delete $SEEDME{$node};
243     kil $SELF;
244     };
245 root 1.3
246 root 1.4 rcv $SELF,
247 root 1.35 setup => sub {
248     $_->($node) for values %ON_SETUP;
249     },
250 root 1.8 addr => sub {
251     my $addresses = shift;
252 root 1.14 $AnyEvent::MP::Kernel::WARN->(9, "$node told us its addresses (@$addresses).");
253 root 1.8 $addr{$node} = $addresses;
254 root 1.9
255 root 1.38 # delay broadcast by a random amount, to avoid nodes connecting to each other
256     # at the same time.
257 root 1.40 after 2 + rand, sub {
258 root 1.38 for my $slave (keys %SEEDME) {
259 root 1.40 snd $port{$slave} || next, nodes => { $node => $addresses };
260 root 1.38 }
261     };
262 root 1.8 },
263 root 1.17 nodes => sub {
264 root 1.8 my ($kv) = @_;
265    
266     use JSON::XS;#d#
267     my $kv_txt = JSON::XS->new->encode ($kv);#d#
268     $AnyEvent::MP::Kernel::WARN->(9, "$node told us it knows about $kv_txt.");#d#
269    
270     while (my ($id, $addresses) = each %$kv) {
271     my $node = AnyEvent::MP::Kernel::add_node $id;
272 root 1.43 #$node->connect (@$addresses);
273     #d#
274     die;
275 root 1.8 start_node $id;
276 root 1.4 }
277     },
278 root 1.35 set => sub {
279     set_groups $node, shift;
280     },
281 root 1.16 find => sub {
282 root 1.8 my ($othernode) = @_;
283    
284     $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to find $othernode.");
285 root 1.17 snd $port{$node}, nodes => { $othernode => $addr{$othernode} }
286 root 1.8 if $addr{$othernode};
287     },
288 root 1.20 reg0 => sub {
289     _change $_[0], [], [$_[1]];
290     },
291     reg1 => sub {
292     _change $_[0], [$_[1]], [];
293     },
294 root 1.26
295     # some node asks us to provide network updates
296     seedme0 => sub {
297 root 1.36 $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to NOT seed it.");#d#
298 root 1.26 delete $SEEDME{$node};
299     },
300     seedme1 => sub {
301 root 1.36 $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to seed it.");#d#
302 root 1.26 $SEEDME{$node} = ();
303    
304     # for good measure
305     snd $port{$node}, nodes => \%addr if %addr;
306     },
307 root 1.4 ;
308 root 1.3 }
309 root 1.1
310 root 1.29 sub set_master($) {
311     return if $MASTER eq $_[0];
312    
313     snd $port{$MASTER}, "seedme0"
314     if $MASTER && node_is_up $MASTER;
315    
316     $MASTER = $_[0];
317    
318     if ($MASTER) {
319     snd $port{$MASTER}, "seedme1";
320     $AnyEvent::MP::Kernel::WARN->(7, "selected new master: $MASTER.");
321     } else {
322 root 1.37 $AnyEvent::MP::Kernel::WARN->(1, "no contact to any other node, cannot seed.");
323 root 1.29 }
324     }
325    
326 root 1.1 sub mon_node {
327 root 1.8 my ($node, $is_up) = @_;
328 root 1.1
329     if ($is_up) {
330 root 1.8 ++$nodecnt;
331     start_node $node;
332 root 1.26
333 root 1.29 if (node_is_seed $node) {
334     if (node_is_seed $MASTER) {
335     my @SEEDS = up_seeds;
336    
337     # switching here with lower chance roughly hopefully still gives us
338     # an equal selection.
339     set_master $node
340     if 1 < rand @SEEDS;
341     } else {
342     # a seed always beats a non-seed
343     set_master $node;
344     }
345     }
346 root 1.26 }
347    
348 root 1.29 # select a new(?) master, if required
349     unless ($MASTER and node_is_up $MASTER) {
350     if (my @SEEDS = up_seeds) {
351     set_master $SEEDS[rand @SEEDS];
352 root 1.26 } else {
353 root 1.29 # select "last" non-seed node
354     set_master +(sort +up_nodes)[-1];
355 root 1.26 }
356     }
357    
358     unless ($is_up) {
359 root 1.8 --$nodecnt;
360 root 1.41 #d#more_seeding unless $nodecnt;
361 root 1.8 unreg_groups $node;
362    
363     # forget about the node
364     delete $addr{$node};
365 root 1.26
366     # ask our master for quick recovery
367     snd $port{$MASTER}, find => $node
368     if $MASTER;
369 root 1.1 }
370     }
371    
372     mon_node $_, 1
373     for up_nodes;
374    
375     mon_nodes \&mon_node;
376    
377     =back
378    
379     =head1 SEE ALSO
380    
381     L<AnyEvent::MP>.
382    
383     =head1 AUTHOR
384    
385     Marc Lehmann <schmorp@schmorp.de>
386     http://home.schmorp.de/
387    
388     =cut
389    
390     1
391