ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.43
Committed: Tue Feb 28 18:37:24 2012 UTC (12 years, 3 months ago) by root
Branch: MAIN
Changes since 1.42: +3 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Global - some network-global services
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Global;
8
9 =head1 DESCRIPTION
10
11 This module maintains a fully-meshed network between global nodes and
12 tries to have connections with all nodes in the network.
13
14 It also manages named port groups - ports can register themselves in any
15 number of groups that will be available network-wide, which is great for
16 discovering services.
17
18 Running it on one node will automatically run it on all nodes, although,
19 at the moment, the global service is started by default anyways.
20
21 =head1 GLOBALS AND FUNCTIONS
22
23 =over 4
24
25 =cut
26
27 package AnyEvent::MP::Global;
28
29 use common::sense;
30 use Carp ();
31
32 use AnyEvent ();
33 use AnyEvent::Util ();
34
35 use AnyEvent::MP;
36 use AnyEvent::MP::Kernel;
37 use AnyEvent::MP::Transport ();
38
39 use base "Exporter";
40
41 our @EXPORT = qw(
42 grp_reg
43 grp_get
44 grp_mon
45 );
46
47 our $GLOBAL_VERSION = 0;
48
49 our %ON_SETUP; # take note: not public
50
51 our %addr; # port ID => [address...] mapping
52
53 our %port; # our rendezvous port on the other side
54 our %lreg; # local registry, name => [pid...]
55 our %lmon; # local registry monitoring name,pid => mon
56 our %greg; # global regstry, name => pid => undef
57 our %gmon; # group monitoring, group => [$cb...]
58
59 our $nodecnt;
60
61 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
62
63 #############################################################################
64 # seednodes
65
66 our $MASTER; # our current master (which we regularly query for net updates)
67 our %SEEDME;
68
69 sub up_seeds() {
70 grep node_is_up $_, keys %AnyEvent::MP::Kernel::NODE_SEED
71 }
72
73 sub node_is_seed($) {
74 grep $_ eq $_[0], grep defined, keys %AnyEvent::MP::Kernel::NODE_SEED
75 }
76
77 # returns all (up) seed nodes, or all nodes if no seednodes are up/known
78 sub route_nodes {
79 my @seeds = up_seeds;
80 @seeds = up_nodes unless @seeds;
81 @seeds
82 }
83
84 #############################################################################
85
86 sub _change {
87 my ($group, $add, $del) = @_;
88
89 my $kv = $greg{$group} ||= {};
90
91 delete @$kv{@$del};
92 @$kv{@$add} = ();
93
94 my $ports = [keys %$kv];
95 $_->($ports, $add, $del) for @{ $gmon{$group} };
96 }
97
98 sub unreg_groups($) {
99 my ($node) = @_;
100
101 my $qr = qr/^\Q$node\E(?:#|$)/;
102 my @del;
103
104 while (my ($group, $ports) = each %greg) {
105 @del = grep /$qr/, keys %$ports;
106 _change $group, [], \@del
107 if @del;
108 }
109 }
110
111 sub set_groups($$) {
112 my ($node, $lreg) = @_;
113
114 while (my ($k, $v) = each %$lreg) {
115 _change $k, $v, [];
116 }
117 }
118
119 =item $guard = grp_reg $group, $port
120
121 Register the given (local!) port in the named global group C<$group>.
122
123 The port will be unregistered automatically when the port is destroyed.
124
125 When not called in void context, then a guard object will be returned that
126 will also cause the name to be unregistered when destroyed.
127
128 =cut
129
130 # unregister local port
131 sub unregister {
132 my ($port, $group) = @_;
133
134 delete $lmon{"$group\x00$port"};
135 @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
136
137 _change $group, [], [$port];
138
139 snd $_, reg0 => $group, $port
140 for values %port;
141 }
142
143 # register local port
144 sub grp_reg($$) {
145 my ($group, $port) = @_;
146
147 port_is_local $port
148 or Carp::croak "AnyEvent::MP::Global::grp_reg can only be called for local ports, caught";
149
150 grep $_ eq $port, @{ $lreg{$group} }
151 and Carp::croak "'$group': group already registered, cannot register a second time";
152
153 $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
154 push @{ $lreg{$group} }, $port;
155
156 snd $_, reg1 => $group, $port
157 for values %port;
158
159 _change $group, [$port], [];
160
161 defined wantarray && AnyEvent::Util::guard { unregister $port, $group }
162 }
163
164 =item $ports = grp_get $group
165
166 Returns all the ports currently registered to the given group (as
167 read-only(!) array reference). When the group has no registered members,
168 return C<undef>.
169
170 =cut
171
172 sub grp_get($) {
173 my @ports = keys %{ $greg{$_[0]} };
174 @ports ? \@ports : undef
175 }
176
177 =item $guard = grp_mon $group, $callback->($ports, $add, $del)
178
179 Installs a monitor on the given group. Each time there is a change it
180 will be called with the current group members as an arrayref as the
181 first argument. The second argument only contains ports added, the third
182 argument only ports removed.
183
184 Unlike C<grp_get>, all three arguments will always be array-refs, even if
185 the array is empty. None of the arrays must be modified in any way.
186
187 The first invocation will be with the first two arguments set to the
188 current members, as if all of them were just added, but only when the
189 group is actually non-empty.
190
191 Optionally returns a guard object that uninstalls the watcher when it is
192 destroyed.
193
194 =cut
195
196 sub grp_mon($$) {
197 my ($grp, $cb) = @_;
198
199 AnyEvent::MP::Kernel::delay sub {
200 return unless $cb;
201
202 push @{ $gmon{$grp} }, $cb;
203 $cb->(((grp_get $grp) || return) x 2, []);
204 };
205
206 defined wantarray && AnyEvent::Util::guard {
207 my @mon = grep $_ != $cb, @{ delete $gmon{$grp} };
208 $gmon{$grp} = \@mon if @mon;
209 undef $cb;
210 }
211 }
212
213 sub start_node {
214 my ($node) = @_;
215
216 return if exists $port{$node};
217 return if $node eq $NODE; # do not connect to ourselves
218
219 # establish connection
220 my $port = $port{$node} = spawn $node, "AnyEvent::MP::Global::connect", $GLOBAL_VERSION, $NODE;
221
222 mon $port, sub {
223 unreg_groups $node;
224 delete $port{$node};
225 };
226
227 snd $port, addr => $AnyEvent::MP::Kernel::LISTENER;
228 snd $port, nodes => \%addr if %addr;
229 snd $port, set => \%lreg if %lreg;
230 snd $port, "setup"; # tell the other side that we are in business now
231 }
232
233 # other nodes connect via this
234 sub connect {
235 my ($version, $node) = @_;
236
237 (int $version) == (int $GLOBAL_VERSION)
238 or die "node version mismatch (requested $version; we have $GLOBAL_VERSION)";
239
240 # monitor them, silently die
241 mon $node, psub {
242 delete $SEEDME{$node};
243 kil $SELF;
244 };
245
246 rcv $SELF,
247 setup => sub {
248 $_->($node) for values %ON_SETUP;
249 },
250 addr => sub {
251 my $addresses = shift;
252 $AnyEvent::MP::Kernel::WARN->(9, "$node told us its addresses (@$addresses).");
253 $addr{$node} = $addresses;
254
255 # delay broadcast by a random amount, to avoid nodes connecting to each other
256 # at the same time.
257 after 2 + rand, sub {
258 for my $slave (keys %SEEDME) {
259 snd $port{$slave} || next, nodes => { $node => $addresses };
260 }
261 };
262 },
263 nodes => sub {
264 my ($kv) = @_;
265
266 use JSON::XS;#d#
267 my $kv_txt = JSON::XS->new->encode ($kv);#d#
268 $AnyEvent::MP::Kernel::WARN->(9, "$node told us it knows about $kv_txt.");#d#
269
270 while (my ($id, $addresses) = each %$kv) {
271 my $node = AnyEvent::MP::Kernel::add_node $id;
272 #$node->connect (@$addresses);
273 #d#
274 die;
275 start_node $id;
276 }
277 },
278 set => sub {
279 set_groups $node, shift;
280 },
281 find => sub {
282 my ($othernode) = @_;
283
284 $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to find $othernode.");
285 snd $port{$node}, nodes => { $othernode => $addr{$othernode} }
286 if $addr{$othernode};
287 },
288 reg0 => sub {
289 _change $_[0], [], [$_[1]];
290 },
291 reg1 => sub {
292 _change $_[0], [$_[1]], [];
293 },
294
295 # some node asks us to provide network updates
296 seedme0 => sub {
297 $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to NOT seed it.");#d#
298 delete $SEEDME{$node};
299 },
300 seedme1 => sub {
301 $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to seed it.");#d#
302 $SEEDME{$node} = ();
303
304 # for good measure
305 snd $port{$node}, nodes => \%addr if %addr;
306 },
307 ;
308 }
309
310 sub set_master($) {
311 return if $MASTER eq $_[0];
312
313 snd $port{$MASTER}, "seedme0"
314 if $MASTER && node_is_up $MASTER;
315
316 $MASTER = $_[0];
317
318 if ($MASTER) {
319 snd $port{$MASTER}, "seedme1";
320 $AnyEvent::MP::Kernel::WARN->(7, "selected new master: $MASTER.");
321 } else {
322 $AnyEvent::MP::Kernel::WARN->(1, "no contact to any other node, cannot seed.");
323 }
324 }
325
326 sub mon_node {
327 my ($node, $is_up) = @_;
328
329 if ($is_up) {
330 ++$nodecnt;
331 start_node $node;
332
333 if (node_is_seed $node) {
334 if (node_is_seed $MASTER) {
335 my @SEEDS = up_seeds;
336
337 # switching here with lower chance roughly hopefully still gives us
338 # an equal selection.
339 set_master $node
340 if 1 < rand @SEEDS;
341 } else {
342 # a seed always beats a non-seed
343 set_master $node;
344 }
345 }
346 }
347
348 # select a new(?) master, if required
349 unless ($MASTER and node_is_up $MASTER) {
350 if (my @SEEDS = up_seeds) {
351 set_master $SEEDS[rand @SEEDS];
352 } else {
353 # select "last" non-seed node
354 set_master +(sort +up_nodes)[-1];
355 }
356 }
357
358 unless ($is_up) {
359 --$nodecnt;
360 #d#more_seeding unless $nodecnt;
361 unreg_groups $node;
362
363 # forget about the node
364 delete $addr{$node};
365
366 # ask our master for quick recovery
367 snd $port{$MASTER}, find => $node
368 if $MASTER;
369 }
370 }
371
372 mon_node $_, 1
373 for up_nodes;
374
375 mon_nodes \&mon_node;
376
377 =back
378
379 =head1 SEE ALSO
380
381 L<AnyEvent::MP>.
382
383 =head1 AUTHOR
384
385 Marc Lehmann <schmorp@schmorp.de>
386 http://home.schmorp.de/
387
388 =cut
389
390 1
391