ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Global.pm
Revision: 1.55
Committed: Thu Mar 8 21:37:51 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.54: +15 -0 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Global - some network-global services
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Global;
8
9 =head1 DESCRIPTION
10
11 This module maintains a fully-meshed network between global nodes and
12 tries to have connections with all nodes in the network.
13
14 It also manages named port groups - ports can register themselves in any
15 number of groups that will be available network-wide, which is great for
16 discovering services.
17
18 Running it on one node will automatically run it on all nodes, although,
19 at the moment, the global service is started by default anyways.
20
21 =head1 GLOBALS AND FUNCTIONS
22
23 =over 4
24
25 =cut
26
27 package AnyEvent::MP::Global;
28
29 use common::sense;
30 use Carp ();
31
32 use AnyEvent ();
33
34 use AnyEvent::MP;
35 use AnyEvent::MP::Kernel;
36
37 $AnyEvent::MP::Kernel::WARN->(7, "starting global service.");
38
39 #############################################################################
40 # node protocol parts for global nodes
41
42 package AnyEvent::MP::Kernel;
43
44 # TODO: this is ugly (classical use vars vs. our),
45 # maybe this should go into MP::Kernel
46
47 our %NODE;
48 our $NODE;
49
50 our $GLOBAL;
51 our $MASTER;
52 our $MASTER_MON;
53 our $MASTER_TIMER;
54
55 our %GLOBAL_SLAVE;
56
57 our %GLOBAL_DB; # global db
58 our %LOCAL_DBS; # local databases of other global nodes
59 our %LOCAL_DB; # this node database
60
61 our $SRCNODE;
62 our %NODE_REQ;
63
64 # only in global code
65 our %GLOBAL_MON; # monitors {family}
66
67 sub other_globals() {
68 grep $_ ne $NODE && node_is_up $_, keys %{ $GLOBAL_DB{"'g"} }
69 }
70
71 # broadcasts a message to all other global nodes
72 sub g_broadcast {
73 snd $_, @_
74 for other_globals;
75 }
76
77 # add/replace a key in the database
78 sub g_add($$$$) {
79 $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]} =
80 $GLOBAL_DB {$_[1]}{$_[2]} = $_[3];
81
82 g_broadcast g_add => $_[1] => $_[2] => $_[3]
83 if exists $GLOBAL_SLAVE{$_[0]};
84
85 # tell subscribers we have added or replaced a key
86 snd $_ => g_chg2 => $_[1], $_[2], $_[3]
87 for keys %{ $GLOBAL_MON{$_[1]} };
88 }
89
90 # delete a key from the database
91 sub g_del($$$) {
92 delete $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]};
93 delete $LOCAL_DBS{$_[0]}{$_[1]} unless %{ $LOCAL_DBS{$_[0]}{$_[1]} };
94
95 g_broadcast g_del => $_[1] => $_[2]
96 if exists $GLOBAL_SLAVE{$_[0]};
97
98 delete $GLOBAL_DB{$_[1]}{$_[2]};
99 delete $GLOBAL_DB{$_[1]} unless %{ $GLOBAL_DB{$_[1]} }; # could be moved below for
100
101 # check if other node maybe still has the key, then we don't delete, but add
102 for (values %LOCAL_DBS) {
103 if (exists $_->{$_[1]}{$_[2]}) {
104 my $val = $GLOBAL_DB{$_[1]}{$_[2]} = $_->{$_[1]}{$_[2]};
105
106 # tell subscribers we have added or replaced a key
107 snd $_ => g_chg2 => $_[1], $_[2], $val
108 for keys %{ $GLOBAL_MON{$_[1]} };
109
110 last;
111 }
112 }
113
114 # tell subscribers key is actually gone
115 snd $_ => g_chg2 => $_[1], $_[2]
116 for keys %{ $GLOBAL_MON{$_[1]} };
117 }
118
119 # set the whole (node-local) database - previous value must be empty
120 sub g_set($$) {
121 my ($node, $db) = @_;
122
123 while (my ($f, $k) = each %$db) {
124 g_add $node, $f => $_ => delete $k->{$_}
125 for keys %$k;
126 }
127 }
128
129 # delete all keys from a database
130 sub g_clr($) {
131 my ($node) = @_;
132
133 my $db = $LOCAL_DBS{$node};
134 while (my ($f, $k) = each %$db) {
135 g_del $node, $f => $_
136 for keys %$k;
137 }
138
139 delete $LOCAL_DBS{$node};
140 }
141
142 # gather node databases from slaves
143
144 # other node wants to make us the master
145 $NODE_REQ{g_slave} = sub {
146 my ($db) = @_;
147
148 my $node = $SRCNODE->{id};
149 undef $GLOBAL_SLAVE{$node};
150 g_set $node, $db;
151 };
152
153 $NODE_REQ{g_add} = sub {
154 &g_add ($SRCNODE->{id}, @_);
155 };
156
157 $NODE_REQ{g_del} = sub {
158 &g_del ($SRCNODE->{id}, @_);
159 };
160
161 $NODE_REQ{g_set} = sub {
162 g_set $SRCNODE->{id}, $_[0];
163 };
164
165 $NODE_REQ{g_find} = sub {
166 my ($node) = @_;
167
168 snd $SRCNODE->{id}, g_found => $node, $GLOBAL_DB{"'l"}{$node};
169 };
170
171 $NODE_REQ{g_db_family} = sub {
172 my ($family, $id) = @_;
173 snd $SRCNODE->{id}, g_reply => $id, $GLOBAL_DB{$family} || {};
174 };
175
176 $NODE_REQ{g_db_keys} = sub {
177 my ($family, $id) = @_;
178 snd $SRCNODE->{id}, g_reply => $id, [keys %{ $GLOBAL_DB{$family} } ];
179 };
180
181 $NODE_REQ{g_db_values} = sub {
182 my ($family, $id) = @_;
183 snd $SRCNODE->{id}, g_reply => $id, [values %{ $GLOBAL_DB{$family} } ];
184 };
185
186 # monitoring
187
188 sub g_slave_disconnect($) {
189 my ($node) = @_;
190
191 g_clr $node;
192
193 if (my $mon = delete $GLOBAL_SLAVE{$node}) {
194 while (my ($f, $fv) = each %$mon) {
195 delete $GLOBAL_MON{$f}{$_}
196 for keys %$fv;
197
198 delete $GLOBAL_MON{$f}
199 unless %{ $GLOBAL_MON{$f} };
200 }
201 }
202 }
203
204 # g_mon0 family - stop monitoring
205 $NODE_REQ{g_mon0} = sub {
206 delete $GLOBAL_MON{$_[0]}{$SRCNODE->{id}};
207 delete $GLOBAL_MON{$_[0]} unless %{ $GLOBAL_MON{$_[0]} };
208
209 delete $GLOBAL_SLAVE{$SRCNODE->{id}}{$_[0]};
210 };
211
212 # g_mon1 family key - start monitoring
213 $NODE_REQ{g_mon1} = sub {
214 undef $GLOBAL_SLAVE{$SRCNODE->{id}}{$_[0]};
215 undef $GLOBAL_MON{$_[0]}{$SRCNODE->{id}};
216
217 snd $SRCNODE->{id}, g_chg1 => $_[0], $GLOBAL_DB{$_[0]};
218 };
219
220 #############################################################################
221 # switch to global mode
222
223 $GLOBAL = 1;
224 $MASTER = $NODE;
225 undef $GLOBAL_SLAVE{$NODE}; # we are our own master (and slave)
226
227 # regularly try to connect to global nodes - maybe use seeding code?
228 $MASTER_TIMER = AE::timer 0, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, sub {
229 (add_node $_)->connect
230 for keys %{ $GLOBAL_DB{"'g"} };
231 };
232
233 # instantly connect to other global nodes when we learn of them
234 # so we don't have to wait for the timer.
235 #TODO
236 # $GLOBAL_MON{"'g"}{""}{""} = sub {
237 # (add_node $_[1])->connect;
238 # };
239
240 # delete slaves on node-down
241 # clear slave db on node-down
242 $MASTER_MON = mon_nodes sub {
243 g_slave_disconnect $_[0] unless $_[1];
244 };
245
246 # tell everybody who connects that we are a global node
247 push @AnyEvent::MP::Transport::HOOK_GREET, sub {
248 $_[0]{local_greeting}{global} = 1;
249 };
250
251 # connect from a global node
252 sub g_global_connect {
253 my ($node) = @_;
254
255 # we need to set this currently, as to avoid race conditions
256 # because it takes a while until the other global node tells us it is global.
257
258 undef $GLOBAL_DB{"'g"}{$node};
259 undef $LOCAL_DBS{$node}{"'g"}{$node};
260
261 # global nodes send all local databases, merged,
262 # as their local database to global nodes
263 my %db;
264
265 for (values %LOCAL_DBS) {
266 while (my ($f, $fv) = each %$_) {
267 while (my ($k, $kv) = each %$fv) {
268 $db{$f}{$k} = $kv;
269 }
270 }
271 }
272
273 snd $node => g_set => \%db;
274 }
275
276 # send our database to every global node that connects
277 push @AnyEvent::MP::Transport::HOOK_CONNECT, sub {
278 return unless $_[0]{remote_greeting}{global};
279
280 g_global_connect $_[0]{remote_node};
281 };
282
283 # tell our master that we are global now
284 for (values %NODE) {
285 if ($_->{transport} && $_->{transport}{remote_greeting}{global}) {
286 snd $_->{id} => "g_global";
287 g_global_connect $_->{id};
288 }
289 }
290
291 $NODE_REQ{g_global} = sub {
292 g_slave_disconnect $SRCNODE->{id};
293 $SRCNODE->{transport}{remote_greeting}{global} = 1;
294 g_global_connect $SRCNODE->{id};
295 };
296
297 # send the local db to themaster - outselves - to prime our global_db
298 snd $MASTER, g_set => \%LOCAL_DB;
299 # now add us to the set of global nodes
300 db_set "'g" => $NODE => undef;
301
302 =back
303
304 =head1 SEE ALSO
305
306 L<AnyEvent::MP>.
307
308 =head1 AUTHOR
309
310 Marc Lehmann <schmorp@schmorp.de>
311 http://home.schmorp.de/
312
313 =cut
314
315 1
316