ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.26
Committed: Wed Sep 9 01:47:01 2009 UTC (14 years, 8 months ago) by root
Branch: MAIN
Changes since 1.25: +67 -18 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Global - some network-global services
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Global;
8    
9     =head1 DESCRIPTION
10    
11 root 1.15 This module maintains a fully-meshed network, if possible, and tries to
12     ensure that we are connected to at least one other node.
13 root 1.1
14 root 1.15 It also manages named port groups - ports can register themselves in any
15     number of groups that will be available network-wide, which is great for
16     discovering services.
17 root 1.1
18 root 1.15 Running it on one node will automatically run it on all nodes, although,
19     at the moment, the global service is started by default anyways.
20 root 1.3
21 root 1.1 =head1 GLOBALS AND FUNCTIONS
22    
23     =over 4
24    
25     =cut
26    
27     package AnyEvent::MP::Global;
28    
29     use common::sense;
30     use Carp ();
31    
32 root 1.8 use AnyEvent ();
33 root 1.4 use AnyEvent::Util ();
34    
35 root 1.1 use AnyEvent::MP;
36     use AnyEvent::MP::Kernel;
37 root 1.22 use AnyEvent::MP::Transport ();
38 root 1.1
39 root 1.20 use base "Exporter";
40    
41     our @EXPORT = qw(
42     grp_reg
43     grp_get
44     grp_mon
45     );
46    
47 root 1.1 our $VERSION = $AnyEvent::MP::VERSION;
48    
49 root 1.8 our %addr; # port ID => [address...] mapping
50    
51 root 1.4 our %port; # our rendezvous port on the other side
52     our %lreg; # local registry, name => [pid...]
53 root 1.20 our %lmon; # local registry monitoring name,pid => mon
54     our %greg; # global regstry, name => pid => undef
55 root 1.22 our %gmon; # group monitoring, group => [$cb...]
56 root 1.4
57 root 1.8 our $nodecnt;
58    
59 root 1.5 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
60    
61 root 1.8 #############################################################################
62     # seednodes
63    
64 root 1.26 our $MASTER; # our current master (which we regularly query for net updates)
65    
66     our %SEEDME; # $node => $port
67 root 1.8 our @SEEDS;
68 root 1.22 our %SEEDS; # just to check whether a seed is a seed
69 root 1.14 our %SEED_CONNECT;
70 root 1.8 our $SEED_WATCHER;
71    
72 root 1.22 push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
73     my $peer = $_[0]{local_greeting}{peeraddr};
74     return unless exists $SEEDS{$peer};
75     $SEED_CONNECT{$peer} = 2;
76     };
77    
78     push @AnyEvent::MP::Transport::HOOK_CONNECTED, sub {
79     my $peer = $_[0]{local_greeting}{peeraddr};
80     return unless exists $SEEDS{$peer};
81     $SEEDS{$peer} = $_[0]{remote_node};
82     };
83    
84     push @AnyEvent::MP::Transport::HOOK_DESTROY, sub {
85     delete $SEED_CONNECT{$_[0]{local_greeting}{peeraddr}};
86 root 1.24
87     # check if we contacted ourselves, so nuke this seed
88 root 1.25 if (exists $_[0]{seed} && $_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
89     # $AnyEvent::MP::Kernel::WARN->(0,"avoiding seed $_[0]{seed}\n");#d#
90     delete $SEEDS{$_[0]{seed}};
91 root 1.24 }
92 root 1.22 };
93    
94 root 1.8 sub seed_connect {
95     my ($seed) = @_;
96    
97     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
98     or Carp::croak "$seed: unparsable seed address";
99    
100 root 1.22 return if $SEED_CONNECT{$seed};
101    
102     $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
103    
104 root 1.8 # ughhh
105 root 1.14 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect $host, $port,
106     seed => $seed,
107 root 1.8 sub {
108 root 1.22 $SEED_CONNECT{$seed} = 1;
109 root 1.8 },
110     ;
111     }
112    
113     sub more_seeding {
114 root 1.22 my $int = List::Util::max 1,
115     $AnyEvent::MP::Kernel::CONFIG->{connect_interval}
116     * ($nodecnt ? keys %AnyEvent::MP::Kernel::NODE : 1)
117     - rand;
118    
119     $SEED_WATCHER = AE::timer $int, 0, \&more_seeding;
120    
121     @SEEDS = keys %SEEDS unless @SEEDS;
122 root 1.8 return unless @SEEDS;
123    
124 root 1.22 seed_connect splice @SEEDS, rand @SEEDS, 1;
125 root 1.8 }
126    
127     sub set_seeds(@) {
128 root 1.22 @SEEDS{@_} = ();
129 root 1.8
130     $SEED_WATCHER ||= AE::timer 5, $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, \&more_seeding;
131    
132 root 1.22 for (1 .. keys %SEEDS) {
133     after 0.100 * rand, \&more_seeding;
134 root 1.19 }
135 root 1.8 }
136    
137 root 1.26 sub up_seeds() {
138     grep node_is_up $_, values %SEEDS
139     }
140    
141     sub node_is_seed($) {
142     grep $_ eq $_[0], grep defined, values %SEEDS
143     }
144    
145 root 1.22 # returns all (up) seed nodes, or all nodes if no seednodes are up/known
146 root 1.26 sub route_nodes {
147     my @seeds = up_seeds;
148 root 1.22 @seeds = up_nodes unless @seeds;
149     @seeds
150     }
151    
152 root 1.8 #############################################################################
153    
154 root 1.20 sub _change {
155     my ($group, $add, $del) = @_;
156    
157     my $kv = $greg{$group} ||= {};
158    
159     delete @$kv{@$del};
160     @$kv{@$add} = ();
161    
162     my $ports = [keys %$kv];
163     $_->($ports, $add, $del) for @{ $gmon{$group} };
164     }
165    
166 root 1.4 sub unreg_groups($) {
167 root 1.8 my ($node) = @_;
168 root 1.4
169 root 1.8 my $qr = qr/^\Q$node\E(?:#|$)/;
170 root 1.20 my @del;
171 root 1.4
172 root 1.20 while (my ($group, $ports) = each %greg) {
173     @del = grep /$qr/, keys %$ports;
174     _change $group, [], \@del
175     if @del;
176 root 1.4 }
177     }
178    
179     sub set_groups($$) {
180 root 1.8 my ($node, $lreg) = @_;
181 root 1.5
182 root 1.6 while (my ($k, $v) = each %$lreg) {
183 root 1.20 _change $k, $v, [];
184 root 1.6 }
185 root 1.4 }
186    
187 root 1.20 =item $guard = grp_reg $group, $port
188 root 1.4
189     Register the given (local!) port in the named global group C<$group>.
190    
191     The port will be unregistered automatically when the port is destroyed.
192    
193     When not called in void context, then a guard object will be returned that
194     will also cause the name to be unregistered when destroyed.
195    
196     =cut
197    
198     # unregister local port
199     sub unregister {
200     my ($port, $group) = @_;
201    
202     delete $lmon{"$group\x00$port"};
203     @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
204    
205 root 1.20 _change $group, [], [$port];
206 root 1.4
207 root 1.20 snd $_, reg0 => $group, $port
208 root 1.4 for values %port;
209     }
210    
211     # register local port
212 root 1.20 sub grp_reg($$) {
213     my ($group, $port) = @_;
214 root 1.4
215     port_is_local $port
216 root 1.20 or Carp::croak "AnyEvent::MP::Global::grp_reg can only be called for local ports, caught";
217    
218     grep $_ eq $port, @{ $lreg{$group} }
219     and Carp::croak "'$group': group already registered, cannot register a second time";
220 root 1.4
221     $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
222     push @{ $lreg{$group} }, $port;
223    
224 root 1.20 snd $_, reg1 => $group, $port
225 root 1.4 for values %port;
226    
227 root 1.20 _change $group, [$port], [];
228 root 1.4
229     wantarray && AnyEvent::Util::guard { unregister $port, $group }
230     }
231    
232 root 1.20 =item $ports = grp_get $group
233 root 1.15
234     Returns all the ports currently registered to the given group (as
235 root 1.20 read-only(!) array reference). When the group has no registered members,
236 root 1.15 return C<undef>.
237    
238     =cut
239    
240 root 1.20 sub grp_get($) {
241     my @ports = keys %{ $greg{$_[0]} };
242     @ports ? \@ports : undef
243     }
244    
245     =item $guard = grp_mon $group, $callback->($ports, $add, $del)
246    
247     Installs a monitor on the given group. Each time there is a change it
248     will be called with the current group members as an arrayref as the
249     first argument. The second argument only contains ports added, the third
250     argument only ports removed.
251    
252     Unlike C<grp_get>, all three arguments will always be array-refs, even if
253     the array is empty. None of the arrays must be modified in any way.
254    
255     The first invocation will be with the first two arguments set to the
256     current members, as if all of them were just added, but only when the
257     group is atcually non-empty.
258    
259     Optionally returns a guard object that uninstalls the watcher when it is
260     destroyed.
261    
262     =cut
263    
264     sub grp_mon($$) {
265     my ($grp, $cb) = @_;
266    
267     AnyEvent::MP::Kernel::delay sub {
268     return unless $cb;
269    
270     push @{ $gmon{$grp} }, $cb;
271     $cb->(((grp_get $grp) || return) x 2, []);
272     };
273    
274     defined wantarray && AnyEvent::Util::::guard {
275     my @mon = grep $_ != $cb, @{ delete $gmon{$grp} };
276     $gmon{$grp} = \@mon if @mon;
277     undef $cb;
278     }
279 root 1.15 }
280    
281 root 1.4 sub start_node {
282 root 1.8 my ($node) = @_;
283 root 1.4
284 root 1.8 return if exists $port{$node};
285     return if $node eq $NODE; # do not connect to ourselves
286 root 1.4
287     # establish connection
288 root 1.8 my $port = $port{$node} = spawn $node, "AnyEvent::MP::Global::connect", 0, $NODE;
289    
290 root 1.4 mon $port, sub {
291 root 1.8 unreg_groups $node;
292     delete $port{$node};
293 root 1.4 };
294 root 1.8
295 root 1.10 snd $port, addr => $AnyEvent::MP::Kernel::LISTENER;
296 root 1.17 snd $port, nodes => \%addr if %addr;
297 root 1.8 snd $port, set => \%lreg if %lreg;
298 root 1.4 }
299 root 1.3
300 root 1.5 # other nodes connect via this
301 root 1.3 sub connect {
302 root 1.8 my ($version, $node) = @_;
303 root 1.1
304 root 1.3 # monitor them, silently die
305 root 1.26 mon $node, psub {
306     delete $SEEDME{$node};
307     kil $SELF;
308     };
309 root 1.3
310 root 1.4 rcv $SELF,
311 root 1.8 addr => sub {
312     my $addresses = shift;
313 root 1.14 $AnyEvent::MP::Kernel::WARN->(9, "$node told us its addresses (@$addresses).");
314 root 1.8 $addr{$node} = $addresses;
315 root 1.9
316 root 1.26 for my $node (values %SEEDME) {
317     my $port = $port{$node};
318     snd $port, nodes => { $node => $addresses };
319 root 1.9 }
320 root 1.8 },
321 root 1.17 nodes => sub {
322 root 1.8 my ($kv) = @_;
323    
324     use JSON::XS;#d#
325     my $kv_txt = JSON::XS->new->encode ($kv);#d#
326     $AnyEvent::MP::Kernel::WARN->(9, "$node told us it knows about $kv_txt.");#d#
327    
328     while (my ($id, $addresses) = each %$kv) {
329     my $node = AnyEvent::MP::Kernel::add_node $id;
330     $node->connect (@$addresses);
331     start_node $id;
332 root 1.4 }
333     },
334 root 1.16 find => sub {
335 root 1.8 my ($othernode) = @_;
336    
337     $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to find $othernode.");
338 root 1.17 snd $port{$node}, nodes => { $othernode => $addr{$othernode} }
339 root 1.8 if $addr{$othernode};
340     },
341 root 1.4 set => sub {
342 root 1.8 set_groups $node, shift;
343 root 1.4 },
344 root 1.20 reg0 => sub {
345     _change $_[0], [], [$_[1]];
346     },
347     reg1 => sub {
348     _change $_[0], [$_[1]], [];
349     },
350 root 1.26
351     # some node asks us to provide network updates
352     seedme0 => sub {
353     $AnyEvent::MP::Kernel::WARN->(0, "$node asked us to NOT seed it.");#d#
354     delete $SEEDME{$node};
355     },
356     seedme1 => sub {
357     $AnyEvent::MP::Kernel::WARN->(0, "$node asked us to seed it.");#d#
358     $SEEDME{$node} = ();
359    
360     # for good measure
361     snd $port{$node}, nodes => \%addr if %addr;
362     },
363 root 1.4 ;
364 root 1.3 }
365 root 1.1
366     sub mon_node {
367 root 1.8 my ($node, $is_up) = @_;
368 root 1.1
369     if ($is_up) {
370 root 1.8 ++$nodecnt;
371     start_node $node;
372 root 1.26 }
373    
374     # now select a new(?) master
375     my $master;
376    
377     if (my @SEEDS = up_seeds) {
378     # switching here with lower chance roughly hopefully still gives us
379     # an equal selection.
380     $master = node_is_up $MASTER && node_is_seed $MASTER && 1 < rand @SEEDS
381     ? $MASTER
382     : $SEEDS[rand @SEEDS];
383 root 1.2 } else {
384 root 1.26 # select "last" non-seed node
385     $master = (sort +up_nodes)[-1];
386     #TODO maybe avoid listener-less nodes?
387     }
388    
389     if ($MASTER ne $master) {
390     snd $port{$MASTER}, "seedme0"
391     if $MASTER && node_is_up $MASTER;
392    
393     $MASTER = $master;
394     if ($MASTER) {
395     snd $port{$MASTER}, "seedme1";
396     $AnyEvent::MP::Kernel::WARN->(7, "selected new master: $MASTER.");
397     } else {
398     $AnyEvent::MP::Kernel::WARN->(1, "no contact to any other node.");
399     }
400     }
401    
402     unless ($is_up) {
403 root 1.8 --$nodecnt;
404     more_seeding unless $nodecnt;
405     unreg_groups $node;
406    
407     # forget about the node
408     delete $addr{$node};
409 root 1.26
410     # ask our master for quick recovery
411     snd $port{$MASTER}, find => $node
412     if $MASTER;
413 root 1.1 }
414     }
415    
416     mon_node $_, 1
417     for up_nodes;
418    
419     mon_nodes \&mon_node;
420    
421     =back
422    
423     =head1 SEE ALSO
424    
425     L<AnyEvent::MP>.
426    
427     =head1 AUTHOR
428    
429     Marc Lehmann <schmorp@schmorp.de>
430     http://home.schmorp.de/
431    
432     =cut
433    
434     1
435