ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.22
Committed: Tue Sep 8 01:38:16 2009 UTC (14 years, 8 months ago) by root
Branch: MAIN
Changes since 1.21: +43 -8 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Global - some network-global services
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Global;
8    
9     =head1 DESCRIPTION
10    
11 root 1.15 This module maintains a fully-meshed network, if possible, and tries to
12     ensure that we are connected to at least one other node.
13 root 1.1
14 root 1.15 It also manages named port groups - ports can register themselves in any
15     number of groups that will be available network-wide, which is great for
16     discovering services.
17 root 1.1
18 root 1.15 Running it on one node will automatically run it on all nodes, although,
19     at the moment, the global service is started by default anyways.
20 root 1.3
21 root 1.1 =head1 GLOBALS AND FUNCTIONS
22    
23     =over 4
24    
25     =cut
26    
27     package AnyEvent::MP::Global;
28    
29     use common::sense;
30     use Carp ();
31    
32 root 1.8 use AnyEvent ();
33 root 1.4 use AnyEvent::Util ();
34    
35 root 1.1 use AnyEvent::MP;
36     use AnyEvent::MP::Kernel;
37 root 1.22 use AnyEvent::MP::Transport ();
38 root 1.1
39 root 1.20 use base "Exporter";
40    
41     our @EXPORT = qw(
42     grp_reg
43     grp_get
44     grp_mon
45     );
46    
47 root 1.1 our $VERSION = $AnyEvent::MP::VERSION;
48    
49 root 1.8 our %addr; # port ID => [address...] mapping
50    
51 root 1.4 our %port; # our rendezvous port on the other side
52     our %lreg; # local registry, name => [pid...]
53 root 1.20 our %lmon; # local registry monitoring name,pid => mon
54     our %greg; # global regstry, name => pid => undef
55 root 1.22 our %gmon; # group monitoring, group => [$cb...]
56 root 1.4
57 root 1.8 our $nodecnt;
58    
59 root 1.5 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
60    
61 root 1.8 #############################################################################
62     # seednodes
63    
64     our @SEEDS;
65 root 1.22 our %SEEDS; # just to check whether a seed is a seed
66 root 1.14 our %SEED_CONNECT;
67 root 1.8 our $SEED_WATCHER;
68    
69 root 1.22 push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
70     my $peer = $_[0]{local_greeting}{peeraddr};
71     return unless exists $SEEDS{$peer};
72     $SEED_CONNECT{$peer} = 2;
73     };
74    
75     push @AnyEvent::MP::Transport::HOOK_CONNECTED, sub {
76     my $peer = $_[0]{local_greeting}{peeraddr};
77     return unless exists $SEEDS{$peer};
78     $SEEDS{$peer} = $_[0]{remote_node};
79     };
80    
81     push @AnyEvent::MP::Transport::HOOK_DESTROY, sub {
82     delete $SEED_CONNECT{$_[0]{local_greeting}{peeraddr}};
83     };
84    
85 root 1.8 sub seed_connect {
86     my ($seed) = @_;
87    
88     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
89     or Carp::croak "$seed: unparsable seed address";
90    
91 root 1.22 return if $SEED_CONNECT{$seed};
92    
93     $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
94    
95 root 1.8 # ughhh
96 root 1.14 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect $host, $port,
97     seed => $seed,
98 root 1.8 sub {
99 root 1.22 $SEED_CONNECT{$seed} = 1;
100 root 1.8 },
101     ;
102     }
103    
104     sub more_seeding {
105 root 1.22 my $int = List::Util::max 1,
106     $AnyEvent::MP::Kernel::CONFIG->{connect_interval}
107     * ($nodecnt ? keys %AnyEvent::MP::Kernel::NODE : 1)
108     - rand;
109    
110     $SEED_WATCHER = AE::timer $int, 0, \&more_seeding;
111    
112     @SEEDS = keys %SEEDS unless @SEEDS;
113 root 1.8 return unless @SEEDS;
114    
115     $AnyEvent::MP::Kernel::WARN->(9, "no nodes connected, seeding.");
116    
117 root 1.22 seed_connect splice @SEEDS, rand @SEEDS, 1;
118 root 1.8 }
119    
120 root 1.14 sub avoid_seed($) {
121     @SEEDS = grep $_ ne $_[0], @SEEDS;
122     }
123    
124 root 1.8 sub set_seeds(@) {
125 root 1.22 @SEEDS{@_} = ();
126 root 1.8
127     $SEED_WATCHER ||= AE::timer 5, $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, \&more_seeding;
128    
129 root 1.22 for (1 .. keys %SEEDS) {
130     after 0.100 * rand, \&more_seeding;
131 root 1.19 }
132 root 1.8 }
133    
134 root 1.22 # returns all (up) seed nodes, or all nodes if no seednodes are up/known
135     sub _route_nodes {
136     my @seeds = grep node_is_up $_, values %SEEDS;
137     @seeds = up_nodes unless @seeds;
138     @seeds
139     }
140    
141 root 1.8 #############################################################################
142    
143 root 1.20 sub _change {
144     my ($group, $add, $del) = @_;
145    
146     my $kv = $greg{$group} ||= {};
147    
148     delete @$kv{@$del};
149     @$kv{@$add} = ();
150    
151     my $ports = [keys %$kv];
152     $_->($ports, $add, $del) for @{ $gmon{$group} };
153     }
154    
155 root 1.4 sub unreg_groups($) {
156 root 1.8 my ($node) = @_;
157 root 1.4
158 root 1.8 my $qr = qr/^\Q$node\E(?:#|$)/;
159 root 1.20 my @del;
160 root 1.4
161 root 1.20 while (my ($group, $ports) = each %greg) {
162     @del = grep /$qr/, keys %$ports;
163     _change $group, [], \@del
164     if @del;
165 root 1.4 }
166     }
167    
168     sub set_groups($$) {
169 root 1.8 my ($node, $lreg) = @_;
170 root 1.5
171 root 1.6 while (my ($k, $v) = each %$lreg) {
172 root 1.20 _change $k, $v, [];
173 root 1.6 }
174 root 1.4 }
175    
176 root 1.20 =item $guard = grp_reg $group, $port
177 root 1.4
178     Register the given (local!) port in the named global group C<$group>.
179    
180     The port will be unregistered automatically when the port is destroyed.
181    
182     When not called in void context, then a guard object will be returned that
183     will also cause the name to be unregistered when destroyed.
184    
185     =cut
186    
187     # unregister local port
188     sub unregister {
189     my ($port, $group) = @_;
190    
191     delete $lmon{"$group\x00$port"};
192     @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
193    
194 root 1.20 _change $group, [], [$port];
195 root 1.4
196 root 1.20 snd $_, reg0 => $group, $port
197 root 1.4 for values %port;
198     }
199    
200     # register local port
201 root 1.20 sub grp_reg($$) {
202     my ($group, $port) = @_;
203 root 1.4
204     port_is_local $port
205 root 1.20 or Carp::croak "AnyEvent::MP::Global::grp_reg can only be called for local ports, caught";
206    
207     grep $_ eq $port, @{ $lreg{$group} }
208     and Carp::croak "'$group': group already registered, cannot register a second time";
209 root 1.4
210     $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
211     push @{ $lreg{$group} }, $port;
212    
213 root 1.20 snd $_, reg1 => $group, $port
214 root 1.4 for values %port;
215    
216 root 1.20 _change $group, [$port], [];
217 root 1.4
218     wantarray && AnyEvent::Util::guard { unregister $port, $group }
219     }
220    
221 root 1.20 =item $ports = grp_get $group
222 root 1.15
223     Returns all the ports currently registered to the given group (as
224 root 1.20 read-only(!) array reference). When the group has no registered members,
225 root 1.15 return C<undef>.
226    
227     =cut
228    
229 root 1.20 sub grp_get($) {
230     my @ports = keys %{ $greg{$_[0]} };
231     @ports ? \@ports : undef
232     }
233    
234     =item $guard = grp_mon $group, $callback->($ports, $add, $del)
235    
236     Installs a monitor on the given group. Each time there is a change it
237     will be called with the current group members as an arrayref as the
238     first argument. The second argument only contains ports added, the third
239     argument only ports removed.
240    
241     Unlike C<grp_get>, all three arguments will always be array-refs, even if
242     the array is empty. None of the arrays must be modified in any way.
243    
244     The first invocation will be with the first two arguments set to the
245     current members, as if all of them were just added, but only when the
246     group is atcually non-empty.
247    
248     Optionally returns a guard object that uninstalls the watcher when it is
249     destroyed.
250    
251     =cut
252    
253     sub grp_mon($$) {
254     my ($grp, $cb) = @_;
255    
256     AnyEvent::MP::Kernel::delay sub {
257     return unless $cb;
258    
259     push @{ $gmon{$grp} }, $cb;
260     $cb->(((grp_get $grp) || return) x 2, []);
261     };
262    
263     defined wantarray && AnyEvent::Util::::guard {
264     my @mon = grep $_ != $cb, @{ delete $gmon{$grp} };
265     $gmon{$grp} = \@mon if @mon;
266     undef $cb;
267     }
268 root 1.15 }
269    
270 root 1.4 sub start_node {
271 root 1.8 my ($node) = @_;
272 root 1.4
273 root 1.8 return if exists $port{$node};
274     return if $node eq $NODE; # do not connect to ourselves
275 root 1.4
276     # establish connection
277 root 1.8 my $port = $port{$node} = spawn $node, "AnyEvent::MP::Global::connect", 0, $NODE;
278    
279 root 1.4 mon $port, sub {
280 root 1.8 unreg_groups $node;
281     delete $port{$node};
282 root 1.4 };
283 root 1.8
284 root 1.10 snd $port, addr => $AnyEvent::MP::Kernel::LISTENER;
285 root 1.17 snd $port, nodes => \%addr if %addr;
286 root 1.8 snd $port, set => \%lreg if %lreg;
287 root 1.4 }
288 root 1.3
289 root 1.5 # other nodes connect via this
290 root 1.3 sub connect {
291 root 1.8 my ($version, $node) = @_;
292 root 1.1
293 root 1.3 # monitor them, silently die
294 root 1.8 mon $node, psub { kil $SELF };
295 root 1.3
296 root 1.4 rcv $SELF,
297 root 1.8 addr => sub {
298     my $addresses = shift;
299 root 1.14 $AnyEvent::MP::Kernel::WARN->(9, "$node told us its addresses (@$addresses).");
300 root 1.8 $addr{$node} = $addresses;
301 root 1.9
302     # to help listener-less nodes, we broadcast new addresses to them unconditionally
303     #TODO: should be done by a node finding out about a listener-less one
304     if (@$addresses) {
305 root 1.18 for my $other (values %AnyEvent::MP::Kernel::NODE) {
306 root 1.9 if ($other->{transport}) {
307 root 1.12 if ($addr{$other->{id}} && !@{ $addr{$other->{id}} }) {
308     $AnyEvent::MP::Kernel::WARN->(9, "helping $other->{id} to find $node.");
309 root 1.17 snd $port{$other->{id}}, nodes => { $node => $addresses };
310 root 1.9 }
311     }
312     }
313     }
314 root 1.8 },
315 root 1.17 nodes => sub {
316 root 1.8 my ($kv) = @_;
317    
318     use JSON::XS;#d#
319     my $kv_txt = JSON::XS->new->encode ($kv);#d#
320     $AnyEvent::MP::Kernel::WARN->(9, "$node told us it knows about $kv_txt.");#d#
321    
322     while (my ($id, $addresses) = each %$kv) {
323     my $node = AnyEvent::MP::Kernel::add_node $id;
324     $node->connect (@$addresses);
325     start_node $id;
326 root 1.4 }
327     },
328 root 1.16 find => sub {
329 root 1.8 my ($othernode) = @_;
330    
331     $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to find $othernode.");
332 root 1.17 snd $port{$node}, nodes => { $othernode => $addr{$othernode} }
333 root 1.8 if $addr{$othernode};
334     },
335 root 1.4 set => sub {
336 root 1.8 set_groups $node, shift;
337 root 1.4 },
338 root 1.20 reg0 => sub {
339     _change $_[0], [], [$_[1]];
340     },
341     reg1 => sub {
342     _change $_[0], [$_[1]], [];
343     },
344 root 1.4 ;
345 root 1.3 }
346 root 1.1
347     sub mon_node {
348 root 1.8 my ($node, $is_up) = @_;
349 root 1.1
350     if ($is_up) {
351 root 1.8 ++$nodecnt;
352     start_node $node;
353 root 1.2 } else {
354 root 1.8 --$nodecnt;
355     more_seeding unless $nodecnt;
356     unreg_groups $node;
357    
358     # forget about the node
359     delete $addr{$node};
360     # ask other nodes if they know the node
361 root 1.16 snd $_, find => $node
362 root 1.8 for values %port;
363 root 1.1 }
364 root 1.8 #warn "node<$node,$is_up>\n";#d#
365 root 1.1 }
366    
367     mon_node $_, 1
368     for up_nodes;
369    
370     mon_nodes \&mon_node;
371    
372     =back
373    
374     =head1 SEE ALSO
375    
376     L<AnyEvent::MP>.
377    
378     =head1 AUTHOR
379    
380     Marc Lehmann <schmorp@schmorp.de>
381     http://home.schmorp.de/
382    
383     =cut
384    
385     1
386