ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.42
Committed: Sun Feb 26 11:12:54 2012 UTC (12 years, 3 months ago) by root
Branch: MAIN
Changes since 1.41: +2 -2 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Global - some network-global services
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Global;
8
9 =head1 DESCRIPTION
10
11 This module maintains a fully-meshed network between global nodes and
12 tries to have connections with all nodes in the network.
13
14 It also manages named port groups - ports can register themselves in any
15 number of groups that will be available network-wide, which is great for
16 discovering services.
17
18 Running it on one node will automatically run it on all nodes, although,
19 at the moment, the global service is started by default anyways.
20
21 =head1 GLOBALS AND FUNCTIONS
22
23 =over 4
24
25 =cut
26
27 package AnyEvent::MP::Global;
28
29 use common::sense;
30 use Carp ();
31
32 use AnyEvent ();
33 use AnyEvent::Util ();
34
35 use AnyEvent::MP;
36 use AnyEvent::MP::Kernel;
37 use AnyEvent::MP::Transport ();
38
39 use base "Exporter";
40
41 our @EXPORT = qw(
42 grp_reg
43 grp_get
44 grp_mon
45 );
46
47 our $GLOBAL_VERSION = 0;
48
49 our %ON_SETUP; # take note: not public
50
51 our %addr; # port ID => [address...] mapping
52
53 our %port; # our rendezvous port on the other side
54 our %lreg; # local registry, name => [pid...]
55 our %lmon; # local registry monitoring name,pid => mon
56 our %greg; # global regstry, name => pid => undef
57 our %gmon; # group monitoring, group => [$cb...]
58
59 our $nodecnt;
60
61 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
62
63 #############################################################################
64 # seednodes
65
66 our $MASTER; # our current master (which we regularly query for net updates)
67 our %SEEDME;
68
69 sub up_seeds() {
70 grep node_is_up $_, keys %AnyEvent::MP::Kernel::NODE_SEED
71 }
72
73 sub node_is_seed($) {
74 grep $_ eq $_[0], grep defined, keys %AnyEvent::MP::Kernel::NODE_SEED
75 }
76
77 # returns all (up) seed nodes, or all nodes if no seednodes are up/known
78 sub route_nodes {
79 my @seeds = up_seeds;
80 @seeds = up_nodes unless @seeds;
81 @seeds
82 }
83
84 #############################################################################
85
86 sub _change {
87 my ($group, $add, $del) = @_;
88
89 my $kv = $greg{$group} ||= {};
90
91 delete @$kv{@$del};
92 @$kv{@$add} = ();
93
94 my $ports = [keys %$kv];
95 $_->($ports, $add, $del) for @{ $gmon{$group} };
96 }
97
98 sub unreg_groups($) {
99 my ($node) = @_;
100
101 my $qr = qr/^\Q$node\E(?:#|$)/;
102 my @del;
103
104 while (my ($group, $ports) = each %greg) {
105 @del = grep /$qr/, keys %$ports;
106 _change $group, [], \@del
107 if @del;
108 }
109 }
110
111 sub set_groups($$) {
112 my ($node, $lreg) = @_;
113
114 while (my ($k, $v) = each %$lreg) {
115 _change $k, $v, [];
116 }
117 }
118
119 =item $guard = grp_reg $group, $port
120
121 Register the given (local!) port in the named global group C<$group>.
122
123 The port will be unregistered automatically when the port is destroyed.
124
125 When not called in void context, then a guard object will be returned that
126 will also cause the name to be unregistered when destroyed.
127
128 =cut
129
130 # unregister local port
131 sub unregister {
132 my ($port, $group) = @_;
133
134 delete $lmon{"$group\x00$port"};
135 @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
136
137 _change $group, [], [$port];
138
139 snd $_, reg0 => $group, $port
140 for values %port;
141 }
142
143 # register local port
144 sub grp_reg($$) {
145 my ($group, $port) = @_;
146
147 port_is_local $port
148 or Carp::croak "AnyEvent::MP::Global::grp_reg can only be called for local ports, caught";
149
150 grep $_ eq $port, @{ $lreg{$group} }
151 and Carp::croak "'$group': group already registered, cannot register a second time";
152
153 $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
154 push @{ $lreg{$group} }, $port;
155
156 snd $_, reg1 => $group, $port
157 for values %port;
158
159 _change $group, [$port], [];
160
161 defined wantarray && AnyEvent::Util::guard { unregister $port, $group }
162 }
163
164 =item $ports = grp_get $group
165
166 Returns all the ports currently registered to the given group (as
167 read-only(!) array reference). When the group has no registered members,
168 return C<undef>.
169
170 =cut
171
172 sub grp_get($) {
173 my @ports = keys %{ $greg{$_[0]} };
174 @ports ? \@ports : undef
175 }
176
177 =item $guard = grp_mon $group, $callback->($ports, $add, $del)
178
179 Installs a monitor on the given group. Each time there is a change it
180 will be called with the current group members as an arrayref as the
181 first argument. The second argument only contains ports added, the third
182 argument only ports removed.
183
184 Unlike C<grp_get>, all three arguments will always be array-refs, even if
185 the array is empty. None of the arrays must be modified in any way.
186
187 The first invocation will be with the first two arguments set to the
188 current members, as if all of them were just added, but only when the
189 group is actually non-empty.
190
191 Optionally returns a guard object that uninstalls the watcher when it is
192 destroyed.
193
194 =cut
195
196 sub grp_mon($$) {
197 my ($grp, $cb) = @_;
198
199 AnyEvent::MP::Kernel::delay sub {
200 return unless $cb;
201
202 push @{ $gmon{$grp} }, $cb;
203 $cb->(((grp_get $grp) || return) x 2, []);
204 };
205
206 defined wantarray && AnyEvent::Util::guard {
207 my @mon = grep $_ != $cb, @{ delete $gmon{$grp} };
208 $gmon{$grp} = \@mon if @mon;
209 undef $cb;
210 }
211 }
212
213 sub start_node {
214 my ($node) = @_;
215
216 return if exists $port{$node};
217 return if $node eq $NODE; # do not connect to ourselves
218
219 # establish connection
220 my $port = $port{$node} = spawn $node, "AnyEvent::MP::Global::connect", $GLOBAL_VERSION, $NODE;
221
222 mon $port, sub {
223 unreg_groups $node;
224 delete $port{$node};
225 };
226
227 snd $port, addr => $AnyEvent::MP::Kernel::LISTENER;
228 snd $port, nodes => \%addr if %addr;
229 snd $port, set => \%lreg if %lreg;
230 snd $port, "setup"; # tell the other side that we are in business now
231 }
232
233 # other nodes connect via this
234 sub connect {
235 my ($version, $node) = @_;
236
237 (int $version) == (int $GLOBAL_VERSION)
238 or die "node version mismatch (requested $version; we have $GLOBAL_VERSION)";
239
240 # monitor them, silently die
241 mon $node, psub {
242 delete $SEEDME{$node};
243 kil $SELF;
244 };
245
246 rcv $SELF,
247 setup => sub {
248 $_->($node) for values %ON_SETUP;
249 },
250 addr => sub {
251 my $addresses = shift;
252 $AnyEvent::MP::Kernel::WARN->(9, "$node told us its addresses (@$addresses).");
253 $addr{$node} = $addresses;
254
255 # delay broadcast by a random amount, to avoid nodes connecting to each other
256 # at the same time.
257 after 2 + rand, sub {
258 for my $slave (keys %SEEDME) {
259 snd $port{$slave} || next, nodes => { $node => $addresses };
260 }
261 };
262 },
263 nodes => sub {
264 my ($kv) = @_;
265
266 use JSON::XS;#d#
267 my $kv_txt = JSON::XS->new->encode ($kv);#d#
268 $AnyEvent::MP::Kernel::WARN->(9, "$node told us it knows about $kv_txt.");#d#
269
270 while (my ($id, $addresses) = each %$kv) {
271 my $node = AnyEvent::MP::Kernel::add_node $id;
272 $node->connect (@$addresses);
273 start_node $id;
274 }
275 },
276 set => sub {
277 set_groups $node, shift;
278 },
279 find => sub {
280 my ($othernode) = @_;
281
282 $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to find $othernode.");
283 snd $port{$node}, nodes => { $othernode => $addr{$othernode} }
284 if $addr{$othernode};
285 },
286 reg0 => sub {
287 _change $_[0], [], [$_[1]];
288 },
289 reg1 => sub {
290 _change $_[0], [$_[1]], [];
291 },
292
293 # some node asks us to provide network updates
294 seedme0 => sub {
295 $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to NOT seed it.");#d#
296 delete $SEEDME{$node};
297 },
298 seedme1 => sub {
299 $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to seed it.");#d#
300 $SEEDME{$node} = ();
301
302 # for good measure
303 snd $port{$node}, nodes => \%addr if %addr;
304 },
305 ;
306 }
307
308 sub set_master($) {
309 return if $MASTER eq $_[0];
310
311 snd $port{$MASTER}, "seedme0"
312 if $MASTER && node_is_up $MASTER;
313
314 $MASTER = $_[0];
315
316 if ($MASTER) {
317 snd $port{$MASTER}, "seedme1";
318 $AnyEvent::MP::Kernel::WARN->(7, "selected new master: $MASTER.");
319 } else {
320 $AnyEvent::MP::Kernel::WARN->(1, "no contact to any other node, cannot seed.");
321 }
322 }
323
324 sub mon_node {
325 my ($node, $is_up) = @_;
326
327 if ($is_up) {
328 ++$nodecnt;
329 start_node $node;
330
331 if (node_is_seed $node) {
332 if (node_is_seed $MASTER) {
333 my @SEEDS = up_seeds;
334
335 # switching here with lower chance roughly hopefully still gives us
336 # an equal selection.
337 set_master $node
338 if 1 < rand @SEEDS;
339 } else {
340 # a seed always beats a non-seed
341 set_master $node;
342 }
343 }
344 }
345
346 # select a new(?) master, if required
347 unless ($MASTER and node_is_up $MASTER) {
348 if (my @SEEDS = up_seeds) {
349 set_master $SEEDS[rand @SEEDS];
350 } else {
351 # select "last" non-seed node
352 set_master +(sort +up_nodes)[-1];
353 }
354 }
355
356 unless ($is_up) {
357 --$nodecnt;
358 #d#more_seeding unless $nodecnt;
359 unreg_groups $node;
360
361 # forget about the node
362 delete $addr{$node};
363
364 # ask our master for quick recovery
365 snd $port{$MASTER}, find => $node
366 if $MASTER;
367 }
368 }
369
370 mon_node $_, 1
371 for up_nodes;
372
373 mon_nodes \&mon_node;
374
375 =back
376
377 =head1 SEE ALSO
378
379 L<AnyEvent::MP>.
380
381 =head1 AUTHOR
382
383 Marc Lehmann <schmorp@schmorp.de>
384 http://home.schmorp.de/
385
386 =cut
387
388 1
389