ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.31
Committed: Sat Sep 12 04:46:21 2009 UTC (14 years, 8 months ago) by root
Branch: MAIN
CVS Tags: rel-1_2
Changes since 1.30: +6 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Global - some network-global services
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Global;
8    
9     =head1 DESCRIPTION
10    
11 root 1.15 This module maintains a fully-meshed network, if possible, and tries to
12     ensure that we are connected to at least one other node.
13 root 1.1
14 root 1.15 It also manages named port groups - ports can register themselves in any
15     number of groups that will be available network-wide, which is great for
16     discovering services.
17 root 1.1
18 root 1.15 Running it on one node will automatically run it on all nodes, although,
19     at the moment, the global service is started by default anyways.
20 root 1.3
21 root 1.1 =head1 GLOBALS AND FUNCTIONS
22    
23     =over 4
24    
25     =cut
26    
27     package AnyEvent::MP::Global;
28    
29     use common::sense;
30     use Carp ();
31    
32 root 1.8 use AnyEvent ();
33 root 1.4 use AnyEvent::Util ();
34    
35 root 1.1 use AnyEvent::MP;
36     use AnyEvent::MP::Kernel;
37 root 1.22 use AnyEvent::MP::Transport ();
38 root 1.1
39 root 1.20 use base "Exporter";
40    
41     our @EXPORT = qw(
42     grp_reg
43     grp_get
44     grp_mon
45     );
46    
47 root 1.1 our $VERSION = $AnyEvent::MP::VERSION;
48    
49 root 1.31 our $GLOBAL_VERSION = 0;
50    
51 root 1.8 our %addr; # port ID => [address...] mapping
52    
53 root 1.4 our %port; # our rendezvous port on the other side
54     our %lreg; # local registry, name => [pid...]
55 root 1.20 our %lmon; # local registry monitoring name,pid => mon
56     our %greg; # global regstry, name => pid => undef
57 root 1.22 our %gmon; # group monitoring, group => [$cb...]
58 root 1.4
59 root 1.8 our $nodecnt;
60    
61 root 1.5 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
62    
63 root 1.8 #############################################################################
64     # seednodes
65    
66 root 1.26 our $MASTER; # our current master (which we regularly query for net updates)
67    
68     our %SEEDME; # $node => $port
69 root 1.8 our @SEEDS;
70 root 1.22 our %SEEDS; # just to check whether a seed is a seed
71 root 1.14 our %SEED_CONNECT;
72 root 1.8 our $SEED_WATCHER;
73    
74 root 1.22 push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
75     my $peer = $_[0]{local_greeting}{peeraddr};
76     return unless exists $SEEDS{$peer};
77     $SEED_CONNECT{$peer} = 2;
78     };
79    
80 root 1.27 push @AnyEvent::MP::Transport::HOOK_GREETING, sub {
81     # we rely on untrusted data here (the remote node name)
82     # this is hopefully ok, as we override it on successful
83     # connects, and this can at most be used for DOSing,
84     # which is easy when you can do MITM.
85     my $peer = $_[0]{local_greeting}{peeraddr};
86     return unless exists $SEEDS{$peer};
87     $SEEDS{$peer} ||= $_[0]{remote_node};
88     };
89    
90 root 1.22 push @AnyEvent::MP::Transport::HOOK_CONNECTED, sub {
91     my $peer = $_[0]{local_greeting}{peeraddr};
92     return unless exists $SEEDS{$peer};
93     $SEEDS{$peer} = $_[0]{remote_node};
94     };
95    
96     push @AnyEvent::MP::Transport::HOOK_DESTROY, sub {
97     delete $SEED_CONNECT{$_[0]{local_greeting}{peeraddr}};
98 root 1.24
99     # check if we contacted ourselves, so nuke this seed
100 root 1.25 if (exists $_[0]{seed} && $_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
101     # $AnyEvent::MP::Kernel::WARN->(0,"avoiding seed $_[0]{seed}\n");#d#
102     delete $SEEDS{$_[0]{seed}};
103 root 1.24 }
104 root 1.22 };
105    
106 root 1.8 sub seed_connect {
107     my ($seed) = @_;
108    
109     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
110     or Carp::croak "$seed: unparsable seed address";
111    
112 root 1.22 return if $SEED_CONNECT{$seed};
113 root 1.27 return if defined $SEEDS{$seed} && node_is_up $SEEDS{$seed};
114 root 1.22
115     $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
116    
117 root 1.8 # ughhh
118 root 1.14 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect $host, $port,
119     seed => $seed,
120 root 1.8 sub {
121 root 1.22 $SEED_CONNECT{$seed} = 1;
122 root 1.8 },
123     ;
124     }
125    
126     sub more_seeding {
127 root 1.22 my $int = List::Util::max 1,
128     $AnyEvent::MP::Kernel::CONFIG->{connect_interval}
129     * ($nodecnt ? keys %AnyEvent::MP::Kernel::NODE : 1)
130     - rand;
131    
132     $SEED_WATCHER = AE::timer $int, 0, \&more_seeding;
133    
134     @SEEDS = keys %SEEDS unless @SEEDS;
135 root 1.8 return unless @SEEDS;
136    
137 root 1.22 seed_connect splice @SEEDS, rand @SEEDS, 1;
138 root 1.8 }
139    
140     sub set_seeds(@) {
141 root 1.22 @SEEDS{@_} = ();
142 root 1.8
143     $SEED_WATCHER ||= AE::timer 5, $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, \&more_seeding;
144    
145 root 1.29 after 0.100 * rand, \&more_seeding
146     for 1 .. keys %SEEDS;
147 root 1.8 }
148    
149 root 1.26 sub up_seeds() {
150     grep node_is_up $_, values %SEEDS
151     }
152    
153     sub node_is_seed($) {
154     grep $_ eq $_[0], grep defined, values %SEEDS
155     }
156    
157 root 1.22 # returns all (up) seed nodes, or all nodes if no seednodes are up/known
158 root 1.26 sub route_nodes {
159     my @seeds = up_seeds;
160 root 1.22 @seeds = up_nodes unless @seeds;
161     @seeds
162     }
163    
164 root 1.8 #############################################################################
165    
166 root 1.20 sub _change {
167     my ($group, $add, $del) = @_;
168    
169     my $kv = $greg{$group} ||= {};
170    
171     delete @$kv{@$del};
172     @$kv{@$add} = ();
173    
174     my $ports = [keys %$kv];
175     $_->($ports, $add, $del) for @{ $gmon{$group} };
176     }
177    
178 root 1.4 sub unreg_groups($) {
179 root 1.8 my ($node) = @_;
180 root 1.4
181 root 1.8 my $qr = qr/^\Q$node\E(?:#|$)/;
182 root 1.20 my @del;
183 root 1.4
184 root 1.20 while (my ($group, $ports) = each %greg) {
185     @del = grep /$qr/, keys %$ports;
186     _change $group, [], \@del
187     if @del;
188 root 1.4 }
189     }
190    
191     sub set_groups($$) {
192 root 1.8 my ($node, $lreg) = @_;
193 root 1.5
194 root 1.6 while (my ($k, $v) = each %$lreg) {
195 root 1.20 _change $k, $v, [];
196 root 1.6 }
197 root 1.4 }
198    
199 root 1.20 =item $guard = grp_reg $group, $port
200 root 1.4
201     Register the given (local!) port in the named global group C<$group>.
202    
203     The port will be unregistered automatically when the port is destroyed.
204    
205     When not called in void context, then a guard object will be returned that
206     will also cause the name to be unregistered when destroyed.
207    
208     =cut
209    
210     # unregister local port
211     sub unregister {
212     my ($port, $group) = @_;
213    
214     delete $lmon{"$group\x00$port"};
215     @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
216    
217 root 1.20 _change $group, [], [$port];
218 root 1.4
219 root 1.20 snd $_, reg0 => $group, $port
220 root 1.4 for values %port;
221     }
222    
223     # register local port
224 root 1.20 sub grp_reg($$) {
225     my ($group, $port) = @_;
226 root 1.4
227     port_is_local $port
228 root 1.20 or Carp::croak "AnyEvent::MP::Global::grp_reg can only be called for local ports, caught";
229    
230     grep $_ eq $port, @{ $lreg{$group} }
231     and Carp::croak "'$group': group already registered, cannot register a second time";
232 root 1.4
233     $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
234     push @{ $lreg{$group} }, $port;
235    
236 root 1.20 snd $_, reg1 => $group, $port
237 root 1.4 for values %port;
238    
239 root 1.20 _change $group, [$port], [];
240 root 1.4
241     wantarray && AnyEvent::Util::guard { unregister $port, $group }
242     }
243    
244 root 1.20 =item $ports = grp_get $group
245 root 1.15
246     Returns all the ports currently registered to the given group (as
247 root 1.20 read-only(!) array reference). When the group has no registered members,
248 root 1.15 return C<undef>.
249    
250     =cut
251    
252 root 1.20 sub grp_get($) {
253     my @ports = keys %{ $greg{$_[0]} };
254     @ports ? \@ports : undef
255     }
256    
257     =item $guard = grp_mon $group, $callback->($ports, $add, $del)
258    
259     Installs a monitor on the given group. Each time there is a change it
260     will be called with the current group members as an arrayref as the
261     first argument. The second argument only contains ports added, the third
262     argument only ports removed.
263    
264     Unlike C<grp_get>, all three arguments will always be array-refs, even if
265     the array is empty. None of the arrays must be modified in any way.
266    
267     The first invocation will be with the first two arguments set to the
268     current members, as if all of them were just added, but only when the
269     group is atcually non-empty.
270    
271     Optionally returns a guard object that uninstalls the watcher when it is
272     destroyed.
273    
274     =cut
275    
276     sub grp_mon($$) {
277     my ($grp, $cb) = @_;
278    
279     AnyEvent::MP::Kernel::delay sub {
280     return unless $cb;
281    
282     push @{ $gmon{$grp} }, $cb;
283     $cb->(((grp_get $grp) || return) x 2, []);
284     };
285    
286 root 1.30 defined wantarray && AnyEvent::Util::guard {
287 root 1.20 my @mon = grep $_ != $cb, @{ delete $gmon{$grp} };
288     $gmon{$grp} = \@mon if @mon;
289     undef $cb;
290     }
291 root 1.15 }
292    
293 root 1.4 sub start_node {
294 root 1.8 my ($node) = @_;
295 root 1.4
296 root 1.8 return if exists $port{$node};
297     return if $node eq $NODE; # do not connect to ourselves
298 root 1.4
299     # establish connection
300 root 1.31 my $port = $port{$node} = spawn $node, "AnyEvent::MP::Global::connect", $GLOBAL_VERSION, $NODE;
301 root 1.8
302 root 1.4 mon $port, sub {
303 root 1.8 unreg_groups $node;
304     delete $port{$node};
305 root 1.4 };
306 root 1.8
307 root 1.10 snd $port, addr => $AnyEvent::MP::Kernel::LISTENER;
308 root 1.17 snd $port, nodes => \%addr if %addr;
309 root 1.8 snd $port, set => \%lreg if %lreg;
310 root 1.4 }
311 root 1.3
312 root 1.5 # other nodes connect via this
313 root 1.3 sub connect {
314 root 1.8 my ($version, $node) = @_;
315 root 1.1
316 root 1.31 (int $version) == (int $GLOBAL_VERSION)
317     or die "node version mismatch (requested $version; we have $GLOBAL_VERSION)";
318    
319 root 1.3 # monitor them, silently die
320 root 1.26 mon $node, psub {
321     delete $SEEDME{$node};
322     kil $SELF;
323     };
324 root 1.3
325 root 1.4 rcv $SELF,
326 root 1.8 addr => sub {
327     my $addresses = shift;
328 root 1.14 $AnyEvent::MP::Kernel::WARN->(9, "$node told us its addresses (@$addresses).");
329 root 1.8 $addr{$node} = $addresses;
330 root 1.9
331 root 1.28 for my $node (keys %SEEDME) {
332 root 1.26 my $port = $port{$node};
333     snd $port, nodes => { $node => $addresses };
334 root 1.9 }
335 root 1.8 },
336 root 1.17 nodes => sub {
337 root 1.8 my ($kv) = @_;
338    
339     use JSON::XS;#d#
340     my $kv_txt = JSON::XS->new->encode ($kv);#d#
341     $AnyEvent::MP::Kernel::WARN->(9, "$node told us it knows about $kv_txt.");#d#
342    
343     while (my ($id, $addresses) = each %$kv) {
344     my $node = AnyEvent::MP::Kernel::add_node $id;
345     $node->connect (@$addresses);
346     start_node $id;
347 root 1.4 }
348     },
349 root 1.16 find => sub {
350 root 1.8 my ($othernode) = @_;
351    
352     $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to find $othernode.");
353 root 1.17 snd $port{$node}, nodes => { $othernode => $addr{$othernode} }
354 root 1.8 if $addr{$othernode};
355     },
356 root 1.4 set => sub {
357 root 1.8 set_groups $node, shift;
358 root 1.4 },
359 root 1.20 reg0 => sub {
360     _change $_[0], [], [$_[1]];
361     },
362     reg1 => sub {
363     _change $_[0], [$_[1]], [];
364     },
365 root 1.26
366     # some node asks us to provide network updates
367     seedme0 => sub {
368     $AnyEvent::MP::Kernel::WARN->(0, "$node asked us to NOT seed it.");#d#
369     delete $SEEDME{$node};
370     },
371     seedme1 => sub {
372     $AnyEvent::MP::Kernel::WARN->(0, "$node asked us to seed it.");#d#
373     $SEEDME{$node} = ();
374    
375     # for good measure
376     snd $port{$node}, nodes => \%addr if %addr;
377     },
378 root 1.4 ;
379 root 1.3 }
380 root 1.1
381 root 1.29 sub set_master($) {
382     return if $MASTER eq $_[0];
383    
384     snd $port{$MASTER}, "seedme0"
385     if $MASTER && node_is_up $MASTER;
386    
387     $MASTER = $_[0];
388    
389     if ($MASTER) {
390     snd $port{$MASTER}, "seedme1";
391     $AnyEvent::MP::Kernel::WARN->(7, "selected new master: $MASTER.");
392     } else {
393     $AnyEvent::MP::Kernel::WARN->(1, "no contact to any other node.");
394     }
395     }
396    
397 root 1.1 sub mon_node {
398 root 1.8 my ($node, $is_up) = @_;
399 root 1.1
400     if ($is_up) {
401 root 1.8 ++$nodecnt;
402     start_node $node;
403 root 1.26
404 root 1.29 if (node_is_seed $node) {
405     if (node_is_seed $MASTER) {
406     my @SEEDS = up_seeds;
407    
408     # switching here with lower chance roughly hopefully still gives us
409     # an equal selection.
410     set_master $node
411     if 1 < rand @SEEDS;
412     } else {
413     # a seed always beats a non-seed
414     set_master $node;
415     }
416     }
417 root 1.26 }
418    
419 root 1.29 # select a new(?) master, if required
420     unless ($MASTER and node_is_up $MASTER) {
421     if (my @SEEDS = up_seeds) {
422     set_master $SEEDS[rand @SEEDS];
423 root 1.26 } else {
424 root 1.29 # select "last" non-seed node
425     set_master +(sort +up_nodes)[-1];
426 root 1.26 }
427     }
428    
429     unless ($is_up) {
430 root 1.8 --$nodecnt;
431     more_seeding unless $nodecnt;
432     unreg_groups $node;
433    
434     # forget about the node
435     delete $addr{$node};
436 root 1.26
437     # ask our master for quick recovery
438     snd $port{$MASTER}, find => $node
439     if $MASTER;
440 root 1.1 }
441     }
442    
443     mon_node $_, 1
444     for up_nodes;
445    
446     mon_nodes \&mon_node;
447    
448     =back
449    
450     =head1 SEE ALSO
451    
452     L<AnyEvent::MP>.
453    
454     =head1 AUTHOR
455    
456     Marc Lehmann <schmorp@schmorp.de>
457     http://home.schmorp.de/
458    
459     =cut
460    
461     1
462