ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.23
Committed: Tue Sep 8 01:42:14 2009 UTC (14 years, 8 months ago) by root
Branch: MAIN
Changes since 1.22: +1 -3 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Global - some network-global services
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Global;
8
9 =head1 DESCRIPTION
10
11 This module maintains a fully-meshed network, if possible, and tries to
12 ensure that we are connected to at least one other node.
13
14 It also manages named port groups - ports can register themselves in any
15 number of groups that will be available network-wide, which is great for
16 discovering services.
17
18 Running it on one node will automatically run it on all nodes, although,
19 at the moment, the global service is started by default anyways.
20
21 =head1 GLOBALS AND FUNCTIONS
22
23 =over 4
24
25 =cut
26
27 package AnyEvent::MP::Global;
28
29 use common::sense;
30 use Carp ();
31
32 use AnyEvent ();
33 use AnyEvent::Util ();
34
35 use AnyEvent::MP;
36 use AnyEvent::MP::Kernel;
37 use AnyEvent::MP::Transport ();
38
39 use base "Exporter";
40
41 our @EXPORT = qw(
42 grp_reg
43 grp_get
44 grp_mon
45 );
46
47 our $VERSION = $AnyEvent::MP::VERSION;
48
49 our %addr; # port ID => [address...] mapping
50
51 our %port; # our rendezvous port on the other side
52 our %lreg; # local registry, name => [pid...]
53 our %lmon; # local registry monitoring name,pid => mon
54 our %greg; # global regstry, name => pid => undef
55 our %gmon; # group monitoring, group => [$cb...]
56
57 our $nodecnt;
58
59 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
60
61 #############################################################################
62 # seednodes
63
64 our @SEEDS;
65 our %SEEDS; # just to check whether a seed is a seed
66 our %SEED_CONNECT;
67 our $SEED_WATCHER;
68
69 push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
70 my $peer = $_[0]{local_greeting}{peeraddr};
71 return unless exists $SEEDS{$peer};
72 $SEED_CONNECT{$peer} = 2;
73 };
74
75 push @AnyEvent::MP::Transport::HOOK_CONNECTED, sub {
76 my $peer = $_[0]{local_greeting}{peeraddr};
77 return unless exists $SEEDS{$peer};
78 $SEEDS{$peer} = $_[0]{remote_node};
79 };
80
81 push @AnyEvent::MP::Transport::HOOK_DESTROY, sub {
82 delete $SEED_CONNECT{$_[0]{local_greeting}{peeraddr}};
83 };
84
85 sub seed_connect {
86 my ($seed) = @_;
87
88 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
89 or Carp::croak "$seed: unparsable seed address";
90
91 return if $SEED_CONNECT{$seed};
92
93 $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
94
95 # ughhh
96 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect $host, $port,
97 seed => $seed,
98 sub {
99 $SEED_CONNECT{$seed} = 1;
100 },
101 ;
102 }
103
104 sub more_seeding {
105 my $int = List::Util::max 1,
106 $AnyEvent::MP::Kernel::CONFIG->{connect_interval}
107 * ($nodecnt ? keys %AnyEvent::MP::Kernel::NODE : 1)
108 - rand;
109
110 $SEED_WATCHER = AE::timer $int, 0, \&more_seeding;
111
112 @SEEDS = keys %SEEDS unless @SEEDS;
113 return unless @SEEDS;
114
115 seed_connect splice @SEEDS, rand @SEEDS, 1;
116 }
117
118 sub avoid_seed($) {
119 delete $SEEDS{$_[0]};
120 }
121
122 sub set_seeds(@) {
123 @SEEDS{@_} = ();
124
125 $SEED_WATCHER ||= AE::timer 5, $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, \&more_seeding;
126
127 for (1 .. keys %SEEDS) {
128 after 0.100 * rand, \&more_seeding;
129 }
130 }
131
132 # returns all (up) seed nodes, or all nodes if no seednodes are up/known
133 sub _route_nodes {
134 my @seeds = grep node_is_up $_, values %SEEDS;
135 @seeds = up_nodes unless @seeds;
136 @seeds
137 }
138
139 #############################################################################
140
141 sub _change {
142 my ($group, $add, $del) = @_;
143
144 my $kv = $greg{$group} ||= {};
145
146 delete @$kv{@$del};
147 @$kv{@$add} = ();
148
149 my $ports = [keys %$kv];
150 $_->($ports, $add, $del) for @{ $gmon{$group} };
151 }
152
153 sub unreg_groups($) {
154 my ($node) = @_;
155
156 my $qr = qr/^\Q$node\E(?:#|$)/;
157 my @del;
158
159 while (my ($group, $ports) = each %greg) {
160 @del = grep /$qr/, keys %$ports;
161 _change $group, [], \@del
162 if @del;
163 }
164 }
165
166 sub set_groups($$) {
167 my ($node, $lreg) = @_;
168
169 while (my ($k, $v) = each %$lreg) {
170 _change $k, $v, [];
171 }
172 }
173
174 =item $guard = grp_reg $group, $port
175
176 Register the given (local!) port in the named global group C<$group>.
177
178 The port will be unregistered automatically when the port is destroyed.
179
180 When not called in void context, then a guard object will be returned that
181 will also cause the name to be unregistered when destroyed.
182
183 =cut
184
185 # unregister local port
186 sub unregister {
187 my ($port, $group) = @_;
188
189 delete $lmon{"$group\x00$port"};
190 @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
191
192 _change $group, [], [$port];
193
194 snd $_, reg0 => $group, $port
195 for values %port;
196 }
197
198 # register local port
199 sub grp_reg($$) {
200 my ($group, $port) = @_;
201
202 port_is_local $port
203 or Carp::croak "AnyEvent::MP::Global::grp_reg can only be called for local ports, caught";
204
205 grep $_ eq $port, @{ $lreg{$group} }
206 and Carp::croak "'$group': group already registered, cannot register a second time";
207
208 $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
209 push @{ $lreg{$group} }, $port;
210
211 snd $_, reg1 => $group, $port
212 for values %port;
213
214 _change $group, [$port], [];
215
216 wantarray && AnyEvent::Util::guard { unregister $port, $group }
217 }
218
219 =item $ports = grp_get $group
220
221 Returns all the ports currently registered to the given group (as
222 read-only(!) array reference). When the group has no registered members,
223 return C<undef>.
224
225 =cut
226
227 sub grp_get($) {
228 my @ports = keys %{ $greg{$_[0]} };
229 @ports ? \@ports : undef
230 }
231
232 =item $guard = grp_mon $group, $callback->($ports, $add, $del)
233
234 Installs a monitor on the given group. Each time there is a change it
235 will be called with the current group members as an arrayref as the
236 first argument. The second argument only contains ports added, the third
237 argument only ports removed.
238
239 Unlike C<grp_get>, all three arguments will always be array-refs, even if
240 the array is empty. None of the arrays must be modified in any way.
241
242 The first invocation will be with the first two arguments set to the
243 current members, as if all of them were just added, but only when the
244 group is atcually non-empty.
245
246 Optionally returns a guard object that uninstalls the watcher when it is
247 destroyed.
248
249 =cut
250
251 sub grp_mon($$) {
252 my ($grp, $cb) = @_;
253
254 AnyEvent::MP::Kernel::delay sub {
255 return unless $cb;
256
257 push @{ $gmon{$grp} }, $cb;
258 $cb->(((grp_get $grp) || return) x 2, []);
259 };
260
261 defined wantarray && AnyEvent::Util::::guard {
262 my @mon = grep $_ != $cb, @{ delete $gmon{$grp} };
263 $gmon{$grp} = \@mon if @mon;
264 undef $cb;
265 }
266 }
267
268 sub start_node {
269 my ($node) = @_;
270
271 return if exists $port{$node};
272 return if $node eq $NODE; # do not connect to ourselves
273
274 # establish connection
275 my $port = $port{$node} = spawn $node, "AnyEvent::MP::Global::connect", 0, $NODE;
276
277 mon $port, sub {
278 unreg_groups $node;
279 delete $port{$node};
280 };
281
282 snd $port, addr => $AnyEvent::MP::Kernel::LISTENER;
283 snd $port, nodes => \%addr if %addr;
284 snd $port, set => \%lreg if %lreg;
285 }
286
287 # other nodes connect via this
288 sub connect {
289 my ($version, $node) = @_;
290
291 # monitor them, silently die
292 mon $node, psub { kil $SELF };
293
294 rcv $SELF,
295 addr => sub {
296 my $addresses = shift;
297 $AnyEvent::MP::Kernel::WARN->(9, "$node told us its addresses (@$addresses).");
298 $addr{$node} = $addresses;
299
300 # to help listener-less nodes, we broadcast new addresses to them unconditionally
301 #TODO: should be done by a node finding out about a listener-less one
302 if (@$addresses) {
303 for my $other (values %AnyEvent::MP::Kernel::NODE) {
304 if ($other->{transport}) {
305 if ($addr{$other->{id}} && !@{ $addr{$other->{id}} }) {
306 $AnyEvent::MP::Kernel::WARN->(9, "helping $other->{id} to find $node.");
307 snd $port{$other->{id}}, nodes => { $node => $addresses };
308 }
309 }
310 }
311 }
312 },
313 nodes => sub {
314 my ($kv) = @_;
315
316 use JSON::XS;#d#
317 my $kv_txt = JSON::XS->new->encode ($kv);#d#
318 $AnyEvent::MP::Kernel::WARN->(9, "$node told us it knows about $kv_txt.");#d#
319
320 while (my ($id, $addresses) = each %$kv) {
321 my $node = AnyEvent::MP::Kernel::add_node $id;
322 $node->connect (@$addresses);
323 start_node $id;
324 }
325 },
326 find => sub {
327 my ($othernode) = @_;
328
329 $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to find $othernode.");
330 snd $port{$node}, nodes => { $othernode => $addr{$othernode} }
331 if $addr{$othernode};
332 },
333 set => sub {
334 set_groups $node, shift;
335 },
336 reg0 => sub {
337 _change $_[0], [], [$_[1]];
338 },
339 reg1 => sub {
340 _change $_[0], [$_[1]], [];
341 },
342 ;
343 }
344
345 sub mon_node {
346 my ($node, $is_up) = @_;
347
348 if ($is_up) {
349 ++$nodecnt;
350 start_node $node;
351 } else {
352 --$nodecnt;
353 more_seeding unless $nodecnt;
354 unreg_groups $node;
355
356 # forget about the node
357 delete $addr{$node};
358 # ask other nodes if they know the node
359 snd $_, find => $node
360 for values %port;
361 }
362 #warn "node<$node,$is_up>\n";#d#
363 }
364
365 mon_node $_, 1
366 for up_nodes;
367
368 mon_nodes \&mon_node;
369
370 =back
371
372 =head1 SEE ALSO
373
374 L<AnyEvent::MP>.
375
376 =head1 AUTHOR
377
378 Marc Lehmann <schmorp@schmorp.de>
379 http://home.schmorp.de/
380
381 =cut
382
383 1
384