ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.16
Committed: Sun Aug 30 13:22:46 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.15: +6 -6 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Global - some network-global services
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Global;
8
9 =head1 DESCRIPTION
10
11 This module maintains a fully-meshed network, if possible, and tries to
12 ensure that we are connected to at least one other node.
13
14 It also manages named port groups - ports can register themselves in any
15 number of groups that will be available network-wide, which is great for
16 discovering services.
17
18 Running it on one node will automatically run it on all nodes, although,
19 at the moment, the global service is started by default anyways.
20
21 =head1 GLOBALS AND FUNCTIONS
22
23 =over 4
24
25 =cut
26
27 package AnyEvent::MP::Global;
28
29 use common::sense;
30 use Carp ();
31 use MIME::Base64 ();
32
33 use AnyEvent ();
34 use AnyEvent::Util ();
35
36 use AnyEvent::MP;
37 use AnyEvent::MP::Kernel;
38
39 our $VERSION = $AnyEvent::MP::VERSION;
40
41 our %addr; # port ID => [address...] mapping
42
43 our %port; # our rendezvous port on the other side
44 our %lreg; # local registry, name => [pid...]
45 our %lmon; # local rgeistry monitoring name,pid => mon
46 our %greg; # global regstry, name => [pid...]
47
48 our $nodecnt;
49
50 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
51
52 #############################################################################
53 # seednodes
54
55 our @SEEDS;
56 our %SEED_CONNECT;
57 our $SEED_WATCHER;
58
59 sub seed_connect {
60 my ($seed) = @_;
61
62 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
63 or Carp::croak "$seed: unparsable seed address";
64
65 # ughhh
66 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect $host, $port,
67 seed => $seed,
68 sub {
69 delete $SEED_CONNECT{$seed};
70 after 1, \&more_seeding;
71 },
72 ;
73 }
74
75 sub more_seeding {
76 return if $nodecnt;
77 return unless @SEEDS;
78
79 $AnyEvent::MP::Kernel::WARN->(9, "no nodes connected, seeding.");
80
81 seed_connect $SEEDS[rand @SEEDS];
82 }
83
84 sub avoid_seed($) {
85 @SEEDS = grep $_ ne $_[0], @SEEDS;
86 }
87
88 sub set_seeds(@) {
89 @SEEDS = @_;
90
91 $SEED_WATCHER ||= AE::timer 5, $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, \&more_seeding;
92
93 seed_connect $_
94 for @SEEDS;
95 }
96
97 #############################################################################
98
99 sub unreg_groups($) {
100 my ($node) = @_;
101
102 my $qr = qr/^\Q$node\E(?:#|$)/;
103
104 for my $group (values %greg) {
105 @$group = grep $_ !~ $qr, @$group;
106 }
107 }
108
109 sub set_groups($$) {
110 my ($node, $lreg) = @_;
111
112 while (my ($k, $v) = each %$lreg) {
113 push @{ $greg{$k} }, @$v;
114 }
115 }
116
117 =item $guard = register $port, $group
118
119 Register the given (local!) port in the named global group C<$group>.
120
121 The port will be unregistered automatically when the port is destroyed.
122
123 When not called in void context, then a guard object will be returned that
124 will also cause the name to be unregistered when destroyed.
125
126 =cut
127
128 # register port from any node
129 sub _register {
130 my ($port, $group) = @_;
131
132 push @{ $greg{$group} }, $port;
133 }
134
135 # unregister port from any node
136 sub _unregister {
137 my ($port, $group) = @_;
138
139 @{ $greg{$group} } = grep $_ ne $port, @{ $greg{$group} };
140 }
141
142 # unregister local port
143 sub unregister {
144 my ($port, $group) = @_;
145
146 delete $lmon{"$group\x00$port"};
147 @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
148
149 _unregister $port, $group;
150
151 snd $_, reg0 => $port, $group
152 for values %port;
153 }
154
155 # register local port
156 sub register($$) {
157 my ($port, $group) = @_;
158
159 port_is_local $port
160 or Carp::croak "AnyEvent::MP::Global::register can only be called for local ports, caught";
161
162 $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
163 push @{ $lreg{$group} }, $port;
164
165 snd $_, reg1 => $port, $group
166 for values %port;
167
168 _register $port, $group;
169
170 wantarray && AnyEvent::Util::guard { unregister $port, $group }
171 }
172
173 =item $ports = find $group
174
175 Returns all the ports currently registered to the given group (as
176 read-only array reference). When the group has no registered members,
177 return C<undef>.
178
179 =cut
180
181 sub find($) {
182 @{ $greg{$_[0]} }
183 ? $greg{$_[0]}
184 : undef
185 }
186
187 sub start_node {
188 my ($node) = @_;
189
190 return if exists $port{$node};
191 return if $node eq $NODE; # do not connect to ourselves
192
193 # establish connection
194 my $port = $port{$node} = spawn $node, "AnyEvent::MP::Global::connect", 0, $NODE;
195
196 mon $port, sub {
197 unreg_groups $node;
198 delete $port{$node};
199 };
200
201 snd $port, addr => $AnyEvent::MP::Kernel::LISTENER;
202 snd $port, connect => \%addr if %addr;
203 snd $port, set => \%lreg if %lreg;
204 }
205
206 # other nodes connect via this
207 sub connect {
208 my ($version, $node) = @_;
209
210 # monitor them, silently die
211 mon $node, psub { kil $SELF };
212
213 rcv $SELF,
214 addr => sub {
215 my $addresses = shift;
216 $AnyEvent::MP::Kernel::WARN->(9, "$node told us its addresses (@$addresses).");
217 $addr{$node} = $addresses;
218
219 # to help listener-less nodes, we broadcast new addresses to them unconditionally
220 #TODO: should be done by a node finding out about a listener-less one
221 if (@$addresses) {
222 for my $other (values %AnyEvent::MP::NODE) {
223 if ($other->{transport}) {
224 if ($addr{$other->{id}} && !@{ $addr{$other->{id}} }) {
225 $AnyEvent::MP::Kernel::WARN->(9, "helping $other->{id} to find $node.");
226 snd $port{$other->{id}}, connect => { $node => $addresses };
227 }
228 }
229 }
230 }
231 },
232 connect => sub {
233 my ($kv) = @_;
234
235 use JSON::XS;#d#
236 my $kv_txt = JSON::XS->new->encode ($kv);#d#
237 $AnyEvent::MP::Kernel::WARN->(9, "$node told us it knows about $kv_txt.");#d#
238
239 while (my ($id, $addresses) = each %$kv) {
240 my $node = AnyEvent::MP::Kernel::add_node $id;
241 $node->connect (@$addresses);
242 start_node $id;
243 }
244 },
245 find => sub {
246 my ($othernode) = @_;
247
248 $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to find $othernode.");
249 snd $port{$node}, connect => { $othernode => $addr{$othernode} }
250 if $addr{$othernode};
251 },
252 set => sub {
253 set_groups $node, shift;
254 },
255 reg1 => \&_register,
256 reg0 => \&_unregister,
257 ;
258 }
259
260 sub mon_node {
261 my ($node, $is_up) = @_;
262
263 if ($is_up) {
264 ++$nodecnt;
265 start_node $node;
266 } else {
267 --$nodecnt;
268 more_seeding unless $nodecnt;
269 unreg_groups $node;
270
271 # forget about the node
272 delete $addr{$node};
273 # ask other nodes if they know the node
274 snd $_, find => $node
275 for values %port;
276 }
277 #warn "node<$node,$is_up>\n";#d#
278 }
279
280 mon_node $_, 1
281 for up_nodes;
282
283 mon_nodes \&mon_node;
284
285 =back
286
287 =head1 SEE ALSO
288
289 L<AnyEvent::MP>.
290
291 =head1 AUTHOR
292
293 Marc Lehmann <schmorp@schmorp.de>
294 http://home.schmorp.de/
295
296 =cut
297
298 1
299