ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.14
Committed: Fri Aug 28 20:57:42 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.13: +10 -6 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Global - some network-global services
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Global;
8
9 =head1 DESCRIPTION
10
11 This module provides an assortment of network-global functions: group name
12 registration and non-local locks.
13
14 It will also try to build and maintain a full mesh of all network nodes.
15
16 While it isn't mandatory to run the global services, running it on one
17 node will automatically run it on all nodes.
18
19 =head1 GLOBALS AND FUNCTIONS
20
21 =over 4
22
23 =cut
24
25 package AnyEvent::MP::Global;
26
27 use common::sense;
28 use Carp ();
29 use MIME::Base64 ();
30
31 use AnyEvent ();
32 use AnyEvent::Util ();
33
34 use AnyEvent::MP;
35 use AnyEvent::MP::Kernel;
36
37 our $VERSION = $AnyEvent::MP::VERSION;
38
39 our %addr; # port ID => [address...] mapping
40
41 our %port; # our rendezvous port on the other side
42 our %lreg; # local registry, name => [pid...]
43 our %lmon; # local rgeistry monitoring name,pid => mon
44 our %greg; # global regstry, name => [pid...]
45
46 our $nodecnt;
47
48 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
49
50 #############################################################################
51 # seednodes
52
53 our @SEEDS;
54 our %SEED_CONNECT;
55 our $SEED_WATCHER;
56
57 sub seed_connect {
58 my ($seed) = @_;
59
60 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
61 or Carp::croak "$seed: unparsable seed address";
62
63 # ughhh
64 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect $host, $port,
65 seed => $seed,
66 sub {
67 delete $SEED_CONNECT{$seed};
68 after 1, \&more_seeding;
69 },
70 ;
71 }
72
73 sub more_seeding {
74 return if $nodecnt;
75 return unless @SEEDS;
76
77 $AnyEvent::MP::Kernel::WARN->(9, "no nodes connected, seeding.");
78
79 seed_connect $SEEDS[rand @SEEDS];
80 }
81
82 sub avoid_seed($) {
83 @SEEDS = grep $_ ne $_[0], @SEEDS;
84 }
85
86 sub set_seeds(@) {
87 @SEEDS = @_;
88
89 $SEED_WATCHER ||= AE::timer 5, $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, \&more_seeding;
90
91 seed_connect $_
92 for @SEEDS;
93 }
94
95 #############################################################################
96
97 sub unreg_groups($) {
98 my ($node) = @_;
99
100 my $qr = qr/^\Q$node\E(?:#|$)/;
101
102 for my $group (values %greg) {
103 @$group = grep $_ !~ $qr, @$group;
104 }
105 }
106
107 sub set_groups($$) {
108 my ($node, $lreg) = @_;
109
110 while (my ($k, $v) = each %$lreg) {
111 push @{ $greg{$k} }, @$v;
112 }
113 }
114
115 =item $ports = find $group
116
117 Returns all the ports currently registered to the given group (as
118 read-only array reference). When the group has no registered members,
119 return C<undef>.
120
121 =cut
122
123 sub find($) {
124 @{ $greg{$_[0]} }
125 ? $greg{$_[0]}
126 : undef
127 }
128
129 =item $guard = register $port, $group
130
131 Register the given (local!) port in the named global group C<$group>.
132
133 The port will be unregistered automatically when the port is destroyed.
134
135 When not called in void context, then a guard object will be returned that
136 will also cause the name to be unregistered when destroyed.
137
138 =cut
139
140 # register port from any node
141 sub _register {
142 my ($port, $group) = @_;
143
144 push @{ $greg{$group} }, $port;
145 }
146
147 # unregister port from any node
148 sub _unregister {
149 my ($port, $group) = @_;
150
151 @{ $greg{$group} } = grep $_ ne $port, @{ $greg{$group} };
152 }
153
154 # unregister local port
155 sub unregister {
156 my ($port, $group) = @_;
157
158 delete $lmon{"$group\x00$port"};
159 @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
160
161 _unregister $port, $group;
162
163 snd $_, reg0 => $port, $group
164 for values %port;
165 }
166
167 # register local port
168 sub register($$) {
169 my ($port, $group) = @_;
170
171 port_is_local $port
172 or Carp::croak "AnyEvent::MP::Global::register can only be called for local ports, caught";
173
174 $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
175 push @{ $lreg{$group} }, $port;
176
177 snd $_, reg1 => $port, $group
178 for values %port;
179
180 _register $port, $group;
181
182 wantarray && AnyEvent::Util::guard { unregister $port, $group }
183 }
184
185 sub start_node {
186 my ($node) = @_;
187
188 return if exists $port{$node};
189 return if $node eq $NODE; # do not connect to ourselves
190
191 # establish connection
192 my $port = $port{$node} = spawn $node, "AnyEvent::MP::Global::connect", 0, $NODE;
193
194 mon $port, sub {
195 unreg_groups $node;
196 delete $port{$node};
197 };
198
199 snd $port, addr => $AnyEvent::MP::Kernel::LISTENER;
200 snd $port, connect_nodes => \%addr if %addr;
201 snd $port, set => \%lreg if %lreg;
202 }
203
204 # other nodes connect via this
205 sub connect {
206 my ($version, $node) = @_;
207
208 # monitor them, silently die
209 mon $node, psub { kil $SELF };
210
211 rcv $SELF,
212 addr => sub {
213 my $addresses = shift;
214 $AnyEvent::MP::Kernel::WARN->(9, "$node told us its addresses (@$addresses).");
215 $addr{$node} = $addresses;
216
217 # to help listener-less nodes, we broadcast new addresses to them unconditionally
218 #TODO: should be done by a node finding out about a listener-less one
219 if (@$addresses) {
220 for my $other (values %AnyEvent::MP::NODE) {
221 if ($other->{transport}) {
222 if ($addr{$other->{id}} && !@{ $addr{$other->{id}} }) {
223 $AnyEvent::MP::Kernel::WARN->(9, "helping $other->{id} to find $node.");
224 snd $port{$other->{id}}, connect_nodes => { $node => $addresses };
225 }
226 }
227 }
228 }
229 },
230 connect_nodes => sub {
231 my ($kv) = @_;
232
233 use JSON::XS;#d#
234 my $kv_txt = JSON::XS->new->encode ($kv);#d#
235 $AnyEvent::MP::Kernel::WARN->(9, "$node told us it knows about $kv_txt.");#d#
236
237 while (my ($id, $addresses) = each %$kv) {
238 my $node = AnyEvent::MP::Kernel::add_node $id;
239 $node->connect (@$addresses);
240 start_node $id;
241 }
242 },
243 find_node => sub {
244 my ($othernode) = @_;
245
246 $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to find $othernode.");
247 snd $port{$node}, connect_nodes => { $othernode => $addr{$othernode} }
248 if $addr{$othernode};
249 },
250 set => sub {
251 set_groups $node, shift;
252 },
253 reg1 => \&_register,
254 reg0 => \&_unregister,
255 ;
256 }
257
258 sub mon_node {
259 my ($node, $is_up) = @_;
260
261 if ($is_up) {
262 ++$nodecnt;
263 start_node $node;
264 } else {
265 --$nodecnt;
266 more_seeding unless $nodecnt;
267 unreg_groups $node;
268
269 # forget about the node
270 delete $addr{$node};
271 # ask other nodes if they know the node
272 snd $_, find_node => $node
273 for values %port;
274 }
275 #warn "node<$node,$is_up>\n";#d#
276 }
277
278 mon_node $_, 1
279 for up_nodes;
280
281 mon_nodes \&mon_node;
282
283 =back
284
285 =head1 SEE ALSO
286
287 L<AnyEvent::MP>.
288
289 =head1 AUTHOR
290
291 Marc Lehmann <schmorp@schmorp.de>
292 http://home.schmorp.de/
293
294 =cut
295
296 1
297