ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.23
Committed: Tue Sep 8 01:42:14 2009 UTC (14 years, 8 months ago) by root
Branch: MAIN
Changes since 1.22: +1 -3 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Global - some network-global services
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Global;
8    
9     =head1 DESCRIPTION
10    
11 root 1.15 This module maintains a fully-meshed network, if possible, and tries to
12     ensure that we are connected to at least one other node.
13 root 1.1
14 root 1.15 It also manages named port groups - ports can register themselves in any
15     number of groups that will be available network-wide, which is great for
16     discovering services.
17 root 1.1
18 root 1.15 Running it on one node will automatically run it on all nodes, although,
19     at the moment, the global service is started by default anyways.
20 root 1.3
21 root 1.1 =head1 GLOBALS AND FUNCTIONS
22    
23     =over 4
24    
25     =cut
26    
27     package AnyEvent::MP::Global;
28    
29     use common::sense;
30     use Carp ();
31    
32 root 1.8 use AnyEvent ();
33 root 1.4 use AnyEvent::Util ();
34    
35 root 1.1 use AnyEvent::MP;
36     use AnyEvent::MP::Kernel;
37 root 1.22 use AnyEvent::MP::Transport ();
38 root 1.1
39 root 1.20 use base "Exporter";
40    
41     our @EXPORT = qw(
42     grp_reg
43     grp_get
44     grp_mon
45     );
46    
47 root 1.1 our $VERSION = $AnyEvent::MP::VERSION;
48    
49 root 1.8 our %addr; # port ID => [address...] mapping
50    
51 root 1.4 our %port; # our rendezvous port on the other side
52     our %lreg; # local registry, name => [pid...]
53 root 1.20 our %lmon; # local registry monitoring name,pid => mon
54     our %greg; # global regstry, name => pid => undef
55 root 1.22 our %gmon; # group monitoring, group => [$cb...]
56 root 1.4
57 root 1.8 our $nodecnt;
58    
59 root 1.5 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
60    
61 root 1.8 #############################################################################
62     # seednodes
63    
64     our @SEEDS;
65 root 1.22 our %SEEDS; # just to check whether a seed is a seed
66 root 1.14 our %SEED_CONNECT;
67 root 1.8 our $SEED_WATCHER;
68    
69 root 1.22 push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
70     my $peer = $_[0]{local_greeting}{peeraddr};
71     return unless exists $SEEDS{$peer};
72     $SEED_CONNECT{$peer} = 2;
73     };
74    
75     push @AnyEvent::MP::Transport::HOOK_CONNECTED, sub {
76     my $peer = $_[0]{local_greeting}{peeraddr};
77     return unless exists $SEEDS{$peer};
78     $SEEDS{$peer} = $_[0]{remote_node};
79     };
80    
81     push @AnyEvent::MP::Transport::HOOK_DESTROY, sub {
82     delete $SEED_CONNECT{$_[0]{local_greeting}{peeraddr}};
83     };
84    
85 root 1.8 sub seed_connect {
86     my ($seed) = @_;
87    
88     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
89     or Carp::croak "$seed: unparsable seed address";
90    
91 root 1.22 return if $SEED_CONNECT{$seed};
92    
93     $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
94    
95 root 1.8 # ughhh
96 root 1.14 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect $host, $port,
97     seed => $seed,
98 root 1.8 sub {
99 root 1.22 $SEED_CONNECT{$seed} = 1;
100 root 1.8 },
101     ;
102     }
103    
104     sub more_seeding {
105 root 1.22 my $int = List::Util::max 1,
106     $AnyEvent::MP::Kernel::CONFIG->{connect_interval}
107     * ($nodecnt ? keys %AnyEvent::MP::Kernel::NODE : 1)
108     - rand;
109    
110     $SEED_WATCHER = AE::timer $int, 0, \&more_seeding;
111    
112     @SEEDS = keys %SEEDS unless @SEEDS;
113 root 1.8 return unless @SEEDS;
114    
115 root 1.22 seed_connect splice @SEEDS, rand @SEEDS, 1;
116 root 1.8 }
117    
118 root 1.14 sub avoid_seed($) {
119 root 1.23 delete $SEEDS{$_[0]};
120 root 1.14 }
121    
122 root 1.8 sub set_seeds(@) {
123 root 1.22 @SEEDS{@_} = ();
124 root 1.8
125     $SEED_WATCHER ||= AE::timer 5, $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, \&more_seeding;
126    
127 root 1.22 for (1 .. keys %SEEDS) {
128     after 0.100 * rand, \&more_seeding;
129 root 1.19 }
130 root 1.8 }
131    
132 root 1.22 # returns all (up) seed nodes, or all nodes if no seednodes are up/known
133     sub _route_nodes {
134     my @seeds = grep node_is_up $_, values %SEEDS;
135     @seeds = up_nodes unless @seeds;
136     @seeds
137     }
138    
139 root 1.8 #############################################################################
140    
141 root 1.20 sub _change {
142     my ($group, $add, $del) = @_;
143    
144     my $kv = $greg{$group} ||= {};
145    
146     delete @$kv{@$del};
147     @$kv{@$add} = ();
148    
149     my $ports = [keys %$kv];
150     $_->($ports, $add, $del) for @{ $gmon{$group} };
151     }
152    
153 root 1.4 sub unreg_groups($) {
154 root 1.8 my ($node) = @_;
155 root 1.4
156 root 1.8 my $qr = qr/^\Q$node\E(?:#|$)/;
157 root 1.20 my @del;
158 root 1.4
159 root 1.20 while (my ($group, $ports) = each %greg) {
160     @del = grep /$qr/, keys %$ports;
161     _change $group, [], \@del
162     if @del;
163 root 1.4 }
164     }
165    
166     sub set_groups($$) {
167 root 1.8 my ($node, $lreg) = @_;
168 root 1.5
169 root 1.6 while (my ($k, $v) = each %$lreg) {
170 root 1.20 _change $k, $v, [];
171 root 1.6 }
172 root 1.4 }
173    
174 root 1.20 =item $guard = grp_reg $group, $port
175 root 1.4
176     Register the given (local!) port in the named global group C<$group>.
177    
178     The port will be unregistered automatically when the port is destroyed.
179    
180     When not called in void context, then a guard object will be returned that
181     will also cause the name to be unregistered when destroyed.
182    
183     =cut
184    
185     # unregister local port
186     sub unregister {
187     my ($port, $group) = @_;
188    
189     delete $lmon{"$group\x00$port"};
190     @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
191    
192 root 1.20 _change $group, [], [$port];
193 root 1.4
194 root 1.20 snd $_, reg0 => $group, $port
195 root 1.4 for values %port;
196     }
197    
198     # register local port
199 root 1.20 sub grp_reg($$) {
200     my ($group, $port) = @_;
201 root 1.4
202     port_is_local $port
203 root 1.20 or Carp::croak "AnyEvent::MP::Global::grp_reg can only be called for local ports, caught";
204    
205     grep $_ eq $port, @{ $lreg{$group} }
206     and Carp::croak "'$group': group already registered, cannot register a second time";
207 root 1.4
208     $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
209     push @{ $lreg{$group} }, $port;
210    
211 root 1.20 snd $_, reg1 => $group, $port
212 root 1.4 for values %port;
213    
214 root 1.20 _change $group, [$port], [];
215 root 1.4
216     wantarray && AnyEvent::Util::guard { unregister $port, $group }
217     }
218    
219 root 1.20 =item $ports = grp_get $group
220 root 1.15
221     Returns all the ports currently registered to the given group (as
222 root 1.20 read-only(!) array reference). When the group has no registered members,
223 root 1.15 return C<undef>.
224    
225     =cut
226    
227 root 1.20 sub grp_get($) {
228     my @ports = keys %{ $greg{$_[0]} };
229     @ports ? \@ports : undef
230     }
231    
232     =item $guard = grp_mon $group, $callback->($ports, $add, $del)
233    
234     Installs a monitor on the given group. Each time there is a change it
235     will be called with the current group members as an arrayref as the
236     first argument. The second argument only contains ports added, the third
237     argument only ports removed.
238    
239     Unlike C<grp_get>, all three arguments will always be array-refs, even if
240     the array is empty. None of the arrays must be modified in any way.
241    
242     The first invocation will be with the first two arguments set to the
243     current members, as if all of them were just added, but only when the
244     group is atcually non-empty.
245    
246     Optionally returns a guard object that uninstalls the watcher when it is
247     destroyed.
248    
249     =cut
250    
251     sub grp_mon($$) {
252     my ($grp, $cb) = @_;
253    
254     AnyEvent::MP::Kernel::delay sub {
255     return unless $cb;
256    
257     push @{ $gmon{$grp} }, $cb;
258     $cb->(((grp_get $grp) || return) x 2, []);
259     };
260    
261     defined wantarray && AnyEvent::Util::::guard {
262     my @mon = grep $_ != $cb, @{ delete $gmon{$grp} };
263     $gmon{$grp} = \@mon if @mon;
264     undef $cb;
265     }
266 root 1.15 }
267    
268 root 1.4 sub start_node {
269 root 1.8 my ($node) = @_;
270 root 1.4
271 root 1.8 return if exists $port{$node};
272     return if $node eq $NODE; # do not connect to ourselves
273 root 1.4
274     # establish connection
275 root 1.8 my $port = $port{$node} = spawn $node, "AnyEvent::MP::Global::connect", 0, $NODE;
276    
277 root 1.4 mon $port, sub {
278 root 1.8 unreg_groups $node;
279     delete $port{$node};
280 root 1.4 };
281 root 1.8
282 root 1.10 snd $port, addr => $AnyEvent::MP::Kernel::LISTENER;
283 root 1.17 snd $port, nodes => \%addr if %addr;
284 root 1.8 snd $port, set => \%lreg if %lreg;
285 root 1.4 }
286 root 1.3
287 root 1.5 # other nodes connect via this
288 root 1.3 sub connect {
289 root 1.8 my ($version, $node) = @_;
290 root 1.1
291 root 1.3 # monitor them, silently die
292 root 1.8 mon $node, psub { kil $SELF };
293 root 1.3
294 root 1.4 rcv $SELF,
295 root 1.8 addr => sub {
296     my $addresses = shift;
297 root 1.14 $AnyEvent::MP::Kernel::WARN->(9, "$node told us its addresses (@$addresses).");
298 root 1.8 $addr{$node} = $addresses;
299 root 1.9
300     # to help listener-less nodes, we broadcast new addresses to them unconditionally
301     #TODO: should be done by a node finding out about a listener-less one
302     if (@$addresses) {
303 root 1.18 for my $other (values %AnyEvent::MP::Kernel::NODE) {
304 root 1.9 if ($other->{transport}) {
305 root 1.12 if ($addr{$other->{id}} && !@{ $addr{$other->{id}} }) {
306     $AnyEvent::MP::Kernel::WARN->(9, "helping $other->{id} to find $node.");
307 root 1.17 snd $port{$other->{id}}, nodes => { $node => $addresses };
308 root 1.9 }
309     }
310     }
311     }
312 root 1.8 },
313 root 1.17 nodes => sub {
314 root 1.8 my ($kv) = @_;
315    
316     use JSON::XS;#d#
317     my $kv_txt = JSON::XS->new->encode ($kv);#d#
318     $AnyEvent::MP::Kernel::WARN->(9, "$node told us it knows about $kv_txt.");#d#
319    
320     while (my ($id, $addresses) = each %$kv) {
321     my $node = AnyEvent::MP::Kernel::add_node $id;
322     $node->connect (@$addresses);
323     start_node $id;
324 root 1.4 }
325     },
326 root 1.16 find => sub {
327 root 1.8 my ($othernode) = @_;
328    
329     $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to find $othernode.");
330 root 1.17 snd $port{$node}, nodes => { $othernode => $addr{$othernode} }
331 root 1.8 if $addr{$othernode};
332     },
333 root 1.4 set => sub {
334 root 1.8 set_groups $node, shift;
335 root 1.4 },
336 root 1.20 reg0 => sub {
337     _change $_[0], [], [$_[1]];
338     },
339     reg1 => sub {
340     _change $_[0], [$_[1]], [];
341     },
342 root 1.4 ;
343 root 1.3 }
344 root 1.1
345     sub mon_node {
346 root 1.8 my ($node, $is_up) = @_;
347 root 1.1
348     if ($is_up) {
349 root 1.8 ++$nodecnt;
350     start_node $node;
351 root 1.2 } else {
352 root 1.8 --$nodecnt;
353     more_seeding unless $nodecnt;
354     unreg_groups $node;
355    
356     # forget about the node
357     delete $addr{$node};
358     # ask other nodes if they know the node
359 root 1.16 snd $_, find => $node
360 root 1.8 for values %port;
361 root 1.1 }
362 root 1.8 #warn "node<$node,$is_up>\n";#d#
363 root 1.1 }
364    
365     mon_node $_, 1
366     for up_nodes;
367    
368     mon_nodes \&mon_node;
369    
370     =back
371    
372     =head1 SEE ALSO
373    
374     L<AnyEvent::MP>.
375    
376     =head1 AUTHOR
377    
378     Marc Lehmann <schmorp@schmorp.de>
379     http://home.schmorp.de/
380    
381     =cut
382    
383     1
384