… | |
… | |
45 | our %NODE; |
45 | our %NODE; |
46 | our $NODE; |
46 | our $NODE; |
47 | |
47 | |
48 | our $GLOBAL; |
48 | our $GLOBAL; |
49 | our $MASTER; |
49 | our $MASTER; |
50 | our $MASTER_MON; |
|
|
51 | our $MASTER_TIMER; |
50 | our $MASTER_TIMER; |
52 | |
51 | |
53 | our %GLOBAL_SLAVE; |
52 | our %GLOBAL_SLAVE; |
54 | |
53 | |
55 | our %GLOBAL_DB; # global db |
54 | our %GLOBAL_DB; # global db |
… | |
… | |
67 | snd $_, @_ |
66 | snd $_, @_ |
68 | for grep $_ ne $NODE && node_is_up $_, keys %{ $GLOBAL_DB{"'g"} } |
67 | for grep $_ ne $NODE && node_is_up $_, keys %{ $GLOBAL_DB{"'g"} } |
69 | } |
68 | } |
70 | |
69 | |
71 | # add/replace/del inside a family in the database |
70 | # add/replace/del inside a family in the database |
72 | # @$dle must not contain any key in %$set |
71 | # @$del must not contain any key in %$set |
73 | sub g_upd { |
72 | sub g_upd { |
74 | my ($node, $family, $set, $del) = @_; |
73 | my ($node, $family, $set, $del) = @_; |
75 | |
74 | |
76 | my $ldb = $LOCAL_DBS{$node}{$family} ||= {}; |
75 | my $ldb = $LOCAL_DBS{$node}{$family} ||= {}; |
77 | my $gdb = $GLOBAL_DB {$family} ||= {}; |
76 | my $gdb = $GLOBAL_DB {$family} ||= {}; |
… | |
… | |
146 | my $node = $SRCNODE; |
145 | my $node = $SRCNODE; |
147 | undef $GLOBAL_SLAVE{$node}; |
146 | undef $GLOBAL_SLAVE{$node}; |
148 | g_set $node, $db; |
147 | g_set $node, $db; |
149 | }; |
148 | }; |
150 | |
149 | |
|
|
150 | # other node (global and slave) sends us their database |
151 | $NODE_REQ{g_set} = sub { |
151 | $NODE_REQ{g_set} = sub { |
152 | &g_set ($SRCNODE, @_); |
152 | &g_set ($SRCNODE, @_); |
153 | }; |
153 | }; |
154 | |
154 | |
|
|
155 | # other node (global and slave) sends us a family update |
155 | $NODE_REQ{g_upd} = sub { |
156 | $NODE_REQ{g_upd} = sub { |
156 | &g_upd ($SRCNODE, @_); |
157 | &g_upd ($SRCNODE, @_); |
157 | }; |
158 | }; |
158 | |
159 | |
|
|
160 | # slave node wants to know the listeners of a node |
159 | $NODE_REQ{g_find} = sub { |
161 | $NODE_REQ{g_find} = sub { |
160 | my ($node) = @_; |
162 | my ($node) = @_; |
161 | |
163 | |
162 | snd $SRCNODE, g_found => $node, $GLOBAL_DB{"'l"}{$node}; |
164 | snd $SRCNODE, g_found => $node, $GLOBAL_DB{"'l"}{$node}; |
163 | }; |
165 | }; |
… | |
… | |
212 | }; |
214 | }; |
213 | |
215 | |
214 | ############################################################################# |
216 | ############################################################################# |
215 | # switch to global mode |
217 | # switch to global mode |
216 | |
218 | |
217 | # regularly try to connect to global nodes - maybe use seeding code? |
219 | # maintain conenctions to all global nodes that we know of |
218 | $MASTER_TIMER = AE::timer $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, sub { |
|
|
219 | (add_node $_)->connect |
|
|
220 | for keys %{ $GLOBAL_DB{"'g"} } |
|
|
221 | }; |
|
|
222 | |
|
|
223 | # instantly connect to other global nodes when we learn of them |
|
|
224 | # so we don't have to wait for the timer. |
|
|
225 | db_mon "'g" => sub { |
220 | db_mon "'g" => sub { |
226 | (add_node $_)->connect |
221 | keepalive_add $_ for @{ $_[1] }; |
227 | for @{ $_[1] }; |
222 | keepalive_del $_ for @{ $_[3] }; |
228 | }; |
223 | }; |
229 | |
224 | |
230 | # delete slaves on node-down |
225 | # delete slaves on node-down |
231 | # clear slave db on node-down |
226 | # clear slave db on node-down |
232 | $MASTER_MON = mon_nodes sub { |
227 | mon_nodes sub { |
233 | g_slave_disconnect $_[0] unless $_[1]; |
228 | g_slave_disconnect $_[0] unless $_[1]; |
234 | }; |
229 | }; |
235 | |
230 | |
236 | # tell everybody who connects that we are a global node |
231 | # tell everybody who connects that we are a global node |
237 | push @AnyEvent::MP::Transport::HOOK_GREET, sub { |
232 | push @AnyEvent::MP::Transport::HOOK_GREET, sub { |
… | |
… | |
241 | # connect from a global node |
236 | # connect from a global node |
242 | sub g_global_connect { |
237 | sub g_global_connect { |
243 | my ($node) = @_; |
238 | my ($node) = @_; |
244 | |
239 | |
245 | # we need to set this currently, as to avoid race conditions |
240 | # we need to set this currently, as to avoid race conditions |
246 | # because it takes a while until the other global node tells us it is global. |
241 | # because it takes a while until the other global node tells us it is global, |
|
|
242 | # but we need to send updates even before that. |
247 | |
243 | |
248 | undef $GLOBAL_DB{"'g"}{$node}; |
244 | undef $GLOBAL_DB {"'g"}{$node}; |
249 | undef $LOCAL_DBS{$node}{"'g"}{$node}; |
245 | undef $LOCAL_DBS{$node}{"'g"}{$node}; |
250 | |
246 | |
251 | # global nodes send all local databases, merged, |
247 | # global nodes send all local databases, merged, |
252 | # as their local database to global nodes |
248 | # as their local database to other global nodes |
253 | my %db; |
249 | my %db; |
254 | |
250 | |
255 | for (values %LOCAL_DBS) { |
251 | for (values %LOCAL_DBS) { |
256 | while (my ($f, $fv) = each %$_) { |
252 | while (my ($f, $fv) = each %$_) { |
257 | while (my ($k, $kv) = each %$fv) { |
253 | while (my ($k, $kv) = each %$fv) { |
… | |
… | |
268 | return unless $_[0]{remote_greeting}{global}; |
264 | return unless $_[0]{remote_greeting}{global}; |
269 | |
265 | |
270 | g_global_connect $_[0]{remote_node}; |
266 | g_global_connect $_[0]{remote_node}; |
271 | }; |
267 | }; |
272 | |
268 | |
273 | # tell our master that we are global now |
269 | # tell other global nodes that we are global now |
|
|
270 | # TODO: there is probably a race when two conencted nodes beocme global at the same time |
|
|
271 | # very ugly. |
274 | for (values %NODE) { |
272 | for (values %NODE) { |
275 | if ($_->{transport} && $_->{transport}{remote_greeting}{global}) { |
273 | if ($_->{transport} && $_->{transport}{remote_greeting}{global}) { |
276 | snd $_->{id} => "g_global"; |
274 | snd $_->{id} => "g_global"; |
277 | g_global_connect $_->{id}; |
275 | g_global_connect $_->{id}; |
278 | } |
276 | } |