ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.14
Committed: Fri Aug 28 20:57:42 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.13: +10 -6 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Global - some network-global services
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Global;
8    
9     =head1 DESCRIPTION
10    
11     This module provides an assortment of network-global functions: group name
12     registration and non-local locks.
13    
14     It will also try to build and maintain a full mesh of all network nodes.
15    
16 root 1.3 While it isn't mandatory to run the global services, running it on one
17     node will automatically run it on all nodes.
18    
19 root 1.1 =head1 GLOBALS AND FUNCTIONS
20    
21     =over 4
22    
23     =cut
24    
25     package AnyEvent::MP::Global;
26    
27     use common::sense;
28     use Carp ();
29     use MIME::Base64 ();
30    
31 root 1.8 use AnyEvent ();
32 root 1.4 use AnyEvent::Util ();
33    
34 root 1.1 use AnyEvent::MP;
35     use AnyEvent::MP::Kernel;
36    
37     our $VERSION = $AnyEvent::MP::VERSION;
38    
39 root 1.8 our %addr; # port ID => [address...] mapping
40    
41 root 1.4 our %port; # our rendezvous port on the other side
42     our %lreg; # local registry, name => [pid...]
43     our %lmon; # local rgeistry monitoring name,pid => mon
44     our %greg; # global regstry, name => [pid...]
45    
46 root 1.8 our $nodecnt;
47    
48 root 1.5 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
49    
50 root 1.8 #############################################################################
51     # seednodes
52    
53     our @SEEDS;
54 root 1.14 our %SEED_CONNECT;
55 root 1.8 our $SEED_WATCHER;
56    
57     sub seed_connect {
58     my ($seed) = @_;
59    
60     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
61     or Carp::croak "$seed: unparsable seed address";
62    
63     # ughhh
64 root 1.14 $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect $host, $port,
65     seed => $seed,
66 root 1.8 sub {
67 root 1.14 delete $SEED_CONNECT{$seed};
68     after 1, \&more_seeding;
69 root 1.8 },
70     ;
71     }
72    
73     sub more_seeding {
74     return if $nodecnt;
75     return unless @SEEDS;
76    
77     $AnyEvent::MP::Kernel::WARN->(9, "no nodes connected, seeding.");
78    
79     seed_connect $SEEDS[rand @SEEDS];
80     }
81    
82 root 1.14 sub avoid_seed($) {
83     @SEEDS = grep $_ ne $_[0], @SEEDS;
84     }
85    
86 root 1.8 sub set_seeds(@) {
87     @SEEDS = @_;
88    
89     $SEED_WATCHER ||= AE::timer 5, $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, \&more_seeding;
90    
91     seed_connect $_
92     for @SEEDS;
93     }
94    
95     #############################################################################
96    
97 root 1.4 sub unreg_groups($) {
98 root 1.8 my ($node) = @_;
99 root 1.4
100 root 1.8 my $qr = qr/^\Q$node\E(?:#|$)/;
101 root 1.4
102     for my $group (values %greg) {
103     @$group = grep $_ !~ $qr, @$group;
104     }
105     }
106    
107     sub set_groups($$) {
108 root 1.8 my ($node, $lreg) = @_;
109 root 1.5
110 root 1.6 while (my ($k, $v) = each %$lreg) {
111     push @{ $greg{$k} }, @$v;
112     }
113 root 1.4 }
114    
115 root 1.7 =item $ports = find $group
116    
117     Returns all the ports currently registered to the given group (as
118     read-only array reference). When the group has no registered members,
119     return C<undef>.
120    
121     =cut
122    
123     sub find($) {
124     @{ $greg{$_[0]} }
125     ? $greg{$_[0]}
126     : undef
127     }
128    
129 root 1.4 =item $guard = register $port, $group
130    
131     Register the given (local!) port in the named global group C<$group>.
132    
133     The port will be unregistered automatically when the port is destroyed.
134    
135     When not called in void context, then a guard object will be returned that
136     will also cause the name to be unregistered when destroyed.
137    
138     =cut
139    
140     # register port from any node
141     sub _register {
142     my ($port, $group) = @_;
143    
144     push @{ $greg{$group} }, $port;
145     }
146    
147 root 1.6 # unregister port from any node
148     sub _unregister {
149     my ($port, $group) = @_;
150    
151     @{ $greg{$group} } = grep $_ ne $port, @{ $greg{$group} };
152     }
153    
154 root 1.4 # unregister local port
155     sub unregister {
156     my ($port, $group) = @_;
157    
158     delete $lmon{"$group\x00$port"};
159     @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
160    
161     _unregister $port, $group;
162    
163     snd $_, reg0 => $port, $group
164     for values %port;
165     }
166    
167     # register local port
168     sub register($$) {
169     my ($port, $group) = @_;
170    
171     port_is_local $port
172     or Carp::croak "AnyEvent::MP::Global::register can only be called for local ports, caught";
173    
174     $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
175     push @{ $lreg{$group} }, $port;
176    
177     snd $_, reg1 => $port, $group
178     for values %port;
179    
180     _register $port, $group;
181    
182     wantarray && AnyEvent::Util::guard { unregister $port, $group }
183     }
184    
185     sub start_node {
186 root 1.8 my ($node) = @_;
187 root 1.4
188 root 1.8 return if exists $port{$node};
189     return if $node eq $NODE; # do not connect to ourselves
190 root 1.4
191     # establish connection
192 root 1.8 my $port = $port{$node} = spawn $node, "AnyEvent::MP::Global::connect", 0, $NODE;
193    
194 root 1.4 mon $port, sub {
195 root 1.8 unreg_groups $node;
196     delete $port{$node};
197 root 1.4 };
198 root 1.8
199 root 1.10 snd $port, addr => $AnyEvent::MP::Kernel::LISTENER;
200 root 1.8 snd $port, connect_nodes => \%addr if %addr;
201     snd $port, set => \%lreg if %lreg;
202 root 1.4 }
203 root 1.3
204 root 1.5 # other nodes connect via this
205 root 1.3 sub connect {
206 root 1.8 my ($version, $node) = @_;
207 root 1.1
208 root 1.3 # monitor them, silently die
209 root 1.8 mon $node, psub { kil $SELF };
210 root 1.3
211 root 1.4 rcv $SELF,
212 root 1.8 addr => sub {
213     my $addresses = shift;
214 root 1.14 $AnyEvent::MP::Kernel::WARN->(9, "$node told us its addresses (@$addresses).");
215 root 1.8 $addr{$node} = $addresses;
216 root 1.9
217     # to help listener-less nodes, we broadcast new addresses to them unconditionally
218     #TODO: should be done by a node finding out about a listener-less one
219     if (@$addresses) {
220     for my $other (values %AnyEvent::MP::NODE) {
221     if ($other->{transport}) {
222 root 1.12 if ($addr{$other->{id}} && !@{ $addr{$other->{id}} }) {
223     $AnyEvent::MP::Kernel::WARN->(9, "helping $other->{id} to find $node.");
224     snd $port{$other->{id}}, connect_nodes => { $node => $addresses };
225 root 1.9 }
226     }
227     }
228     }
229 root 1.8 },
230 root 1.4 connect_nodes => sub {
231 root 1.8 my ($kv) = @_;
232    
233     use JSON::XS;#d#
234     my $kv_txt = JSON::XS->new->encode ($kv);#d#
235     $AnyEvent::MP::Kernel::WARN->(9, "$node told us it knows about $kv_txt.");#d#
236    
237     while (my ($id, $addresses) = each %$kv) {
238     my $node = AnyEvent::MP::Kernel::add_node $id;
239     $node->connect (@$addresses);
240     start_node $id;
241 root 1.4 }
242     },
243 root 1.8 find_node => sub {
244     my ($othernode) = @_;
245    
246     $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to find $othernode.");
247     snd $port{$node}, connect_nodes => { $othernode => $addr{$othernode} }
248     if $addr{$othernode};
249     },
250 root 1.4 set => sub {
251 root 1.8 set_groups $node, shift;
252 root 1.4 },
253     reg1 => \&_register,
254     reg0 => \&_unregister,
255     ;
256 root 1.3 }
257 root 1.1
258     sub mon_node {
259 root 1.8 my ($node, $is_up) = @_;
260 root 1.1
261     if ($is_up) {
262 root 1.8 ++$nodecnt;
263     start_node $node;
264 root 1.2 } else {
265 root 1.8 --$nodecnt;
266     more_seeding unless $nodecnt;
267     unreg_groups $node;
268    
269     # forget about the node
270     delete $addr{$node};
271     # ask other nodes if they know the node
272     snd $_, find_node => $node
273     for values %port;
274 root 1.1 }
275 root 1.8 #warn "node<$node,$is_up>\n";#d#
276 root 1.1 }
277    
278     mon_node $_, 1
279     for up_nodes;
280    
281     mon_nodes \&mon_node;
282    
283     =back
284    
285     =head1 SEE ALSO
286    
287     L<AnyEvent::MP>.
288    
289     =head1 AUTHOR
290    
291     Marc Lehmann <schmorp@schmorp.de>
292     http://home.schmorp.de/
293    
294     =cut
295    
296     1
297