ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.60
Committed: Thu Mar 22 00:48:29 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.59: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Global - network backbone services
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Global;
8
9 =head1 DESCRIPTION
10
11 This module is usually run (or started on) seed nodes and provides a
12 variety of services to connected nodes, such as the distributed database.
13
14 The global nodes form a fully-meshed network, that is, all global nodes
15 currently maintain connections to all other global nodes.
16
17 Loading this module (e.g. as a service) transforms the local node into a
18 global node. There are no user-servicable parts inside.
19
20 =cut
21
22 package AnyEvent::MP::Global;
23
24 use common::sense;
25 use Carp ();
26
27 use AnyEvent ();
28
29 use AnyEvent::MP;
30 use AnyEvent::MP::Kernel;
31
32 AE::log 7 => "starting global service.";
33
34 #############################################################################
35 # node protocol parts for global nodes
36
37 package AnyEvent::MP::Kernel;
38
39 # TODO: this is ugly (classical use vars vs. our),
40 # maybe this should go into MP::Kernel
41
42 our %NODE;
43 our $NODE;
44
45 our $GLOBAL;
46 our $MASTER;
47 our $MASTER_MON;
48 our $MASTER_TIMER;
49
50 our %GLOBAL_SLAVE;
51
52 our %GLOBAL_DB; # global db
53 our %LOCAL_DBS; # local databases of other global nodes
54 our %LOCAL_DB; # this node database
55
56 our $SRCNODE; # the origin node id
57 our %NODE_REQ;
58
59 # only in global code
60 our %GLOBAL_MON; # monitors {family}
61
62 sub other_globals() {
63 grep $_ ne $NODE && node_is_up $_, keys %{ $GLOBAL_DB{"'g"} }
64 }
65
66 # broadcasts a message to all other global nodes
67 sub g_broadcast {
68 snd $_, @_
69 for other_globals;
70 }
71
72 # add/replace/del inside a family in the database
73 # @$dle must not contain any key in %$set
74 sub g_upd {
75 my ($node, $family, $set, $del) = @_;
76
77 my $ldb = $LOCAL_DBS{$node}{$family} ||= {};
78 my $gdb = $GLOBAL_DB {$family} ||= {};
79
80 # add/replace keys
81 while (my ($k, $v) = each %$set) {
82 $ldb->{$k} =
83 $gdb->{$k} = $v;
84 }
85
86 my @del; # actual deletes
87
88 # take care of deletes
89 keydel:
90 for my $k (@$del) {
91 delete $ldb->{$k};
92 delete $gdb->{$k};
93
94 # check if some other node still has the key, then we don't delete, but change
95 for (values %LOCAL_DBS) {
96 if (exists $_->{$family}{$k}) {
97 $set->{$k} = $gdb->{$k} = $_->{$family}{$k};
98
99 next keydel;
100 }
101 }
102
103 push @del, $k;
104 }
105
106 # family could be empty now
107 delete $GLOBAL_DB{$family} unless %$gdb;
108 delete $LOCAL_DBS{$node}{$family} unless %$ldb;
109
110 g_broadcast g_upd => $family, $set, \@del
111 if exists $GLOBAL_SLAVE{$node};
112
113 # tell subscribers we have changed the family
114 snd $_ => g_chg2 => $family, $set, \@del
115 for keys %{ $GLOBAL_MON{$family} };
116 }
117
118 # set the whole (node-local) database - previous value must be empty
119 sub g_set($$) {
120 my ($node, $db) = @_;
121
122 while (my ($f, $k) = each %$db) {
123 g_upd $node, $f, $k;
124 }
125 }
126
127 # delete all keys from a database
128 sub g_clr($) {
129 my ($node) = @_;
130
131 my $db = $LOCAL_DBS{$node};
132
133 while (my ($f, $k) = each %$db) {
134 g_upd $node, $f, undef, [keys %$k];
135 }
136
137 delete $LOCAL_DBS{$node};
138 }
139
140 # gather node databases from slaves
141
142 # other node wants to make us the master
143 $NODE_REQ{g_slave} = sub {
144 my ($db) = @_
145 or return; # empty g_slave is used to start global service
146
147 my $node = $SRCNODE;
148 undef $GLOBAL_SLAVE{$node};
149 g_set $node, $db;
150 };
151
152 $NODE_REQ{g_set} = sub {
153 g_set $SRCNODE, @_;
154 };
155
156 $NODE_REQ{g_upd} = sub {
157 g_upd $SRCNODE, @_;
158 };
159
160 $NODE_REQ{g_find} = sub {
161 my ($node) = @_;
162
163 snd $SRCNODE, g_found => $node, $GLOBAL_DB{"'l"}{$node};
164 };
165
166 $NODE_REQ{g_db_family} = sub {
167 my ($family, $id) = @_;
168 snd $SRCNODE, g_reply => $id, $GLOBAL_DB{$family} || {};
169 };
170
171 $NODE_REQ{g_db_keys} = sub {
172 my ($family, $id) = @_;
173 snd $SRCNODE, g_reply => $id, [keys %{ $GLOBAL_DB{$family} } ];
174 };
175
176 $NODE_REQ{g_db_values} = sub {
177 my ($family, $id) = @_;
178 snd $SRCNODE, g_reply => $id, [values %{ $GLOBAL_DB{$family} } ];
179 };
180
181 # monitoring
182
183 sub g_slave_disconnect($) {
184 my ($node) = @_;
185
186 g_clr $node;
187
188 if (my $mon = delete $GLOBAL_SLAVE{$node}) {
189 while (my ($f, $fv) = each %$mon) {
190 delete $GLOBAL_MON{$f}{$_}
191 for keys %$fv;
192
193 delete $GLOBAL_MON{$f}
194 unless %{ $GLOBAL_MON{$f} };
195 }
196 }
197 }
198
199 # g_mon0 family - stop monitoring
200 $NODE_REQ{g_mon0} = sub {
201 delete $GLOBAL_MON{$_[0]}{$SRCNODE};
202 delete $GLOBAL_MON{$_[0]} unless %{ $GLOBAL_MON{$_[0]} };
203
204 delete $GLOBAL_SLAVE{$SRCNODE}{$_[0]};
205 };
206
207 # g_mon1 family key - start monitoring
208 $NODE_REQ{g_mon1} = sub {
209 undef $GLOBAL_SLAVE{$SRCNODE}{$_[0]};
210 undef $GLOBAL_MON{$_[0]}{$SRCNODE};
211
212 snd $SRCNODE, g_chg1 => $_[0], $GLOBAL_DB{$_[0]};
213 };
214
215 #############################################################################
216 # switch to global mode
217
218 # regularly try to connect to global nodes - maybe use seeding code?
219 $MASTER_TIMER = AE::timer 0, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, sub {
220 (add_node $_)->connect
221 for keys %{ $GLOBAL_DB{"'g"} };
222 };
223
224 # instantly connect to other global nodes when we learn of them
225 # so we don't have to wait for the timer.
226 #TODO
227 # $GLOBAL_MON{"'g"}{""}{""} = sub {
228 # (add_node $_[1])->connect;
229 # };
230
231 # delete slaves on node-down
232 # clear slave db on node-down
233 $MASTER_MON = mon_nodes sub {
234 g_slave_disconnect $_[0] unless $_[1];
235 };
236
237 # tell everybody who connects that we are a global node
238 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
239 $_[0]{local_greeting}{global} = 1;
240 };
241
242 # connect from a global node
243 sub g_global_connect {
244 my ($node) = @_;
245
246 # we need to set this currently, as to avoid race conditions
247 # because it takes a while until the other global node tells us it is global.
248
249 undef $GLOBAL_DB{"'g"}{$node};
250 undef $LOCAL_DBS{$node}{"'g"}{$node};
251
252 # global nodes send all local databases, merged,
253 # as their local database to global nodes
254 my %db;
255
256 for (values %LOCAL_DBS) {
257 while (my ($f, $fv) = each %$_) {
258 while (my ($k, $kv) = each %$fv) {
259 $db{$f}{$k} = $kv;
260 }
261 }
262 }
263
264 snd $node => g_set => \%db;
265 }
266
267 # send our database to every global node that connects
268 push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
269 return unless $_[0]{remote_greeting}{global};
270
271 g_global_connect $_[0]{remote_node};
272 };
273
274 # tell our master that we are global now
275 for (values %NODE) {
276 if ($_->{transport} && $_->{transport}{remote_greeting}{global}) {
277 snd $_->{id} => "g_global";
278 g_global_connect $_->{id};
279 }
280 }
281
282 $NODE_REQ{g_global} = sub {
283 g_slave_disconnect $SRCNODE;
284
285 my $node = $NODE{$SRCNODE};
286 $node->{transport}{remote_greeting}{global} = 1;
287
288 g_global_connect $SRCNODE;
289 };
290
291 # enable global mode
292 $GLOBAL = 1;
293
294 # global nodes are their own masters - this
295 # resends global requests and sets the local database.
296 master_set $NODE;
297
298 # now add us to the set of global nodes
299 db_set "'g" => $NODE => undef;
300
301 #############################################################################
302 # compatibility functions for aemp 1.0
303
304 #d#TODO#
305 #S AND FUNCTIONS $guard = grp_reg $group, $port Register the given (local!) port in the named global group $group. The port will be unregistered automatically when the port is destroyed. When not called in void context, then a guard object will be returned that will also cause the name to be unregistered when destroyed.
306 # $ports = grp_get $group Returns all the ports currently registered to the given group (as read-only(!) array reference). When the group has no registered members, return undef.
307 # $guard = grp_mon $group, $callback->($ports, $add, $del) Installs a monitor on the given group. Each time there is a change it will be called with the current group members as an arrayref as the first argument. The second argument only contains ports added, the third argument only ports removed. Unlike grp_get, all three arguments will always be array-refs, even if the array is empty. None of the arrays must be modified in any way. The first invocation will be with the first two arguments set to the current members, as if all of them were just added, but only when the group is actually non-empty. Optionally returns a guard object that uninstalls the watcher when it is destroyed.
308
309 =head1 SEE ALSO
310
311 L<AnyEvent::MP>.
312
313 =head1 AUTHOR
314
315 Marc Lehmann <schmorp@schmorp.de>
316 http://home.schmorp.de/
317
318 =cut
319
320 1
321