ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.13
Committed: Fri Aug 28 16:37:30 2009 UTC (14 years, 11 months ago) by root
Branch: MAIN
Changes since 1.12: +1 -2 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Global - some network-global services
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Global;
8
9 =head1 DESCRIPTION
10
11 This module provides an assortment of network-global functions: group name
12 registration and non-local locks.
13
14 It will also try to build and maintain a full mesh of all network nodes.
15
16 While it isn't mandatory to run the global services, running it on one
17 node will automatically run it on all nodes.
18
19 =head1 GLOBALS AND FUNCTIONS
20
21 =over 4
22
23 =cut
24
25 package AnyEvent::MP::Global;
26
27 use common::sense;
28 use Carp ();
29 use MIME::Base64 ();
30
31 use AnyEvent ();
32 use AnyEvent::Util ();
33
34 use AnyEvent::MP;
35 use AnyEvent::MP::Kernel;
36
37 our $VERSION = $AnyEvent::MP::VERSION;
38
39 our %addr; # port ID => [address...] mapping
40
41 our %port; # our rendezvous port on the other side
42 our %lreg; # local registry, name => [pid...]
43 our %lmon; # local rgeistry monitoring name,pid => mon
44 our %greg; # global regstry, name => [pid...]
45
46 our $nodecnt;
47
48 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
49
50 #############################################################################
51 # seednodes
52
53 our @SEEDS;
54 our $SEED_WATCHER;
55
56 sub seed_connect {
57 my ($seed) = @_;
58
59 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
60 or Carp::croak "$seed: unparsable seed address";
61
62 # ughhh
63 my $tp; $tp = AnyEvent::MP::Transport::mp_connect $host, $port,
64 release => \&more_seeding,
65 sub {
66 $tp = shift;
67 $tp->{keepalive} = $tp;
68 },
69 ;
70 }
71
72 sub more_seeding {
73 return;#d##TODO#
74 return if $nodecnt;
75 return unless @SEEDS;
76
77 $AnyEvent::MP::Kernel::WARN->(9, "no nodes connected, seeding.");
78
79 seed_connect $SEEDS[rand @SEEDS];
80 }
81
82 sub set_seeds(@) {
83 @SEEDS = @_;
84
85 $SEED_WATCHER ||= AE::timer 5, $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, \&more_seeding;
86
87 seed_connect $_
88 for @SEEDS;
89 }
90
91 #############################################################################
92
93 sub unreg_groups($) {
94 my ($node) = @_;
95
96 my $qr = qr/^\Q$node\E(?:#|$)/;
97
98 for my $group (values %greg) {
99 @$group = grep $_ !~ $qr, @$group;
100 }
101 }
102
103 sub set_groups($$) {
104 my ($node, $lreg) = @_;
105
106 while (my ($k, $v) = each %$lreg) {
107 push @{ $greg{$k} }, @$v;
108 }
109 }
110
111 =item $ports = find $group
112
113 Returns all the ports currently registered to the given group (as
114 read-only array reference). When the group has no registered members,
115 return C<undef>.
116
117 =cut
118
119 sub find($) {
120 @{ $greg{$_[0]} }
121 ? $greg{$_[0]}
122 : undef
123 }
124
125 =item $guard = register $port, $group
126
127 Register the given (local!) port in the named global group C<$group>.
128
129 The port will be unregistered automatically when the port is destroyed.
130
131 When not called in void context, then a guard object will be returned that
132 will also cause the name to be unregistered when destroyed.
133
134 =cut
135
136 # register port from any node
137 sub _register {
138 my ($port, $group) = @_;
139
140 push @{ $greg{$group} }, $port;
141 }
142
143 # unregister port from any node
144 sub _unregister {
145 my ($port, $group) = @_;
146
147 @{ $greg{$group} } = grep $_ ne $port, @{ $greg{$group} };
148 }
149
150 # unregister local port
151 sub unregister {
152 my ($port, $group) = @_;
153
154 delete $lmon{"$group\x00$port"};
155 @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
156
157 _unregister $port, $group;
158
159 snd $_, reg0 => $port, $group
160 for values %port;
161 }
162
163 # register local port
164 sub register($$) {
165 my ($port, $group) = @_;
166
167 port_is_local $port
168 or Carp::croak "AnyEvent::MP::Global::register can only be called for local ports, caught";
169
170 $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
171 push @{ $lreg{$group} }, $port;
172
173 snd $_, reg1 => $port, $group
174 for values %port;
175
176 _register $port, $group;
177
178 wantarray && AnyEvent::Util::guard { unregister $port, $group }
179 }
180
181 sub start_node {
182 my ($node) = @_;
183
184 return if exists $port{$node};
185 return if $node eq $NODE; # do not connect to ourselves
186
187 # establish connection
188 my $port = $port{$node} = spawn $node, "AnyEvent::MP::Global::connect", 0, $NODE;
189
190 mon $port, sub {
191 unreg_groups $node;
192 delete $port{$node};
193 };
194
195 snd $port, addr => $AnyEvent::MP::Kernel::LISTENER;
196 snd $port, connect_nodes => \%addr if %addr;
197 snd $port, set => \%lreg if %lreg;
198 }
199
200 # other nodes connect via this
201 sub connect {
202 my ($version, $node) = @_;
203
204 # monitor them, silently die
205 mon $node, psub { kil $SELF };
206
207 rcv $SELF,
208 addr => sub {
209 my $addresses = shift;
210 $AnyEvent::MP::Kernel::WARN->(9, "$node told us it's addresses (@$addresses).");
211 $addr{$node} = $addresses;
212
213 # to help listener-less nodes, we broadcast new addresses to them unconditionally
214 #TODO: should be done by a node finding out about a listener-less one
215 if (@$addresses) {
216 for my $other (values %AnyEvent::MP::NODE) {
217 if ($other->{transport}) {
218 if ($addr{$other->{id}} && !@{ $addr{$other->{id}} }) {
219 $AnyEvent::MP::Kernel::WARN->(9, "helping $other->{id} to find $node.");
220 snd $port{$other->{id}}, connect_nodes => { $node => $addresses };
221 }
222 }
223 }
224 }
225 },
226 connect_nodes => sub {
227 my ($kv) = @_;
228
229 use JSON::XS;#d#
230 my $kv_txt = JSON::XS->new->encode ($kv);#d#
231 $AnyEvent::MP::Kernel::WARN->(9, "$node told us it knows about $kv_txt.");#d#
232
233 while (my ($id, $addresses) = each %$kv) {
234 my $node = AnyEvent::MP::Kernel::add_node $id;
235 $node->connect (@$addresses);
236 start_node $id;
237 }
238 },
239 find_node => sub {
240 my ($othernode) = @_;
241
242 $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to find $othernode.");
243 snd $port{$node}, connect_nodes => { $othernode => $addr{$othernode} }
244 if $addr{$othernode};
245 },
246 set => sub {
247 set_groups $node, shift;
248 },
249 reg1 => \&_register,
250 reg0 => \&_unregister,
251 ;
252 }
253
254 sub mon_node {
255 my ($node, $is_up) = @_;
256
257 if ($is_up) {
258 ++$nodecnt;
259 start_node $node;
260 } else {
261 --$nodecnt;
262 more_seeding unless $nodecnt;
263 unreg_groups $node;
264
265 # forget about the node
266 delete $addr{$node};
267 # ask other nodes if they know the node
268 snd $_, find_node => $node
269 for values %port;
270 }
271 #warn "node<$node,$is_up>\n";#d#
272 }
273
274 mon_node $_, 1
275 for up_nodes;
276
277 mon_nodes \&mon_node;
278
279 =back
280
281 =head1 SEE ALSO
282
283 L<AnyEvent::MP>.
284
285 =head1 AUTHOR
286
287 Marc Lehmann <schmorp@schmorp.de>
288 http://home.schmorp.de/
289
290 =cut
291
292 1
293