ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.19
Committed: Wed Sep 2 13:05:29 2009 UTC (14 years, 11 months ago) by root
Branch: MAIN
CVS Tags: rel-1_0
Changes since 1.18: +3 -2 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Global - some network-global services
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Global;
8
9 =head1 DESCRIPTION
10
11 This module maintains a fully-meshed network, if possible, and tries to
12 ensure that we are connected to at least one other node.
13
14 It also manages named port groups - ports can register themselves in any
15 number of groups that will be available network-wide, which is great for
16 discovering services.
17
18 Running it on one node will automatically run it on all nodes, although,
19 at the moment, the global service is started by default anyways.
20
21 =head1 GLOBALS AND FUNCTIONS
22
23 =over 4
24
25 =cut
26
27 package AnyEvent::MP::Global;
28
29 use common::sense;
30 use Carp ();
31 use MIME::Base64 ();
32
33 use AnyEvent ();
34 use AnyEvent::Util ();
35
36 use AnyEvent::MP;
37 use AnyEvent::MP::Kernel;
38
39 our $VERSION = $AnyEvent::MP::VERSION;
40
41 our %addr; # port ID => [address...] mapping
42
43 our %port; # our rendezvous port on the other side
44 our %lreg; # local registry, name => [pid...]
45 our %lmon; # local rgeistry monitoring name,pid => mon
46 our %greg; # global regstry, name => [pid...]
47
48 our $nodecnt;
49
50 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
51
52 #############################################################################
53 # seednodes
54
55 our @SEEDS;
56 our %SEED_CONNECT;
57 our $SEED_WATCHER;
58
59 sub seed_connect {
60 my ($seed) = @_;
61
62 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
63 or Carp::croak "$seed: unparsable seed address";
64
65 # ughhh
66 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect $host, $port,
67 seed => $seed,
68 sub {
69 delete $SEED_CONNECT{$seed};
70 after 1, \&more_seeding;
71 },
72 ;
73 }
74
75 sub more_seeding {
76 return if $nodecnt;
77 return unless @SEEDS;
78
79 $AnyEvent::MP::Kernel::WARN->(9, "no nodes connected, seeding.");
80
81 seed_connect $SEEDS[rand @SEEDS];
82 }
83
84 sub avoid_seed($) {
85 @SEEDS = grep $_ ne $_[0], @SEEDS;
86 }
87
88 sub set_seeds(@) {
89 @SEEDS = @_;
90
91 $SEED_WATCHER ||= AE::timer 5, $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, \&more_seeding;
92
93 for my $seed (@SEEDS) {
94 after 0.100 * rand, sub { seed_connect $seed };
95 }
96 }
97
98 #############################################################################
99
100 sub unreg_groups($) {
101 my ($node) = @_;
102
103 my $qr = qr/^\Q$node\E(?:#|$)/;
104
105 for my $group (values %greg) {
106 @$group = grep $_ !~ $qr, @$group;
107 }
108 }
109
110 sub set_groups($$) {
111 my ($node, $lreg) = @_;
112
113 while (my ($k, $v) = each %$lreg) {
114 push @{ $greg{$k} }, @$v;
115 }
116 }
117
118 =item $guard = register $port, $group
119
120 Register the given (local!) port in the named global group C<$group>.
121
122 The port will be unregistered automatically when the port is destroyed.
123
124 When not called in void context, then a guard object will be returned that
125 will also cause the name to be unregistered when destroyed.
126
127 =cut
128
129 # register port from any node
130 sub _register {
131 my ($port, $group) = @_;
132
133 push @{ $greg{$group} }, $port;
134 }
135
136 # unregister port from any node
137 sub _unregister {
138 my ($port, $group) = @_;
139
140 @{ $greg{$group} } = grep $_ ne $port, @{ $greg{$group} };
141 }
142
143 # unregister local port
144 sub unregister {
145 my ($port, $group) = @_;
146
147 delete $lmon{"$group\x00$port"};
148 @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
149
150 _unregister $port, $group;
151
152 snd $_, reg0 => $port, $group
153 for values %port;
154 }
155
156 # register local port
157 sub register($$) {
158 my ($port, $group) = @_;
159
160 port_is_local $port
161 or Carp::croak "AnyEvent::MP::Global::register can only be called for local ports, caught";
162
163 $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
164 push @{ $lreg{$group} }, $port;
165
166 snd $_, reg1 => $port, $group
167 for values %port;
168
169 _register $port, $group;
170
171 wantarray && AnyEvent::Util::guard { unregister $port, $group }
172 }
173
174 =item $ports = find $group
175
176 Returns all the ports currently registered to the given group (as
177 read-only array reference). When the group has no registered members,
178 return C<undef>.
179
180 =cut
181
182 sub find($) {
183 @{ $greg{$_[0]} }
184 ? $greg{$_[0]}
185 : undef
186 }
187
188 sub start_node {
189 my ($node) = @_;
190
191 return if exists $port{$node};
192 return if $node eq $NODE; # do not connect to ourselves
193
194 # establish connection
195 my $port = $port{$node} = spawn $node, "AnyEvent::MP::Global::connect", 0, $NODE;
196
197 mon $port, sub {
198 unreg_groups $node;
199 delete $port{$node};
200 };
201
202 snd $port, addr => $AnyEvent::MP::Kernel::LISTENER;
203 snd $port, nodes => \%addr if %addr;
204 snd $port, set => \%lreg if %lreg;
205 }
206
207 # other nodes connect via this
208 sub connect {
209 my ($version, $node) = @_;
210
211 # monitor them, silently die
212 mon $node, psub { kil $SELF };
213
214 rcv $SELF,
215 addr => sub {
216 my $addresses = shift;
217 $AnyEvent::MP::Kernel::WARN->(9, "$node told us its addresses (@$addresses).");
218 $addr{$node} = $addresses;
219
220 # to help listener-less nodes, we broadcast new addresses to them unconditionally
221 #TODO: should be done by a node finding out about a listener-less one
222 if (@$addresses) {
223 for my $other (values %AnyEvent::MP::Kernel::NODE) {
224 if ($other->{transport}) {
225 if ($addr{$other->{id}} && !@{ $addr{$other->{id}} }) {
226 $AnyEvent::MP::Kernel::WARN->(9, "helping $other->{id} to find $node.");
227 snd $port{$other->{id}}, nodes => { $node => $addresses };
228 }
229 }
230 }
231 }
232 },
233 nodes => sub {
234 my ($kv) = @_;
235
236 use JSON::XS;#d#
237 my $kv_txt = JSON::XS->new->encode ($kv);#d#
238 $AnyEvent::MP::Kernel::WARN->(9, "$node told us it knows about $kv_txt.");#d#
239
240 while (my ($id, $addresses) = each %$kv) {
241 my $node = AnyEvent::MP::Kernel::add_node $id;
242 $node->connect (@$addresses);
243 start_node $id;
244 }
245 },
246 find => sub {
247 my ($othernode) = @_;
248
249 $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to find $othernode.");
250 snd $port{$node}, nodes => { $othernode => $addr{$othernode} }
251 if $addr{$othernode};
252 },
253 set => sub {
254 set_groups $node, shift;
255 },
256 reg1 => \&_register,
257 reg0 => \&_unregister,
258 ;
259 }
260
261 sub mon_node {
262 my ($node, $is_up) = @_;
263
264 if ($is_up) {
265 ++$nodecnt;
266 start_node $node;
267 } else {
268 --$nodecnt;
269 more_seeding unless $nodecnt;
270 unreg_groups $node;
271
272 # forget about the node
273 delete $addr{$node};
274 # ask other nodes if they know the node
275 snd $_, find => $node
276 for values %port;
277 }
278 #warn "node<$node,$is_up>\n";#d#
279 }
280
281 mon_node $_, 1
282 for up_nodes;
283
284 mon_nodes \&mon_node;
285
286 =back
287
288 =head1 SEE ALSO
289
290 L<AnyEvent::MP>.
291
292 =head1 AUTHOR
293
294 Marc Lehmann <schmorp@schmorp.de>
295 http://home.schmorp.de/
296
297 =cut
298
299 1
300