ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.35
Committed: Sat Nov 7 02:36:31 2009 UTC (14 years, 6 months ago) by root
Branch: MAIN
Changes since 1.34: +9 -3 lines
Log Message:
bleh

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Global - some network-global services
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Global;
8    
9     =head1 DESCRIPTION
10    
11 root 1.15 This module maintains a fully-meshed network, if possible, and tries to
12     ensure that we are connected to at least one other node.
13 root 1.1
14 root 1.15 It also manages named port groups - ports can register themselves in any
15     number of groups that will be available network-wide, which is great for
16     discovering services.
17 root 1.1
18 root 1.15 Running it on one node will automatically run it on all nodes, although,
19     at the moment, the global service is started by default anyways.
20 root 1.3
21 root 1.1 =head1 GLOBALS AND FUNCTIONS
22    
23     =over 4
24    
25     =cut
26    
27     package AnyEvent::MP::Global;
28    
29     use common::sense;
30     use Carp ();
31    
32 root 1.8 use AnyEvent ();
33 root 1.4 use AnyEvent::Util ();
34    
35 root 1.1 use AnyEvent::MP;
36     use AnyEvent::MP::Kernel;
37 root 1.22 use AnyEvent::MP::Transport ();
38 root 1.1
39 root 1.20 use base "Exporter";
40    
41     our @EXPORT = qw(
42     grp_reg
43     grp_get
44     grp_mon
45     );
46    
47 root 1.31 our $GLOBAL_VERSION = 0;
48    
49 root 1.35 our %ON_SETUP; # take note: not public
50    
51 root 1.8 our %addr; # port ID => [address...] mapping
52    
53 root 1.4 our %port; # our rendezvous port on the other side
54     our %lreg; # local registry, name => [pid...]
55 root 1.20 our %lmon; # local registry monitoring name,pid => mon
56     our %greg; # global regstry, name => pid => undef
57 root 1.22 our %gmon; # group monitoring, group => [$cb...]
58 root 1.4
59 root 1.8 our $nodecnt;
60    
61 root 1.5 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
62    
63 root 1.8 #############################################################################
64     # seednodes
65    
66 root 1.26 our $MASTER; # our current master (which we regularly query for net updates)
67    
68     our %SEEDME; # $node => $port
69 root 1.8 our @SEEDS;
70 root 1.22 our %SEEDS; # just to check whether a seed is a seed
71 root 1.14 our %SEED_CONNECT;
72 root 1.8 our $SEED_WATCHER;
73    
74 root 1.22 push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
75     my $peer = $_[0]{local_greeting}{peeraddr};
76     return unless exists $SEEDS{$peer};
77     $SEED_CONNECT{$peer} = 2;
78     };
79    
80 root 1.27 push @AnyEvent::MP::Transport::HOOK_GREETING, sub {
81     # we rely on untrusted data here (the remote node name)
82     # this is hopefully ok, as we override it on successful
83     # connects, and this can at most be used for DOSing,
84     # which is easy when you can do MITM.
85     my $peer = $_[0]{local_greeting}{peeraddr};
86     return unless exists $SEEDS{$peer};
87     $SEEDS{$peer} ||= $_[0]{remote_node};
88     };
89    
90 root 1.22 push @AnyEvent::MP::Transport::HOOK_CONNECTED, sub {
91     my $peer = $_[0]{local_greeting}{peeraddr};
92     return unless exists $SEEDS{$peer};
93     $SEEDS{$peer} = $_[0]{remote_node};
94     };
95    
96     push @AnyEvent::MP::Transport::HOOK_DESTROY, sub {
97     delete $SEED_CONNECT{$_[0]{local_greeting}{peeraddr}};
98 root 1.24
99     # check if we contacted ourselves, so nuke this seed
100 root 1.25 if (exists $_[0]{seed} && $_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
101     # $AnyEvent::MP::Kernel::WARN->(0,"avoiding seed $_[0]{seed}\n");#d#
102     delete $SEEDS{$_[0]{seed}};
103 root 1.24 }
104 root 1.22 };
105    
106 root 1.8 sub seed_connect {
107     my ($seed) = @_;
108    
109     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
110     or Carp::croak "$seed: unparsable seed address";
111    
112 root 1.22 return if $SEED_CONNECT{$seed};
113 root 1.27 return if defined $SEEDS{$seed} && node_is_up $SEEDS{$seed};
114 root 1.22
115     $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
116    
117 root 1.8 # ughhh
118 root 1.14 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect $host, $port,
119     seed => $seed,
120 root 1.8 sub {
121 root 1.22 $SEED_CONNECT{$seed} = 1;
122 root 1.8 },
123     ;
124     }
125    
126     sub more_seeding {
127 root 1.22 my $int = List::Util::max 1,
128     $AnyEvent::MP::Kernel::CONFIG->{connect_interval}
129     * ($nodecnt ? keys %AnyEvent::MP::Kernel::NODE : 1)
130     - rand;
131    
132     $SEED_WATCHER = AE::timer $int, 0, \&more_seeding;
133    
134     @SEEDS = keys %SEEDS unless @SEEDS;
135 root 1.8 return unless @SEEDS;
136    
137 root 1.22 seed_connect splice @SEEDS, rand @SEEDS, 1;
138 root 1.8 }
139    
140     sub set_seeds(@) {
141 root 1.22 @SEEDS{@_} = ();
142 root 1.8
143     $SEED_WATCHER ||= AE::timer 5, $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, \&more_seeding;
144    
145 root 1.29 after 0.100 * rand, \&more_seeding
146     for 1 .. keys %SEEDS;
147 root 1.8 }
148    
149 root 1.26 sub up_seeds() {
150     grep node_is_up $_, values %SEEDS
151     }
152    
153     sub node_is_seed($) {
154     grep $_ eq $_[0], grep defined, values %SEEDS
155     }
156    
157 root 1.22 # returns all (up) seed nodes, or all nodes if no seednodes are up/known
158 root 1.26 sub route_nodes {
159     my @seeds = up_seeds;
160 root 1.22 @seeds = up_nodes unless @seeds;
161     @seeds
162     }
163    
164 root 1.8 #############################################################################
165    
166 root 1.20 sub _change {
167     my ($group, $add, $del) = @_;
168    
169     my $kv = $greg{$group} ||= {};
170    
171     delete @$kv{@$del};
172     @$kv{@$add} = ();
173    
174     my $ports = [keys %$kv];
175     $_->($ports, $add, $del) for @{ $gmon{$group} };
176     }
177    
178 root 1.4 sub unreg_groups($) {
179 root 1.8 my ($node) = @_;
180 root 1.4
181 root 1.8 my $qr = qr/^\Q$node\E(?:#|$)/;
182 root 1.20 my @del;
183 root 1.4
184 root 1.20 while (my ($group, $ports) = each %greg) {
185     @del = grep /$qr/, keys %$ports;
186     _change $group, [], \@del
187     if @del;
188 root 1.4 }
189     }
190    
191     sub set_groups($$) {
192 root 1.8 my ($node, $lreg) = @_;
193 root 1.5
194 root 1.6 while (my ($k, $v) = each %$lreg) {
195 root 1.20 _change $k, $v, [];
196 root 1.6 }
197 root 1.4 }
198    
199 root 1.20 =item $guard = grp_reg $group, $port
200 root 1.4
201     Register the given (local!) port in the named global group C<$group>.
202    
203     The port will be unregistered automatically when the port is destroyed.
204    
205     When not called in void context, then a guard object will be returned that
206     will also cause the name to be unregistered when destroyed.
207    
208     =cut
209    
210     # unregister local port
211     sub unregister {
212     my ($port, $group) = @_;
213    
214     delete $lmon{"$group\x00$port"};
215     @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
216    
217 root 1.20 _change $group, [], [$port];
218 root 1.4
219 root 1.20 snd $_, reg0 => $group, $port
220 root 1.4 for values %port;
221     }
222    
223     # register local port
224 root 1.20 sub grp_reg($$) {
225     my ($group, $port) = @_;
226 root 1.4
227     port_is_local $port
228 root 1.20 or Carp::croak "AnyEvent::MP::Global::grp_reg can only be called for local ports, caught";
229    
230     grep $_ eq $port, @{ $lreg{$group} }
231     and Carp::croak "'$group': group already registered, cannot register a second time";
232 root 1.4
233     $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
234     push @{ $lreg{$group} }, $port;
235    
236 root 1.20 snd $_, reg1 => $group, $port
237 root 1.4 for values %port;
238    
239 root 1.20 _change $group, [$port], [];
240 root 1.4
241 elmex 1.33 defined wantarray && AnyEvent::Util::guard { unregister $port, $group }
242 root 1.4 }
243    
244 root 1.20 =item $ports = grp_get $group
245 root 1.15
246     Returns all the ports currently registered to the given group (as
247 root 1.20 read-only(!) array reference). When the group has no registered members,
248 root 1.15 return C<undef>.
249    
250     =cut
251    
252 root 1.20 sub grp_get($) {
253     my @ports = keys %{ $greg{$_[0]} };
254     @ports ? \@ports : undef
255     }
256    
257     =item $guard = grp_mon $group, $callback->($ports, $add, $del)
258    
259     Installs a monitor on the given group. Each time there is a change it
260     will be called with the current group members as an arrayref as the
261     first argument. The second argument only contains ports added, the third
262     argument only ports removed.
263    
264     Unlike C<grp_get>, all three arguments will always be array-refs, even if
265     the array is empty. None of the arrays must be modified in any way.
266    
267     The first invocation will be with the first two arguments set to the
268     current members, as if all of them were just added, but only when the
269     group is atcually non-empty.
270    
271     Optionally returns a guard object that uninstalls the watcher when it is
272     destroyed.
273    
274     =cut
275    
276     sub grp_mon($$) {
277     my ($grp, $cb) = @_;
278    
279     AnyEvent::MP::Kernel::delay sub {
280     return unless $cb;
281    
282     push @{ $gmon{$grp} }, $cb;
283     $cb->(((grp_get $grp) || return) x 2, []);
284     };
285    
286 root 1.30 defined wantarray && AnyEvent::Util::guard {
287 root 1.20 my @mon = grep $_ != $cb, @{ delete $gmon{$grp} };
288     $gmon{$grp} = \@mon if @mon;
289     undef $cb;
290     }
291 root 1.15 }
292    
293 root 1.4 sub start_node {
294 root 1.8 my ($node) = @_;
295 root 1.4
296 root 1.8 return if exists $port{$node};
297     return if $node eq $NODE; # do not connect to ourselves
298 root 1.4
299     # establish connection
300 root 1.31 my $port = $port{$node} = spawn $node, "AnyEvent::MP::Global::connect", $GLOBAL_VERSION, $NODE;
301 root 1.8
302 root 1.4 mon $port, sub {
303 root 1.8 unreg_groups $node;
304     delete $port{$node};
305 root 1.4 };
306 root 1.8
307 root 1.10 snd $port, addr => $AnyEvent::MP::Kernel::LISTENER;
308 root 1.17 snd $port, nodes => \%addr if %addr;
309 root 1.8 snd $port, set => \%lreg if %lreg;
310 root 1.35 snd $port, "setup"; # tell the other side that we are in business now
311 root 1.4 }
312 root 1.3
313 root 1.5 # other nodes connect via this
314 root 1.3 sub connect {
315 root 1.8 my ($version, $node) = @_;
316 root 1.1
317 root 1.31 (int $version) == (int $GLOBAL_VERSION)
318     or die "node version mismatch (requested $version; we have $GLOBAL_VERSION)";
319    
320 root 1.3 # monitor them, silently die
321 root 1.26 mon $node, psub {
322     delete $SEEDME{$node};
323     kil $SELF;
324     };
325 root 1.3
326 root 1.4 rcv $SELF,
327 root 1.35 setup => sub {
328     $_->($node) for values %ON_SETUP;
329     },
330 root 1.8 addr => sub {
331     my $addresses = shift;
332 root 1.14 $AnyEvent::MP::Kernel::WARN->(9, "$node told us its addresses (@$addresses).");
333 root 1.8 $addr{$node} = $addresses;
334 root 1.9
335 root 1.34 for my $slave (keys %SEEDME) {
336     snd $port{$slave}, nodes => { $node => $addresses };
337 root 1.9 }
338 root 1.8 },
339 root 1.17 nodes => sub {
340 root 1.8 my ($kv) = @_;
341    
342     use JSON::XS;#d#
343     my $kv_txt = JSON::XS->new->encode ($kv);#d#
344     $AnyEvent::MP::Kernel::WARN->(9, "$node told us it knows about $kv_txt.");#d#
345    
346     while (my ($id, $addresses) = each %$kv) {
347     my $node = AnyEvent::MP::Kernel::add_node $id;
348     $node->connect (@$addresses);
349     start_node $id;
350 root 1.4 }
351     },
352 root 1.35 set => sub {
353     set_groups $node, shift;
354     },
355 root 1.16 find => sub {
356 root 1.8 my ($othernode) = @_;
357    
358     $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to find $othernode.");
359 root 1.17 snd $port{$node}, nodes => { $othernode => $addr{$othernode} }
360 root 1.8 if $addr{$othernode};
361     },
362 root 1.20 reg0 => sub {
363     _change $_[0], [], [$_[1]];
364     },
365     reg1 => sub {
366     _change $_[0], [$_[1]], [];
367     },
368 root 1.26
369     # some node asks us to provide network updates
370     seedme0 => sub {
371     $AnyEvent::MP::Kernel::WARN->(0, "$node asked us to NOT seed it.");#d#
372     delete $SEEDME{$node};
373     },
374     seedme1 => sub {
375     $AnyEvent::MP::Kernel::WARN->(0, "$node asked us to seed it.");#d#
376     $SEEDME{$node} = ();
377    
378     # for good measure
379     snd $port{$node}, nodes => \%addr if %addr;
380     },
381 root 1.4 ;
382 root 1.3 }
383 root 1.1
384 root 1.29 sub set_master($) {
385     return if $MASTER eq $_[0];
386    
387     snd $port{$MASTER}, "seedme0"
388     if $MASTER && node_is_up $MASTER;
389    
390     $MASTER = $_[0];
391    
392     if ($MASTER) {
393     snd $port{$MASTER}, "seedme1";
394     $AnyEvent::MP::Kernel::WARN->(7, "selected new master: $MASTER.");
395     } else {
396     $AnyEvent::MP::Kernel::WARN->(1, "no contact to any other node.");
397     }
398     }
399    
400 root 1.1 sub mon_node {
401 root 1.8 my ($node, $is_up) = @_;
402 root 1.1
403     if ($is_up) {
404 root 1.8 ++$nodecnt;
405     start_node $node;
406 root 1.26
407 root 1.29 if (node_is_seed $node) {
408     if (node_is_seed $MASTER) {
409     my @SEEDS = up_seeds;
410    
411     # switching here with lower chance roughly hopefully still gives us
412     # an equal selection.
413     set_master $node
414     if 1 < rand @SEEDS;
415     } else {
416     # a seed always beats a non-seed
417     set_master $node;
418     }
419     }
420 root 1.26 }
421    
422 root 1.29 # select a new(?) master, if required
423     unless ($MASTER and node_is_up $MASTER) {
424     if (my @SEEDS = up_seeds) {
425     set_master $SEEDS[rand @SEEDS];
426 root 1.26 } else {
427 root 1.29 # select "last" non-seed node
428     set_master +(sort +up_nodes)[-1];
429 root 1.26 }
430     }
431    
432     unless ($is_up) {
433 root 1.8 --$nodecnt;
434     more_seeding unless $nodecnt;
435     unreg_groups $node;
436    
437     # forget about the node
438     delete $addr{$node};
439 root 1.26
440     # ask our master for quick recovery
441     snd $port{$MASTER}, find => $node
442     if $MASTER;
443 root 1.1 }
444     }
445    
446     mon_node $_, 1
447     for up_nodes;
448    
449     mon_nodes \&mon_node;
450    
451     =back
452    
453     =head1 SEE ALSO
454    
455     L<AnyEvent::MP>.
456    
457     =head1 AUTHOR
458    
459     Marc Lehmann <schmorp@schmorp.de>
460     http://home.schmorp.de/
461    
462     =cut
463    
464     1
465