ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.12
Committed: Fri Aug 28 00:26:04 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.11: +3 -6 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Global - some network-global services
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Global;
8 # -OR-
9 aemp addservice AnyEvent::MP::Global::
10
11 =head1 DESCRIPTION
12
13 This module provides an assortment of network-global functions: group name
14 registration and non-local locks.
15
16 It will also try to build and maintain a full mesh of all network nodes.
17
18 While it isn't mandatory to run the global services, running it on one
19 node will automatically run it on all nodes.
20
21 =head1 GLOBALS AND FUNCTIONS
22
23 =over 4
24
25 =cut
26
27 package AnyEvent::MP::Global;
28
29 use common::sense;
30 use Carp ();
31 use MIME::Base64 ();
32
33 use AnyEvent ();
34 use AnyEvent::Util ();
35
36 use AnyEvent::MP;
37 use AnyEvent::MP::Kernel;
38
39 our $VERSION = $AnyEvent::MP::VERSION;
40
41 our %addr; # port ID => [address...] mapping
42
43 our %port; # our rendezvous port on the other side
44 our %lreg; # local registry, name => [pid...]
45 our %lmon; # local rgeistry monitoring name,pid => mon
46 our %greg; # global regstry, name => [pid...]
47
48 our $nodecnt;
49
50 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
51
52 #############################################################################
53 # seednodes
54
55 our @SEEDS;
56 our $SEED_WATCHER;
57
58 sub seed_connect {
59 my ($seed) = @_;
60
61 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
62 or Carp::croak "$seed: unparsable seed address";
63
64 # ughhh
65 my $tp; $tp = AnyEvent::MP::Transport::mp_connect $host, $port,
66 release => \&more_seeding,
67 sub {
68 $tp = shift;
69 $tp->{keepalive} = $tp;
70 },
71 ;
72 }
73
74 sub more_seeding {
75 return if $nodecnt;
76 return unless @SEEDS;
77
78 $AnyEvent::MP::Kernel::WARN->(9, "no nodes connected, seeding.");
79
80 seed_connect $SEEDS[rand @SEEDS];
81 }
82
83 sub set_seeds(@) {
84 @SEEDS = @_;
85
86 $SEED_WATCHER ||= AE::timer 5, $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, \&more_seeding;
87
88 seed_connect $_
89 for @SEEDS;
90 }
91
92 #############################################################################
93
94 sub unreg_groups($) {
95 my ($node) = @_;
96
97 my $qr = qr/^\Q$node\E(?:#|$)/;
98
99 for my $group (values %greg) {
100 @$group = grep $_ !~ $qr, @$group;
101 }
102 }
103
104 sub set_groups($$) {
105 my ($node, $lreg) = @_;
106
107 while (my ($k, $v) = each %$lreg) {
108 push @{ $greg{$k} }, @$v;
109 }
110 }
111
112 =item $ports = find $group
113
114 Returns all the ports currently registered to the given group (as
115 read-only array reference). When the group has no registered members,
116 return C<undef>.
117
118 =cut
119
120 sub find($) {
121 @{ $greg{$_[0]} }
122 ? $greg{$_[0]}
123 : undef
124 }
125
126 =item $guard = register $port, $group
127
128 Register the given (local!) port in the named global group C<$group>.
129
130 The port will be unregistered automatically when the port is destroyed.
131
132 When not called in void context, then a guard object will be returned that
133 will also cause the name to be unregistered when destroyed.
134
135 =cut
136
137 # register port from any node
138 sub _register {
139 my ($port, $group) = @_;
140
141 push @{ $greg{$group} }, $port;
142 }
143
144 # unregister port from any node
145 sub _unregister {
146 my ($port, $group) = @_;
147
148 @{ $greg{$group} } = grep $_ ne $port, @{ $greg{$group} };
149 }
150
151 # unregister local port
152 sub unregister {
153 my ($port, $group) = @_;
154
155 delete $lmon{"$group\x00$port"};
156 @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
157
158 _unregister $port, $group;
159
160 snd $_, reg0 => $port, $group
161 for values %port;
162 }
163
164 # register local port
165 sub register($$) {
166 my ($port, $group) = @_;
167
168 port_is_local $port
169 or Carp::croak "AnyEvent::MP::Global::register can only be called for local ports, caught";
170
171 $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
172 push @{ $lreg{$group} }, $port;
173
174 snd $_, reg1 => $port, $group
175 for values %port;
176
177 _register $port, $group;
178
179 wantarray && AnyEvent::Util::guard { unregister $port, $group }
180 }
181
182 sub start_node {
183 my ($node) = @_;
184
185 return if exists $port{$node};
186 return if $node eq $NODE; # do not connect to ourselves
187
188 # establish connection
189 my $port = $port{$node} = spawn $node, "AnyEvent::MP::Global::connect", 0, $NODE;
190
191 mon $port, sub {
192 unreg_groups $node;
193 delete $port{$node};
194 };
195
196 snd $port, addr => $AnyEvent::MP::Kernel::LISTENER;
197 snd $port, connect_nodes => \%addr if %addr;
198 snd $port, set => \%lreg if %lreg;
199 }
200
201 # other nodes connect via this
202 sub connect {
203 my ($version, $node) = @_;
204
205 # monitor them, silently die
206 mon $node, psub { kil $SELF };
207
208 rcv $SELF,
209 addr => sub {
210 my $addresses = shift;
211 $AnyEvent::MP::Kernel::WARN->(9, "$node told us it's addresses (@$addresses).");
212 $addr{$node} = $addresses;
213
214 # to help listener-less nodes, we broadcast new addresses to them unconditionally
215 #TODO: should be done by a node finding out about a listener-less one
216 if (@$addresses) {
217 for my $other (values %AnyEvent::MP::NODE) {
218 if ($other->{transport}) {
219 if ($addr{$other->{id}} && !@{ $addr{$other->{id}} }) {
220 $AnyEvent::MP::Kernel::WARN->(9, "helping $other->{id} to find $node.");
221 snd $port{$other->{id}}, connect_nodes => { $node => $addresses };
222 }
223 }
224 }
225 }
226 },
227 connect_nodes => sub {
228 my ($kv) = @_;
229
230 use JSON::XS;#d#
231 my $kv_txt = JSON::XS->new->encode ($kv);#d#
232 $AnyEvent::MP::Kernel::WARN->(9, "$node told us it knows about $kv_txt.");#d#
233
234 while (my ($id, $addresses) = each %$kv) {
235 my $node = AnyEvent::MP::Kernel::add_node $id;
236 $node->connect (@$addresses);
237 start_node $id;
238 }
239 },
240 find_node => sub {
241 my ($othernode) = @_;
242
243 $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to find $othernode.");
244 snd $port{$node}, connect_nodes => { $othernode => $addr{$othernode} }
245 if $addr{$othernode};
246 },
247 set => sub {
248 set_groups $node, shift;
249 },
250 reg1 => \&_register,
251 reg0 => \&_unregister,
252 ;
253 }
254
255 sub mon_node {
256 my ($node, $is_up) = @_;
257
258 if ($is_up) {
259 ++$nodecnt;
260 start_node $node;
261 } else {
262 --$nodecnt;
263 more_seeding unless $nodecnt;
264 unreg_groups $node;
265
266 # forget about the node
267 delete $addr{$node};
268 # ask other nodes if they know the node
269 snd $_, find_node => $node
270 for values %port;
271 }
272 #warn "node<$node,$is_up>\n";#d#
273 }
274
275 mon_node $_, 1
276 for up_nodes;
277
278 mon_nodes \&mon_node;
279
280 =back
281
282 =head1 SEE ALSO
283
284 L<AnyEvent::MP>.
285
286 =head1 AUTHOR
287
288 Marc Lehmann <schmorp@schmorp.de>
289 http://home.schmorp.de/
290
291 =cut
292
293 1
294