--- AnyEvent-MP/MP/Global.pm 2012/02/29 18:58:23 1.45 +++ AnyEvent-MP/MP/Global.pm 2012/03/01 18:11:56 1.46 @@ -54,34 +54,137 @@ # TODO: this is ugly, maybe this should go into MP::Kernel or a separate module #d# + our %NODE; + our $NODE; + our $LISTENER; + our $GLOBAL; our $MASTER; - our $NODE_ADDR; - our $GLOBAL_ADDR; - our $NODE; + our %GLOBAL_SLAVE; our $GLOBAL_MON; - our $LISTENER; + our %GLOBAL_DB; # global db + our %LOCAL_DBS; # local databases of other global nodes + our %LOCAL_DB; # this node database + + our $SRCNODE; + our %node_req; + + # broadcasts a message to all other global nodes + sub g_broadcast { + snd $_, @_ + for other_globals; + } + + sub g_mon_check { + warn "g_mon_check<@_>\n";#d# + use Data::Dump; ddx \%GLOBAL_DB;#d# + } + + # add/replace a key in the database + sub g_add($$$$) { + $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]} = + $GLOBAL_DB {$_[1]}{$_[2]} = $_[3]; + + g_broadcast g_add => $_[1] => $_[2] => $_[3] + if exists $GLOBAL_SLAVE{$_[0]}; + + warn "g_add<@_>\n";#d# + &g_mon_check; + } + + # delete a key from the database + sub g_del($$$) { + delete $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]}; + + g_broadcast g_del => $_[1] => $_[2] + if exists $GLOBAL_SLAVE{$_[0]}; + + delete $GLOBAL_DB{$_[1]}{$_[2]}; + + # check if other node maybe still has the key, then we don't delete, but add + for (values %LOCAL_DBS) { + if (exists $_->{$_[1]}{$_[2]}) { + $GLOBAL_DB{$_[1]}{$_[2]} = $_->{$_[1]}{$_[2]}; + last; + } + } + + warn "g_del<@_>\n";#d# + &g_mon_check; + } + + # delete all keys from a database + sub g_clr($) { + my ($node) = @_; + + my $db = $LOCAL_DBS{$node}; + while (my ($f, $k) = each %$db) { + g_del $node, $f => $_ + for keys %$k; + } + + delete $LOCAL_DBS{$node}; + } + + # set the whole (node-local) database - previous value must be empty + sub g_set($$) { + my ($node, $db) = @_; + + while (my ($f, $k) = each %$db) { + g_add $node, $f => $_ => delete $k->{$_} + for keys %$k; + } + } + + # gather node databases from slaves + + # other node wants to make us the master + $node_req{g_slave} = sub { + my ($db) = @_; + + my $node = $SRCNODE->{id}; + undef $GLOBAL_SLAVE{$node}; + g_set $node, $db; + }; + + $node_req{g_add} = sub { + &g_add ($SRCNODE->{id}, @_); + }; + + $node_req{g_del} = sub { + &g_del ($SRCNODE->{id}, @_); + }; + + $node_req{g_set} = sub { + g_set $SRCNODE->{id}, $_[0]; + }; + + $node_req{g_find} = sub { + my ($node) = @_; + + snd $SRCNODE->{id}, g_found => $node, $GLOBAL_DB{"'l"}{$node}; + }; + + ############################################################################# # switch to global mode + $GLOBAL = 1; $MASTER = $NODE; + undef $GLOBAL_SLAVE{$NODE}; # we are our own master (and slave) undef $GLOBAL_MON; - $GLOBAL_ADDR->{$NODE} = $LISTENER; - + # delete slaves on nodw-down + # clear slave db on node-down $GLOBAL_MON = mon_nodes sub { return if $_[1]; - delete $NODE_ADDR->{$_[0]}; - - if (delete $GLOBAL_ADDR->{$_[0]}) { - # if the node is global, tell our slaves - - our %GLOBAL_SLAVE; # ugh, will be in AnyEvent::MP::Global - snd $_, g_del => $_[0] - for keys %GLOBAL_SLAVE; - } + delete $GLOBAL_SLAVE{$_[0]}; + g_clr $_[0]; + ldb_set "'l" => $_[0]; + # clear listener and global database entries on node-down + #ldb_set "'g" => $_[0]; # really? }; # tell everybody who connects that we are a global node @@ -89,15 +192,41 @@ $_[0]{local_greeting}{global} = 1; }; - # tell every global node that connects that we are global too + # send our database to every global node that connects push @AnyEvent::MP::Transport::HOOK_CONNECT, sub { - snd $_[0], g_add => $NODE, $LISTENER - if $_[0]{remote_greeting}{global}; + return unless $_[0]{remote_greeting}{global}; + + # global nodes send all local databases, merged, + # as their local database to global nodes + my %db; + + for (values %LOCAL_DBS) { + while (my ($f, $k) = each %$_) { + while (my ($kk, $kv) = each %$k) { + $db{$f}{$kk} = $kv; + } + } + } + + snd $_[0]{remote_node} => g_set => \%db; }; - # tell everybody else that we are global now - snd $_ => g_add => $NODE, $LISTENER - for up_nodes; +# # tell our master else that we are global now +# for (values %NODE) { +# if ($_->{transport} && $_->{transport}{remote_greeting}{global}) { +# snd $_->{id} => "g_global"; +# snd $_->{id} => g_set => \%LOCAL_DB; +# } +# } +# + #d#d disconnect everybody to bootstrap development grr + + $_->transport_error # remove Self::transport_error + for values %NODE; + + # now add us to the set of global nodes + ldb_set "'g" => $NODE => undef; + g_set $NODE => \%LOCAL_DB; } =item $guard = grp_reg $group, $port