ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.20
Committed: Sat Sep 5 21:16:59 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.19: +87 -35 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Global - some network-global services
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Global;
8    
9     =head1 DESCRIPTION
10    
11 root 1.15 This module maintains a fully-meshed network, if possible, and tries to
12     ensure that we are connected to at least one other node.
13 root 1.1
14 root 1.15 It also manages named port groups - ports can register themselves in any
15     number of groups that will be available network-wide, which is great for
16     discovering services.
17 root 1.1
18 root 1.15 Running it on one node will automatically run it on all nodes, although,
19     at the moment, the global service is started by default anyways.
20 root 1.3
21 root 1.1 =head1 GLOBALS AND FUNCTIONS
22    
23     =over 4
24    
25     =cut
26    
27     package AnyEvent::MP::Global;
28    
29     use common::sense;
30     use Carp ();
31     use MIME::Base64 ();
32    
33 root 1.8 use AnyEvent ();
34 root 1.4 use AnyEvent::Util ();
35    
36 root 1.1 use AnyEvent::MP;
37     use AnyEvent::MP::Kernel;
38    
39 root 1.20 use base "Exporter";
40    
41     our @EXPORT = qw(
42     grp_reg
43     grp_get
44     grp_mon
45     );
46    
47 root 1.1 our $VERSION = $AnyEvent::MP::VERSION;
48    
49 root 1.8 our %addr; # port ID => [address...] mapping
50    
51 root 1.4 our %port; # our rendezvous port on the other side
52     our %lreg; # local registry, name => [pid...]
53 root 1.20 our %lmon; # local registry monitoring name,pid => mon
54     our %greg; # global regstry, name => pid => undef
55     our %gmon; # group monitorign, group => [$cb...]
56 root 1.4
57 root 1.8 our $nodecnt;
58    
59 root 1.5 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
60    
61 root 1.8 #############################################################################
62     # seednodes
63    
64     our @SEEDS;
65 root 1.14 our %SEED_CONNECT;
66 root 1.8 our $SEED_WATCHER;
67    
68     sub seed_connect {
69     my ($seed) = @_;
70    
71     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
72     or Carp::croak "$seed: unparsable seed address";
73    
74     # ughhh
75 root 1.14 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect $host, $port,
76     seed => $seed,
77 root 1.8 sub {
78 root 1.14 delete $SEED_CONNECT{$seed};
79     after 1, \&more_seeding;
80 root 1.8 },
81     ;
82     }
83    
84     sub more_seeding {
85     return if $nodecnt;
86     return unless @SEEDS;
87    
88     $AnyEvent::MP::Kernel::WARN->(9, "no nodes connected, seeding.");
89    
90     seed_connect $SEEDS[rand @SEEDS];
91     }
92    
93 root 1.14 sub avoid_seed($) {
94     @SEEDS = grep $_ ne $_[0], @SEEDS;
95     }
96    
97 root 1.8 sub set_seeds(@) {
98     @SEEDS = @_;
99    
100     $SEED_WATCHER ||= AE::timer 5, $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, \&more_seeding;
101    
102 root 1.19 for my $seed (@SEEDS) {
103     after 0.100 * rand, sub { seed_connect $seed };
104     }
105 root 1.8 }
106    
107     #############################################################################
108    
109 root 1.20 sub _change {
110     my ($group, $add, $del) = @_;
111    
112     my $kv = $greg{$group} ||= {};
113    
114     delete @$kv{@$del};
115     @$kv{@$add} = ();
116    
117     my $ports = [keys %$kv];
118     $_->($ports, $add, $del) for @{ $gmon{$group} };
119     }
120    
121 root 1.4 sub unreg_groups($) {
122 root 1.8 my ($node) = @_;
123 root 1.4
124 root 1.8 my $qr = qr/^\Q$node\E(?:#|$)/;
125 root 1.20 my @del;
126 root 1.4
127 root 1.20 while (my ($group, $ports) = each %greg) {
128     @del = grep /$qr/, keys %$ports;
129     _change $group, [], \@del
130     if @del;
131 root 1.4 }
132     }
133    
134     sub set_groups($$) {
135 root 1.8 my ($node, $lreg) = @_;
136 root 1.5
137 root 1.6 while (my ($k, $v) = each %$lreg) {
138 root 1.20 _change $k, $v, [];
139 root 1.6 }
140 root 1.4 }
141    
142 root 1.20 =item $guard = grp_reg $group, $port
143 root 1.4
144     Register the given (local!) port in the named global group C<$group>.
145    
146     The port will be unregistered automatically when the port is destroyed.
147    
148     When not called in void context, then a guard object will be returned that
149     will also cause the name to be unregistered when destroyed.
150    
151     =cut
152    
153     # unregister local port
154     sub unregister {
155     my ($port, $group) = @_;
156    
157     delete $lmon{"$group\x00$port"};
158     @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
159    
160 root 1.20 _change $group, [], [$port];
161 root 1.4
162 root 1.20 snd $_, reg0 => $group, $port
163 root 1.4 for values %port;
164     }
165    
166     # register local port
167 root 1.20 sub grp_reg($$) {
168     my ($group, $port) = @_;
169 root 1.4
170     port_is_local $port
171 root 1.20 or Carp::croak "AnyEvent::MP::Global::grp_reg can only be called for local ports, caught";
172    
173     grep $_ eq $port, @{ $lreg{$group} }
174     and Carp::croak "'$group': group already registered, cannot register a second time";
175 root 1.4
176     $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
177     push @{ $lreg{$group} }, $port;
178    
179 root 1.20 snd $_, reg1 => $group, $port
180 root 1.4 for values %port;
181    
182 root 1.20 _change $group, [$port], [];
183 root 1.4
184     wantarray && AnyEvent::Util::guard { unregister $port, $group }
185     }
186    
187 root 1.20 =item $ports = grp_get $group
188 root 1.15
189     Returns all the ports currently registered to the given group (as
190 root 1.20 read-only(!) array reference). When the group has no registered members,
191 root 1.15 return C<undef>.
192    
193     =cut
194    
195 root 1.20 sub grp_get($) {
196     my @ports = keys %{ $greg{$_[0]} };
197     @ports ? \@ports : undef
198     }
199    
200     =item $guard = grp_mon $group, $callback->($ports, $add, $del)
201    
202     Installs a monitor on the given group. Each time there is a change it
203     will be called with the current group members as an arrayref as the
204     first argument. The second argument only contains ports added, the third
205     argument only ports removed.
206    
207     Unlike C<grp_get>, all three arguments will always be array-refs, even if
208     the array is empty. None of the arrays must be modified in any way.
209    
210     The first invocation will be with the first two arguments set to the
211     current members, as if all of them were just added, but only when the
212     group is atcually non-empty.
213    
214     Optionally returns a guard object that uninstalls the watcher when it is
215     destroyed.
216    
217     =cut
218    
219     sub grp_mon($$) {
220     my ($grp, $cb) = @_;
221    
222     AnyEvent::MP::Kernel::delay sub {
223     return unless $cb;
224    
225     push @{ $gmon{$grp} }, $cb;
226     $cb->(((grp_get $grp) || return) x 2, []);
227     };
228    
229     defined wantarray && AnyEvent::Util::::guard {
230     my @mon = grep $_ != $cb, @{ delete $gmon{$grp} };
231     $gmon{$grp} = \@mon if @mon;
232     undef $cb;
233     }
234 root 1.15 }
235    
236 root 1.4 sub start_node {
237 root 1.8 my ($node) = @_;
238 root 1.4
239 root 1.8 return if exists $port{$node};
240     return if $node eq $NODE; # do not connect to ourselves
241 root 1.4
242     # establish connection
243 root 1.8 my $port = $port{$node} = spawn $node, "AnyEvent::MP::Global::connect", 0, $NODE;
244    
245 root 1.4 mon $port, sub {
246 root 1.8 unreg_groups $node;
247     delete $port{$node};
248 root 1.4 };
249 root 1.8
250 root 1.10 snd $port, addr => $AnyEvent::MP::Kernel::LISTENER;
251 root 1.17 snd $port, nodes => \%addr if %addr;
252 root 1.8 snd $port, set => \%lreg if %lreg;
253 root 1.4 }
254 root 1.3
255 root 1.5 # other nodes connect via this
256 root 1.3 sub connect {
257 root 1.8 my ($version, $node) = @_;
258 root 1.1
259 root 1.3 # monitor them, silently die
260 root 1.8 mon $node, psub { kil $SELF };
261 root 1.3
262 root 1.4 rcv $SELF,
263 root 1.8 addr => sub {
264     my $addresses = shift;
265 root 1.14 $AnyEvent::MP::Kernel::WARN->(9, "$node told us its addresses (@$addresses).");
266 root 1.8 $addr{$node} = $addresses;
267 root 1.9
268     # to help listener-less nodes, we broadcast new addresses to them unconditionally
269     #TODO: should be done by a node finding out about a listener-less one
270     if (@$addresses) {
271 root 1.18 for my $other (values %AnyEvent::MP::Kernel::NODE) {
272 root 1.9 if ($other->{transport}) {
273 root 1.12 if ($addr{$other->{id}} && !@{ $addr{$other->{id}} }) {
274     $AnyEvent::MP::Kernel::WARN->(9, "helping $other->{id} to find $node.");
275 root 1.17 snd $port{$other->{id}}, nodes => { $node => $addresses };
276 root 1.9 }
277     }
278     }
279     }
280 root 1.8 },
281 root 1.17 nodes => sub {
282 root 1.8 my ($kv) = @_;
283    
284     use JSON::XS;#d#
285     my $kv_txt = JSON::XS->new->encode ($kv);#d#
286     $AnyEvent::MP::Kernel::WARN->(9, "$node told us it knows about $kv_txt.");#d#
287    
288     while (my ($id, $addresses) = each %$kv) {
289     my $node = AnyEvent::MP::Kernel::add_node $id;
290     $node->connect (@$addresses);
291     start_node $id;
292 root 1.4 }
293     },
294 root 1.16 find => sub {
295 root 1.8 my ($othernode) = @_;
296    
297     $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to find $othernode.");
298 root 1.17 snd $port{$node}, nodes => { $othernode => $addr{$othernode} }
299 root 1.8 if $addr{$othernode};
300     },
301 root 1.4 set => sub {
302 root 1.8 set_groups $node, shift;
303 root 1.4 },
304 root 1.20 reg0 => sub {
305     _change $_[0], [], [$_[1]];
306     },
307     reg1 => sub {
308     _change $_[0], [$_[1]], [];
309     },
310 root 1.4 ;
311 root 1.3 }
312 root 1.1
313     sub mon_node {
314 root 1.8 my ($node, $is_up) = @_;
315 root 1.1
316     if ($is_up) {
317 root 1.8 ++$nodecnt;
318     start_node $node;
319 root 1.2 } else {
320 root 1.8 --$nodecnt;
321     more_seeding unless $nodecnt;
322     unreg_groups $node;
323    
324     # forget about the node
325     delete $addr{$node};
326     # ask other nodes if they know the node
327 root 1.16 snd $_, find => $node
328 root 1.8 for values %port;
329 root 1.1 }
330 root 1.8 #warn "node<$node,$is_up>\n";#d#
331 root 1.1 }
332    
333     mon_node $_, 1
334     for up_nodes;
335    
336     mon_nodes \&mon_node;
337    
338     =back
339    
340     =head1 SEE ALSO
341    
342     L<AnyEvent::MP>.
343    
344     =head1 AUTHOR
345    
346     Marc Lehmann <schmorp@schmorp.de>
347     http://home.schmorp.de/
348    
349     =cut
350    
351     1
352