ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.34
Committed: Thu Nov 5 01:12:41 2009 UTC (14 years, 7 months ago) by root
Branch: MAIN
CVS Tags: rel-1_23
Changes since 1.33: +2 -3 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Global - some network-global services
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Global;
8    
9     =head1 DESCRIPTION
10    
11 root 1.15 This module maintains a fully-meshed network, if possible, and tries to
12     ensure that we are connected to at least one other node.
13 root 1.1
14 root 1.15 It also manages named port groups - ports can register themselves in any
15     number of groups that will be available network-wide, which is great for
16     discovering services.
17 root 1.1
18 root 1.15 Running it on one node will automatically run it on all nodes, although,
19     at the moment, the global service is started by default anyways.
20 root 1.3
21 root 1.1 =head1 GLOBALS AND FUNCTIONS
22    
23     =over 4
24    
25     =cut
26    
27     package AnyEvent::MP::Global;
28    
29     use common::sense;
30     use Carp ();
31    
32 root 1.8 use AnyEvent ();
33 root 1.4 use AnyEvent::Util ();
34    
35 root 1.1 use AnyEvent::MP;
36     use AnyEvent::MP::Kernel;
37 root 1.22 use AnyEvent::MP::Transport ();
38 root 1.1
39 root 1.20 use base "Exporter";
40    
41     our @EXPORT = qw(
42     grp_reg
43     grp_get
44     grp_mon
45     );
46    
47 root 1.31 our $GLOBAL_VERSION = 0;
48    
49 root 1.8 our %addr; # port ID => [address...] mapping
50    
51 root 1.4 our %port; # our rendezvous port on the other side
52     our %lreg; # local registry, name => [pid...]
53 root 1.20 our %lmon; # local registry monitoring name,pid => mon
54     our %greg; # global regstry, name => pid => undef
55 root 1.22 our %gmon; # group monitoring, group => [$cb...]
56 root 1.4
57 root 1.8 our $nodecnt;
58    
59 root 1.5 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
60    
61 root 1.8 #############################################################################
62     # seednodes
63    
64 root 1.26 our $MASTER; # our current master (which we regularly query for net updates)
65    
66     our %SEEDME; # $node => $port
67 root 1.8 our @SEEDS;
68 root 1.22 our %SEEDS; # just to check whether a seed is a seed
69 root 1.14 our %SEED_CONNECT;
70 root 1.8 our $SEED_WATCHER;
71    
72 root 1.22 push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
73     my $peer = $_[0]{local_greeting}{peeraddr};
74     return unless exists $SEEDS{$peer};
75     $SEED_CONNECT{$peer} = 2;
76     };
77    
78 root 1.27 push @AnyEvent::MP::Transport::HOOK_GREETING, sub {
79     # we rely on untrusted data here (the remote node name)
80     # this is hopefully ok, as we override it on successful
81     # connects, and this can at most be used for DOSing,
82     # which is easy when you can do MITM.
83     my $peer = $_[0]{local_greeting}{peeraddr};
84     return unless exists $SEEDS{$peer};
85     $SEEDS{$peer} ||= $_[0]{remote_node};
86     };
87    
88 root 1.22 push @AnyEvent::MP::Transport::HOOK_CONNECTED, sub {
89     my $peer = $_[0]{local_greeting}{peeraddr};
90     return unless exists $SEEDS{$peer};
91     $SEEDS{$peer} = $_[0]{remote_node};
92     };
93    
94     push @AnyEvent::MP::Transport::HOOK_DESTROY, sub {
95     delete $SEED_CONNECT{$_[0]{local_greeting}{peeraddr}};
96 root 1.24
97     # check if we contacted ourselves, so nuke this seed
98 root 1.25 if (exists $_[0]{seed} && $_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
99     # $AnyEvent::MP::Kernel::WARN->(0,"avoiding seed $_[0]{seed}\n");#d#
100     delete $SEEDS{$_[0]{seed}};
101 root 1.24 }
102 root 1.22 };
103    
104 root 1.8 sub seed_connect {
105     my ($seed) = @_;
106    
107     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
108     or Carp::croak "$seed: unparsable seed address";
109    
110 root 1.22 return if $SEED_CONNECT{$seed};
111 root 1.27 return if defined $SEEDS{$seed} && node_is_up $SEEDS{$seed};
112 root 1.22
113     $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
114    
115 root 1.8 # ughhh
116 root 1.14 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect $host, $port,
117     seed => $seed,
118 root 1.8 sub {
119 root 1.22 $SEED_CONNECT{$seed} = 1;
120 root 1.8 },
121     ;
122     }
123    
124     sub more_seeding {
125 root 1.22 my $int = List::Util::max 1,
126     $AnyEvent::MP::Kernel::CONFIG->{connect_interval}
127     * ($nodecnt ? keys %AnyEvent::MP::Kernel::NODE : 1)
128     - rand;
129    
130     $SEED_WATCHER = AE::timer $int, 0, \&more_seeding;
131    
132     @SEEDS = keys %SEEDS unless @SEEDS;
133 root 1.8 return unless @SEEDS;
134    
135 root 1.22 seed_connect splice @SEEDS, rand @SEEDS, 1;
136 root 1.8 }
137    
138     sub set_seeds(@) {
139 root 1.22 @SEEDS{@_} = ();
140 root 1.8
141     $SEED_WATCHER ||= AE::timer 5, $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, \&more_seeding;
142    
143 root 1.29 after 0.100 * rand, \&more_seeding
144     for 1 .. keys %SEEDS;
145 root 1.8 }
146    
147 root 1.26 sub up_seeds() {
148     grep node_is_up $_, values %SEEDS
149     }
150    
151     sub node_is_seed($) {
152     grep $_ eq $_[0], grep defined, values %SEEDS
153     }
154    
155 root 1.22 # returns all (up) seed nodes, or all nodes if no seednodes are up/known
156 root 1.26 sub route_nodes {
157     my @seeds = up_seeds;
158 root 1.22 @seeds = up_nodes unless @seeds;
159     @seeds
160     }
161    
162 root 1.8 #############################################################################
163    
164 root 1.20 sub _change {
165     my ($group, $add, $del) = @_;
166    
167     my $kv = $greg{$group} ||= {};
168    
169     delete @$kv{@$del};
170     @$kv{@$add} = ();
171    
172     my $ports = [keys %$kv];
173     $_->($ports, $add, $del) for @{ $gmon{$group} };
174     }
175    
176 root 1.4 sub unreg_groups($) {
177 root 1.8 my ($node) = @_;
178 root 1.4
179 root 1.8 my $qr = qr/^\Q$node\E(?:#|$)/;
180 root 1.20 my @del;
181 root 1.4
182 root 1.20 while (my ($group, $ports) = each %greg) {
183     @del = grep /$qr/, keys %$ports;
184     _change $group, [], \@del
185     if @del;
186 root 1.4 }
187     }
188    
189     sub set_groups($$) {
190 root 1.8 my ($node, $lreg) = @_;
191 root 1.5
192 root 1.6 while (my ($k, $v) = each %$lreg) {
193 root 1.20 _change $k, $v, [];
194 root 1.6 }
195 root 1.4 }
196    
197 root 1.20 =item $guard = grp_reg $group, $port
198 root 1.4
199     Register the given (local!) port in the named global group C<$group>.
200    
201     The port will be unregistered automatically when the port is destroyed.
202    
203     When not called in void context, then a guard object will be returned that
204     will also cause the name to be unregistered when destroyed.
205    
206     =cut
207    
208     # unregister local port
209     sub unregister {
210     my ($port, $group) = @_;
211    
212     delete $lmon{"$group\x00$port"};
213     @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
214    
215 root 1.20 _change $group, [], [$port];
216 root 1.4
217 root 1.20 snd $_, reg0 => $group, $port
218 root 1.4 for values %port;
219     }
220    
221     # register local port
222 root 1.20 sub grp_reg($$) {
223     my ($group, $port) = @_;
224 root 1.4
225     port_is_local $port
226 root 1.20 or Carp::croak "AnyEvent::MP::Global::grp_reg can only be called for local ports, caught";
227    
228     grep $_ eq $port, @{ $lreg{$group} }
229     and Carp::croak "'$group': group already registered, cannot register a second time";
230 root 1.4
231     $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
232     push @{ $lreg{$group} }, $port;
233    
234 root 1.20 snd $_, reg1 => $group, $port
235 root 1.4 for values %port;
236    
237 root 1.20 _change $group, [$port], [];
238 root 1.4
239 elmex 1.33 defined wantarray && AnyEvent::Util::guard { unregister $port, $group }
240 root 1.4 }
241    
242 root 1.20 =item $ports = grp_get $group
243 root 1.15
244     Returns all the ports currently registered to the given group (as
245 root 1.20 read-only(!) array reference). When the group has no registered members,
246 root 1.15 return C<undef>.
247    
248     =cut
249    
250 root 1.20 sub grp_get($) {
251     my @ports = keys %{ $greg{$_[0]} };
252     @ports ? \@ports : undef
253     }
254    
255     =item $guard = grp_mon $group, $callback->($ports, $add, $del)
256    
257     Installs a monitor on the given group. Each time there is a change it
258     will be called with the current group members as an arrayref as the
259     first argument. The second argument only contains ports added, the third
260     argument only ports removed.
261    
262     Unlike C<grp_get>, all three arguments will always be array-refs, even if
263     the array is empty. None of the arrays must be modified in any way.
264    
265     The first invocation will be with the first two arguments set to the
266     current members, as if all of them were just added, but only when the
267     group is atcually non-empty.
268    
269     Optionally returns a guard object that uninstalls the watcher when it is
270     destroyed.
271    
272     =cut
273    
274     sub grp_mon($$) {
275     my ($grp, $cb) = @_;
276    
277     AnyEvent::MP::Kernel::delay sub {
278     return unless $cb;
279    
280     push @{ $gmon{$grp} }, $cb;
281     $cb->(((grp_get $grp) || return) x 2, []);
282     };
283    
284 root 1.30 defined wantarray && AnyEvent::Util::guard {
285 root 1.20 my @mon = grep $_ != $cb, @{ delete $gmon{$grp} };
286     $gmon{$grp} = \@mon if @mon;
287     undef $cb;
288     }
289 root 1.15 }
290    
291 root 1.4 sub start_node {
292 root 1.8 my ($node) = @_;
293 root 1.4
294 root 1.8 return if exists $port{$node};
295     return if $node eq $NODE; # do not connect to ourselves
296 root 1.4
297     # establish connection
298 root 1.31 my $port = $port{$node} = spawn $node, "AnyEvent::MP::Global::connect", $GLOBAL_VERSION, $NODE;
299 root 1.8
300 root 1.4 mon $port, sub {
301 root 1.8 unreg_groups $node;
302     delete $port{$node};
303 root 1.4 };
304 root 1.8
305 root 1.10 snd $port, addr => $AnyEvent::MP::Kernel::LISTENER;
306 root 1.17 snd $port, nodes => \%addr if %addr;
307 root 1.8 snd $port, set => \%lreg if %lreg;
308 root 1.4 }
309 root 1.3
310 root 1.5 # other nodes connect via this
311 root 1.3 sub connect {
312 root 1.8 my ($version, $node) = @_;
313 root 1.1
314 root 1.31 (int $version) == (int $GLOBAL_VERSION)
315     or die "node version mismatch (requested $version; we have $GLOBAL_VERSION)";
316    
317 root 1.3 # monitor them, silently die
318 root 1.26 mon $node, psub {
319     delete $SEEDME{$node};
320     kil $SELF;
321     };
322 root 1.3
323 root 1.4 rcv $SELF,
324 root 1.8 addr => sub {
325     my $addresses = shift;
326 root 1.14 $AnyEvent::MP::Kernel::WARN->(9, "$node told us its addresses (@$addresses).");
327 root 1.8 $addr{$node} = $addresses;
328 root 1.9
329 root 1.34 for my $slave (keys %SEEDME) {
330     snd $port{$slave}, nodes => { $node => $addresses };
331 root 1.9 }
332 root 1.8 },
333 root 1.17 nodes => sub {
334 root 1.8 my ($kv) = @_;
335    
336     use JSON::XS;#d#
337     my $kv_txt = JSON::XS->new->encode ($kv);#d#
338     $AnyEvent::MP::Kernel::WARN->(9, "$node told us it knows about $kv_txt.");#d#
339    
340     while (my ($id, $addresses) = each %$kv) {
341     my $node = AnyEvent::MP::Kernel::add_node $id;
342     $node->connect (@$addresses);
343     start_node $id;
344 root 1.4 }
345     },
346 root 1.16 find => sub {
347 root 1.8 my ($othernode) = @_;
348    
349     $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to find $othernode.");
350 root 1.17 snd $port{$node}, nodes => { $othernode => $addr{$othernode} }
351 root 1.8 if $addr{$othernode};
352     },
353 root 1.4 set => sub {
354 root 1.8 set_groups $node, shift;
355 root 1.4 },
356 root 1.20 reg0 => sub {
357     _change $_[0], [], [$_[1]];
358     },
359     reg1 => sub {
360     _change $_[0], [$_[1]], [];
361     },
362 root 1.26
363     # some node asks us to provide network updates
364     seedme0 => sub {
365     $AnyEvent::MP::Kernel::WARN->(0, "$node asked us to NOT seed it.");#d#
366     delete $SEEDME{$node};
367     },
368     seedme1 => sub {
369     $AnyEvent::MP::Kernel::WARN->(0, "$node asked us to seed it.");#d#
370     $SEEDME{$node} = ();
371    
372     # for good measure
373     snd $port{$node}, nodes => \%addr if %addr;
374     },
375 root 1.4 ;
376 root 1.3 }
377 root 1.1
378 root 1.29 sub set_master($) {
379     return if $MASTER eq $_[0];
380    
381     snd $port{$MASTER}, "seedme0"
382     if $MASTER && node_is_up $MASTER;
383    
384     $MASTER = $_[0];
385    
386     if ($MASTER) {
387     snd $port{$MASTER}, "seedme1";
388     $AnyEvent::MP::Kernel::WARN->(7, "selected new master: $MASTER.");
389     } else {
390     $AnyEvent::MP::Kernel::WARN->(1, "no contact to any other node.");
391     }
392     }
393    
394 root 1.1 sub mon_node {
395 root 1.8 my ($node, $is_up) = @_;
396 root 1.1
397     if ($is_up) {
398 root 1.8 ++$nodecnt;
399     start_node $node;
400 root 1.26
401 root 1.29 if (node_is_seed $node) {
402     if (node_is_seed $MASTER) {
403     my @SEEDS = up_seeds;
404    
405     # switching here with lower chance roughly hopefully still gives us
406     # an equal selection.
407     set_master $node
408     if 1 < rand @SEEDS;
409     } else {
410     # a seed always beats a non-seed
411     set_master $node;
412     }
413     }
414 root 1.26 }
415    
416 root 1.29 # select a new(?) master, if required
417     unless ($MASTER and node_is_up $MASTER) {
418     if (my @SEEDS = up_seeds) {
419     set_master $SEEDS[rand @SEEDS];
420 root 1.26 } else {
421 root 1.29 # select "last" non-seed node
422     set_master +(sort +up_nodes)[-1];
423 root 1.26 }
424     }
425    
426     unless ($is_up) {
427 root 1.8 --$nodecnt;
428     more_seeding unless $nodecnt;
429     unreg_groups $node;
430    
431     # forget about the node
432     delete $addr{$node};
433 root 1.26
434     # ask our master for quick recovery
435     snd $port{$MASTER}, find => $node
436     if $MASTER;
437 root 1.1 }
438     }
439    
440     mon_node $_, 1
441     for up_nodes;
442    
443     mon_nodes \&mon_node;
444    
445     =back
446    
447     =head1 SEE ALSO
448    
449     L<AnyEvent::MP>.
450    
451     =head1 AUTHOR
452    
453     Marc Lehmann <schmorp@schmorp.de>
454     http://home.schmorp.de/
455    
456     =cut
457    
458     1
459