… | |
… | |
58 | our $NODE; |
58 | our $NODE; |
59 | our $LISTENER; |
59 | our $LISTENER; |
60 | |
60 | |
61 | our $GLOBAL; |
61 | our $GLOBAL; |
62 | our $MASTER; |
62 | our $MASTER; |
|
|
63 | our $MASTER_MON; |
|
|
64 | our $MASTER_TIMER; |
63 | |
65 | |
64 | our %GLOBAL_SLAVE; |
66 | our %GLOBAL_SLAVE; |
65 | our $GLOBAL_MON; |
|
|
66 | |
67 | |
67 | our %GLOBAL_DB; # global db |
68 | our %GLOBAL_DB; # global db |
68 | our %LOCAL_DBS; # local databases of other global nodes |
69 | our %LOCAL_DBS; # local databases of other global nodes |
69 | our %LOCAL_DB; # this node database |
70 | our %LOCAL_DB; # this node database |
70 | |
71 | |
71 | our $SRCNODE; |
72 | our $SRCNODE; |
72 | our %node_req; |
73 | our %node_req; |
|
|
74 | |
|
|
75 | # only in global code |
|
|
76 | our %GLOBAL_MON; # monitors {family}{"" or key} |
|
|
77 | |
|
|
78 | sub other_globals() { |
|
|
79 | grep $_ ne $NODE && node_is_up $_, keys %{ $GLOBAL_DB{"'g"} } |
|
|
80 | } |
73 | |
81 | |
74 | # broadcasts a message to all other global nodes |
82 | # broadcasts a message to all other global nodes |
75 | sub g_broadcast { |
83 | sub g_broadcast { |
76 | snd $_, @_ |
84 | snd $_, @_ |
77 | for other_globals; |
85 | for other_globals; |
78 | } |
86 | } |
79 | |
87 | |
80 | sub g_mon_check { |
88 | sub g_mon_check { |
81 | warn "g_mon_check<@_>\n";#d# |
89 | warn "g_mon_check<@_>\n";#d# |
82 | use Data::Dump; ddx \%GLOBAL_DB;#d# |
90 | |
|
|
91 | my %node = ( |
|
|
92 | %{ $GLOBAL_MON{$_[1]}{$_[2]} }, |
|
|
93 | %{ $GLOBAL_MON{$_[1]}{"" } }, |
|
|
94 | %{ $GLOBAL_MON{"" }{"" } }, |
|
|
95 | ); |
|
|
96 | |
|
|
97 | snd $_ => g_chg1 => $_[1], $_[2], @_ > 2 ? $_[3] : () |
|
|
98 | for keys %node; |
83 | } |
99 | } |
84 | |
100 | |
85 | # add/replace a key in the database |
101 | # add/replace a key in the database |
86 | sub g_add($$$$) { |
102 | sub g_add($$$$) { |
87 | $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]} = |
103 | $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]} = |
… | |
… | |
165 | my ($node) = @_; |
181 | my ($node) = @_; |
166 | |
182 | |
167 | snd $SRCNODE->{id}, g_found => $node, $GLOBAL_DB{"'l"}{$node}; |
183 | snd $SRCNODE->{id}, g_found => $node, $GLOBAL_DB{"'l"}{$node}; |
168 | }; |
184 | }; |
169 | |
185 | |
|
|
186 | # monitoring |
|
|
187 | |
|
|
188 | sub g_slave_disconnect($) { |
|
|
189 | my ($node) = @_; |
|
|
190 | |
|
|
191 | g_clr $node; |
|
|
192 | |
|
|
193 | if (my $mon = delete $GLOBAL_SLAVE{$node}) { |
|
|
194 | while (my ($f, $fv) = each %$mon) { |
|
|
195 | delete $GLOBAL_MON{$f}{$_} |
|
|
196 | for keys %$fv; |
|
|
197 | } |
|
|
198 | } |
|
|
199 | } |
|
|
200 | |
|
|
201 | # g_mon0 family key - stop monitoring |
|
|
202 | $node_req{g_mon0} = sub { |
|
|
203 | delete $GLOBAL_MON{$_[0]}{$_[1]}{$SRCNODE->{id}}; |
|
|
204 | delete $GLOBAL_SLAVE{$SRCNODE->{id}}{$_[0]}{$_[1]}; |
|
|
205 | }; |
|
|
206 | |
|
|
207 | # g_mon1 family key - start monitoring |
|
|
208 | $node_req{g_mon1} = sub { |
|
|
209 | undef $GLOBAL_SLAVE{$SRCNODE->{id}}{$_[0]}{$_[1]}; |
|
|
210 | undef $GLOBAL_MON{$_[0]}{$_[1]}{$SRCNODE->{id}}; |
|
|
211 | #d# generate lots of initial change requests, or one big? |
|
|
212 | |
|
|
213 | snd $SRCNODE->{id}, g_chg0 => $_[0], $_[1] |
|
|
214 | }; |
|
|
215 | |
170 | ############################################################################# |
216 | ############################################################################# |
171 | # switch to global mode |
217 | # switch to global mode |
172 | |
218 | |
173 | $GLOBAL = 1; |
219 | $GLOBAL = 1; |
174 | $MASTER = $NODE; |
220 | $MASTER = $NODE; |
175 | undef $GLOBAL_SLAVE{$NODE}; # we are our own master (and slave) |
221 | undef $GLOBAL_SLAVE{$NODE}; # we are our own master (and slave) |
176 | undef $GLOBAL_MON; |
222 | $LOCAL_DBS{$NODE} = { %LOCAL_DB }; |
177 | |
223 | |
|
|
224 | # regularly try to connect to global nodes - maybe use seeding code? |
|
|
225 | $MASTER_TIMER = AE::timer 0, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, sub { |
|
|
226 | (add_node $_)->connect |
|
|
227 | for keys %{ $GLOBAL_DB{"'g"} }; |
|
|
228 | }; |
|
|
229 | |
|
|
230 | # instantly connect to other global nodes when we learn of them |
|
|
231 | # so we don't have to wait for the timer. |
|
|
232 | #TODO |
|
|
233 | # $GLOBAL_MON{"'g"}{""}{""} = sub { |
|
|
234 | # (add_node $_[1])->connect; |
|
|
235 | # }; |
|
|
236 | |
178 | # delete slaves on nodw-down |
237 | # delete slaves on node-down |
179 | # clear slave db on node-down |
238 | # clear slave db on node-down |
180 | $GLOBAL_MON = mon_nodes sub { |
239 | $MASTER_MON = mon_nodes sub { |
181 | return if $_[1]; |
240 | g_slave_disconnect $_[0] unless $_[1]; |
182 | |
|
|
183 | delete $GLOBAL_SLAVE{$_[0]}; |
|
|
184 | g_clr $_[0]; |
|
|
185 | ldb_set "'l" => $_[0]; |
|
|
186 | # clear listener and global database entries on node-down |
|
|
187 | #ldb_set "'g" => $_[0]; # really? |
|
|
188 | }; |
241 | }; |
189 | |
242 | |
190 | # tell everybody who connects that we are a global node |
243 | # tell everybody who connects that we are a global node |
191 | push @AnyEvent::MP::Transport::HOOK_GREET, sub { |
244 | push @AnyEvent::MP::Transport::HOOK_GREET, sub { |
192 | $_[0]{local_greeting}{global} = 1; |
245 | $_[0]{local_greeting}{global} = 1; |
193 | }; |
246 | }; |
194 | |
247 | |
|
|
248 | # connect from a global node |
|
|
249 | sub g_global_connect { |
|
|
250 | my ($node) = @_; |
|
|
251 | |
|
|
252 | # we need to set this currently, as to avoid race conditions |
|
|
253 | # because it takes a while until the other global node tells us it is global. |
|
|
254 | |
|
|
255 | undef $GLOBAL_DB{"'g"}{$node}; |
|
|
256 | undef $LOCAL_DBS{$node}{"'g"}{$node}; |
|
|
257 | |
|
|
258 | # global nodes send all local databases, merged, |
|
|
259 | # as their local database to global nodes |
|
|
260 | my %db; |
|
|
261 | |
|
|
262 | for (values %LOCAL_DBS) { |
|
|
263 | while (my ($f, $fv) = each %$_) { |
|
|
264 | while (my ($k, $kv) = each %$fv) { |
|
|
265 | $db{$f}{$k} = $kv; |
|
|
266 | } |
|
|
267 | } |
|
|
268 | } |
|
|
269 | |
|
|
270 | snd $node => g_set => \%db; |
|
|
271 | } |
|
|
272 | |
195 | # send our database to every global node that connects |
273 | # send our database to every global node that connects |
196 | push @AnyEvent::MP::Transport::HOOK_CONNECT, sub { |
274 | push @AnyEvent::MP::Transport::HOOK_CONNECT, sub { |
197 | return unless $_[0]{remote_greeting}{global}; |
275 | return unless $_[0]{remote_greeting}{global}; |
198 | |
276 | |
199 | # global nodes send all local databases, merged, |
277 | g_global_connect $_[0]{remote_node}; |
200 | # as their local database to global nodes |
|
|
201 | my %db; |
|
|
202 | |
|
|
203 | for (values %LOCAL_DBS) { |
|
|
204 | while (my ($f, $k) = each %$_) { |
|
|
205 | while (my ($kk, $kv) = each %$k) { |
|
|
206 | $db{$f}{$kk} = $kv; |
|
|
207 | } |
|
|
208 | } |
|
|
209 | } |
|
|
210 | |
|
|
211 | snd $_[0]{remote_node} => g_set => \%db; |
|
|
212 | }; |
278 | }; |
213 | |
279 | |
214 | # # tell our master else that we are global now |
280 | # tell our master that we are global now |
215 | # for (values %NODE) { |
281 | for (values %NODE) { |
216 | # if ($_->{transport} && $_->{transport}{remote_greeting}{global}) { |
282 | if ($_->{transport} && $_->{transport}{remote_greeting}{global}) { |
217 | # snd $_->{id} => "g_global"; |
283 | snd $_->{id} => "g_global"; |
218 | # snd $_->{id} => g_set => \%LOCAL_DB; |
284 | g_global_connect $_->{id}; |
219 | # } |
285 | } |
220 | # } |
286 | } |
221 | # |
287 | |
|
|
288 | $node_req{g_global} = sub { |
|
|
289 | g_slave_disconnect $SRCNODE->{id}; |
|
|
290 | $SRCNODE->{transport}{remote_greeting}{global} = 1; |
|
|
291 | g_global_connect $SRCNODE->{id}; |
|
|
292 | }; |
|
|
293 | |
222 | #d#d disconnect everybody to bootstrap development grr |
294 | #d#d disconnect everybody to bootstrap development grr |
223 | |
295 | |
224 | $_->transport_error # remove Self::transport_error |
296 | # $_->transport_error # remove Self::transport_error |
225 | for values %NODE; |
297 | # for values %NODE; |
226 | |
298 | |
227 | # now add us to the set of global nodes |
299 | # now add us to the set of global nodes |
228 | ldb_set "'g" => $NODE => undef; |
300 | ldb_set "'g" => $NODE => undef; |
229 | g_set $NODE => \%LOCAL_DB; |
|
|
230 | } |
301 | } |
231 | |
302 | |
232 | =item $guard = grp_reg $group, $port |
303 | =item $guard = grp_reg $group, $port |
233 | |
304 | |
234 | Register the given (local!) port in the named global group C<$group>. |
305 | Register the given (local!) port in the named global group C<$group>. |