ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.25
Committed: Tue Sep 8 13:46:25 2009 UTC (14 years, 8 months ago) by root
Branch: MAIN
Changes since 1.24: +3 -3 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Global - some network-global services
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Global;
8
9 =head1 DESCRIPTION
10
11 This module maintains a fully-meshed network, if possible, and tries to
12 ensure that we are connected to at least one other node.
13
14 It also manages named port groups - ports can register themselves in any
15 number of groups that will be available network-wide, which is great for
16 discovering services.
17
18 Running it on one node will automatically run it on all nodes, although,
19 at the moment, the global service is started by default anyways.
20
21 =head1 GLOBALS AND FUNCTIONS
22
23 =over 4
24
25 =cut
26
27 package AnyEvent::MP::Global;
28
29 use common::sense;
30 use Carp ();
31
32 use AnyEvent ();
33 use AnyEvent::Util ();
34
35 use AnyEvent::MP;
36 use AnyEvent::MP::Kernel;
37 use AnyEvent::MP::Transport ();
38
39 use base "Exporter";
40
41 our @EXPORT = qw(
42 grp_reg
43 grp_get
44 grp_mon
45 );
46
47 our $VERSION = $AnyEvent::MP::VERSION;
48
49 our %addr; # port ID => [address...] mapping
50
51 our %port; # our rendezvous port on the other side
52 our %lreg; # local registry, name => [pid...]
53 our %lmon; # local registry monitoring name,pid => mon
54 our %greg; # global regstry, name => pid => undef
55 our %gmon; # group monitoring, group => [$cb...]
56
57 our $nodecnt;
58
59 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
60
61 #############################################################################
62 # seednodes
63
64 our @SEEDS;
65 our %SEEDS; # just to check whether a seed is a seed
66 our %SEED_CONNECT;
67 our $SEED_WATCHER;
68
69 push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
70 my $peer = $_[0]{local_greeting}{peeraddr};
71 return unless exists $SEEDS{$peer};
72 $SEED_CONNECT{$peer} = 2;
73 };
74
75 push @AnyEvent::MP::Transport::HOOK_CONNECTED, sub {
76 my $peer = $_[0]{local_greeting}{peeraddr};
77 return unless exists $SEEDS{$peer};
78 $SEEDS{$peer} = $_[0]{remote_node};
79 };
80
81 push @AnyEvent::MP::Transport::HOOK_DESTROY, sub {
82 delete $SEED_CONNECT{$_[0]{local_greeting}{peeraddr}};
83
84 # check if we contacted ourselves, so nuke this seed
85 if (exists $_[0]{seed} && $_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) {
86 # $AnyEvent::MP::Kernel::WARN->(0,"avoiding seed $_[0]{seed}\n");#d#
87 delete $SEEDS{$_[0]{seed}};
88 }
89 };
90
91 sub seed_connect {
92 my ($seed) = @_;
93
94 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
95 or Carp::croak "$seed: unparsable seed address";
96
97 return if $SEED_CONNECT{$seed};
98
99 $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed.");
100
101 # ughhh
102 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect $host, $port,
103 seed => $seed,
104 sub {
105 $SEED_CONNECT{$seed} = 1;
106 },
107 ;
108 }
109
110 sub more_seeding {
111 my $int = List::Util::max 1,
112 $AnyEvent::MP::Kernel::CONFIG->{connect_interval}
113 * ($nodecnt ? keys %AnyEvent::MP::Kernel::NODE : 1)
114 - rand;
115
116 $SEED_WATCHER = AE::timer $int, 0, \&more_seeding;
117
118 @SEEDS = keys %SEEDS unless @SEEDS;
119 return unless @SEEDS;
120
121 seed_connect splice @SEEDS, rand @SEEDS, 1;
122 }
123
124 sub set_seeds(@) {
125 @SEEDS{@_} = ();
126
127 $SEED_WATCHER ||= AE::timer 5, $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, \&more_seeding;
128
129 for (1 .. keys %SEEDS) {
130 after 0.100 * rand, \&more_seeding;
131 }
132 }
133
134 # returns all (up) seed nodes, or all nodes if no seednodes are up/known
135 sub _route_nodes {
136 my @seeds = grep node_is_up $_, values %SEEDS;
137 @seeds = up_nodes unless @seeds;
138 @seeds
139 }
140
141 #############################################################################
142
143 sub _change {
144 my ($group, $add, $del) = @_;
145
146 my $kv = $greg{$group} ||= {};
147
148 delete @$kv{@$del};
149 @$kv{@$add} = ();
150
151 my $ports = [keys %$kv];
152 $_->($ports, $add, $del) for @{ $gmon{$group} };
153 }
154
155 sub unreg_groups($) {
156 my ($node) = @_;
157
158 my $qr = qr/^\Q$node\E(?:#|$)/;
159 my @del;
160
161 while (my ($group, $ports) = each %greg) {
162 @del = grep /$qr/, keys %$ports;
163 _change $group, [], \@del
164 if @del;
165 }
166 }
167
168 sub set_groups($$) {
169 my ($node, $lreg) = @_;
170
171 while (my ($k, $v) = each %$lreg) {
172 _change $k, $v, [];
173 }
174 }
175
176 =item $guard = grp_reg $group, $port
177
178 Register the given (local!) port in the named global group C<$group>.
179
180 The port will be unregistered automatically when the port is destroyed.
181
182 When not called in void context, then a guard object will be returned that
183 will also cause the name to be unregistered when destroyed.
184
185 =cut
186
187 # unregister local port
188 sub unregister {
189 my ($port, $group) = @_;
190
191 delete $lmon{"$group\x00$port"};
192 @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
193
194 _change $group, [], [$port];
195
196 snd $_, reg0 => $group, $port
197 for values %port;
198 }
199
200 # register local port
201 sub grp_reg($$) {
202 my ($group, $port) = @_;
203
204 port_is_local $port
205 or Carp::croak "AnyEvent::MP::Global::grp_reg can only be called for local ports, caught";
206
207 grep $_ eq $port, @{ $lreg{$group} }
208 and Carp::croak "'$group': group already registered, cannot register a second time";
209
210 $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
211 push @{ $lreg{$group} }, $port;
212
213 snd $_, reg1 => $group, $port
214 for values %port;
215
216 _change $group, [$port], [];
217
218 wantarray && AnyEvent::Util::guard { unregister $port, $group }
219 }
220
221 =item $ports = grp_get $group
222
223 Returns all the ports currently registered to the given group (as
224 read-only(!) array reference). When the group has no registered members,
225 return C<undef>.
226
227 =cut
228
229 sub grp_get($) {
230 my @ports = keys %{ $greg{$_[0]} };
231 @ports ? \@ports : undef
232 }
233
234 =item $guard = grp_mon $group, $callback->($ports, $add, $del)
235
236 Installs a monitor on the given group. Each time there is a change it
237 will be called with the current group members as an arrayref as the
238 first argument. The second argument only contains ports added, the third
239 argument only ports removed.
240
241 Unlike C<grp_get>, all three arguments will always be array-refs, even if
242 the array is empty. None of the arrays must be modified in any way.
243
244 The first invocation will be with the first two arguments set to the
245 current members, as if all of them were just added, but only when the
246 group is atcually non-empty.
247
248 Optionally returns a guard object that uninstalls the watcher when it is
249 destroyed.
250
251 =cut
252
253 sub grp_mon($$) {
254 my ($grp, $cb) = @_;
255
256 AnyEvent::MP::Kernel::delay sub {
257 return unless $cb;
258
259 push @{ $gmon{$grp} }, $cb;
260 $cb->(((grp_get $grp) || return) x 2, []);
261 };
262
263 defined wantarray && AnyEvent::Util::::guard {
264 my @mon = grep $_ != $cb, @{ delete $gmon{$grp} };
265 $gmon{$grp} = \@mon if @mon;
266 undef $cb;
267 }
268 }
269
270 sub start_node {
271 my ($node) = @_;
272
273 return if exists $port{$node};
274 return if $node eq $NODE; # do not connect to ourselves
275
276 # establish connection
277 my $port = $port{$node} = spawn $node, "AnyEvent::MP::Global::connect", 0, $NODE;
278
279 mon $port, sub {
280 unreg_groups $node;
281 delete $port{$node};
282 };
283
284 snd $port, addr => $AnyEvent::MP::Kernel::LISTENER;
285 snd $port, nodes => \%addr if %addr;
286 snd $port, set => \%lreg if %lreg;
287 }
288
289 # other nodes connect via this
290 sub connect {
291 my ($version, $node) = @_;
292
293 # monitor them, silently die
294 mon $node, psub { kil $SELF };
295
296 rcv $SELF,
297 addr => sub {
298 my $addresses = shift;
299 $AnyEvent::MP::Kernel::WARN->(9, "$node told us its addresses (@$addresses).");
300 $addr{$node} = $addresses;
301
302 # to help listener-less nodes, we broadcast new addresses to them unconditionally
303 #TODO: should be done by a node finding out about a listener-less one
304 if (@$addresses) {
305 for my $other (values %AnyEvent::MP::Kernel::NODE) {
306 if ($other->{transport}) {
307 if ($addr{$other->{id}} && !@{ $addr{$other->{id}} }) {
308 $AnyEvent::MP::Kernel::WARN->(9, "helping $other->{id} to find $node.");
309 snd $port{$other->{id}}, nodes => { $node => $addresses };
310 }
311 }
312 }
313 }
314 },
315 nodes => sub {
316 my ($kv) = @_;
317
318 use JSON::XS;#d#
319 my $kv_txt = JSON::XS->new->encode ($kv);#d#
320 $AnyEvent::MP::Kernel::WARN->(9, "$node told us it knows about $kv_txt.");#d#
321
322 while (my ($id, $addresses) = each %$kv) {
323 my $node = AnyEvent::MP::Kernel::add_node $id;
324 $node->connect (@$addresses);
325 start_node $id;
326 }
327 },
328 find => sub {
329 my ($othernode) = @_;
330
331 $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to find $othernode.");
332 snd $port{$node}, nodes => { $othernode => $addr{$othernode} }
333 if $addr{$othernode};
334 },
335 set => sub {
336 set_groups $node, shift;
337 },
338 reg0 => sub {
339 _change $_[0], [], [$_[1]];
340 },
341 reg1 => sub {
342 _change $_[0], [$_[1]], [];
343 },
344 ;
345 }
346
347 sub mon_node {
348 my ($node, $is_up) = @_;
349
350 if ($is_up) {
351 ++$nodecnt;
352 start_node $node;
353 } else {
354 --$nodecnt;
355 more_seeding unless $nodecnt;
356 unreg_groups $node;
357
358 # forget about the node
359 delete $addr{$node};
360 # ask other nodes if they know the node
361 snd $_, find => $node
362 for grep $_, map $port{$_}, _route_nodes;
363 }
364 #warn "node<$node,$is_up>\n";#d#
365 }
366
367 mon_node $_, 1
368 for up_nodes;
369
370 mon_nodes \&mon_node;
371
372 =back
373
374 =head1 SEE ALSO
375
376 L<AnyEvent::MP>.
377
378 =head1 AUTHOR
379
380 Marc Lehmann <schmorp@schmorp.de>
381 http://home.schmorp.de/
382
383 =cut
384
385 1
386