ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.57
Committed: Fri Mar 9 17:05:26 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.56: +46 -47 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Global - some network-global services
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Global;
8
9 =head1 DESCRIPTION
10
11 This module maintains a fully-meshed network between global nodes and
12 tries to have connections with all nodes in the network.
13
14 It also manages named port groups - ports can register themselves in any
15 number of groups that will be available network-wide, which is great for
16 discovering services.
17
18 Running it on one node will automatically run it on all nodes, although,
19 at the moment, the global service is started by default anyways.
20
21 =head1 GLOBALS AND FUNCTIONS
22
23 =over 4
24
25 =cut
26
27 package AnyEvent::MP::Global;
28
29 use common::sense;
30 use Carp ();
31
32 use AnyEvent ();
33
34 use AnyEvent::MP;
35 use AnyEvent::MP::Kernel;
36
37 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
38
39 #############################################################################
40 # node protocol parts for global nodes
41
42 package AnyEvent::MP::Kernel;
43
44 # TODO: this is ugly (classical use vars vs. our),
45 # maybe this should go into MP::Kernel
46
47 our %NODE;
48 our $NODE;
49
50 our $GLOBAL;
51 our $MASTER;
52 our $MASTER_MON;
53 our $MASTER_TIMER;
54
55 our %GLOBAL_SLAVE;
56
57 our %GLOBAL_DB; # global db
58 our %LOCAL_DBS; # local databases of other global nodes
59 our %LOCAL_DB; # this node database
60
61 our $SRCNODE;
62 our %NODE_REQ;
63
64 # only in global code
65 our %GLOBAL_MON; # monitors {family}
66
67 sub other_globals() {
68 grep $_ ne $NODE && node_is_up $_, keys %{ $GLOBAL_DB{"'g"} }
69 }
70
71 # broadcasts a message to all other global nodes
72 sub g_broadcast {
73 snd $_, @_
74 for other_globals;
75 }
76
77 # add/replace/del inside a family in the database
78 # @$dle must not contain any key in %$set
79 sub g_upd {
80 my ($node, $family, $set, $del) = @_;
81
82 my $ldb = $LOCAL_DBS{$node}{$family} ||= {};
83 my $gdb = $GLOBAL_DB {$family} ||= {};
84
85 # add/replace keys
86 while (my ($k, $v) = each %$set) {
87 $ldb->{$k} =
88 $gdb->{$k} = $v;
89 }
90
91 my @del; # actual deletes
92
93 # take care of deletes
94 keydel:
95 for my $k (@$del) {
96 delete $ldb->{$k};
97 delete $gdb->{$k};
98
99 # check if some other node still has the key, then we don't delete, but change
100 for (values %LOCAL_DBS) {
101 if (exists $_->{$family}{$k}) {
102 $set->{$k} = $gdb->{$k} = $_->{$family}{$k};
103
104 next keydel;
105 }
106 }
107
108 push @del, $k;
109 }
110
111 # family could be empty now
112 delete $GLOBAL_DB{$family} unless %$gdb;
113 delete $LOCAL_DBS{$node}{$family} unless %$ldb;
114
115 g_broadcast g_upd => $family, $set, \@del
116 if exists $GLOBAL_SLAVE{$node};
117
118 # tell subscribers we have changed the family
119 snd $_ => g_chg2 => $family, $set, \@del
120 for keys %{ $GLOBAL_MON{$family} };
121 }
122
123 # set the whole (node-local) database - previous value must be empty
124 sub g_set($$) {
125 my ($node, $db) = @_;
126
127 while (my ($f, $k) = each %$db) {
128 g_upd $node, $f, $k;
129 }
130 }
131
132 # delete all keys from a database
133 sub g_clr($) {
134 my ($node) = @_;
135
136 my $db = $LOCAL_DBS{$node};
137
138 while (my ($f, $k) = each %$db) {
139 g_upd $node, $f, undef, [keys %$k];
140 }
141
142 delete $LOCAL_DBS{$node};
143 }
144
145 # gather node databases from slaves
146
147 # other node wants to make us the master
148 $NODE_REQ{g_slave} = sub {
149 my ($db) = @_;
150
151 my $node = $SRCNODE->{id};
152 undef $GLOBAL_SLAVE{$node};
153 g_set $node, $db;
154 };
155
156 $NODE_REQ{g_set} = sub {
157 g_set $SRCNODE->{id}, @_;
158 };
159
160 $NODE_REQ{g_upd} = sub {
161 g_upd $SRCNODE->{id}, @_;
162 };
163
164 $NODE_REQ{g_find} = sub {
165 my ($node) = @_;
166
167 snd $SRCNODE->{id}, g_found => $node, $GLOBAL_DB{"'l"}{$node};
168 };
169
170 $NODE_REQ{g_db_family} = sub {
171 my ($family, $id) = @_;
172 snd $SRCNODE->{id}, g_reply => $id, $GLOBAL_DB{$family} || {};
173 };
174
175 $NODE_REQ{g_db_keys} = sub {
176 my ($family, $id) = @_;
177 snd $SRCNODE->{id}, g_reply => $id, [keys %{ $GLOBAL_DB{$family} } ];
178 };
179
180 $NODE_REQ{g_db_values} = sub {
181 my ($family, $id) = @_;
182 snd $SRCNODE->{id}, g_reply => $id, [values %{ $GLOBAL_DB{$family} } ];
183 };
184
185 # monitoring
186
187 sub g_slave_disconnect($) {
188 my ($node) = @_;
189
190 g_clr $node;
191
192 if (my $mon = delete $GLOBAL_SLAVE{$node}) {
193 while (my ($f, $fv) = each %$mon) {
194 delete $GLOBAL_MON{$f}{$_}
195 for keys %$fv;
196
197 delete $GLOBAL_MON{$f}
198 unless %{ $GLOBAL_MON{$f} };
199 }
200 }
201 }
202
203 # g_mon0 family - stop monitoring
204 $NODE_REQ{g_mon0} = sub {
205 delete $GLOBAL_MON{$_[0]}{$SRCNODE->{id}};
206 delete $GLOBAL_MON{$_[0]} unless %{ $GLOBAL_MON{$_[0]} };
207
208 delete $GLOBAL_SLAVE{$SRCNODE->{id}}{$_[0]};
209 };
210
211 # g_mon1 family key - start monitoring
212 $NODE_REQ{g_mon1} = sub {
213 undef $GLOBAL_SLAVE{$SRCNODE->{id}}{$_[0]};
214 undef $GLOBAL_MON{$_[0]}{$SRCNODE->{id}};
215
216 snd $SRCNODE->{id}, g_chg1 => $_[0], $GLOBAL_DB{$_[0]};
217 };
218
219 #############################################################################
220 # switch to global mode
221
222 # regularly try to connect to global nodes - maybe use seeding code?
223 $MASTER_TIMER = AE::timer 0, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, sub {
224 (add_node $_)->connect
225 for keys %{ $GLOBAL_DB{"'g"} };
226 };
227
228 # instantly connect to other global nodes when we learn of them
229 # so we don't have to wait for the timer.
230 #TODO
231 # $GLOBAL_MON{"'g"}{""}{""} = sub {
232 # (add_node $_[1])->connect;
233 # };
234
235 # delete slaves on node-down
236 # clear slave db on node-down
237 $MASTER_MON = mon_nodes sub {
238 g_slave_disconnect $_[0] unless $_[1];
239 };
240
241 # tell everybody who connects that we are a global node
242 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
243 $_[0]{local_greeting}{global} = 1;
244 };
245
246 # connect from a global node
247 sub g_global_connect {
248 my ($node) = @_;
249
250 # we need to set this currently, as to avoid race conditions
251 # because it takes a while until the other global node tells us it is global.
252
253 undef $GLOBAL_DB{"'g"}{$node};
254 undef $LOCAL_DBS{$node}{"'g"}{$node};
255
256 # global nodes send all local databases, merged,
257 # as their local database to global nodes
258 my %db;
259
260 for (values %LOCAL_DBS) {
261 while (my ($f, $fv) = each %$_) {
262 while (my ($k, $kv) = each %$fv) {
263 $db{$f}{$k} = $kv;
264 }
265 }
266 }
267
268 snd $node => g_set => \%db;
269 }
270
271 # send our database to every global node that connects
272 push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
273 return unless $_[0]{remote_greeting}{global};
274
275 g_global_connect $_[0]{remote_node};
276 };
277
278 # tell our master that we are global now
279 for (values %NODE) {
280 if ($_->{transport} && $_->{transport}{remote_greeting}{global}) {
281 snd $_->{id} => "g_global";
282 g_global_connect $_->{id};
283 }
284 }
285
286 $NODE_REQ{g_global} = sub {
287 g_slave_disconnect $SRCNODE->{id};
288 $SRCNODE->{transport}{remote_greeting}{global} = 1;
289 g_global_connect $SRCNODE->{id};
290 };
291
292 # enable global mode
293 $GLOBAL = 1;
294
295 # global nodes are their own masters - this
296 # resends global requests and sets the local database.
297 master_set $NODE;
298
299 # now add us to the set of global nodes
300 db_set "'g" => $NODE => undef;
301
302 =back
303
304 =head1 SEE ALSO
305
306 L<AnyEvent::MP>.
307
308 =head1 AUTHOR
309
310 Marc Lehmann <schmorp@schmorp.de>
311 http://home.schmorp.de/
312
313 =cut
314
315 1
316