ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.60
Committed: Thu Mar 22 00:48:29 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.59: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.58 AnyEvent::MP::Global - network backbone services
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Global;
8    
9     =head1 DESCRIPTION
10    
11 root 1.58 This module is usually run (or started on) seed nodes and provides a
12     variety of services to connected nodes, such as the distributed database.
13 root 1.1
14 root 1.58 The global nodes form a fully-meshed network, that is, all global nodes
15     currently maintain connections to all other global nodes.
16 root 1.1
17 root 1.58 Loading this module (e.g. as a service) transforms the local node into a
18     global node. There are no user-servicable parts inside.
19 root 1.1
20     =cut
21    
22     package AnyEvent::MP::Global;
23    
24     use common::sense;
25     use Carp ();
26    
27 root 1.8 use AnyEvent ();
28 root 1.4
29 root 1.1 use AnyEvent::MP;
30     use AnyEvent::MP::Kernel;
31 root 1.20
32 root 1.60 AE::log 7 => "starting global service.";
33 root 1.5
34 root 1.8 #############################################################################
35 root 1.44 # node protocol parts for global nodes
36 root 1.8
37 root 1.49 package AnyEvent::MP::Kernel;
38 root 1.8
39 root 1.49 # TODO: this is ugly (classical use vars vs. our),
40     # maybe this should go into MP::Kernel
41 root 1.26
42 root 1.49 our %NODE;
43     our $NODE;
44 root 1.46
45 root 1.49 our $GLOBAL;
46     our $MASTER;
47     our $MASTER_MON;
48     our $MASTER_TIMER;
49 root 1.46
50 root 1.49 our %GLOBAL_SLAVE;
51 root 1.26
52 root 1.49 our %GLOBAL_DB; # global db
53     our %LOCAL_DBS; # local databases of other global nodes
54     our %LOCAL_DB; # this node database
55 root 1.46
56 root 1.59 our $SRCNODE; # the origin node id
57 root 1.49 our %NODE_REQ;
58 root 1.46
59 root 1.49 # only in global code
60 root 1.51 our %GLOBAL_MON; # monitors {family}
61 root 1.47
62 root 1.49 sub other_globals() {
63     grep $_ ne $NODE && node_is_up $_, keys %{ $GLOBAL_DB{"'g"} }
64     }
65 root 1.47
66 root 1.49 # broadcasts a message to all other global nodes
67     sub g_broadcast {
68     snd $_, @_
69     for other_globals;
70     }
71 root 1.46
72 root 1.57 # add/replace/del inside a family in the database
73     # @$dle must not contain any key in %$set
74     sub g_upd {
75     my ($node, $family, $set, $del) = @_;
76    
77     my $ldb = $LOCAL_DBS{$node}{$family} ||= {};
78     my $gdb = $GLOBAL_DB {$family} ||= {};
79    
80     # add/replace keys
81     while (my ($k, $v) = each %$set) {
82     $ldb->{$k} =
83     $gdb->{$k} = $v;
84     }
85    
86     my @del; # actual deletes
87    
88     # take care of deletes
89     keydel:
90     for my $k (@$del) {
91     delete $ldb->{$k};
92     delete $gdb->{$k};
93    
94     # check if some other node still has the key, then we don't delete, but change
95     for (values %LOCAL_DBS) {
96     if (exists $_->{$family}{$k}) {
97     $set->{$k} = $gdb->{$k} = $_->{$family}{$k};
98 root 1.46
99 root 1.57 next keydel;
100     }
101     }
102 root 1.46
103 root 1.57 push @del, $k;
104     }
105 root 1.51
106 root 1.57 # family could be empty now
107     delete $GLOBAL_DB{$family} unless %$gdb;
108     delete $LOCAL_DBS{$node}{$family} unless %$ldb;
109 root 1.51
110 root 1.57 g_broadcast g_upd => $family, $set, \@del
111     if exists $GLOBAL_SLAVE{$node};
112 root 1.46
113 root 1.57 # tell subscribers we have changed the family
114     snd $_ => g_chg2 => $family, $set, \@del
115     for keys %{ $GLOBAL_MON{$family} };
116 root 1.51 }
117    
118     # set the whole (node-local) database - previous value must be empty
119     sub g_set($$) {
120     my ($node, $db) = @_;
121    
122     while (my ($f, $k) = each %$db) {
123 root 1.57 g_upd $node, $f, $k;
124 root 1.51 }
125 root 1.49 }
126 root 1.46
127 root 1.49 # delete all keys from a database
128     sub g_clr($) {
129     my ($node) = @_;
130    
131     my $db = $LOCAL_DBS{$node};
132 root 1.57
133 root 1.49 while (my ($f, $k) = each %$db) {
134 root 1.57 g_upd $node, $f, undef, [keys %$k];
135 root 1.46 }
136    
137 root 1.49 delete $LOCAL_DBS{$node};
138     }
139    
140     # gather node databases from slaves
141 root 1.46
142 root 1.49 # other node wants to make us the master
143     $NODE_REQ{g_slave} = sub {
144 root 1.58 my ($db) = @_
145     or return; # empty g_slave is used to start global service
146 root 1.46
147 root 1.59 my $node = $SRCNODE;
148 root 1.49 undef $GLOBAL_SLAVE{$node};
149     g_set $node, $db;
150     };
151 root 1.46
152 root 1.57 $NODE_REQ{g_set} = sub {
153 root 1.59 g_set $SRCNODE, @_;
154 root 1.49 };
155 root 1.46
156 root 1.57 $NODE_REQ{g_upd} = sub {
157 root 1.59 g_upd $SRCNODE, @_;
158 root 1.49 };
159 root 1.46
160 root 1.49 $NODE_REQ{g_find} = sub {
161     my ($node) = @_;
162 root 1.46
163 root 1.59 snd $SRCNODE, g_found => $node, $GLOBAL_DB{"'l"}{$node};
164 root 1.49 };
165 root 1.46
166 root 1.55 $NODE_REQ{g_db_family} = sub {
167     my ($family, $id) = @_;
168 root 1.59 snd $SRCNODE, g_reply => $id, $GLOBAL_DB{$family} || {};
169 root 1.55 };
170    
171     $NODE_REQ{g_db_keys} = sub {
172     my ($family, $id) = @_;
173 root 1.59 snd $SRCNODE, g_reply => $id, [keys %{ $GLOBAL_DB{$family} } ];
174 root 1.55 };
175    
176     $NODE_REQ{g_db_values} = sub {
177     my ($family, $id) = @_;
178 root 1.59 snd $SRCNODE, g_reply => $id, [values %{ $GLOBAL_DB{$family} } ];
179 root 1.55 };
180    
181 root 1.49 # monitoring
182 root 1.47
183 root 1.49 sub g_slave_disconnect($) {
184     my ($node) = @_;
185 root 1.47
186 root 1.49 g_clr $node;
187 root 1.47
188 root 1.49 if (my $mon = delete $GLOBAL_SLAVE{$node}) {
189     while (my ($f, $fv) = each %$mon) {
190     delete $GLOBAL_MON{$f}{$_}
191     for keys %$fv;
192 root 1.51
193     delete $GLOBAL_MON{$f}
194     unless %{ $GLOBAL_MON{$f} };
195 root 1.47 }
196     }
197 root 1.49 }
198    
199 root 1.51 # g_mon0 family - stop monitoring
200 root 1.49 $NODE_REQ{g_mon0} = sub {
201 root 1.59 delete $GLOBAL_MON{$_[0]}{$SRCNODE};
202 root 1.51 delete $GLOBAL_MON{$_[0]} unless %{ $GLOBAL_MON{$_[0]} };
203    
204 root 1.59 delete $GLOBAL_SLAVE{$SRCNODE}{$_[0]};
205 root 1.49 };
206    
207     # g_mon1 family key - start monitoring
208     $NODE_REQ{g_mon1} = sub {
209 root 1.59 undef $GLOBAL_SLAVE{$SRCNODE}{$_[0]};
210     undef $GLOBAL_MON{$_[0]}{$SRCNODE};
211 root 1.49
212 root 1.59 snd $SRCNODE, g_chg1 => $_[0], $GLOBAL_DB{$_[0]};
213 root 1.49 };
214 root 1.47
215 root 1.49 #############################################################################
216     # switch to global mode
217    
218     # regularly try to connect to global nodes - maybe use seeding code?
219     $MASTER_TIMER = AE::timer 0, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, sub {
220     (add_node $_)->connect
221     for keys %{ $GLOBAL_DB{"'g"} };
222     };
223    
224     # instantly connect to other global nodes when we learn of them
225     # so we don't have to wait for the timer.
226     #TODO
227 root 1.47 # $GLOBAL_MON{"'g"}{""}{""} = sub {
228     # (add_node $_[1])->connect;
229     # };
230    
231 root 1.49 # delete slaves on node-down
232     # clear slave db on node-down
233     $MASTER_MON = mon_nodes sub {
234     g_slave_disconnect $_[0] unless $_[1];
235     };
236    
237     # tell everybody who connects that we are a global node
238     push @AnyEvent::MP::Transport::HOOK_GREET, sub {
239     $_[0]{local_greeting}{global} = 1;
240     };
241    
242     # connect from a global node
243     sub g_global_connect {
244     my ($node) = @_;
245    
246     # we need to set this currently, as to avoid race conditions
247     # because it takes a while until the other global node tells us it is global.
248    
249     undef $GLOBAL_DB{"'g"}{$node};
250     undef $LOCAL_DBS{$node}{"'g"}{$node};
251    
252     # global nodes send all local databases, merged,
253     # as their local database to global nodes
254     my %db;
255    
256     for (values %LOCAL_DBS) {
257     while (my ($f, $fv) = each %$_) {
258     while (my ($k, $kv) = each %$fv) {
259     $db{$f}{$k} = $kv;
260 root 1.46 }
261     }
262 root 1.47 }
263    
264 root 1.49 snd $node => g_set => \%db;
265 root 1.4 }
266    
267 root 1.49 # send our database to every global node that connects
268     push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
269     return unless $_[0]{remote_greeting}{global};
270    
271     g_global_connect $_[0]{remote_node};
272     };
273    
274     # tell our master that we are global now
275     for (values %NODE) {
276     if ($_->{transport} && $_->{transport}{remote_greeting}{global}) {
277     snd $_->{id} => "g_global";
278     g_global_connect $_->{id};
279     }
280 root 1.20 }
281    
282 root 1.49 $NODE_REQ{g_global} = sub {
283 root 1.59 g_slave_disconnect $SRCNODE;
284    
285     my $node = $NODE{$SRCNODE};
286     $node->{transport}{remote_greeting}{global} = 1;
287    
288     g_global_connect $SRCNODE;
289 root 1.49 };
290 root 1.20
291 root 1.56 # enable global mode
292     $GLOBAL = 1;
293    
294     # global nodes are their own masters - this
295     # resends global requests and sets the local database.
296     master_set $NODE;
297    
298 root 1.49 # now add us to the set of global nodes
299     db_set "'g" => $NODE => undef;
300 root 1.3
301 root 1.58 #############################################################################
302     # compatibility functions for aemp 1.0
303    
304     #d#TODO#
305     #S AND FUNCTIONS $guard = grp_reg $group, $port Register the given (local!) port in the named global group $group. The port will be unregistered automatically when the port is destroyed. When not called in void context, then a guard object will be returned that will also cause the name to be unregistered when destroyed.
306     # $ports = grp_get $group Returns all the ports currently registered to the given group (as read-only(!) array reference). When the group has no registered members, return undef.
307     # $guard = grp_mon $group, $callback->($ports, $add, $del) Installs a monitor on the given group. Each time there is a change it will be called with the current group members as an arrayref as the first argument. The second argument only contains ports added, the third argument only ports removed. Unlike grp_get, all three arguments will always be array-refs, even if the array is empty. None of the arrays must be modified in any way. The first invocation will be with the first two arguments set to the current members, as if all of them were just added, but only when the group is actually non-empty. Optionally returns a guard object that uninstalls the watcher when it is destroyed.
308 root 1.1
309     =head1 SEE ALSO
310    
311     L<AnyEvent::MP>.
312    
313     =head1 AUTHOR
314    
315     Marc Lehmann <schmorp@schmorp.de>
316     http://home.schmorp.de/
317    
318     =cut
319    
320     1
321