… | |
… | |
50 | our $MASTER_TIMER; |
50 | our $MASTER_TIMER; |
51 | |
51 | |
52 | our %GLOBAL_SLAVE; |
52 | our %GLOBAL_SLAVE; |
53 | |
53 | |
54 | our %GLOBAL_DB; # global db |
54 | our %GLOBAL_DB; # global db |
55 | our %LOCAL_DBS; # local databases of other global nodes |
55 | our %LOCAL_DBS; # local databases of other nodes (global and slave) |
56 | our %LOCAL_DB; # this node database |
56 | our %LOCAL_DB; # this node database |
57 | |
57 | |
58 | our $SRCNODE; # the origin node id |
58 | our $SRCNODE; # the origin node id |
59 | our %NODE_REQ; |
59 | our %NODE_REQ; |
60 | |
60 | |
… | |
… | |
84 | my @del; # actual deletes |
84 | my @del; # actual deletes |
85 | |
85 | |
86 | # take care of deletes |
86 | # take care of deletes |
87 | keydel: |
87 | keydel: |
88 | for my $k (@$del) { |
88 | for my $k (@$del) { |
|
|
89 | next unless exists $ldb->{$k}; |
|
|
90 | |
89 | delete $ldb->{$k}; |
91 | delete $ldb->{$k}; |
90 | delete $gdb->{$k}; |
92 | delete $gdb->{$k}; |
91 | |
93 | |
92 | # check if some other node still has the key, then we don't delete, but change |
94 | # check if some other node still has the key, then we don't delete, but change |
93 | for (values %LOCAL_DBS) { |
95 | for (values %LOCAL_DBS) { |
… | |
… | |
103 | |
105 | |
104 | # family could be empty now |
106 | # family could be empty now |
105 | delete $GLOBAL_DB{$family} unless %$gdb; |
107 | delete $GLOBAL_DB{$family} unless %$gdb; |
106 | delete $LOCAL_DBS{$node}{$family} unless %$ldb; |
108 | delete $LOCAL_DBS{$node}{$family} unless %$ldb; |
107 | |
109 | |
|
|
110 | return unless %$set || @del; |
|
|
111 | |
108 | g_broadcast g_upd => $family, $set, \@del |
112 | g_broadcast g_upd => $family, $set, \@del |
109 | if exists $GLOBAL_SLAVE{$node}; |
113 | if exists $GLOBAL_SLAVE{$node}; |
110 | |
114 | |
111 | # tell subscribers we have changed the family |
115 | # tell subscribers we have changed the family |
112 | snd $_ => g_chg2 => $family, $set, \@del |
116 | snd $_ => g_chg2 => $family, $set, \@del |
… | |
… | |
182 | # monitoring |
186 | # monitoring |
183 | |
187 | |
184 | sub g_disconnect($) { |
188 | sub g_disconnect($) { |
185 | my ($node) = @_; |
189 | my ($node) = @_; |
186 | |
190 | |
|
|
191 | db_del "'g" => $node; |
187 | g_clr $node; |
192 | g_clr $node; |
188 | |
193 | |
189 | if (my $mon = delete $GLOBAL_SLAVE{$node}) { |
194 | if (my $mon = delete $GLOBAL_SLAVE{$node}) { |
190 | while (my ($f, $fv) = each %$mon) { |
195 | while (my ($f, $fv) = each %$mon) { |
191 | delete $GLOBAL_MON{$f}{$_} |
196 | delete $GLOBAL_MON{$f}{$_} |
… | |
… | |
214 | }; |
219 | }; |
215 | |
220 | |
216 | ############################################################################# |
221 | ############################################################################# |
217 | # switch to global mode |
222 | # switch to global mode |
218 | |
223 | |
219 | # maintain conenctions to all global nodes that we know of |
|
|
220 | db_mon "'g" => sub { |
|
|
221 | keepalive_add $_ for @{ $_[1] }; |
|
|
222 | keepalive_del $_ for @{ $_[3] }; |
|
|
223 | }; |
|
|
224 | |
|
|
225 | # delete data from other nodes on node-down |
224 | # delete data from other nodes on node-down |
226 | mon_nodes sub { |
225 | mon_nodes sub { |
227 | g_disconnect $_[0] unless $_[1]; |
226 | g_disconnect $_[0] unless $_[1]; |
228 | }; |
227 | }; |
229 | |
228 | |
… | |
… | |
234 | |
233 | |
235 | # connect from a global node |
234 | # connect from a global node |
236 | sub g_global_connect { |
235 | sub g_global_connect { |
237 | my ($node) = @_; |
236 | my ($node) = @_; |
238 | |
237 | |
239 | # we need to set this currently, as to avoid race conditions |
238 | # each node puts the set of connected global nodes into |
240 | # because it takes a while until the other global node tells us it is global, |
239 | # 'g - this causes a big duplication and mergefest, but |
241 | # but we need to send updates even before that. |
240 | # is the easiest way to ensure global nodes have a list |
|
|
241 | # of all other global nodes. |
242 | |
242 | |
243 | undef $GLOBAL_DB {"'g"}{$node}; |
243 | db_set "'g" => $node; |
244 | undef $LOCAL_DBS{$node}{"'g"}{$node}; |
|
|
245 | |
244 | |
246 | # global nodes send all local databases, merged, |
245 | # global nodes send all local databases of their slaves, merged, |
247 | # as their local database to other global nodes |
246 | # as their local database to other global nodes |
248 | my %db; |
247 | my %db; |
249 | |
248 | |
250 | for (values %LOCAL_DBS) { |
249 | while (my ($k, $v) = each %LOCAL_DBS) { |
|
|
250 | next unless exists $GLOBAL_SLAVE{$k}; |
|
|
251 | |
251 | while (my ($f, $fv) = each %$_) { |
252 | while (my ($f, $fv) = each %$_) { |
252 | while (my ($k, $kv) = each %$fv) { |
253 | while (my ($k, $kv) = each %$fv) { |
253 | $db{$f}{$k} = $kv; |
254 | $db{$f}{$k} = $kv; |
254 | } |
255 | } |
255 | } |
256 | } |
… | |
… | |
289 | |
290 | |
290 | # global nodes are their own masters - this |
291 | # global nodes are their own masters - this |
291 | # resends global requests and sets the local database. |
292 | # resends global requests and sets the local database. |
292 | master_set $NODE; |
293 | master_set $NODE; |
293 | |
294 | |
|
|
295 | # from here on we should be able to act normally |
|
|
296 | |
294 | # now add us to the set of global nodes |
297 | # now add us to the set of global nodes |
295 | db_set "'g" => $NODE => undef; |
298 | db_set "'g" => $NODE; |
|
|
299 | |
|
|
300 | # maintain connections to all global nodes that we know of |
|
|
301 | db_mon "'g" => sub { |
|
|
302 | keepalive_add $_ for @{ $_[1] }; |
|
|
303 | keepalive_del $_ for @{ $_[3] }; |
|
|
304 | }; |
296 | |
305 | |
297 | ############################################################################# |
306 | ############################################################################# |
298 | # compatibility functions for aemp 1.0 |
307 | # compatibility functions for aemp 1.0 |
299 | |
308 | |
300 | package AnyEvent::MP::Global; |
309 | package AnyEvent::MP::Global; |