ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.8
Committed: Thu Aug 27 21:29:37 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.7: +99 -22 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Global - some network-global services
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Global;
8 # -OR-
9 aemp addservice AnyEvent::MP::Global::
10
11 =head1 DESCRIPTION
12
13 This module provides an assortment of network-global functions: group name
14 registration and non-local locks.
15
16 It will also try to build and maintain a full mesh of all network nodes.
17
18 While it isn't mandatory to run the global services, running it on one
19 node will automatically run it on all nodes.
20
21 =head1 GLOBALS AND FUNCTIONS
22
23 =over 4
24
25 =cut
26
27 package AnyEvent::MP::Global;
28
29 use common::sense;
30 use Carp ();
31 use MIME::Base64 ();
32
33 use AnyEvent ();
34 use AnyEvent::Util ();
35
36 use AnyEvent::MP;
37 use AnyEvent::MP::Kernel;
38
39 our $VERSION = $AnyEvent::MP::VERSION;
40
41 our %addr; # port ID => [address...] mapping
42
43 our %port; # our rendezvous port on the other side
44 our %lreg; # local registry, name => [pid...]
45 our %lmon; # local rgeistry monitoring name,pid => mon
46 our %greg; # global regstry, name => [pid...]
47
48 our $nodecnt;
49
50 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
51
52 #############################################################################
53 # seednodes
54
55 our @SEEDS;
56 our $SEED_WATCHER;
57
58 sub seed_connect {
59 my ($seed) = @_;
60
61 my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
62 or Carp::croak "$seed: unparsable seed address";
63
64 # ughhh
65 my $tp; $tp = AnyEvent::MP::Transport::mp_connect $host, $port,
66 release => \&more_seeding,
67 sub {
68 $tp = shift;
69 $tp->{keepalive} = $tp;
70 },
71 ;
72 }
73
74 sub more_seeding {
75 return if $nodecnt;
76 return unless @SEEDS;
77
78 $AnyEvent::MP::Kernel::WARN->(9, "no nodes connected, seeding.");
79
80 seed_connect $SEEDS[rand @SEEDS];
81 }
82
83 sub set_seeds(@) {
84 @SEEDS = @_;
85
86 $SEED_WATCHER ||= AE::timer 5, $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, \&more_seeding;
87
88 seed_connect $_
89 for @SEEDS;
90 }
91
92 #############################################################################
93
94 sub unreg_groups($) {
95 my ($node) = @_;
96
97 my $qr = qr/^\Q$node\E(?:#|$)/;
98
99 for my $group (values %greg) {
100 @$group = grep $_ !~ $qr, @$group;
101 }
102 }
103
104 sub set_groups($$) {
105 my ($node, $lreg) = @_;
106
107 while (my ($k, $v) = each %$lreg) {
108 push @{ $greg{$k} }, @$v;
109 }
110 }
111
112 =item $ports = find $group
113
114 Returns all the ports currently registered to the given group (as
115 read-only array reference). When the group has no registered members,
116 return C<undef>.
117
118 =cut
119
120 sub find($) {
121 @{ $greg{$_[0]} }
122 ? $greg{$_[0]}
123 : undef
124 }
125
126 =item $guard = register $port, $group
127
128 Register the given (local!) port in the named global group C<$group>.
129
130 The port will be unregistered automatically when the port is destroyed.
131
132 When not called in void context, then a guard object will be returned that
133 will also cause the name to be unregistered when destroyed.
134
135 =cut
136
137 # register port from any node
138 sub _register {
139 my ($port, $group) = @_;
140
141 push @{ $greg{$group} }, $port;
142 }
143
144 # unregister port from any node
145 sub _unregister {
146 my ($port, $group) = @_;
147
148 @{ $greg{$group} } = grep $_ ne $port, @{ $greg{$group} };
149 }
150
151 # unregister local port
152 sub unregister {
153 my ($port, $group) = @_;
154
155 delete $lmon{"$group\x00$port"};
156 @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
157
158 _unregister $port, $group;
159
160 snd $_, reg0 => $port, $group
161 for values %port;
162 }
163
164 # register local port
165 sub register($$) {
166 my ($port, $group) = @_;
167
168 port_is_local $port
169 or Carp::croak "AnyEvent::MP::Global::register can only be called for local ports, caught";
170
171 $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
172 push @{ $lreg{$group} }, $port;
173
174 snd $_, reg1 => $port, $group
175 for values %port;
176
177 _register $port, $group;
178
179 wantarray && AnyEvent::Util::guard { unregister $port, $group }
180 }
181
182 sub start_node {
183 my ($node) = @_;
184
185 return if exists $port{$node};
186 return if $node eq $NODE; # do not connect to ourselves
187
188 # establish connection
189 my $port = $port{$node} = spawn $node, "AnyEvent::MP::Global::connect", 0, $NODE;
190
191 mon $port, sub {
192 unreg_groups $node;
193 delete $port{$node};
194 };
195
196 snd $port, addr => $AnyEvent::MP::Kernel::LISTENER if @$AnyEvent::MP::Kernel::LISTENER;
197 snd $port, connect_nodes => \%addr if %addr;
198 snd $port, set => \%lreg if %lreg;
199 }
200
201 # other nodes connect via this
202 sub connect {
203 my ($version, $node) = @_;
204
205 # monitor them, silently die
206 mon $node, psub { kil $SELF };
207
208 rcv $SELF,
209 addr => sub {
210 my $addresses = shift;
211 $AnyEvent::MP::Kernel::WARN->(9, "$node told us it's addresses (@$addresses).");
212 $addr{$node} = $addresses;
213 },
214 connect_nodes => sub {
215 my ($kv) = @_;
216
217 use JSON::XS;#d#
218 my $kv_txt = JSON::XS->new->encode ($kv);#d#
219 $AnyEvent::MP::Kernel::WARN->(9, "$node told us it knows about $kv_txt.");#d#
220
221 while (my ($id, $addresses) = each %$kv) {
222 my $node = AnyEvent::MP::Kernel::add_node $id;
223 $node->connect (@$addresses);
224 start_node $id;
225 }
226 },
227 find_node => sub {
228 my ($othernode) = @_;
229
230 $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to find $othernode.");
231 snd $port{$node}, connect_nodes => { $othernode => $addr{$othernode} }
232 if $addr{$othernode};
233 },
234 set => sub {
235 set_groups $node, shift;
236 },
237 reg1 => \&_register,
238 reg0 => \&_unregister,
239 ;
240 }
241
242 sub mon_node {
243 my ($node, $is_up) = @_;
244
245 if ($is_up) {
246 ++$nodecnt;
247 start_node $node;
248 } else {
249 --$nodecnt;
250 more_seeding unless $nodecnt;
251 unreg_groups $node;
252
253 # forget about the node
254 delete $addr{$node};
255 # ask other nodes if they know the node
256 snd $_, find_node => $node
257 for values %port;
258 }
259 #warn "node<$node,$is_up>\n";#d#
260 }
261
262 mon_node $_, 1
263 for up_nodes;
264
265 mon_nodes \&mon_node;
266
267 =back
268
269 =head1 SEE ALSO
270
271 L<AnyEvent::MP>.
272
273 =head1 AUTHOR
274
275 Marc Lehmann <schmorp@schmorp.de>
276 http://home.schmorp.de/
277
278 =cut
279
280 1
281