ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.13
Committed: Fri Aug 28 16:37:30 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.12: +1 -2 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Global - some network-global services
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Global;
8    
9     =head1 DESCRIPTION
10    
11     This module provides an assortment of network-global functions: group name
12     registration and non-local locks.
13    
14     It will also try to build and maintain a full mesh of all network nodes.
15    
16 root 1.3 While it isn't mandatory to run the global services, running it on one
17     node will automatically run it on all nodes.
18    
19 root 1.1 =head1 GLOBALS AND FUNCTIONS
20    
21     =over 4
22    
23     =cut
24    
25     package AnyEvent::MP::Global;
26    
27     use common::sense;
28     use Carp ();
29     use MIME::Base64 ();
30    
31 root 1.8 use AnyEvent ();
32 root 1.4 use AnyEvent::Util ();
33    
34 root 1.1 use AnyEvent::MP;
35     use AnyEvent::MP::Kernel;
36    
37     our $VERSION = $AnyEvent::MP::VERSION;
38    
39 root 1.8 our %addr; # port ID => [address...] mapping
40    
41 root 1.4 our %port; # our rendezvous port on the other side
42     our %lreg; # local registry, name => [pid...]
43     our %lmon; # local rgeistry monitoring name,pid => mon
44     our %greg; # global regstry, name => [pid...]
45    
46 root 1.8 our $nodecnt;
47    
48 root 1.5 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
49    
50 root 1.8 #############################################################################
51     # seednodes
52    
53     our @SEEDS;
54     our $SEED_WATCHER;
55    
56     sub seed_connect {
57     my ($seed) = @_;
58    
59     my ($host, $port) = AnyEvent::Socket::parse_hostport $seed
60     or Carp::croak "$seed: unparsable seed address";
61    
62     # ughhh
63     my $tp; $tp = AnyEvent::MP::Transport::mp_connect $host, $port,
64     release => \&more_seeding,
65     sub {
66     $tp = shift;
67     $tp->{keepalive} = $tp;
68     },
69     ;
70     }
71    
72     sub more_seeding {
73 root 1.13 return;#d##TODO#
74 root 1.8 return if $nodecnt;
75     return unless @SEEDS;
76    
77     $AnyEvent::MP::Kernel::WARN->(9, "no nodes connected, seeding.");
78    
79     seed_connect $SEEDS[rand @SEEDS];
80     }
81    
82     sub set_seeds(@) {
83     @SEEDS = @_;
84    
85     $SEED_WATCHER ||= AE::timer 5, $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, \&more_seeding;
86    
87     seed_connect $_
88     for @SEEDS;
89     }
90    
91     #############################################################################
92    
93 root 1.4 sub unreg_groups($) {
94 root 1.8 my ($node) = @_;
95 root 1.4
96 root 1.8 my $qr = qr/^\Q$node\E(?:#|$)/;
97 root 1.4
98     for my $group (values %greg) {
99     @$group = grep $_ !~ $qr, @$group;
100     }
101     }
102    
103     sub set_groups($$) {
104 root 1.8 my ($node, $lreg) = @_;
105 root 1.5
106 root 1.6 while (my ($k, $v) = each %$lreg) {
107     push @{ $greg{$k} }, @$v;
108     }
109 root 1.4 }
110    
111 root 1.7 =item $ports = find $group
112    
113     Returns all the ports currently registered to the given group (as
114     read-only array reference). When the group has no registered members,
115     return C<undef>.
116    
117     =cut
118    
119     sub find($) {
120     @{ $greg{$_[0]} }
121     ? $greg{$_[0]}
122     : undef
123     }
124    
125 root 1.4 =item $guard = register $port, $group
126    
127     Register the given (local!) port in the named global group C<$group>.
128    
129     The port will be unregistered automatically when the port is destroyed.
130    
131     When not called in void context, then a guard object will be returned that
132     will also cause the name to be unregistered when destroyed.
133    
134     =cut
135    
136     # register port from any node
137     sub _register {
138     my ($port, $group) = @_;
139    
140     push @{ $greg{$group} }, $port;
141     }
142    
143 root 1.6 # unregister port from any node
144     sub _unregister {
145     my ($port, $group) = @_;
146    
147     @{ $greg{$group} } = grep $_ ne $port, @{ $greg{$group} };
148     }
149    
150 root 1.4 # unregister local port
151     sub unregister {
152     my ($port, $group) = @_;
153    
154     delete $lmon{"$group\x00$port"};
155     @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} };
156    
157     _unregister $port, $group;
158    
159     snd $_, reg0 => $port, $group
160     for values %port;
161     }
162    
163     # register local port
164     sub register($$) {
165     my ($port, $group) = @_;
166    
167     port_is_local $port
168     or Carp::croak "AnyEvent::MP::Global::register can only be called for local ports, caught";
169    
170     $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group };
171     push @{ $lreg{$group} }, $port;
172    
173     snd $_, reg1 => $port, $group
174     for values %port;
175    
176     _register $port, $group;
177    
178     wantarray && AnyEvent::Util::guard { unregister $port, $group }
179     }
180    
181     sub start_node {
182 root 1.8 my ($node) = @_;
183 root 1.4
184 root 1.8 return if exists $port{$node};
185     return if $node eq $NODE; # do not connect to ourselves
186 root 1.4
187     # establish connection
188 root 1.8 my $port = $port{$node} = spawn $node, "AnyEvent::MP::Global::connect", 0, $NODE;
189    
190 root 1.4 mon $port, sub {
191 root 1.8 unreg_groups $node;
192     delete $port{$node};
193 root 1.4 };
194 root 1.8
195 root 1.10 snd $port, addr => $AnyEvent::MP::Kernel::LISTENER;
196 root 1.8 snd $port, connect_nodes => \%addr if %addr;
197     snd $port, set => \%lreg if %lreg;
198 root 1.4 }
199 root 1.3
200 root 1.5 # other nodes connect via this
201 root 1.3 sub connect {
202 root 1.8 my ($version, $node) = @_;
203 root 1.1
204 root 1.3 # monitor them, silently die
205 root 1.8 mon $node, psub { kil $SELF };
206 root 1.3
207 root 1.4 rcv $SELF,
208 root 1.8 addr => sub {
209     my $addresses = shift;
210     $AnyEvent::MP::Kernel::WARN->(9, "$node told us it's addresses (@$addresses).");
211     $addr{$node} = $addresses;
212 root 1.9
213     # to help listener-less nodes, we broadcast new addresses to them unconditionally
214     #TODO: should be done by a node finding out about a listener-less one
215     if (@$addresses) {
216     for my $other (values %AnyEvent::MP::NODE) {
217     if ($other->{transport}) {
218 root 1.12 if ($addr{$other->{id}} && !@{ $addr{$other->{id}} }) {
219     $AnyEvent::MP::Kernel::WARN->(9, "helping $other->{id} to find $node.");
220     snd $port{$other->{id}}, connect_nodes => { $node => $addresses };
221 root 1.9 }
222     }
223     }
224     }
225 root 1.8 },
226 root 1.4 connect_nodes => sub {
227 root 1.8 my ($kv) = @_;
228    
229     use JSON::XS;#d#
230     my $kv_txt = JSON::XS->new->encode ($kv);#d#
231     $AnyEvent::MP::Kernel::WARN->(9, "$node told us it knows about $kv_txt.");#d#
232    
233     while (my ($id, $addresses) = each %$kv) {
234     my $node = AnyEvent::MP::Kernel::add_node $id;
235     $node->connect (@$addresses);
236     start_node $id;
237 root 1.4 }
238     },
239 root 1.8 find_node => sub {
240     my ($othernode) = @_;
241    
242     $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to find $othernode.");
243     snd $port{$node}, connect_nodes => { $othernode => $addr{$othernode} }
244     if $addr{$othernode};
245     },
246 root 1.4 set => sub {
247 root 1.8 set_groups $node, shift;
248 root 1.4 },
249     reg1 => \&_register,
250     reg0 => \&_unregister,
251     ;
252 root 1.3 }
253 root 1.1
254     sub mon_node {
255 root 1.8 my ($node, $is_up) = @_;
256 root 1.1
257     if ($is_up) {
258 root 1.8 ++$nodecnt;
259     start_node $node;
260 root 1.2 } else {
261 root 1.8 --$nodecnt;
262     more_seeding unless $nodecnt;
263     unreg_groups $node;
264    
265     # forget about the node
266     delete $addr{$node};
267     # ask other nodes if they know the node
268     snd $_, find_node => $node
269     for values %port;
270 root 1.1 }
271 root 1.8 #warn "node<$node,$is_up>\n";#d#
272 root 1.1 }
273    
274     mon_node $_, 1
275     for up_nodes;
276    
277     mon_nodes \&mon_node;
278    
279     =back
280    
281     =head1 SEE ALSO
282    
283     L<AnyEvent::MP>.
284    
285     =head1 AUTHOR
286    
287     Marc Lehmann <schmorp@schmorp.de>
288     http://home.schmorp.de/
289    
290     =cut
291    
292     1
293