ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.20
Committed: Sat Sep 5 21:16:59 2009 UTC (14 years, 10 months ago) by root
Branch: MAIN
Changes since 1.19: +87 -35 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Global - some network-global services
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Global;
8
9 =head1 DESCRIPTION
10
11 This module maintains a fully-meshed network, if possible, and tries to
12 ensure that we are connected to at least one other node.
13
14 It also manages named port groups - ports can register themselves in any
15 number of groups that will be available network-wide, which is great for
16 discovering services.
17
18 Running it on one node will automatically run it on all nodes, although,
19 at the moment, the global service is started by default anyways.
20
21 =head1 GLOBALS AND FUNCTIONS
22
23 =over 4
24
25 =cut
26
27 package AnyEvent::MP::Global;
28
29 use common::sense;
30 use Carp ();
31 use MIME::Base64 ();
32
33 use AnyEvent ();
34 use AnyEvent::Util ();
35
36 use AnyEvent::MP;
37 use AnyEvent::MP::Kernel;
38
39 use base "Exporter";
40
41 our @EXPORT = qw(
42 grp_reg
43 grp_get
44 grp_mon
45 );
46
47 our $VERSION = $AnyEvent::MP::VERSION;
48
49 our %addr; # port ID => [address...] mapping
50
51 our %port; # our rendezvous port on the other side
52 our %lreg; # local registry, name => [pid...]
53 our %lmon; # local registry monitoring name,pid => mon
54 our %greg; # global regstry, name => pid => undef
55 our %gmon; # group monitorign, group => [$cb...]
56
57 our $nodecnt;
58
59 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
60
61 #############################################################################
62 # seednodes
63
64 our @SEEDS;
65 our %SEED_CONNECT;
66 our $SEED_WATCHER;
67
68 sub seed_connect {
69 my ($seed) = @_;
70
71 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
72 or Carp::croak "$seed: unparsable seed address";
73
74 # ughhh
75 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect $host, $port,
76 seed => $seed,
77 sub {
78 delete $SEED_CONNECT{$seed};
79 after 1, \&more_seeding;
80 },
81 ;
82 }
83
84 sub more_seeding {
85 return if $nodecnt;
86 return unless @SEEDS;
87
88 $AnyEvent::MP::Kernel::WARN->(9, "no nodes connected, seeding.");
89
90 seed_connect $SEEDS[rand @SEEDS];
91 }
92
93 sub avoid_seed($) {
94 @SEEDS = grep $_ ne $_[0], @SEEDS;
95 }
96
97 sub set_seeds(@) {
98 @SEEDS = @_;
99
100 $SEED_WATCHER ||= AE::timer 5, $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, \&more_seeding;
101
102 for my $seed (@SEEDS) {
103 after 0.100 * rand, sub { seed_connect $seed };
104 }
105 }
106
107 #############################################################################
108
109 sub _change {
110 my ($group, $add, $del) = @_;
111
112 my $kv = $greg{$group} ||= {};
113
114 delete @$kv{@$del};
115 @$kv{@$add} = ();
116
117 my $ports = [keys %$kv];
118 $_->($ports, $add, $del) for @{ $gmon{$group} };
119 }
120
121 sub unreg_groups($) {
122 my ($node) = @_;
123
124 my $qr = qr/^\Q$node\E(?:#|$)/;
125 my @del;
126
127 while (my ($group, $ports) = each %greg) {
128 @del = grep /$qr/, keys %$ports;
129 _change $group, [], \@del
130 if @del;
131 }
132 }
133
134 sub set_groups($$) {
135 my ($node, $lreg) = @_;
136
137 while (my ($k, $v) = each %$lreg) {
138 _change $k, $v, [];
139 }
140 }
141
142 =item $guard = grp_reg $group, $port
143
144 Register the given (local!) port in the named global group C<$group>.
145
146 The port will be unregistered automatically when the port is destroyed.
147
148 When not called in void context, then a guard object will be returned that
149 will also cause the name to be unregistered when destroyed.
150
151 =cut
152
153 # unregister local port
154 sub unregister {
155 my ($port, $group) = @_;
156
157 delete $lmon{"$group\x00$port"};
158 @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
159
160 _change $group, [], [$port];
161
162 snd $_, reg0 => $group, $port
163 for values %port;
164 }
165
166 # register local port
167 sub grp_reg($$) {
168 my ($group, $port) = @_;
169
170 port_is_local $port
171 or Carp::croak "AnyEvent::MP::Global::grp_reg can only be called for local ports, caught";
172
173 grep $_ eq $port, @{ $lreg{$group} }
174 and Carp::croak "'$group': group already registered, cannot register a second time";
175
176 $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
177 push @{ $lreg{$group} }, $port;
178
179 snd $_, reg1 => $group, $port
180 for values %port;
181
182 _change $group, [$port], [];
183
184 wantarray && AnyEvent::Util::guard { unregister $port, $group }
185 }
186
187 =item $ports = grp_get $group
188
189 Returns all the ports currently registered to the given group (as
190 read-only(!) array reference). When the group has no registered members,
191 return C<undef>.
192
193 =cut
194
195 sub grp_get($) {
196 my @ports = keys %{ $greg{$_[0]} };
197 @ports ? \@ports : undef
198 }
199
200 =item $guard = grp_mon $group, $callback->($ports, $add, $del)
201
202 Installs a monitor on the given group. Each time there is a change it
203 will be called with the current group members as an arrayref as the
204 first argument. The second argument only contains ports added, the third
205 argument only ports removed.
206
207 Unlike C<grp_get>, all three arguments will always be array-refs, even if
208 the array is empty. None of the arrays must be modified in any way.
209
210 The first invocation will be with the first two arguments set to the
211 current members, as if all of them were just added, but only when the
212 group is atcually non-empty.
213
214 Optionally returns a guard object that uninstalls the watcher when it is
215 destroyed.
216
217 =cut
218
219 sub grp_mon($$) {
220 my ($grp, $cb) = @_;
221
222 AnyEvent::MP::Kernel::delay sub {
223 return unless $cb;
224
225 push @{ $gmon{$grp} }, $cb;
226 $cb->(((grp_get $grp) || return) x 2, []);
227 };
228
229 defined wantarray && AnyEvent::Util::::guard {
230 my @mon = grep $_ != $cb, @{ delete $gmon{$grp} };
231 $gmon{$grp} = \@mon if @mon;
232 undef $cb;
233 }
234 }
235
236 sub start_node {
237 my ($node) = @_;
238
239 return if exists $port{$node};
240 return if $node eq $NODE; # do not connect to ourselves
241
242 # establish connection
243 my $port = $port{$node} = spawn $node, "AnyEvent::MP::Global::connect", 0, $NODE;
244
245 mon $port, sub {
246 unreg_groups $node;
247 delete $port{$node};
248 };
249
250 snd $port, addr => $AnyEvent::MP::Kernel::LISTENER;
251 snd $port, nodes => \%addr if %addr;
252 snd $port, set => \%lreg if %lreg;
253 }
254
255 # other nodes connect via this
256 sub connect {
257 my ($version, $node) = @_;
258
259 # monitor them, silently die
260 mon $node, psub { kil $SELF };
261
262 rcv $SELF,
263 addr => sub {
264 my $addresses = shift;
265 $AnyEvent::MP::Kernel::WARN->(9, "$node told us its addresses (@$addresses).");
266 $addr{$node} = $addresses;
267
268 # to help listener-less nodes, we broadcast new addresses to them unconditionally
269 #TODO: should be done by a node finding out about a listener-less one
270 if (@$addresses) {
271 for my $other (values %AnyEvent::MP::Kernel::NODE) {
272 if ($other->{transport}) {
273 if ($addr{$other->{id}} && !@{ $addr{$other->{id}} }) {
274 $AnyEvent::MP::Kernel::WARN->(9, "helping $other->{id} to find $node.");
275 snd $port{$other->{id}}, nodes => { $node => $addresses };
276 }
277 }
278 }
279 }
280 },
281 nodes => sub {
282 my ($kv) = @_;
283
284 use JSON::XS;#d#
285 my $kv_txt = JSON::XS->new->encode ($kv);#d#
286 $AnyEvent::MP::Kernel::WARN->(9, "$node told us it knows about $kv_txt.");#d#
287
288 while (my ($id, $addresses) = each %$kv) {
289 my $node = AnyEvent::MP::Kernel::add_node $id;
290 $node->connect (@$addresses);
291 start_node $id;
292 }
293 },
294 find => sub {
295 my ($othernode) = @_;
296
297 $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to find $othernode.");
298 snd $port{$node}, nodes => { $othernode => $addr{$othernode} }
299 if $addr{$othernode};
300 },
301 set => sub {
302 set_groups $node, shift;
303 },
304 reg0 => sub {
305 _change $_[0], [], [$_[1]];
306 },
307 reg1 => sub {
308 _change $_[0], [$_[1]], [];
309 },
310 ;
311 }
312
313 sub mon_node {
314 my ($node, $is_up) = @_;
315
316 if ($is_up) {
317 ++$nodecnt;
318 start_node $node;
319 } else {
320 --$nodecnt;
321 more_seeding unless $nodecnt;
322 unreg_groups $node;
323
324 # forget about the node
325 delete $addr{$node};
326 # ask other nodes if they know the node
327 snd $_, find => $node
328 for values %port;
329 }
330 #warn "node<$node,$is_up>\n";#d#
331 }
332
333 mon_node $_, 1
334 for up_nodes;
335
336 mon_nodes \&mon_node;
337
338 =back
339
340 =head1 SEE ALSO
341
342 L<AnyEvent::MP>.
343
344 =head1 AUTHOR
345
346 Marc Lehmann <schmorp@schmorp.de>
347 http://home.schmorp.de/
348
349 =cut
350
351 1
352