ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.19
Committed: Wed Sep 2 13:05:29 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
CVS Tags: rel-1_0
Changes since 1.18: +3 -2 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Global - some network-global services
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Global;
8    
9     =head1 DESCRIPTION
10    
11 root 1.15 This module maintains a fully-meshed network, if possible, and tries to
12     ensure that we are connected to at least one other node.
13 root 1.1
14 root 1.15 It also manages named port groups - ports can register themselves in any
15     number of groups that will be available network-wide, which is great for
16     discovering services.
17 root 1.1
18 root 1.15 Running it on one node will automatically run it on all nodes, although,
19     at the moment, the global service is started by default anyways.
20 root 1.3
21 root 1.1 =head1 GLOBALS AND FUNCTIONS
22    
23     =over 4
24    
25     =cut
26    
27     package AnyEvent::MP::Global;
28    
29     use common::sense;
30     use Carp ();
31     use MIME::Base64 ();
32    
33 root 1.8 use AnyEvent ();
34 root 1.4 use AnyEvent::Util ();
35    
36 root 1.1 use AnyEvent::MP;
37     use AnyEvent::MP::Kernel;
38    
39     our $VERSION = $AnyEvent::MP::VERSION;
40    
41 root 1.8 our %addr; # port ID => [address...] mapping
42    
43 root 1.4 our %port; # our rendezvous port on the other side
44     our %lreg; # local registry, name => [pid...]
45     our %lmon; # local rgeistry monitoring name,pid => mon
46     our %greg; # global regstry, name => [pid...]
47    
48 root 1.8 our $nodecnt;
49    
50 root 1.5 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
51    
52 root 1.8 #############################################################################
53     # seednodes
54    
55     our @SEEDS;
56 root 1.14 our %SEED_CONNECT;
57 root 1.8 our $SEED_WATCHER;
58    
59     sub seed_connect {
60     my ($seed) = @_;
61    
62     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
63     or Carp::croak "$seed: unparsable seed address";
64    
65     # ughhh
66 root 1.14 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect $host, $port,
67     seed => $seed,
68 root 1.8 sub {
69 root 1.14 delete $SEED_CONNECT{$seed};
70     after 1, \&more_seeding;
71 root 1.8 },
72     ;
73     }
74    
75     sub more_seeding {
76     return if $nodecnt;
77     return unless @SEEDS;
78    
79     $AnyEvent::MP::Kernel::WARN->(9, "no nodes connected, seeding.");
80    
81     seed_connect $SEEDS[rand @SEEDS];
82     }
83    
84 root 1.14 sub avoid_seed($) {
85     @SEEDS = grep $_ ne $_[0], @SEEDS;
86     }
87    
88 root 1.8 sub set_seeds(@) {
89     @SEEDS = @_;
90    
91     $SEED_WATCHER ||= AE::timer 5, $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, \&more_seeding;
92    
93 root 1.19 for my $seed (@SEEDS) {
94     after 0.100 * rand, sub { seed_connect $seed };
95     }
96 root 1.8 }
97    
98     #############################################################################
99    
100 root 1.4 sub unreg_groups($) {
101 root 1.8 my ($node) = @_;
102 root 1.4
103 root 1.8 my $qr = qr/^\Q$node\E(?:#|$)/;
104 root 1.4
105     for my $group (values %greg) {
106     @$group = grep $_ !~ $qr, @$group;
107     }
108     }
109    
110     sub set_groups($$) {
111 root 1.8 my ($node, $lreg) = @_;
112 root 1.5
113 root 1.6 while (my ($k, $v) = each %$lreg) {
114     push @{ $greg{$k} }, @$v;
115     }
116 root 1.4 }
117    
118     =item $guard = register $port, $group
119    
120     Register the given (local!) port in the named global group C<$group>.
121    
122     The port will be unregistered automatically when the port is destroyed.
123    
124     When not called in void context, then a guard object will be returned that
125     will also cause the name to be unregistered when destroyed.
126    
127     =cut
128    
129     # register port from any node
130     sub _register {
131     my ($port, $group) = @_;
132    
133     push @{ $greg{$group} }, $port;
134     }
135    
136 root 1.6 # unregister port from any node
137     sub _unregister {
138     my ($port, $group) = @_;
139    
140     @{ $greg{$group} } = grep $_ ne $port, @{ $greg{$group} };
141     }
142    
143 root 1.4 # unregister local port
144     sub unregister {
145     my ($port, $group) = @_;
146    
147     delete $lmon{"$group\x00$port"};
148     @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
149    
150     _unregister $port, $group;
151    
152     snd $_, reg0 => $port, $group
153     for values %port;
154     }
155    
156     # register local port
157     sub register($$) {
158     my ($port, $group) = @_;
159    
160     port_is_local $port
161     or Carp::croak "AnyEvent::MP::Global::register can only be called for local ports, caught";
162    
163     $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
164     push @{ $lreg{$group} }, $port;
165    
166     snd $_, reg1 => $port, $group
167     for values %port;
168    
169     _register $port, $group;
170    
171     wantarray && AnyEvent::Util::guard { unregister $port, $group }
172     }
173    
174 root 1.15 =item $ports = find $group
175    
176     Returns all the ports currently registered to the given group (as
177     read-only array reference). When the group has no registered members,
178     return C<undef>.
179    
180     =cut
181    
182     sub find($) {
183     @{ $greg{$_[0]} }
184     ? $greg{$_[0]}
185     : undef
186     }
187    
188 root 1.4 sub start_node {
189 root 1.8 my ($node) = @_;
190 root 1.4
191 root 1.8 return if exists $port{$node};
192     return if $node eq $NODE; # do not connect to ourselves
193 root 1.4
194     # establish connection
195 root 1.8 my $port = $port{$node} = spawn $node, "AnyEvent::MP::Global::connect", 0, $NODE;
196    
197 root 1.4 mon $port, sub {
198 root 1.8 unreg_groups $node;
199     delete $port{$node};
200 root 1.4 };
201 root 1.8
202 root 1.10 snd $port, addr => $AnyEvent::MP::Kernel::LISTENER;
203 root 1.17 snd $port, nodes => \%addr if %addr;
204 root 1.8 snd $port, set => \%lreg if %lreg;
205 root 1.4 }
206 root 1.3
207 root 1.5 # other nodes connect via this
208 root 1.3 sub connect {
209 root 1.8 my ($version, $node) = @_;
210 root 1.1
211 root 1.3 # monitor them, silently die
212 root 1.8 mon $node, psub { kil $SELF };
213 root 1.3
214 root 1.4 rcv $SELF,
215 root 1.8 addr => sub {
216     my $addresses = shift;
217 root 1.14 $AnyEvent::MP::Kernel::WARN->(9, "$node told us its addresses (@$addresses).");
218 root 1.8 $addr{$node} = $addresses;
219 root 1.9
220     # to help listener-less nodes, we broadcast new addresses to them unconditionally
221     #TODO: should be done by a node finding out about a listener-less one
222     if (@$addresses) {
223 root 1.18 for my $other (values %AnyEvent::MP::Kernel::NODE) {
224 root 1.9 if ($other->{transport}) {
225 root 1.12 if ($addr{$other->{id}} && !@{ $addr{$other->{id}} }) {
226     $AnyEvent::MP::Kernel::WARN->(9, "helping $other->{id} to find $node.");
227 root 1.17 snd $port{$other->{id}}, nodes => { $node => $addresses };
228 root 1.9 }
229     }
230     }
231     }
232 root 1.8 },
233 root 1.17 nodes => sub {
234 root 1.8 my ($kv) = @_;
235    
236     use JSON::XS;#d#
237     my $kv_txt = JSON::XS->new->encode ($kv);#d#
238     $AnyEvent::MP::Kernel::WARN->(9, "$node told us it knows about $kv_txt.");#d#
239    
240     while (my ($id, $addresses) = each %$kv) {
241     my $node = AnyEvent::MP::Kernel::add_node $id;
242     $node->connect (@$addresses);
243     start_node $id;
244 root 1.4 }
245     },
246 root 1.16 find => sub {
247 root 1.8 my ($othernode) = @_;
248    
249     $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to find $othernode.");
250 root 1.17 snd $port{$node}, nodes => { $othernode => $addr{$othernode} }
251 root 1.8 if $addr{$othernode};
252     },
253 root 1.4 set => sub {
254 root 1.8 set_groups $node, shift;
255 root 1.4 },
256     reg1 => \&_register,
257     reg0 => \&_unregister,
258     ;
259 root 1.3 }
260 root 1.1
261     sub mon_node {
262 root 1.8 my ($node, $is_up) = @_;
263 root 1.1
264     if ($is_up) {
265 root 1.8 ++$nodecnt;
266     start_node $node;
267 root 1.2 } else {
268 root 1.8 --$nodecnt;
269     more_seeding unless $nodecnt;
270     unreg_groups $node;
271    
272     # forget about the node
273     delete $addr{$node};
274     # ask other nodes if they know the node
275 root 1.16 snd $_, find => $node
276 root 1.8 for values %port;
277 root 1.1 }
278 root 1.8 #warn "node<$node,$is_up>\n";#d#
279 root 1.1 }
280    
281     mon_node $_, 1
282     for up_nodes;
283    
284     mon_nodes \&mon_node;
285    
286     =back
287    
288     =head1 SEE ALSO
289    
290     L<AnyEvent::MP>.
291    
292     =head1 AUTHOR
293    
294     Marc Lehmann <schmorp@schmorp.de>
295     http://home.schmorp.de/
296    
297     =cut
298    
299     1
300