ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.18
Committed: Sun Aug 30 17:08:16 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
CVS Tags: rel-0_95
Changes since 1.17: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Global - some network-global services
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Global;
8    
9     =head1 DESCRIPTION
10    
11 root 1.15 This module maintains a fully-meshed network, if possible, and tries to
12     ensure that we are connected to at least one other node.
13 root 1.1
14 root 1.15 It also manages named port groups - ports can register themselves in any
15     number of groups that will be available network-wide, which is great for
16     discovering services.
17 root 1.1
18 root 1.15 Running it on one node will automatically run it on all nodes, although,
19     at the moment, the global service is started by default anyways.
20 root 1.3
21 root 1.1 =head1 GLOBALS AND FUNCTIONS
22    
23     =over 4
24    
25     =cut
26    
27     package AnyEvent::MP::Global;
28    
29     use common::sense;
30     use Carp ();
31     use MIME::Base64 ();
32    
33 root 1.8 use AnyEvent ();
34 root 1.4 use AnyEvent::Util ();
35    
36 root 1.1 use AnyEvent::MP;
37     use AnyEvent::MP::Kernel;
38    
39     our $VERSION = $AnyEvent::MP::VERSION;
40    
41 root 1.8 our %addr; # port ID => [address...] mapping
42    
43 root 1.4 our %port; # our rendezvous port on the other side
44     our %lreg; # local registry, name => [pid...]
45     our %lmon; # local rgeistry monitoring name,pid => mon
46     our %greg; # global regstry, name => [pid...]
47    
48 root 1.8 our $nodecnt;
49    
50 root 1.5 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
51    
52 root 1.8 #############################################################################
53     # seednodes
54    
55     our @SEEDS;
56 root 1.14 our %SEED_CONNECT;
57 root 1.8 our $SEED_WATCHER;
58    
59     sub seed_connect {
60     my ($seed) = @_;
61    
62     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
63     or Carp::croak "$seed: unparsable seed address";
64    
65     # ughhh
66 root 1.14 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect $host, $port,
67     seed => $seed,
68 root 1.8 sub {
69 root 1.14 delete $SEED_CONNECT{$seed};
70     after 1, \&more_seeding;
71 root 1.8 },
72     ;
73     }
74    
75     sub more_seeding {
76     return if $nodecnt;
77     return unless @SEEDS;
78    
79     $AnyEvent::MP::Kernel::WARN->(9, "no nodes connected, seeding.");
80    
81     seed_connect $SEEDS[rand @SEEDS];
82     }
83    
84 root 1.14 sub avoid_seed($) {
85     @SEEDS = grep $_ ne $_[0], @SEEDS;
86     }
87    
88 root 1.8 sub set_seeds(@) {
89     @SEEDS = @_;
90    
91     $SEED_WATCHER ||= AE::timer 5, $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, \&more_seeding;
92    
93     seed_connect $_
94     for @SEEDS;
95     }
96    
97     #############################################################################
98    
99 root 1.4 sub unreg_groups($) {
100 root 1.8 my ($node) = @_;
101 root 1.4
102 root 1.8 my $qr = qr/^\Q$node\E(?:#|$)/;
103 root 1.4
104     for my $group (values %greg) {
105     @$group = grep $_ !~ $qr, @$group;
106     }
107     }
108    
109     sub set_groups($$) {
110 root 1.8 my ($node, $lreg) = @_;
111 root 1.5
112 root 1.6 while (my ($k, $v) = each %$lreg) {
113     push @{ $greg{$k} }, @$v;
114     }
115 root 1.4 }
116    
117     =item $guard = register $port, $group
118    
119     Register the given (local!) port in the named global group C<$group>.
120    
121     The port will be unregistered automatically when the port is destroyed.
122    
123     When not called in void context, then a guard object will be returned that
124     will also cause the name to be unregistered when destroyed.
125    
126     =cut
127    
128     # register port from any node
129     sub _register {
130     my ($port, $group) = @_;
131    
132     push @{ $greg{$group} }, $port;
133     }
134    
135 root 1.6 # unregister port from any node
136     sub _unregister {
137     my ($port, $group) = @_;
138    
139     @{ $greg{$group} } = grep $_ ne $port, @{ $greg{$group} };
140     }
141    
142 root 1.4 # unregister local port
143     sub unregister {
144     my ($port, $group) = @_;
145    
146     delete $lmon{"$group\x00$port"};
147     @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
148    
149     _unregister $port, $group;
150    
151     snd $_, reg0 => $port, $group
152     for values %port;
153     }
154    
155     # register local port
156     sub register($$) {
157     my ($port, $group) = @_;
158    
159     port_is_local $port
160     or Carp::croak "AnyEvent::MP::Global::register can only be called for local ports, caught";
161    
162     $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
163     push @{ $lreg{$group} }, $port;
164    
165     snd $_, reg1 => $port, $group
166     for values %port;
167    
168     _register $port, $group;
169    
170     wantarray && AnyEvent::Util::guard { unregister $port, $group }
171     }
172    
173 root 1.15 =item $ports = find $group
174    
175     Returns all the ports currently registered to the given group (as
176     read-only array reference). When the group has no registered members,
177     return C<undef>.
178    
179     =cut
180    
181     sub find($) {
182     @{ $greg{$_[0]} }
183     ? $greg{$_[0]}
184     : undef
185     }
186    
187 root 1.4 sub start_node {
188 root 1.8 my ($node) = @_;
189 root 1.4
190 root 1.8 return if exists $port{$node};
191     return if $node eq $NODE; # do not connect to ourselves
192 root 1.4
193     # establish connection
194 root 1.8 my $port = $port{$node} = spawn $node, "AnyEvent::MP::Global::connect", 0, $NODE;
195    
196 root 1.4 mon $port, sub {
197 root 1.8 unreg_groups $node;
198     delete $port{$node};
199 root 1.4 };
200 root 1.8
201 root 1.10 snd $port, addr => $AnyEvent::MP::Kernel::LISTENER;
202 root 1.17 snd $port, nodes => \%addr if %addr;
203 root 1.8 snd $port, set => \%lreg if %lreg;
204 root 1.4 }
205 root 1.3
206 root 1.5 # other nodes connect via this
207 root 1.3 sub connect {
208 root 1.8 my ($version, $node) = @_;
209 root 1.1
210 root 1.3 # monitor them, silently die
211 root 1.8 mon $node, psub { kil $SELF };
212 root 1.3
213 root 1.4 rcv $SELF,
214 root 1.8 addr => sub {
215     my $addresses = shift;
216 root 1.14 $AnyEvent::MP::Kernel::WARN->(9, "$node told us its addresses (@$addresses).");
217 root 1.8 $addr{$node} = $addresses;
218 root 1.9
219     # to help listener-less nodes, we broadcast new addresses to them unconditionally
220     #TODO: should be done by a node finding out about a listener-less one
221     if (@$addresses) {
222 root 1.18 for my $other (values %AnyEvent::MP::Kernel::NODE) {
223 root 1.9 if ($other->{transport}) {
224 root 1.12 if ($addr{$other->{id}} && !@{ $addr{$other->{id}} }) {
225     $AnyEvent::MP::Kernel::WARN->(9, "helping $other->{id} to find $node.");
226 root 1.17 snd $port{$other->{id}}, nodes => { $node => $addresses };
227 root 1.9 }
228     }
229     }
230     }
231 root 1.8 },
232 root 1.17 nodes => sub {
233 root 1.8 my ($kv) = @_;
234    
235     use JSON::XS;#d#
236     my $kv_txt = JSON::XS->new->encode ($kv);#d#
237     $AnyEvent::MP::Kernel::WARN->(9, "$node told us it knows about $kv_txt.");#d#
238    
239     while (my ($id, $addresses) = each %$kv) {
240     my $node = AnyEvent::MP::Kernel::add_node $id;
241     $node->connect (@$addresses);
242     start_node $id;
243 root 1.4 }
244     },
245 root 1.16 find => sub {
246 root 1.8 my ($othernode) = @_;
247    
248     $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to find $othernode.");
249 root 1.17 snd $port{$node}, nodes => { $othernode => $addr{$othernode} }
250 root 1.8 if $addr{$othernode};
251     },
252 root 1.4 set => sub {
253 root 1.8 set_groups $node, shift;
254 root 1.4 },
255     reg1 => \&_register,
256     reg0 => \&_unregister,
257     ;
258 root 1.3 }
259 root 1.1
260     sub mon_node {
261 root 1.8 my ($node, $is_up) = @_;
262 root 1.1
263     if ($is_up) {
264 root 1.8 ++$nodecnt;
265     start_node $node;
266 root 1.2 } else {
267 root 1.8 --$nodecnt;
268     more_seeding unless $nodecnt;
269     unreg_groups $node;
270    
271     # forget about the node
272     delete $addr{$node};
273     # ask other nodes if they know the node
274 root 1.16 snd $_, find => $node
275 root 1.8 for values %port;
276 root 1.1 }
277 root 1.8 #warn "node<$node,$is_up>\n";#d#
278 root 1.1 }
279    
280     mon_node $_, 1
281     for up_nodes;
282    
283     mon_nodes \&mon_node;
284    
285     =back
286    
287     =head1 SEE ALSO
288    
289     L<AnyEvent::MP>.
290    
291     =head1 AUTHOR
292    
293     Marc Lehmann <schmorp@schmorp.de>
294     http://home.schmorp.de/
295    
296     =cut
297    
298     1
299