ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.21
Committed: Sun Sep 6 00:13:21 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.20: +0 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Global - some network-global services
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Global;
8    
9     =head1 DESCRIPTION
10    
11 root 1.15 This module maintains a fully-meshed network, if possible, and tries to
12     ensure that we are connected to at least one other node.
13 root 1.1
14 root 1.15 It also manages named port groups - ports can register themselves in any
15     number of groups that will be available network-wide, which is great for
16     discovering services.
17 root 1.1
18 root 1.15 Running it on one node will automatically run it on all nodes, although,
19     at the moment, the global service is started by default anyways.
20 root 1.3
21 root 1.1 =head1 GLOBALS AND FUNCTIONS
22    
23     =over 4
24    
25     =cut
26    
27     package AnyEvent::MP::Global;
28    
29     use common::sense;
30     use Carp ();
31    
32 root 1.8 use AnyEvent ();
33 root 1.4 use AnyEvent::Util ();
34    
35 root 1.1 use AnyEvent::MP;
36     use AnyEvent::MP::Kernel;
37    
38 root 1.20 use base "Exporter";
39    
40     our @EXPORT = qw(
41     grp_reg
42     grp_get
43     grp_mon
44     );
45    
46 root 1.1 our $VERSION = $AnyEvent::MP::VERSION;
47    
48 root 1.8 our %addr; # port ID => [address...] mapping
49    
50 root 1.4 our %port; # our rendezvous port on the other side
51     our %lreg; # local registry, name => [pid...]
52 root 1.20 our %lmon; # local registry monitoring name,pid => mon
53     our %greg; # global regstry, name => pid => undef
54     our %gmon; # group monitorign, group => [$cb...]
55 root 1.4
56 root 1.8 our $nodecnt;
57    
58 root 1.5 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
59    
60 root 1.8 #############################################################################
61     # seednodes
62    
63     our @SEEDS;
64 root 1.14 our %SEED_CONNECT;
65 root 1.8 our $SEED_WATCHER;
66    
67     sub seed_connect {
68     my ($seed) = @_;
69    
70     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
71     or Carp::croak "$seed: unparsable seed address";
72    
73     # ughhh
74 root 1.14 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect $host, $port,
75     seed => $seed,
76 root 1.8 sub {
77 root 1.14 delete $SEED_CONNECT{$seed};
78     after 1, \&more_seeding;
79 root 1.8 },
80     ;
81     }
82    
83     sub more_seeding {
84     return if $nodecnt;
85     return unless @SEEDS;
86    
87     $AnyEvent::MP::Kernel::WARN->(9, "no nodes connected, seeding.");
88    
89     seed_connect $SEEDS[rand @SEEDS];
90     }
91    
92 root 1.14 sub avoid_seed($) {
93     @SEEDS = grep $_ ne $_[0], @SEEDS;
94     }
95    
96 root 1.8 sub set_seeds(@) {
97     @SEEDS = @_;
98    
99     $SEED_WATCHER ||= AE::timer 5, $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, \&more_seeding;
100    
101 root 1.19 for my $seed (@SEEDS) {
102     after 0.100 * rand, sub { seed_connect $seed };
103     }
104 root 1.8 }
105    
106     #############################################################################
107    
108 root 1.20 sub _change {
109     my ($group, $add, $del) = @_;
110    
111     my $kv = $greg{$group} ||= {};
112    
113     delete @$kv{@$del};
114     @$kv{@$add} = ();
115    
116     my $ports = [keys %$kv];
117     $_->($ports, $add, $del) for @{ $gmon{$group} };
118     }
119    
120 root 1.4 sub unreg_groups($) {
121 root 1.8 my ($node) = @_;
122 root 1.4
123 root 1.8 my $qr = qr/^\Q$node\E(?:#|$)/;
124 root 1.20 my @del;
125 root 1.4
126 root 1.20 while (my ($group, $ports) = each %greg) {
127     @del = grep /$qr/, keys %$ports;
128     _change $group, [], \@del
129     if @del;
130 root 1.4 }
131     }
132    
133     sub set_groups($$) {
134 root 1.8 my ($node, $lreg) = @_;
135 root 1.5
136 root 1.6 while (my ($k, $v) = each %$lreg) {
137 root 1.20 _change $k, $v, [];
138 root 1.6 }
139 root 1.4 }
140    
141 root 1.20 =item $guard = grp_reg $group, $port
142 root 1.4
143     Register the given (local!) port in the named global group C<$group>.
144    
145     The port will be unregistered automatically when the port is destroyed.
146    
147     When not called in void context, then a guard object will be returned that
148     will also cause the name to be unregistered when destroyed.
149    
150     =cut
151    
152     # unregister local port
153     sub unregister {
154     my ($port, $group) = @_;
155    
156     delete $lmon{"$group\x00$port"};
157     @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
158    
159 root 1.20 _change $group, [], [$port];
160 root 1.4
161 root 1.20 snd $_, reg0 => $group, $port
162 root 1.4 for values %port;
163     }
164    
165     # register local port
166 root 1.20 sub grp_reg($$) {
167     my ($group, $port) = @_;
168 root 1.4
169     port_is_local $port
170 root 1.20 or Carp::croak "AnyEvent::MP::Global::grp_reg can only be called for local ports, caught";
171    
172     grep $_ eq $port, @{ $lreg{$group} }
173     and Carp::croak "'$group': group already registered, cannot register a second time";
174 root 1.4
175     $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
176     push @{ $lreg{$group} }, $port;
177    
178 root 1.20 snd $_, reg1 => $group, $port
179 root 1.4 for values %port;
180    
181 root 1.20 _change $group, [$port], [];
182 root 1.4
183     wantarray && AnyEvent::Util::guard { unregister $port, $group }
184     }
185    
186 root 1.20 =item $ports = grp_get $group
187 root 1.15
188     Returns all the ports currently registered to the given group (as
189 root 1.20 read-only(!) array reference). When the group has no registered members,
190 root 1.15 return C<undef>.
191    
192     =cut
193    
194 root 1.20 sub grp_get($) {
195     my @ports = keys %{ $greg{$_[0]} };
196     @ports ? \@ports : undef
197     }
198    
199     =item $guard = grp_mon $group, $callback->($ports, $add, $del)
200    
201     Installs a monitor on the given group. Each time there is a change it
202     will be called with the current group members as an arrayref as the
203     first argument. The second argument only contains ports added, the third
204     argument only ports removed.
205    
206     Unlike C<grp_get>, all three arguments will always be array-refs, even if
207     the array is empty. None of the arrays must be modified in any way.
208    
209     The first invocation will be with the first two arguments set to the
210     current members, as if all of them were just added, but only when the
211     group is atcually non-empty.
212    
213     Optionally returns a guard object that uninstalls the watcher when it is
214     destroyed.
215    
216     =cut
217    
218     sub grp_mon($$) {
219     my ($grp, $cb) = @_;
220    
221     AnyEvent::MP::Kernel::delay sub {
222     return unless $cb;
223    
224     push @{ $gmon{$grp} }, $cb;
225     $cb->(((grp_get $grp) || return) x 2, []);
226     };
227    
228     defined wantarray && AnyEvent::Util::::guard {
229     my @mon = grep $_ != $cb, @{ delete $gmon{$grp} };
230     $gmon{$grp} = \@mon if @mon;
231     undef $cb;
232     }
233 root 1.15 }
234    
235 root 1.4 sub start_node {
236 root 1.8 my ($node) = @_;
237 root 1.4
238 root 1.8 return if exists $port{$node};
239     return if $node eq $NODE; # do not connect to ourselves
240 root 1.4
241     # establish connection
242 root 1.8 my $port = $port{$node} = spawn $node, "AnyEvent::MP::Global::connect", 0, $NODE;
243    
244 root 1.4 mon $port, sub {
245 root 1.8 unreg_groups $node;
246     delete $port{$node};
247 root 1.4 };
248 root 1.8
249 root 1.10 snd $port, addr => $AnyEvent::MP::Kernel::LISTENER;
250 root 1.17 snd $port, nodes => \%addr if %addr;
251 root 1.8 snd $port, set => \%lreg if %lreg;
252 root 1.4 }
253 root 1.3
254 root 1.5 # other nodes connect via this
255 root 1.3 sub connect {
256 root 1.8 my ($version, $node) = @_;
257 root 1.1
258 root 1.3 # monitor them, silently die
259 root 1.8 mon $node, psub { kil $SELF };
260 root 1.3
261 root 1.4 rcv $SELF,
262 root 1.8 addr => sub {
263     my $addresses = shift;
264 root 1.14 $AnyEvent::MP::Kernel::WARN->(9, "$node told us its addresses (@$addresses).");
265 root 1.8 $addr{$node} = $addresses;
266 root 1.9
267     # to help listener-less nodes, we broadcast new addresses to them unconditionally
268     #TODO: should be done by a node finding out about a listener-less one
269     if (@$addresses) {
270 root 1.18 for my $other (values %AnyEvent::MP::Kernel::NODE) {
271 root 1.9 if ($other->{transport}) {
272 root 1.12 if ($addr{$other->{id}} && !@{ $addr{$other->{id}} }) {
273     $AnyEvent::MP::Kernel::WARN->(9, "helping $other->{id} to find $node.");
274 root 1.17 snd $port{$other->{id}}, nodes => { $node => $addresses };
275 root 1.9 }
276     }
277     }
278     }
279 root 1.8 },
280 root 1.17 nodes => sub {
281 root 1.8 my ($kv) = @_;
282    
283     use JSON::XS;#d#
284     my $kv_txt = JSON::XS->new->encode ($kv);#d#
285     $AnyEvent::MP::Kernel::WARN->(9, "$node told us it knows about $kv_txt.");#d#
286    
287     while (my ($id, $addresses) = each %$kv) {
288     my $node = AnyEvent::MP::Kernel::add_node $id;
289     $node->connect (@$addresses);
290     start_node $id;
291 root 1.4 }
292     },
293 root 1.16 find => sub {
294 root 1.8 my ($othernode) = @_;
295    
296     $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to find $othernode.");
297 root 1.17 snd $port{$node}, nodes => { $othernode => $addr{$othernode} }
298 root 1.8 if $addr{$othernode};
299     },
300 root 1.4 set => sub {
301 root 1.8 set_groups $node, shift;
302 root 1.4 },
303 root 1.20 reg0 => sub {
304     _change $_[0], [], [$_[1]];
305     },
306     reg1 => sub {
307     _change $_[0], [$_[1]], [];
308     },
309 root 1.4 ;
310 root 1.3 }
311 root 1.1
312     sub mon_node {
313 root 1.8 my ($node, $is_up) = @_;
314 root 1.1
315     if ($is_up) {
316 root 1.8 ++$nodecnt;
317     start_node $node;
318 root 1.2 } else {
319 root 1.8 --$nodecnt;
320     more_seeding unless $nodecnt;
321     unreg_groups $node;
322    
323     # forget about the node
324     delete $addr{$node};
325     # ask other nodes if they know the node
326 root 1.16 snd $_, find => $node
327 root 1.8 for values %port;
328 root 1.1 }
329 root 1.8 #warn "node<$node,$is_up>\n";#d#
330 root 1.1 }
331    
332     mon_node $_, 1
333     for up_nodes;
334    
335     mon_nodes \&mon_node;
336    
337     =back
338    
339     =head1 SEE ALSO
340    
341     L<AnyEvent::MP>.
342    
343     =head1 AUTHOR
344    
345     Marc Lehmann <schmorp@schmorp.de>
346     http://home.schmorp.de/
347    
348     =cut
349    
350     1
351