ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.12
Committed: Fri Aug 28 00:26:04 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.11: +3 -6 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Global - some network-global services
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Global;
8     # -OR-
9     aemp addservice AnyEvent::MP::Global::
10    
11     =head1 DESCRIPTION
12    
13     This module provides an assortment of network-global functions: group name
14     registration and non-local locks.
15    
16     It will also try to build and maintain a full mesh of all network nodes.
17    
18 root 1.3 While it isn't mandatory to run the global services, running it on one
19     node will automatically run it on all nodes.
20    
21 root 1.1 =head1 GLOBALS AND FUNCTIONS
22    
23     =over 4
24    
25     =cut
26    
27     package AnyEvent::MP::Global;
28    
29     use common::sense;
30     use Carp ();
31     use MIME::Base64 ();
32    
33 root 1.8 use AnyEvent ();
34 root 1.4 use AnyEvent::Util ();
35    
36 root 1.1 use AnyEvent::MP;
37     use AnyEvent::MP::Kernel;
38    
39     our $VERSION = $AnyEvent::MP::VERSION;
40    
41 root 1.8 our %addr; # port ID => [address...] mapping
42    
43 root 1.4 our %port; # our rendezvous port on the other side
44     our %lreg; # local registry, name => [pid...]
45     our %lmon; # local rgeistry monitoring name,pid => mon
46     our %greg; # global regstry, name => [pid...]
47    
48 root 1.8 our $nodecnt;
49    
50 root 1.5 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
51    
52 root 1.8 #############################################################################
53     # seednodes
54    
55     our @SEEDS;
56     our $SEED_WATCHER;
57    
58     sub seed_connect {
59     my ($seed) = @_;
60    
61     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
62     or Carp::croak "$seed: unparsable seed address";
63    
64     # ughhh
65     my $tp; $tp = AnyEvent::MP::Transport::mp_connect $host, $port,
66     release => \&more_seeding,
67     sub {
68     $tp = shift;
69     $tp->{keepalive} = $tp;
70     },
71     ;
72     }
73    
74     sub more_seeding {
75     return if $nodecnt;
76     return unless @SEEDS;
77    
78     $AnyEvent::MP::Kernel::WARN->(9, "no nodes connected, seeding.");
79    
80     seed_connect $SEEDS[rand @SEEDS];
81     }
82    
83     sub set_seeds(@) {
84     @SEEDS = @_;
85    
86     $SEED_WATCHER ||= AE::timer 5, $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, \&more_seeding;
87    
88     seed_connect $_
89     for @SEEDS;
90     }
91    
92     #############################################################################
93    
94 root 1.4 sub unreg_groups($) {
95 root 1.8 my ($node) = @_;
96 root 1.4
97 root 1.8 my $qr = qr/^\Q$node\E(?:#|$)/;
98 root 1.4
99     for my $group (values %greg) {
100     @$group = grep $_ !~ $qr, @$group;
101     }
102     }
103    
104     sub set_groups($$) {
105 root 1.8 my ($node, $lreg) = @_;
106 root 1.5
107 root 1.6 while (my ($k, $v) = each %$lreg) {
108     push @{ $greg{$k} }, @$v;
109     }
110 root 1.4 }
111    
112 root 1.7 =item $ports = find $group
113    
114     Returns all the ports currently registered to the given group (as
115     read-only array reference). When the group has no registered members,
116     return C<undef>.
117    
118     =cut
119    
120     sub find($) {
121     @{ $greg{$_[0]} }
122     ? $greg{$_[0]}
123     : undef
124     }
125    
126 root 1.4 =item $guard = register $port, $group
127    
128     Register the given (local!) port in the named global group C<$group>.
129    
130     The port will be unregistered automatically when the port is destroyed.
131    
132     When not called in void context, then a guard object will be returned that
133     will also cause the name to be unregistered when destroyed.
134    
135     =cut
136    
137     # register port from any node
138     sub _register {
139     my ($port, $group) = @_;
140    
141     push @{ $greg{$group} }, $port;
142     }
143    
144 root 1.6 # unregister port from any node
145     sub _unregister {
146     my ($port, $group) = @_;
147    
148     @{ $greg{$group} } = grep $_ ne $port, @{ $greg{$group} };
149     }
150    
151 root 1.4 # unregister local port
152     sub unregister {
153     my ($port, $group) = @_;
154    
155     delete $lmon{"$group\x00$port"};
156     @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
157    
158     _unregister $port, $group;
159    
160     snd $_, reg0 => $port, $group
161     for values %port;
162     }
163    
164     # register local port
165     sub register($$) {
166     my ($port, $group) = @_;
167    
168     port_is_local $port
169     or Carp::croak "AnyEvent::MP::Global::register can only be called for local ports, caught";
170    
171     $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
172     push @{ $lreg{$group} }, $port;
173    
174     snd $_, reg1 => $port, $group
175     for values %port;
176    
177     _register $port, $group;
178    
179     wantarray && AnyEvent::Util::guard { unregister $port, $group }
180     }
181    
182     sub start_node {
183 root 1.8 my ($node) = @_;
184 root 1.4
185 root 1.8 return if exists $port{$node};
186     return if $node eq $NODE; # do not connect to ourselves
187 root 1.4
188     # establish connection
189 root 1.8 my $port = $port{$node} = spawn $node, "AnyEvent::MP::Global::connect", 0, $NODE;
190    
191 root 1.4 mon $port, sub {
192 root 1.8 unreg_groups $node;
193     delete $port{$node};
194 root 1.4 };
195 root 1.8
196 root 1.10 snd $port, addr => $AnyEvent::MP::Kernel::LISTENER;
197 root 1.8 snd $port, connect_nodes => \%addr if %addr;
198     snd $port, set => \%lreg if %lreg;
199 root 1.4 }
200 root 1.3
201 root 1.5 # other nodes connect via this
202 root 1.3 sub connect {
203 root 1.8 my ($version, $node) = @_;
204 root 1.1
205 root 1.3 # monitor them, silently die
206 root 1.8 mon $node, psub { kil $SELF };
207 root 1.3
208 root 1.4 rcv $SELF,
209 root 1.8 addr => sub {
210     my $addresses = shift;
211     $AnyEvent::MP::Kernel::WARN->(9, "$node told us it's addresses (@$addresses).");
212     $addr{$node} = $addresses;
213 root 1.9
214     # to help listener-less nodes, we broadcast new addresses to them unconditionally
215     #TODO: should be done by a node finding out about a listener-less one
216     if (@$addresses) {
217     for my $other (values %AnyEvent::MP::NODE) {
218     if ($other->{transport}) {
219 root 1.12 if ($addr{$other->{id}} && !@{ $addr{$other->{id}} }) {
220     $AnyEvent::MP::Kernel::WARN->(9, "helping $other->{id} to find $node.");
221     snd $port{$other->{id}}, connect_nodes => { $node => $addresses };
222 root 1.9 }
223     }
224     }
225     }
226 root 1.8 },
227 root 1.4 connect_nodes => sub {
228 root 1.8 my ($kv) = @_;
229    
230     use JSON::XS;#d#
231     my $kv_txt = JSON::XS->new->encode ($kv);#d#
232     $AnyEvent::MP::Kernel::WARN->(9, "$node told us it knows about $kv_txt.");#d#
233    
234     while (my ($id, $addresses) = each %$kv) {
235     my $node = AnyEvent::MP::Kernel::add_node $id;
236     $node->connect (@$addresses);
237     start_node $id;
238 root 1.4 }
239     },
240 root 1.8 find_node => sub {
241     my ($othernode) = @_;
242    
243     $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to find $othernode.");
244     snd $port{$node}, connect_nodes => { $othernode => $addr{$othernode} }
245     if $addr{$othernode};
246     },
247 root 1.4 set => sub {
248 root 1.8 set_groups $node, shift;
249 root 1.4 },
250     reg1 => \&_register,
251     reg0 => \&_unregister,
252     ;
253 root 1.3 }
254 root 1.1
255     sub mon_node {
256 root 1.8 my ($node, $is_up) = @_;
257 root 1.1
258     if ($is_up) {
259 root 1.8 ++$nodecnt;
260     start_node $node;
261 root 1.2 } else {
262 root 1.8 --$nodecnt;
263     more_seeding unless $nodecnt;
264     unreg_groups $node;
265    
266     # forget about the node
267     delete $addr{$node};
268     # ask other nodes if they know the node
269     snd $_, find_node => $node
270     for values %port;
271 root 1.1 }
272 root 1.8 #warn "node<$node,$is_up>\n";#d#
273 root 1.1 }
274    
275     mon_node $_, 1
276     for up_nodes;
277    
278     mon_nodes \&mon_node;
279    
280     =back
281    
282     =head1 SEE ALSO
283    
284     L<AnyEvent::MP>.
285    
286     =head1 AUTHOR
287    
288     Marc Lehmann <schmorp@schmorp.de>
289     http://home.schmorp.de/
290    
291     =cut
292    
293     1
294