ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.27
Committed: Wed Sep 9 21:05:15 2009 UTC (14 years, 8 months ago) by root
Branch: MAIN
Changes since 1.26: +11 -0 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Global - some network-global services
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Global;
8    
9     =head1 DESCRIPTION
10    
11 root 1.15 This module maintains a fully-meshed network, if possible, and tries to
12     ensure that we are connected to at least one other node.
13 root 1.1
14 root 1.15 It also manages named port groups - ports can register themselves in any
15     number of groups that will be available network-wide, which is great for
16     discovering services.
17 root 1.1
18 root 1.15 Running it on one node will automatically run it on all nodes, although,
19     at the moment, the global service is started by default anyways.
20 root 1.3
21 root 1.1 =head1 GLOBALS AND FUNCTIONS
22    
23     =over 4
24    
25     =cut
26    
27     package AnyEvent::MP::Global;
28    
29     use common::sense;
30     use Carp ();
31    
32 root 1.8 use AnyEvent ();
33 root 1.4 use AnyEvent::Util ();
34    
35 root 1.1 use AnyEvent::MP;
36     use AnyEvent::MP::Kernel;
37 root 1.22 use AnyEvent::MP::Transport ();
38 root 1.1
39 root 1.20 use base "Exporter";
40    
41     our @EXPORT = qw(
42     grp_reg
43     grp_get
44     grp_mon
45     );
46    
47 root 1.1 our $VERSION = $AnyEvent::MP::VERSION;
48    
49 root 1.8 our %addr; # port ID => [address...] mapping
50    
51 root 1.4 our %port; # our rendezvous port on the other side
52     our %lreg; # local registry, name => [pid...]
53 root 1.20 our %lmon; # local registry monitoring name,pid => mon
54     our %greg; # global regstry, name => pid => undef
55 root 1.22 our %gmon; # group monitoring, group => [$cb...]
56 root 1.4
57 root 1.8 our $nodecnt;
58    
59 root 1.5 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
60    
61 root 1.8 #############################################################################
62     # seednodes
63    
64 root 1.26 our $MASTER; # our current master (which we regularly query for net updates)
65    
66     our %SEEDME; # $node => $port
67 root 1.8 our @SEEDS;
68 root 1.22 our %SEEDS; # just to check whether a seed is a seed
69 root 1.14 our %SEED_CONNECT;
70 root 1.8 our $SEED_WATCHER;
71    
72 root 1.22 push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
73     my $peer = $_[0]{local_greeting}{peeraddr};
74     return unless exists $SEEDS{$peer};
75     $SEED_CONNECT{$peer} = 2;
76     };
77    
78 root 1.27 push @AnyEvent::MP::Transport::HOOK_GREETING, sub {
79     # we rely on untrusted data here (the remote node name)
80     # this is hopefully ok, as we override it on successful
81     # connects, and this can at most be used for DOSing,
82     # which is easy when you can do MITM.
83     my $peer = $_[0]{local_greeting}{peeraddr};
84     return unless exists $SEEDS{$peer};
85     $SEEDS{$peer} ||= $_[0]{remote_node};
86     };
87    
88 root 1.22 push @AnyEvent::MP::Transport::HOOK_CONNECTED, sub {
89     my $peer = $_[0]{local_greeting}{peeraddr};
90     return unless exists $SEEDS{$peer};
91     $SEEDS{$peer} = $_[0]{remote_node};
92     };
93    
94     push @AnyEvent::MP::Transport::HOOK_DESTROY, sub {
95     delete $SEED_CONNECT{$_[0]{local_greeting}{peeraddr}};
96 root 1.24
97     # check if we contacted ourselves, so nuke this seed
98 root 1.25 if (exists $_[0]{seed} && $_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
99     # $AnyEvent::MP::Kernel::WARN->(0,"avoiding seed $_[0]{seed}\n");#d#
100     delete $SEEDS{$_[0]{seed}};
101 root 1.24 }
102 root 1.22 };
103    
104 root 1.8 sub seed_connect {
105     my ($seed) = @_;
106    
107     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
108     or Carp::croak "$seed: unparsable seed address";
109    
110 root 1.22 return if $SEED_CONNECT{$seed};
111 root 1.27 return if defined $SEEDS{$seed} && node_is_up $SEEDS{$seed};
112 root 1.22
113     $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
114    
115 root 1.8 # ughhh
116 root 1.14 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect $host, $port,
117     seed => $seed,
118 root 1.8 sub {
119 root 1.22 $SEED_CONNECT{$seed} = 1;
120 root 1.8 },
121     ;
122     }
123    
124     sub more_seeding {
125 root 1.22 my $int = List::Util::max 1,
126     $AnyEvent::MP::Kernel::CONFIG->{connect_interval}
127     * ($nodecnt ? keys %AnyEvent::MP::Kernel::NODE : 1)
128     - rand;
129    
130     $SEED_WATCHER = AE::timer $int, 0, \&more_seeding;
131    
132     @SEEDS = keys %SEEDS unless @SEEDS;
133 root 1.8 return unless @SEEDS;
134    
135 root 1.22 seed_connect splice @SEEDS, rand @SEEDS, 1;
136 root 1.8 }
137    
138     sub set_seeds(@) {
139 root 1.22 @SEEDS{@_} = ();
140 root 1.8
141     $SEED_WATCHER ||= AE::timer 5, $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, \&more_seeding;
142    
143 root 1.22 for (1 .. keys %SEEDS) {
144     after 0.100 * rand, \&more_seeding;
145 root 1.19 }
146 root 1.8 }
147    
148 root 1.26 sub up_seeds() {
149     grep node_is_up $_, values %SEEDS
150     }
151    
152     sub node_is_seed($) {
153     grep $_ eq $_[0], grep defined, values %SEEDS
154     }
155    
156 root 1.22 # returns all (up) seed nodes, or all nodes if no seednodes are up/known
157 root 1.26 sub route_nodes {
158     my @seeds = up_seeds;
159 root 1.22 @seeds = up_nodes unless @seeds;
160     @seeds
161     }
162    
163 root 1.8 #############################################################################
164    
165 root 1.20 sub _change {
166     my ($group, $add, $del) = @_;
167    
168     my $kv = $greg{$group} ||= {};
169    
170     delete @$kv{@$del};
171     @$kv{@$add} = ();
172    
173     my $ports = [keys %$kv];
174     $_->($ports, $add, $del) for @{ $gmon{$group} };
175     }
176    
177 root 1.4 sub unreg_groups($) {
178 root 1.8 my ($node) = @_;
179 root 1.4
180 root 1.8 my $qr = qr/^\Q$node\E(?:#|$)/;
181 root 1.20 my @del;
182 root 1.4
183 root 1.20 while (my ($group, $ports) = each %greg) {
184     @del = grep /$qr/, keys %$ports;
185     _change $group, [], \@del
186     if @del;
187 root 1.4 }
188     }
189    
190     sub set_groups($$) {
191 root 1.8 my ($node, $lreg) = @_;
192 root 1.5
193 root 1.6 while (my ($k, $v) = each %$lreg) {
194 root 1.20 _change $k, $v, [];
195 root 1.6 }
196 root 1.4 }
197    
198 root 1.20 =item $guard = grp_reg $group, $port
199 root 1.4
200     Register the given (local!) port in the named global group C<$group>.
201    
202     The port will be unregistered automatically when the port is destroyed.
203    
204     When not called in void context, then a guard object will be returned that
205     will also cause the name to be unregistered when destroyed.
206    
207     =cut
208    
209     # unregister local port
210     sub unregister {
211     my ($port, $group) = @_;
212    
213     delete $lmon{"$group\x00$port"};
214     @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
215    
216 root 1.20 _change $group, [], [$port];
217 root 1.4
218 root 1.20 snd $_, reg0 => $group, $port
219 root 1.4 for values %port;
220     }
221    
222     # register local port
223 root 1.20 sub grp_reg($$) {
224     my ($group, $port) = @_;
225 root 1.4
226     port_is_local $port
227 root 1.20 or Carp::croak "AnyEvent::MP::Global::grp_reg can only be called for local ports, caught";
228    
229     grep $_ eq $port, @{ $lreg{$group} }
230     and Carp::croak "'$group': group already registered, cannot register a second time";
231 root 1.4
232     $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
233     push @{ $lreg{$group} }, $port;
234    
235 root 1.20 snd $_, reg1 => $group, $port
236 root 1.4 for values %port;
237    
238 root 1.20 _change $group, [$port], [];
239 root 1.4
240     wantarray && AnyEvent::Util::guard { unregister $port, $group }
241     }
242    
243 root 1.20 =item $ports = grp_get $group
244 root 1.15
245     Returns all the ports currently registered to the given group (as
246 root 1.20 read-only(!) array reference). When the group has no registered members,
247 root 1.15 return C<undef>.
248    
249     =cut
250    
251 root 1.20 sub grp_get($) {
252     my @ports = keys %{ $greg{$_[0]} };
253     @ports ? \@ports : undef
254     }
255    
256     =item $guard = grp_mon $group, $callback->($ports, $add, $del)
257    
258     Installs a monitor on the given group. Each time there is a change it
259     will be called with the current group members as an arrayref as the
260     first argument. The second argument only contains ports added, the third
261     argument only ports removed.
262    
263     Unlike C<grp_get>, all three arguments will always be array-refs, even if
264     the array is empty. None of the arrays must be modified in any way.
265    
266     The first invocation will be with the first two arguments set to the
267     current members, as if all of them were just added, but only when the
268     group is atcually non-empty.
269    
270     Optionally returns a guard object that uninstalls the watcher when it is
271     destroyed.
272    
273     =cut
274    
275     sub grp_mon($$) {
276     my ($grp, $cb) = @_;
277    
278     AnyEvent::MP::Kernel::delay sub {
279     return unless $cb;
280    
281     push @{ $gmon{$grp} }, $cb;
282     $cb->(((grp_get $grp) || return) x 2, []);
283     };
284    
285     defined wantarray && AnyEvent::Util::::guard {
286     my @mon = grep $_ != $cb, @{ delete $gmon{$grp} };
287     $gmon{$grp} = \@mon if @mon;
288     undef $cb;
289     }
290 root 1.15 }
291    
292 root 1.4 sub start_node {
293 root 1.8 my ($node) = @_;
294 root 1.4
295 root 1.8 return if exists $port{$node};
296     return if $node eq $NODE; # do not connect to ourselves
297 root 1.4
298     # establish connection
299 root 1.8 my $port = $port{$node} = spawn $node, "AnyEvent::MP::Global::connect", 0, $NODE;
300    
301 root 1.4 mon $port, sub {
302 root 1.8 unreg_groups $node;
303     delete $port{$node};
304 root 1.4 };
305 root 1.8
306 root 1.10 snd $port, addr => $AnyEvent::MP::Kernel::LISTENER;
307 root 1.17 snd $port, nodes => \%addr if %addr;
308 root 1.8 snd $port, set => \%lreg if %lreg;
309 root 1.4 }
310 root 1.3
311 root 1.5 # other nodes connect via this
312 root 1.3 sub connect {
313 root 1.8 my ($version, $node) = @_;
314 root 1.1
315 root 1.3 # monitor them, silently die
316 root 1.26 mon $node, psub {
317     delete $SEEDME{$node};
318     kil $SELF;
319     };
320 root 1.3
321 root 1.4 rcv $SELF,
322 root 1.8 addr => sub {
323     my $addresses = shift;
324 root 1.14 $AnyEvent::MP::Kernel::WARN->(9, "$node told us its addresses (@$addresses).");
325 root 1.8 $addr{$node} = $addresses;
326 root 1.9
327 root 1.26 for my $node (values %SEEDME) {
328     my $port = $port{$node};
329     snd $port, nodes => { $node => $addresses };
330 root 1.9 }
331 root 1.8 },
332 root 1.17 nodes => sub {
333 root 1.8 my ($kv) = @_;
334    
335     use JSON::XS;#d#
336     my $kv_txt = JSON::XS->new->encode ($kv);#d#
337     $AnyEvent::MP::Kernel::WARN->(9, "$node told us it knows about $kv_txt.");#d#
338    
339     while (my ($id, $addresses) = each %$kv) {
340     my $node = AnyEvent::MP::Kernel::add_node $id;
341     $node->connect (@$addresses);
342     start_node $id;
343 root 1.4 }
344     },
345 root 1.16 find => sub {
346 root 1.8 my ($othernode) = @_;
347    
348     $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to find $othernode.");
349 root 1.17 snd $port{$node}, nodes => { $othernode => $addr{$othernode} }
350 root 1.8 if $addr{$othernode};
351     },
352 root 1.4 set => sub {
353 root 1.8 set_groups $node, shift;
354 root 1.4 },
355 root 1.20 reg0 => sub {
356     _change $_[0], [], [$_[1]];
357     },
358     reg1 => sub {
359     _change $_[0], [$_[1]], [];
360     },
361 root 1.26
362     # some node asks us to provide network updates
363     seedme0 => sub {
364     $AnyEvent::MP::Kernel::WARN->(0, "$node asked us to NOT seed it.");#d#
365     delete $SEEDME{$node};
366     },
367     seedme1 => sub {
368     $AnyEvent::MP::Kernel::WARN->(0, "$node asked us to seed it.");#d#
369     $SEEDME{$node} = ();
370    
371     # for good measure
372     snd $port{$node}, nodes => \%addr if %addr;
373     },
374 root 1.4 ;
375 root 1.3 }
376 root 1.1
377     sub mon_node {
378 root 1.8 my ($node, $is_up) = @_;
379 root 1.1
380     if ($is_up) {
381 root 1.8 ++$nodecnt;
382     start_node $node;
383 root 1.26 }
384    
385     # now select a new(?) master
386     my $master;
387    
388     if (my @SEEDS = up_seeds) {
389     # switching here with lower chance roughly hopefully still gives us
390     # an equal selection.
391     $master = node_is_up $MASTER && node_is_seed $MASTER && 1 < rand @SEEDS
392     ? $MASTER
393     : $SEEDS[rand @SEEDS];
394 root 1.2 } else {
395 root 1.26 # select "last" non-seed node
396     $master = (sort +up_nodes)[-1];
397     #TODO maybe avoid listener-less nodes?
398     }
399    
400     if ($MASTER ne $master) {
401     snd $port{$MASTER}, "seedme0"
402     if $MASTER && node_is_up $MASTER;
403    
404     $MASTER = $master;
405     if ($MASTER) {
406     snd $port{$MASTER}, "seedme1";
407     $AnyEvent::MP::Kernel::WARN->(7, "selected new master: $MASTER.");
408     } else {
409     $AnyEvent::MP::Kernel::WARN->(1, "no contact to any other node.");
410     }
411     }
412    
413     unless ($is_up) {
414 root 1.8 --$nodecnt;
415     more_seeding unless $nodecnt;
416     unreg_groups $node;
417    
418     # forget about the node
419     delete $addr{$node};
420 root 1.26
421     # ask our master for quick recovery
422     snd $port{$MASTER}, find => $node
423     if $MASTER;
424 root 1.1 }
425     }
426    
427     mon_node $_, 1
428     for up_nodes;
429    
430     mon_nodes \&mon_node;
431    
432     =back
433    
434     =head1 SEE ALSO
435    
436     L<AnyEvent::MP>.
437    
438     =head1 AUTHOR
439    
440     Marc Lehmann <schmorp@schmorp.de>
441     http://home.schmorp.de/
442    
443     =cut
444    
445     1
446