ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.40
Committed: Mon May 3 14:20:51 2010 UTC (14 years, 1 month ago) by root
Branch: MAIN
CVS Tags: rel-1_29, rel-1_30
Changes since 1.39: +2 -2 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Global - some network-global services
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Global;
8    
9     =head1 DESCRIPTION
10    
11 root 1.15 This module maintains a fully-meshed network, if possible, and tries to
12     ensure that we are connected to at least one other node.
13 root 1.1
14 root 1.15 It also manages named port groups - ports can register themselves in any
15     number of groups that will be available network-wide, which is great for
16     discovering services.
17 root 1.1
18 root 1.15 Running it on one node will automatically run it on all nodes, although,
19     at the moment, the global service is started by default anyways.
20 root 1.3
21 root 1.1 =head1 GLOBALS AND FUNCTIONS
22    
23     =over 4
24    
25     =cut
26    
27     package AnyEvent::MP::Global;
28    
29     use common::sense;
30     use Carp ();
31    
32 root 1.8 use AnyEvent ();
33 root 1.4 use AnyEvent::Util ();
34    
35 root 1.1 use AnyEvent::MP;
36     use AnyEvent::MP::Kernel;
37 root 1.22 use AnyEvent::MP::Transport ();
38 root 1.1
39 root 1.20 use base "Exporter";
40    
41     our @EXPORT = qw(
42     grp_reg
43     grp_get
44     grp_mon
45     );
46    
47 root 1.31 our $GLOBAL_VERSION = 0;
48    
49 root 1.35 our %ON_SETUP; # take note: not public
50    
51 root 1.8 our %addr; # port ID => [address...] mapping
52    
53 root 1.4 our %port; # our rendezvous port on the other side
54     our %lreg; # local registry, name => [pid...]
55 root 1.20 our %lmon; # local registry monitoring name,pid => mon
56     our %greg; # global regstry, name => pid => undef
57 root 1.22 our %gmon; # group monitoring, group => [$cb...]
58 root 1.4
59 root 1.8 our $nodecnt;
60    
61 root 1.5 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
62    
63 root 1.8 #############################################################################
64     # seednodes
65    
66 root 1.26 our $MASTER; # our current master (which we regularly query for net updates)
67    
68     our %SEEDME; # $node => $port
69 root 1.8 our @SEEDS;
70 root 1.22 our %SEEDS; # just to check whether a seed is a seed
71 root 1.14 our %SEED_CONNECT;
72 root 1.8 our $SEED_WATCHER;
73    
74 root 1.22 push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
75     my $peer = $_[0]{local_greeting}{peeraddr};
76     return unless exists $SEEDS{$peer};
77     $SEED_CONNECT{$peer} = 2;
78     };
79    
80 root 1.27 push @AnyEvent::MP::Transport::HOOK_GREETING, sub {
81     # we rely on untrusted data here (the remote node name)
82     # this is hopefully ok, as we override it on successful
83     # connects, and this can at most be used for DOSing,
84     # which is easy when you can do MITM.
85     my $peer = $_[0]{local_greeting}{peeraddr};
86     return unless exists $SEEDS{$peer};
87     $SEEDS{$peer} ||= $_[0]{remote_node};
88     };
89    
90 root 1.22 push @AnyEvent::MP::Transport::HOOK_CONNECTED, sub {
91     my $peer = $_[0]{local_greeting}{peeraddr};
92     return unless exists $SEEDS{$peer};
93     $SEEDS{$peer} = $_[0]{remote_node};
94     };
95    
96     push @AnyEvent::MP::Transport::HOOK_DESTROY, sub {
97     delete $SEED_CONNECT{$_[0]{local_greeting}{peeraddr}};
98 root 1.24
99     # check if we contacted ourselves, so nuke this seed
100 root 1.25 if (exists $_[0]{seed} && $_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
101     # $AnyEvent::MP::Kernel::WARN->(0,"avoiding seed $_[0]{seed}\n");#d#
102     delete $SEEDS{$_[0]{seed}};
103 root 1.24 }
104 root 1.22 };
105    
106 root 1.8 sub seed_connect {
107     my ($seed) = @_;
108    
109     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
110     or Carp::croak "$seed: unparsable seed address";
111    
112 root 1.22 return if $SEED_CONNECT{$seed};
113 root 1.27 return if defined $SEEDS{$seed} && node_is_up $SEEDS{$seed};
114 root 1.22
115     $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
116    
117 root 1.8 # ughhh
118 root 1.14 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect $host, $port,
119     seed => $seed,
120 root 1.8 sub {
121 root 1.22 $SEED_CONNECT{$seed} = 1;
122 root 1.8 },
123     ;
124     }
125    
126     sub more_seeding {
127 root 1.22 my $int = List::Util::max 1,
128     $AnyEvent::MP::Kernel::CONFIG->{connect_interval}
129     * ($nodecnt ? keys %AnyEvent::MP::Kernel::NODE : 1)
130     - rand;
131    
132     $SEED_WATCHER = AE::timer $int, 0, \&more_seeding;
133    
134     @SEEDS = keys %SEEDS unless @SEEDS;
135 root 1.8 return unless @SEEDS;
136    
137 root 1.22 seed_connect splice @SEEDS, rand @SEEDS, 1;
138 root 1.8 }
139    
140     sub set_seeds(@) {
141 root 1.22 @SEEDS{@_} = ();
142 root 1.8
143     $SEED_WATCHER ||= AE::timer 5, $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, \&more_seeding;
144    
145 root 1.29 after 0.100 * rand, \&more_seeding
146     for 1 .. keys %SEEDS;
147 root 1.8 }
148    
149 root 1.26 sub up_seeds() {
150     grep node_is_up $_, values %SEEDS
151     }
152    
153     sub node_is_seed($) {
154     grep $_ eq $_[0], grep defined, values %SEEDS
155     }
156    
157 root 1.22 # returns all (up) seed nodes, or all nodes if no seednodes are up/known
158 root 1.26 sub route_nodes {
159     my @seeds = up_seeds;
160 root 1.22 @seeds = up_nodes unless @seeds;
161     @seeds
162     }
163    
164 root 1.8 #############################################################################
165    
166 root 1.20 sub _change {
167     my ($group, $add, $del) = @_;
168    
169     my $kv = $greg{$group} ||= {};
170    
171     delete @$kv{@$del};
172     @$kv{@$add} = ();
173    
174     my $ports = [keys %$kv];
175     $_->($ports, $add, $del) for @{ $gmon{$group} };
176     }
177    
178 root 1.4 sub unreg_groups($) {
179 root 1.8 my ($node) = @_;
180 root 1.4
181 root 1.8 my $qr = qr/^\Q$node\E(?:#|$)/;
182 root 1.20 my @del;
183 root 1.4
184 root 1.20 while (my ($group, $ports) = each %greg) {
185     @del = grep /$qr/, keys %$ports;
186     _change $group, [], \@del
187     if @del;
188 root 1.4 }
189     }
190    
191     sub set_groups($$) {
192 root 1.8 my ($node, $lreg) = @_;
193 root 1.5
194 root 1.6 while (my ($k, $v) = each %$lreg) {
195 root 1.20 _change $k, $v, [];
196 root 1.6 }
197 root 1.4 }
198    
199 root 1.20 =item $guard = grp_reg $group, $port
200 root 1.4
201     Register the given (local!) port in the named global group C<$group>.
202    
203     The port will be unregistered automatically when the port is destroyed.
204    
205     When not called in void context, then a guard object will be returned that
206     will also cause the name to be unregistered when destroyed.
207    
208     =cut
209    
210     # unregister local port
211     sub unregister {
212     my ($port, $group) = @_;
213    
214     delete $lmon{"$group\x00$port"};
215     @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
216    
217 root 1.20 _change $group, [], [$port];
218 root 1.4
219 root 1.20 snd $_, reg0 => $group, $port
220 root 1.4 for values %port;
221     }
222    
223     # register local port
224 root 1.20 sub grp_reg($$) {
225     my ($group, $port) = @_;
226 root 1.4
227     port_is_local $port
228 root 1.20 or Carp::croak "AnyEvent::MP::Global::grp_reg can only be called for local ports, caught";
229    
230     grep $_ eq $port, @{ $lreg{$group} }
231     and Carp::croak "'$group': group already registered, cannot register a second time";
232 root 1.4
233     $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
234     push @{ $lreg{$group} }, $port;
235    
236 root 1.20 snd $_, reg1 => $group, $port
237 root 1.4 for values %port;
238    
239 root 1.20 _change $group, [$port], [];
240 root 1.4
241 elmex 1.33 defined wantarray && AnyEvent::Util::guard { unregister $port, $group }
242 root 1.4 }
243    
244 root 1.20 =item $ports = grp_get $group
245 root 1.15
246     Returns all the ports currently registered to the given group (as
247 root 1.20 read-only(!) array reference). When the group has no registered members,
248 root 1.15 return C<undef>.
249    
250     =cut
251    
252 root 1.20 sub grp_get($) {
253     my @ports = keys %{ $greg{$_[0]} };
254     @ports ? \@ports : undef
255     }
256    
257     =item $guard = grp_mon $group, $callback->($ports, $add, $del)
258    
259     Installs a monitor on the given group. Each time there is a change it
260     will be called with the current group members as an arrayref as the
261     first argument. The second argument only contains ports added, the third
262     argument only ports removed.
263    
264     Unlike C<grp_get>, all three arguments will always be array-refs, even if
265     the array is empty. None of the arrays must be modified in any way.
266    
267     The first invocation will be with the first two arguments set to the
268     current members, as if all of them were just added, but only when the
269 root 1.39 group is actually non-empty.
270 root 1.20
271     Optionally returns a guard object that uninstalls the watcher when it is
272     destroyed.
273    
274     =cut
275    
276     sub grp_mon($$) {
277     my ($grp, $cb) = @_;
278    
279     AnyEvent::MP::Kernel::delay sub {
280     return unless $cb;
281    
282     push @{ $gmon{$grp} }, $cb;
283     $cb->(((grp_get $grp) || return) x 2, []);
284     };
285    
286 root 1.30 defined wantarray && AnyEvent::Util::guard {
287 root 1.20 my @mon = grep $_ != $cb, @{ delete $gmon{$grp} };
288     $gmon{$grp} = \@mon if @mon;
289     undef $cb;
290     }
291 root 1.15 }
292    
293 root 1.4 sub start_node {
294 root 1.8 my ($node) = @_;
295 root 1.4
296 root 1.8 return if exists $port{$node};
297     return if $node eq $NODE; # do not connect to ourselves
298 root 1.4
299     # establish connection
300 root 1.31 my $port = $port{$node} = spawn $node, "AnyEvent::MP::Global::connect", $GLOBAL_VERSION, $NODE;
301 root 1.8
302 root 1.4 mon $port, sub {
303 root 1.8 unreg_groups $node;
304     delete $port{$node};
305 root 1.4 };
306 root 1.8
307 root 1.10 snd $port, addr => $AnyEvent::MP::Kernel::LISTENER;
308 root 1.17 snd $port, nodes => \%addr if %addr;
309 root 1.8 snd $port, set => \%lreg if %lreg;
310 root 1.35 snd $port, "setup"; # tell the other side that we are in business now
311 root 1.4 }
312 root 1.3
313 root 1.5 # other nodes connect via this
314 root 1.3 sub connect {
315 root 1.8 my ($version, $node) = @_;
316 root 1.1
317 root 1.31 (int $version) == (int $GLOBAL_VERSION)
318     or die "node version mismatch (requested $version; we have $GLOBAL_VERSION)";
319    
320 root 1.3 # monitor them, silently die
321 root 1.26 mon $node, psub {
322     delete $SEEDME{$node};
323     kil $SELF;
324     };
325 root 1.3
326 root 1.4 rcv $SELF,
327 root 1.35 setup => sub {
328     $_->($node) for values %ON_SETUP;
329     },
330 root 1.8 addr => sub {
331     my $addresses = shift;
332 root 1.14 $AnyEvent::MP::Kernel::WARN->(9, "$node told us its addresses (@$addresses).");
333 root 1.8 $addr{$node} = $addresses;
334 root 1.9
335 root 1.38 # delay broadcast by a random amount, to avoid nodes connecting to each other
336     # at the same time.
337 root 1.40 after 2 + rand, sub {
338 root 1.38 for my $slave (keys %SEEDME) {
339 root 1.40 snd $port{$slave} || next, nodes => { $node => $addresses };
340 root 1.38 }
341     };
342 root 1.8 },
343 root 1.17 nodes => sub {
344 root 1.8 my ($kv) = @_;
345    
346     use JSON::XS;#d#
347     my $kv_txt = JSON::XS->new->encode ($kv);#d#
348     $AnyEvent::MP::Kernel::WARN->(9, "$node told us it knows about $kv_txt.");#d#
349    
350     while (my ($id, $addresses) = each %$kv) {
351     my $node = AnyEvent::MP::Kernel::add_node $id;
352     $node->connect (@$addresses);
353     start_node $id;
354 root 1.4 }
355     },
356 root 1.35 set => sub {
357     set_groups $node, shift;
358     },
359 root 1.16 find => sub {
360 root 1.8 my ($othernode) = @_;
361    
362     $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to find $othernode.");
363 root 1.17 snd $port{$node}, nodes => { $othernode => $addr{$othernode} }
364 root 1.8 if $addr{$othernode};
365     },
366 root 1.20 reg0 => sub {
367     _change $_[0], [], [$_[1]];
368     },
369     reg1 => sub {
370     _change $_[0], [$_[1]], [];
371     },
372 root 1.26
373     # some node asks us to provide network updates
374     seedme0 => sub {
375 root 1.36 $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to NOT seed it.");#d#
376 root 1.26 delete $SEEDME{$node};
377     },
378     seedme1 => sub {
379 root 1.36 $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to seed it.");#d#
380 root 1.26 $SEEDME{$node} = ();
381    
382     # for good measure
383     snd $port{$node}, nodes => \%addr if %addr;
384     },
385 root 1.4 ;
386 root 1.3 }
387 root 1.1
388 root 1.29 sub set_master($) {
389     return if $MASTER eq $_[0];
390    
391     snd $port{$MASTER}, "seedme0"
392     if $MASTER && node_is_up $MASTER;
393    
394     $MASTER = $_[0];
395    
396     if ($MASTER) {
397     snd $port{$MASTER}, "seedme1";
398     $AnyEvent::MP::Kernel::WARN->(7, "selected new master: $MASTER.");
399     } else {
400 root 1.37 $AnyEvent::MP::Kernel::WARN->(1, "no contact to any other node, cannot seed.");
401 root 1.29 }
402     }
403    
404 root 1.1 sub mon_node {
405 root 1.8 my ($node, $is_up) = @_;
406 root 1.1
407     if ($is_up) {
408 root 1.8 ++$nodecnt;
409     start_node $node;
410 root 1.26
411 root 1.29 if (node_is_seed $node) {
412     if (node_is_seed $MASTER) {
413     my @SEEDS = up_seeds;
414    
415     # switching here with lower chance roughly hopefully still gives us
416     # an equal selection.
417     set_master $node
418     if 1 < rand @SEEDS;
419     } else {
420     # a seed always beats a non-seed
421     set_master $node;
422     }
423     }
424 root 1.26 }
425    
426 root 1.29 # select a new(?) master, if required
427     unless ($MASTER and node_is_up $MASTER) {
428     if (my @SEEDS = up_seeds) {
429     set_master $SEEDS[rand @SEEDS];
430 root 1.26 } else {
431 root 1.29 # select "last" non-seed node
432     set_master +(sort +up_nodes)[-1];
433 root 1.26 }
434     }
435    
436     unless ($is_up) {
437 root 1.8 --$nodecnt;
438     more_seeding unless $nodecnt;
439     unreg_groups $node;
440    
441     # forget about the node
442     delete $addr{$node};
443 root 1.26
444     # ask our master for quick recovery
445     snd $port{$MASTER}, find => $node
446     if $MASTER;
447 root 1.1 }
448     }
449    
450     mon_node $_, 1
451     for up_nodes;
452    
453     mon_nodes \&mon_node;
454    
455     =back
456    
457     =head1 SEE ALSO
458    
459     L<AnyEvent::MP>.
460    
461     =head1 AUTHOR
462    
463     Marc Lehmann <schmorp@schmorp.de>
464     http://home.schmorp.de/
465    
466     =cut
467    
468     1
469