ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.21
Committed: Sun Sep 6 00:13:21 2009 UTC (14 years, 10 months ago) by root
Branch: MAIN
Changes since 1.20: +0 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Global - some network-global services
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Global;
8
9 =head1 DESCRIPTION
10
11 This module maintains a fully-meshed network, if possible, and tries to
12 ensure that we are connected to at least one other node.
13
14 It also manages named port groups - ports can register themselves in any
15 number of groups that will be available network-wide, which is great for
16 discovering services.
17
18 Running it on one node will automatically run it on all nodes, although,
19 at the moment, the global service is started by default anyways.
20
21 =head1 GLOBALS AND FUNCTIONS
22
23 =over 4
24
25 =cut
26
27 package AnyEvent::MP::Global;
28
29 use common::sense;
30 use Carp ();
31
32 use AnyEvent ();
33 use AnyEvent::Util ();
34
35 use AnyEvent::MP;
36 use AnyEvent::MP::Kernel;
37
38 use base "Exporter";
39
40 our @EXPORT = qw(
41 grp_reg
42 grp_get
43 grp_mon
44 );
45
46 our $VERSION = $AnyEvent::MP::VERSION;
47
48 our %addr; # port ID => [address...] mapping
49
50 our %port; # our rendezvous port on the other side
51 our %lreg; # local registry, name => [pid...]
52 our %lmon; # local registry monitoring name,pid => mon
53 our %greg; # global regstry, name => pid => undef
54 our %gmon; # group monitorign, group => [$cb...]
55
56 our $nodecnt;
57
58 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
59
60 #############################################################################
61 # seednodes
62
63 our @SEEDS;
64 our %SEED_CONNECT;
65 our $SEED_WATCHER;
66
67 sub seed_connect {
68 my ($seed) = @_;
69
70 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
71 or Carp::croak "$seed: unparsable seed address";
72
73 # ughhh
74 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect $host, $port,
75 seed => $seed,
76 sub {
77 delete $SEED_CONNECT{$seed};
78 after 1, \&more_seeding;
79 },
80 ;
81 }
82
83 sub more_seeding {
84 return if $nodecnt;
85 return unless @SEEDS;
86
87 $AnyEvent::MP::Kernel::WARN->(9, "no nodes connected, seeding.");
88
89 seed_connect $SEEDS[rand @SEEDS];
90 }
91
92 sub avoid_seed($) {
93 @SEEDS = grep $_ ne $_[0], @SEEDS;
94 }
95
96 sub set_seeds(@) {
97 @SEEDS = @_;
98
99 $SEED_WATCHER ||= AE::timer 5, $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, \&more_seeding;
100
101 for my $seed (@SEEDS) {
102 after 0.100 * rand, sub { seed_connect $seed };
103 }
104 }
105
106 #############################################################################
107
108 sub _change {
109 my ($group, $add, $del) = @_;
110
111 my $kv = $greg{$group} ||= {};
112
113 delete @$kv{@$del};
114 @$kv{@$add} = ();
115
116 my $ports = [keys %$kv];
117 $_->($ports, $add, $del) for @{ $gmon{$group} };
118 }
119
120 sub unreg_groups($) {
121 my ($node) = @_;
122
123 my $qr = qr/^\Q$node\E(?:#|$)/;
124 my @del;
125
126 while (my ($group, $ports) = each %greg) {
127 @del = grep /$qr/, keys %$ports;
128 _change $group, [], \@del
129 if @del;
130 }
131 }
132
133 sub set_groups($$) {
134 my ($node, $lreg) = @_;
135
136 while (my ($k, $v) = each %$lreg) {
137 _change $k, $v, [];
138 }
139 }
140
141 =item $guard = grp_reg $group, $port
142
143 Register the given (local!) port in the named global group C<$group>.
144
145 The port will be unregistered automatically when the port is destroyed.
146
147 When not called in void context, then a guard object will be returned that
148 will also cause the name to be unregistered when destroyed.
149
150 =cut
151
152 # unregister local port
153 sub unregister {
154 my ($port, $group) = @_;
155
156 delete $lmon{"$group\x00$port"};
157 @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
158
159 _change $group, [], [$port];
160
161 snd $_, reg0 => $group, $port
162 for values %port;
163 }
164
165 # register local port
166 sub grp_reg($$) {
167 my ($group, $port) = @_;
168
169 port_is_local $port
170 or Carp::croak "AnyEvent::MP::Global::grp_reg can only be called for local ports, caught";
171
172 grep $_ eq $port, @{ $lreg{$group} }
173 and Carp::croak "'$group': group already registered, cannot register a second time";
174
175 $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
176 push @{ $lreg{$group} }, $port;
177
178 snd $_, reg1 => $group, $port
179 for values %port;
180
181 _change $group, [$port], [];
182
183 wantarray && AnyEvent::Util::guard { unregister $port, $group }
184 }
185
186 =item $ports = grp_get $group
187
188 Returns all the ports currently registered to the given group (as
189 read-only(!) array reference). When the group has no registered members,
190 return C<undef>.
191
192 =cut
193
194 sub grp_get($) {
195 my @ports = keys %{ $greg{$_[0]} };
196 @ports ? \@ports : undef
197 }
198
199 =item $guard = grp_mon $group, $callback->($ports, $add, $del)
200
201 Installs a monitor on the given group. Each time there is a change it
202 will be called with the current group members as an arrayref as the
203 first argument. The second argument only contains ports added, the third
204 argument only ports removed.
205
206 Unlike C<grp_get>, all three arguments will always be array-refs, even if
207 the array is empty. None of the arrays must be modified in any way.
208
209 The first invocation will be with the first two arguments set to the
210 current members, as if all of them were just added, but only when the
211 group is atcually non-empty.
212
213 Optionally returns a guard object that uninstalls the watcher when it is
214 destroyed.
215
216 =cut
217
218 sub grp_mon($$) {
219 my ($grp, $cb) = @_;
220
221 AnyEvent::MP::Kernel::delay sub {
222 return unless $cb;
223
224 push @{ $gmon{$grp} }, $cb;
225 $cb->(((grp_get $grp) || return) x 2, []);
226 };
227
228 defined wantarray && AnyEvent::Util::::guard {
229 my @mon = grep $_ != $cb, @{ delete $gmon{$grp} };
230 $gmon{$grp} = \@mon if @mon;
231 undef $cb;
232 }
233 }
234
235 sub start_node {
236 my ($node) = @_;
237
238 return if exists $port{$node};
239 return if $node eq $NODE; # do not connect to ourselves
240
241 # establish connection
242 my $port = $port{$node} = spawn $node, "AnyEvent::MP::Global::connect", 0, $NODE;
243
244 mon $port, sub {
245 unreg_groups $node;
246 delete $port{$node};
247 };
248
249 snd $port, addr => $AnyEvent::MP::Kernel::LISTENER;
250 snd $port, nodes => \%addr if %addr;
251 snd $port, set => \%lreg if %lreg;
252 }
253
254 # other nodes connect via this
255 sub connect {
256 my ($version, $node) = @_;
257
258 # monitor them, silently die
259 mon $node, psub { kil $SELF };
260
261 rcv $SELF,
262 addr => sub {
263 my $addresses = shift;
264 $AnyEvent::MP::Kernel::WARN->(9, "$node told us its addresses (@$addresses).");
265 $addr{$node} = $addresses;
266
267 # to help listener-less nodes, we broadcast new addresses to them unconditionally
268 #TODO: should be done by a node finding out about a listener-less one
269 if (@$addresses) {
270 for my $other (values %AnyEvent::MP::Kernel::NODE) {
271 if ($other->{transport}) {
272 if ($addr{$other->{id}} && !@{ $addr{$other->{id}} }) {
273 $AnyEvent::MP::Kernel::WARN->(9, "helping $other->{id} to find $node.");
274 snd $port{$other->{id}}, nodes => { $node => $addresses };
275 }
276 }
277 }
278 }
279 },
280 nodes => sub {
281 my ($kv) = @_;
282
283 use JSON::XS;#d#
284 my $kv_txt = JSON::XS->new->encode ($kv);#d#
285 $AnyEvent::MP::Kernel::WARN->(9, "$node told us it knows about $kv_txt.");#d#
286
287 while (my ($id, $addresses) = each %$kv) {
288 my $node = AnyEvent::MP::Kernel::add_node $id;
289 $node->connect (@$addresses);
290 start_node $id;
291 }
292 },
293 find => sub {
294 my ($othernode) = @_;
295
296 $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to find $othernode.");
297 snd $port{$node}, nodes => { $othernode => $addr{$othernode} }
298 if $addr{$othernode};
299 },
300 set => sub {
301 set_groups $node, shift;
302 },
303 reg0 => sub {
304 _change $_[0], [], [$_[1]];
305 },
306 reg1 => sub {
307 _change $_[0], [$_[1]], [];
308 },
309 ;
310 }
311
312 sub mon_node {
313 my ($node, $is_up) = @_;
314
315 if ($is_up) {
316 ++$nodecnt;
317 start_node $node;
318 } else {
319 --$nodecnt;
320 more_seeding unless $nodecnt;
321 unreg_groups $node;
322
323 # forget about the node
324 delete $addr{$node};
325 # ask other nodes if they know the node
326 snd $_, find => $node
327 for values %port;
328 }
329 #warn "node<$node,$is_up>\n";#d#
330 }
331
332 mon_node $_, 1
333 for up_nodes;
334
335 mon_nodes \&mon_node;
336
337 =back
338
339 =head1 SEE ALSO
340
341 L<AnyEvent::MP>.
342
343 =head1 AUTHOR
344
345 Marc Lehmann <schmorp@schmorp.de>
346 http://home.schmorp.de/
347
348 =cut
349
350 1
351