ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.48
Committed: Fri Mar 2 19:21:16 2012 UTC (12 years, 5 months ago) by root
Branch: MAIN
Changes since 1.47: +0 -5 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Global - some network-global services
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Global;
8    
9     =head1 DESCRIPTION
10    
11 root 1.42 This module maintains a fully-meshed network between global nodes and
12     tries to have connections with all nodes in the network.
13 root 1.1
14 root 1.15 It also manages named port groups - ports can register themselves in any
15     number of groups that will be available network-wide, which is great for
16     discovering services.
17 root 1.1
18 root 1.15 Running it on one node will automatically run it on all nodes, although,
19     at the moment, the global service is started by default anyways.
20 root 1.3
21 root 1.1 =head1 GLOBALS AND FUNCTIONS
22    
23     =over 4
24    
25     =cut
26    
27     package AnyEvent::MP::Global;
28    
29     use common::sense;
30     use Carp ();
31    
32 root 1.8 use AnyEvent ();
33 root 1.4 use AnyEvent::Util ();
34    
35 root 1.1 use AnyEvent::MP;
36     use AnyEvent::MP::Kernel;
37 root 1.22 use AnyEvent::MP::Transport ();
38 root 1.1
39 root 1.20 use base "Exporter";
40    
41     our @EXPORT = qw(
42     grp_reg
43     grp_get
44     grp_mon
45     );
46    
47 root 1.5 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
48    
49 root 1.8 #############################################################################
50 root 1.44 # node protocol parts for global nodes
51 root 1.8
52 root 1.44 {
53     package AnyEvent::MP::Kernel;
54 root 1.8
55 root 1.44 # TODO: this is ugly, maybe this should go into MP::Kernel or a separate module #d#
56 root 1.26
57 root 1.46 our %NODE;
58     our $NODE;
59     our $LISTENER;
60    
61 root 1.44 our $GLOBAL;
62 root 1.45 our $MASTER;
63 root 1.47 our $MASTER_MON;
64     our $MASTER_TIMER;
65 root 1.46
66 root 1.44 our %GLOBAL_SLAVE;
67 root 1.26
68 root 1.46 our %GLOBAL_DB; # global db
69     our %LOCAL_DBS; # local databases of other global nodes
70     our %LOCAL_DB; # this node database
71    
72     our $SRCNODE;
73     our %node_req;
74    
75 root 1.47 # only in global code
76     our %GLOBAL_MON; # monitors {family}{"" or key}
77    
78     sub other_globals() {
79     grep $_ ne $NODE && node_is_up $_, keys %{ $GLOBAL_DB{"'g"} }
80     }
81    
82 root 1.46 # broadcasts a message to all other global nodes
83     sub g_broadcast {
84     snd $_, @_
85     for other_globals;
86     }
87    
88     sub g_mon_check {
89     warn "g_mon_check<@_>\n";#d#
90 root 1.47
91     my %node = (
92     %{ $GLOBAL_MON{$_[1]}{$_[2]} },
93     %{ $GLOBAL_MON{$_[1]}{"" } },
94     %{ $GLOBAL_MON{"" }{"" } },
95     );
96    
97     snd $_ => g_chg1 => $_[1], $_[2], @_ > 2 ? $_[3] : ()
98     for keys %node;
99 root 1.46 }
100    
101     # add/replace a key in the database
102     sub g_add($$$$) {
103     $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]} =
104     $GLOBAL_DB {$_[1]}{$_[2]} = $_[3];
105    
106     g_broadcast g_add => $_[1] => $_[2] => $_[3]
107     if exists $GLOBAL_SLAVE{$_[0]};
108    
109     warn "g_add<@_>\n";#d#
110     &g_mon_check;
111     }
112    
113     # delete a key from the database
114     sub g_del($$$) {
115     delete $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]};
116    
117     g_broadcast g_del => $_[1] => $_[2]
118     if exists $GLOBAL_SLAVE{$_[0]};
119    
120     delete $GLOBAL_DB{$_[1]}{$_[2]};
121    
122     # check if other node maybe still has the key, then we don't delete, but add
123     for (values %LOCAL_DBS) {
124     if (exists $_->{$_[1]}{$_[2]}) {
125     $GLOBAL_DB{$_[1]}{$_[2]} = $_->{$_[1]}{$_[2]};
126     last;
127     }
128     }
129    
130     warn "g_del<@_>\n";#d#
131     &g_mon_check;
132     }
133    
134     # delete all keys from a database
135     sub g_clr($) {
136     my ($node) = @_;
137    
138     my $db = $LOCAL_DBS{$node};
139     while (my ($f, $k) = each %$db) {
140     g_del $node, $f => $_
141     for keys %$k;
142     }
143    
144     delete $LOCAL_DBS{$node};
145     }
146    
147     # set the whole (node-local) database - previous value must be empty
148     sub g_set($$) {
149     my ($node, $db) = @_;
150    
151     while (my ($f, $k) = each %$db) {
152     g_add $node, $f => $_ => delete $k->{$_}
153     for keys %$k;
154     }
155     }
156    
157     # gather node databases from slaves
158    
159     # other node wants to make us the master
160     $node_req{g_slave} = sub {
161     my ($db) = @_;
162    
163     my $node = $SRCNODE->{id};
164     undef $GLOBAL_SLAVE{$node};
165     g_set $node, $db;
166     };
167    
168     $node_req{g_add} = sub {
169     &g_add ($SRCNODE->{id}, @_);
170     };
171    
172     $node_req{g_del} = sub {
173     &g_del ($SRCNODE->{id}, @_);
174     };
175    
176     $node_req{g_set} = sub {
177     g_set $SRCNODE->{id}, $_[0];
178     };
179    
180     $node_req{g_find} = sub {
181     my ($node) = @_;
182    
183     snd $SRCNODE->{id}, g_found => $node, $GLOBAL_DB{"'l"}{$node};
184     };
185    
186 root 1.47 # monitoring
187    
188     sub g_slave_disconnect($) {
189     my ($node) = @_;
190    
191     g_clr $node;
192    
193     if (my $mon = delete $GLOBAL_SLAVE{$node}) {
194     while (my ($f, $fv) = each %$mon) {
195     delete $GLOBAL_MON{$f}{$_}
196     for keys %$fv;
197     }
198     }
199     }
200    
201     # g_mon0 family key - stop monitoring
202     $node_req{g_mon0} = sub {
203     delete $GLOBAL_MON{$_[0]}{$_[1]}{$SRCNODE->{id}};
204     delete $GLOBAL_SLAVE{$SRCNODE->{id}}{$_[0]}{$_[1]};
205     };
206    
207     # g_mon1 family key - start monitoring
208     $node_req{g_mon1} = sub {
209     undef $GLOBAL_SLAVE{$SRCNODE->{id}}{$_[0]}{$_[1]};
210     undef $GLOBAL_MON{$_[0]}{$_[1]}{$SRCNODE->{id}};
211     #d# generate lots of initial change requests, or one big?
212    
213     snd $SRCNODE->{id}, g_chg0 => $_[0], $_[1]
214     };
215    
216 root 1.46 #############################################################################
217 root 1.44 # switch to global mode
218 root 1.46
219 root 1.44 $GLOBAL = 1;
220     $MASTER = $NODE;
221 root 1.46 undef $GLOBAL_SLAVE{$NODE}; # we are our own master (and slave)
222 root 1.47 $LOCAL_DBS{$NODE} = { %LOCAL_DB };
223    
224     # regularly try to connect to global nodes - maybe use seeding code?
225     $MASTER_TIMER = AE::timer 0, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, sub {
226     (add_node $_)->connect
227     for keys %{ $GLOBAL_DB{"'g"} };
228     };
229 root 1.22
230 root 1.47 # instantly connect to other global nodes when we learn of them
231     # so we don't have to wait for the timer.
232     #TODO
233     # $GLOBAL_MON{"'g"}{""}{""} = sub {
234     # (add_node $_[1])->connect;
235     # };
236    
237     # delete slaves on node-down
238 root 1.46 # clear slave db on node-down
239 root 1.47 $MASTER_MON = mon_nodes sub {
240     g_slave_disconnect $_[0] unless $_[1];
241 root 1.44 };
242 root 1.20
243 root 1.44 # tell everybody who connects that we are a global node
244     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
245     $_[0]{local_greeting}{global} = 1;
246     };
247 root 1.4
248 root 1.47 # connect from a global node
249     sub g_global_connect {
250     my ($node) = @_;
251    
252     # we need to set this currently, as to avoid race conditions
253     # because it takes a while until the other global node tells us it is global.
254    
255     undef $GLOBAL_DB{"'g"}{$node};
256     undef $LOCAL_DBS{$node}{"'g"}{$node};
257 root 1.46
258     # global nodes send all local databases, merged,
259     # as their local database to global nodes
260     my %db;
261    
262     for (values %LOCAL_DBS) {
263 root 1.47 while (my ($f, $fv) = each %$_) {
264     while (my ($k, $kv) = each %$fv) {
265     $db{$f}{$k} = $kv;
266 root 1.46 }
267     }
268     }
269    
270 root 1.47 snd $node => g_set => \%db;
271     }
272    
273     # send our database to every global node that connects
274     push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
275     return unless $_[0]{remote_greeting}{global};
276    
277     g_global_connect $_[0]{remote_node};
278     };
279    
280     # tell our master that we are global now
281     for (values %NODE) {
282     if ($_->{transport} && $_->{transport}{remote_greeting}{global}) {
283     snd $_->{id} => "g_global";
284     g_global_connect $_->{id};
285     }
286     }
287    
288     $node_req{g_global} = sub {
289     g_slave_disconnect $SRCNODE->{id};
290     $SRCNODE->{transport}{remote_greeting}{global} = 1;
291     g_global_connect $SRCNODE->{id};
292 root 1.44 };
293 root 1.4
294 root 1.46 # now add us to the set of global nodes
295     ldb_set "'g" => $NODE => undef;
296 root 1.4 }
297    
298 root 1.20 =item $guard = grp_reg $group, $port
299 root 1.4
300     Register the given (local!) port in the named global group C<$group>.
301    
302     The port will be unregistered automatically when the port is destroyed.
303    
304     When not called in void context, then a guard object will be returned that
305     will also cause the name to be unregistered when destroyed.
306    
307     =cut
308    
309     # register local port
310 root 1.20 sub grp_reg($$) {
311     my ($group, $port) = @_;
312 root 1.4
313     port_is_local $port
314 root 1.20 or Carp::croak "AnyEvent::MP::Global::grp_reg can only be called for local ports, caught";
315    
316 root 1.44 defined wantarray && AnyEvent::Util::guard { unregister ($port, $group) }
317 root 1.4 }
318    
319 root 1.20 =item $ports = grp_get $group
320 root 1.15
321     Returns all the ports currently registered to the given group (as
322 root 1.20 read-only(!) array reference). When the group has no registered members,
323 root 1.15 return C<undef>.
324    
325     =cut
326    
327 root 1.20 sub grp_get($) {
328     }
329    
330     =item $guard = grp_mon $group, $callback->($ports, $add, $del)
331    
332     Installs a monitor on the given group. Each time there is a change it
333     will be called with the current group members as an arrayref as the
334     first argument. The second argument only contains ports added, the third
335     argument only ports removed.
336    
337     Unlike C<grp_get>, all three arguments will always be array-refs, even if
338     the array is empty. None of the arrays must be modified in any way.
339    
340     The first invocation will be with the first two arguments set to the
341     current members, as if all of them were just added, but only when the
342 root 1.39 group is actually non-empty.
343 root 1.20
344     Optionally returns a guard object that uninstalls the watcher when it is
345     destroyed.
346    
347     =cut
348    
349     sub grp_mon($$) {
350     my ($grp, $cb) = @_;
351 root 1.4 }
352 root 1.3
353 root 1.1 =back
354    
355     =head1 SEE ALSO
356    
357     L<AnyEvent::MP>.
358    
359     =head1 AUTHOR
360    
361     Marc Lehmann <schmorp@schmorp.de>
362     http://home.schmorp.de/
363    
364     =cut
365    
366     1
367