ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.56
Committed: Thu Mar 8 21:59:01 2012 UTC (12 years, 3 months ago) by root
Branch: MAIN
Changes since 1.55: +7 -6 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Global - some network-global services
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Global;
8
9 =head1 DESCRIPTION
10
11 This module maintains a fully-meshed network between global nodes and
12 tries to have connections with all nodes in the network.
13
14 It also manages named port groups - ports can register themselves in any
15 number of groups that will be available network-wide, which is great for
16 discovering services.
17
18 Running it on one node will automatically run it on all nodes, although,
19 at the moment, the global service is started by default anyways.
20
21 =head1 GLOBALS AND FUNCTIONS
22
23 =over 4
24
25 =cut
26
27 package AnyEvent::MP::Global;
28
29 use common::sense;
30 use Carp ();
31
32 use AnyEvent ();
33
34 use AnyEvent::MP;
35 use AnyEvent::MP::Kernel;
36
37 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
38
39 #############################################################################
40 # node protocol parts for global nodes
41
42 package AnyEvent::MP::Kernel;
43
44 # TODO: this is ugly (classical use vars vs. our),
45 # maybe this should go into MP::Kernel
46
47 our %NODE;
48 our $NODE;
49
50 our $GLOBAL;
51 our $MASTER;
52 our $MASTER_MON;
53 our $MASTER_TIMER;
54
55 our %GLOBAL_SLAVE;
56
57 our %GLOBAL_DB; # global db
58 our %LOCAL_DBS; # local databases of other global nodes
59 our %LOCAL_DB; # this node database
60
61 our $SRCNODE;
62 our %NODE_REQ;
63
64 # only in global code
65 our %GLOBAL_MON; # monitors {family}
66
67 sub other_globals() {
68 grep $_ ne $NODE && node_is_up $_, keys %{ $GLOBAL_DB{"'g"} }
69 }
70
71 # broadcasts a message to all other global nodes
72 sub g_broadcast {
73 snd $_, @_
74 for other_globals;
75 }
76
77 # add/replace a key in the database
78 sub g_add($$$$) {
79 $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]} =
80 $GLOBAL_DB {$_[1]}{$_[2]} = $_[3];
81
82 g_broadcast g_add => $_[1] => $_[2] => $_[3]
83 if exists $GLOBAL_SLAVE{$_[0]};
84
85 # tell subscribers we have added or replaced a key
86 snd $_ => g_chg2 => $_[1], $_[2], $_[3]
87 for keys %{ $GLOBAL_MON{$_[1]} };
88 }
89
90 # delete a key from the database
91 sub g_del($$$) {
92 delete $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]};
93 delete $LOCAL_DBS{$_[0]}{$_[1]} unless %{ $LOCAL_DBS{$_[0]}{$_[1]} };
94
95 g_broadcast g_del => $_[1] => $_[2]
96 if exists $GLOBAL_SLAVE{$_[0]};
97
98 delete $GLOBAL_DB{$_[1]}{$_[2]};
99 delete $GLOBAL_DB{$_[1]} unless %{ $GLOBAL_DB{$_[1]} }; # could be moved below for
100
101 # check if other node maybe still has the key, then we don't delete, but add
102 for (values %LOCAL_DBS) {
103 if (exists $_->{$_[1]}{$_[2]}) {
104 my $val = $GLOBAL_DB{$_[1]}{$_[2]} = $_->{$_[1]}{$_[2]};
105
106 # tell subscribers we have added or replaced a key
107 snd $_ => g_chg2 => $_[1], $_[2], $val
108 for keys %{ $GLOBAL_MON{$_[1]} };
109
110 last;
111 }
112 }
113
114 # tell subscribers key is actually gone
115 snd $_ => g_chg2 => $_[1], $_[2]
116 for keys %{ $GLOBAL_MON{$_[1]} };
117 }
118
119 # set the whole (node-local) database - previous value must be empty
120 sub g_set($$) {
121 my ($node, $db) = @_;
122
123 while (my ($f, $k) = each %$db) {
124 g_add $node, $f => $_ => delete $k->{$_}
125 for keys %$k;
126 }
127 }
128
129 # delete all keys from a database
130 sub g_clr($) {
131 my ($node) = @_;
132
133 my $db = $LOCAL_DBS{$node};
134 while (my ($f, $k) = each %$db) {
135 g_del $node, $f => $_
136 for keys %$k;
137 }
138
139 delete $LOCAL_DBS{$node};
140 }
141
142 # gather node databases from slaves
143
144 # other node wants to make us the master
145 $NODE_REQ{g_slave} = sub {
146 my ($db) = @_;
147
148 my $node = $SRCNODE->{id};
149 undef $GLOBAL_SLAVE{$node};
150 g_set $node, $db;
151 };
152
153 $NODE_REQ{g_add} = sub {
154 &g_add ($SRCNODE->{id}, @_);
155 };
156
157 $NODE_REQ{g_del} = sub {
158 &g_del ($SRCNODE->{id}, @_);
159 };
160
161 $NODE_REQ{g_set} = sub {
162 g_set $SRCNODE->{id}, $_[0];
163 };
164
165 $NODE_REQ{g_find} = sub {
166 my ($node) = @_;
167
168 snd $SRCNODE->{id}, g_found => $node, $GLOBAL_DB{"'l"}{$node};
169 };
170
171 $NODE_REQ{g_db_family} = sub {
172 my ($family, $id) = @_;
173 snd $SRCNODE->{id}, g_reply => $id, $GLOBAL_DB{$family} || {};
174 };
175
176 $NODE_REQ{g_db_keys} = sub {
177 my ($family, $id) = @_;
178 snd $SRCNODE->{id}, g_reply => $id, [keys %{ $GLOBAL_DB{$family} } ];
179 };
180
181 $NODE_REQ{g_db_values} = sub {
182 my ($family, $id) = @_;
183 snd $SRCNODE->{id}, g_reply => $id, [values %{ $GLOBAL_DB{$family} } ];
184 };
185
186 # monitoring
187
188 sub g_slave_disconnect($) {
189 my ($node) = @_;
190
191 g_clr $node;
192
193 if (my $mon = delete $GLOBAL_SLAVE{$node}) {
194 while (my ($f, $fv) = each %$mon) {
195 delete $GLOBAL_MON{$f}{$_}
196 for keys %$fv;
197
198 delete $GLOBAL_MON{$f}
199 unless %{ $GLOBAL_MON{$f} };
200 }
201 }
202 }
203
204 # g_mon0 family - stop monitoring
205 $NODE_REQ{g_mon0} = sub {
206 delete $GLOBAL_MON{$_[0]}{$SRCNODE->{id}};
207 delete $GLOBAL_MON{$_[0]} unless %{ $GLOBAL_MON{$_[0]} };
208
209 delete $GLOBAL_SLAVE{$SRCNODE->{id}}{$_[0]};
210 };
211
212 # g_mon1 family key - start monitoring
213 $NODE_REQ{g_mon1} = sub {
214 undef $GLOBAL_SLAVE{$SRCNODE->{id}}{$_[0]};
215 undef $GLOBAL_MON{$_[0]}{$SRCNODE->{id}};
216
217 snd $SRCNODE->{id}, g_chg1 => $_[0], $GLOBAL_DB{$_[0]};
218 };
219
220 #############################################################################
221 # switch to global mode
222
223 # regularly try to connect to global nodes - maybe use seeding code?
224 $MASTER_TIMER = AE::timer 0, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, sub {
225 (add_node $_)->connect
226 for keys %{ $GLOBAL_DB{"'g"} };
227 };
228
229 # instantly connect to other global nodes when we learn of them
230 # so we don't have to wait for the timer.
231 #TODO
232 # $GLOBAL_MON{"'g"}{""}{""} = sub {
233 # (add_node $_[1])->connect;
234 # };
235
236 # delete slaves on node-down
237 # clear slave db on node-down
238 $MASTER_MON = mon_nodes sub {
239 g_slave_disconnect $_[0] unless $_[1];
240 };
241
242 # tell everybody who connects that we are a global node
243 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
244 $_[0]{local_greeting}{global} = 1;
245 };
246
247 # connect from a global node
248 sub g_global_connect {
249 my ($node) = @_;
250
251 # we need to set this currently, as to avoid race conditions
252 # because it takes a while until the other global node tells us it is global.
253
254 undef $GLOBAL_DB{"'g"}{$node};
255 undef $LOCAL_DBS{$node}{"'g"}{$node};
256
257 # global nodes send all local databases, merged,
258 # as their local database to global nodes
259 my %db;
260
261 for (values %LOCAL_DBS) {
262 while (my ($f, $fv) = each %$_) {
263 while (my ($k, $kv) = each %$fv) {
264 $db{$f}{$k} = $kv;
265 }
266 }
267 }
268
269 snd $node => g_set => \%db;
270 }
271
272 # send our database to every global node that connects
273 push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
274 return unless $_[0]{remote_greeting}{global};
275
276 g_global_connect $_[0]{remote_node};
277 };
278
279 # tell our master that we are global now
280 for (values %NODE) {
281 if ($_->{transport} && $_->{transport}{remote_greeting}{global}) {
282 snd $_->{id} => "g_global";
283 g_global_connect $_->{id};
284 }
285 }
286
287 $NODE_REQ{g_global} = sub {
288 g_slave_disconnect $SRCNODE->{id};
289 $SRCNODE->{transport}{remote_greeting}{global} = 1;
290 g_global_connect $SRCNODE->{id};
291 };
292
293 # enable global mode
294 $GLOBAL = 1;
295
296 # global nodes are their own masters - this
297 # resends global requests and sets the local database.
298 master_set $NODE;
299
300 # now add us to the set of global nodes
301 db_set "'g" => $NODE => undef;
302
303 =back
304
305 =head1 SEE ALSO
306
307 L<AnyEvent::MP>.
308
309 =head1 AUTHOR
310
311 Marc Lehmann <schmorp@schmorp.de>
312 http://home.schmorp.de/
313
314 =cut
315
316 1
317