--- AnyEvent-MP/MP/Global.pm 2012/03/02 19:21:16 1.48 +++ AnyEvent-MP/MP/Global.pm 2012/03/03 11:38:43 1.49 @@ -30,325 +30,259 @@ use Carp (); use AnyEvent (); -use AnyEvent::Util (); use AnyEvent::MP; use AnyEvent::MP::Kernel; -use AnyEvent::MP::Transport (); - -use base "Exporter"; - -our @EXPORT = qw( - grp_reg - grp_get - grp_mon -); $AnyEvent::MP::Kernel::WARN->(7, "starting global service."); ############################################################################# # node protocol parts for global nodes -{ - package AnyEvent::MP::Kernel; +package AnyEvent::MP::Kernel; - # TODO: this is ugly, maybe this should go into MP::Kernel or a separate module #d# +# TODO: this is ugly (classical use vars vs. our), +# maybe this should go into MP::Kernel - our %NODE; - our $NODE; - our $LISTENER; +our %NODE; +our $NODE; +our $LISTENER; - our $GLOBAL; - our $MASTER; - our $MASTER_MON; - our $MASTER_TIMER; +our $GLOBAL; +our $MASTER; +our $MASTER_MON; +our $MASTER_TIMER; - our %GLOBAL_SLAVE; +our %GLOBAL_SLAVE; - our %GLOBAL_DB; # global db - our %LOCAL_DBS; # local databases of other global nodes - our %LOCAL_DB; # this node database +our %GLOBAL_DB; # global db +our %LOCAL_DBS; # local databases of other global nodes +our %LOCAL_DB; # this node database - our $SRCNODE; - our %node_req; +our $SRCNODE; +our %NODE_REQ; - # only in global code - our %GLOBAL_MON; # monitors {family}{"" or key} +# only in global code +our %GLOBAL_MON; # monitors {family}{"" or key} - sub other_globals() { - grep $_ ne $NODE && node_is_up $_, keys %{ $GLOBAL_DB{"'g"} } - } +sub other_globals() { + grep $_ ne $NODE && node_is_up $_, keys %{ $GLOBAL_DB{"'g"} } +} - # broadcasts a message to all other global nodes - sub g_broadcast { - snd $_, @_ - for other_globals; - } +# broadcasts a message to all other global nodes +sub g_broadcast { + snd $_, @_ + for other_globals; +} - sub g_mon_check { - warn "g_mon_check<@_>\n";#d# +sub g_mon_check { + warn "g_mon_check<@_>\n";#d# - my %node = ( - %{ $GLOBAL_MON{$_[1]}{$_[2]} }, - %{ $GLOBAL_MON{$_[1]}{"" } }, - %{ $GLOBAL_MON{"" }{"" } }, - ); + my %node = ( + %{ $GLOBAL_MON{$_[1]}{$_[2]} }, + %{ $GLOBAL_MON{$_[1]}{"" } }, + %{ $GLOBAL_MON{"" }{"" } }, + ); - snd $_ => g_chg1 => $_[1], $_[2], @_ > 2 ? $_[3] : () - for keys %node; - } + snd $_ => g_chg1 => $_[1], $_[2], @_ > 2 ? $_[3] : () + for keys %node; +} - # add/replace a key in the database - sub g_add($$$$) { - $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]} = - $GLOBAL_DB {$_[1]}{$_[2]} = $_[3]; +# add/replace a key in the database +sub g_add($$$$) { + $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]} = + $GLOBAL_DB {$_[1]}{$_[2]} = $_[3]; - g_broadcast g_add => $_[1] => $_[2] => $_[3] - if exists $GLOBAL_SLAVE{$_[0]}; + g_broadcast g_add => $_[1] => $_[2] => $_[3] + if exists $GLOBAL_SLAVE{$_[0]}; - warn "g_add<@_>\n";#d# - &g_mon_check; - } + warn "g_add<@_>\n";#d# + &g_mon_check; +} - # delete a key from the database - sub g_del($$$) { - delete $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]}; - - g_broadcast g_del => $_[1] => $_[2] - if exists $GLOBAL_SLAVE{$_[0]}; - - delete $GLOBAL_DB{$_[1]}{$_[2]}; - - # check if other node maybe still has the key, then we don't delete, but add - for (values %LOCAL_DBS) { - if (exists $_->{$_[1]}{$_[2]}) { - $GLOBAL_DB{$_[1]}{$_[2]} = $_->{$_[1]}{$_[2]}; - last; - } +# delete a key from the database +sub g_del($$$) { + delete $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]}; + + g_broadcast g_del => $_[1] => $_[2] + if exists $GLOBAL_SLAVE{$_[0]}; + + delete $GLOBAL_DB{$_[1]}{$_[2]}; + + # check if other node maybe still has the key, then we don't delete, but add + for (values %LOCAL_DBS) { + if (exists $_->{$_[1]}{$_[2]}) { + $GLOBAL_DB{$_[1]}{$_[2]} = $_->{$_[1]}{$_[2]}; + last; } - - warn "g_del<@_>\n";#d# - &g_mon_check; } - # delete all keys from a database - sub g_clr($) { - my ($node) = @_; - - my $db = $LOCAL_DBS{$node}; - while (my ($f, $k) = each %$db) { - g_del $node, $f => $_ - for keys %$k; - } + warn "g_del<@_>\n";#d# + &g_mon_check; +} - delete $LOCAL_DBS{$node}; +# delete all keys from a database +sub g_clr($) { + my ($node) = @_; + + my $db = $LOCAL_DBS{$node}; + while (my ($f, $k) = each %$db) { + g_del $node, $f => $_ + for keys %$k; } - # set the whole (node-local) database - previous value must be empty - sub g_set($$) { - my ($node, $db) = @_; - - while (my ($f, $k) = each %$db) { - g_add $node, $f => $_ => delete $k->{$_} - for keys %$k; - } + delete $LOCAL_DBS{$node}; +} + +# set the whole (node-local) database - previous value must be empty +sub g_set($$) { + my ($node, $db) = @_; + + while (my ($f, $k) = each %$db) { + g_add $node, $f => $_ => delete $k->{$_} + for keys %$k; } +} - # gather node databases from slaves +# gather node databases from slaves - # other node wants to make us the master - $node_req{g_slave} = sub { - my ($db) = @_; +# other node wants to make us the master +$NODE_REQ{g_slave} = sub { + my ($db) = @_; - my $node = $SRCNODE->{id}; - undef $GLOBAL_SLAVE{$node}; - g_set $node, $db; - }; + my $node = $SRCNODE->{id}; + undef $GLOBAL_SLAVE{$node}; + g_set $node, $db; +}; - $node_req{g_add} = sub { - &g_add ($SRCNODE->{id}, @_); - }; +$NODE_REQ{g_add} = sub { + &g_add ($SRCNODE->{id}, @_); +}; - $node_req{g_del} = sub { - &g_del ($SRCNODE->{id}, @_); - }; +$NODE_REQ{g_del} = sub { + &g_del ($SRCNODE->{id}, @_); +}; - $node_req{g_set} = sub { - g_set $SRCNODE->{id}, $_[0]; - }; +$NODE_REQ{g_set} = sub { + g_set $SRCNODE->{id}, $_[0]; +}; - $node_req{g_find} = sub { - my ($node) = @_; +$NODE_REQ{g_find} = sub { + my ($node) = @_; - snd $SRCNODE->{id}, g_found => $node, $GLOBAL_DB{"'l"}{$node}; - }; + snd $SRCNODE->{id}, g_found => $node, $GLOBAL_DB{"'l"}{$node}; +}; - # monitoring +# monitoring - sub g_slave_disconnect($) { - my ($node) = @_; +sub g_slave_disconnect($) { + my ($node) = @_; - g_clr $node; + g_clr $node; - if (my $mon = delete $GLOBAL_SLAVE{$node}) { - while (my ($f, $fv) = each %$mon) { - delete $GLOBAL_MON{$f}{$_} - for keys %$fv; - } + if (my $mon = delete $GLOBAL_SLAVE{$node}) { + while (my ($f, $fv) = each %$mon) { + delete $GLOBAL_MON{$f}{$_} + for keys %$fv; } } +} + +# g_mon0 family key - stop monitoring +$NODE_REQ{g_mon0} = sub { + delete $GLOBAL_MON{$_[0]}{$_[1]}{$SRCNODE->{id}}; + delete $GLOBAL_SLAVE{$SRCNODE->{id}}{$_[0]}{$_[1]}; +}; + +# g_mon1 family key - start monitoring +$NODE_REQ{g_mon1} = sub { + undef $GLOBAL_SLAVE{$SRCNODE->{id}}{$_[0]}{$_[1]}; + undef $GLOBAL_MON{$_[0]}{$_[1]}{$SRCNODE->{id}}; + #d# generate lots of initial change requests, or one big? + + snd $SRCNODE->{id}, g_chg0 => $_[0], $_[1] +}; + +############################################################################# +# switch to global mode - # g_mon0 family key - stop monitoring - $node_req{g_mon0} = sub { - delete $GLOBAL_MON{$_[0]}{$_[1]}{$SRCNODE->{id}}; - delete $GLOBAL_SLAVE{$SRCNODE->{id}}{$_[0]}{$_[1]}; - }; - - # g_mon1 family key - start monitoring - $node_req{g_mon1} = sub { - undef $GLOBAL_SLAVE{$SRCNODE->{id}}{$_[0]}{$_[1]}; - undef $GLOBAL_MON{$_[0]}{$_[1]}{$SRCNODE->{id}}; - #d# generate lots of initial change requests, or one big? - - snd $SRCNODE->{id}, g_chg0 => $_[0], $_[1] - }; - - ############################################################################# - # switch to global mode - - $GLOBAL = 1; - $MASTER = $NODE; - undef $GLOBAL_SLAVE{$NODE}; # we are our own master (and slave) - $LOCAL_DBS{$NODE} = { %LOCAL_DB }; - - # regularly try to connect to global nodes - maybe use seeding code? - $MASTER_TIMER = AE::timer 0, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, sub { - (add_node $_)->connect - for keys %{ $GLOBAL_DB{"'g"} }; - }; - - # instantly connect to other global nodes when we learn of them - # so we don't have to wait for the timer. - #TODO +$GLOBAL = 1; +$MASTER = $NODE; +undef $GLOBAL_SLAVE{$NODE}; # we are our own master (and slave) +$LOCAL_DBS{$NODE} = { %LOCAL_DB }; + +# regularly try to connect to global nodes - maybe use seeding code? +$MASTER_TIMER = AE::timer 0, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, sub { + (add_node $_)->connect + for keys %{ $GLOBAL_DB{"'g"} }; +}; + +# instantly connect to other global nodes when we learn of them +# so we don't have to wait for the timer. +#TODO # $GLOBAL_MON{"'g"}{""}{""} = sub { # (add_node $_[1])->connect; # }; - # delete slaves on node-down - # clear slave db on node-down - $MASTER_MON = mon_nodes sub { - g_slave_disconnect $_[0] unless $_[1]; - }; - - # tell everybody who connects that we are a global node - push @AnyEvent::MP::Transport::HOOK_GREET, sub { - $_[0]{local_greeting}{global} = 1; - }; - - # connect from a global node - sub g_global_connect { - my ($node) = @_; - - # we need to set this currently, as to avoid race conditions - # because it takes a while until the other global node tells us it is global. - - undef $GLOBAL_DB{"'g"}{$node}; - undef $LOCAL_DBS{$node}{"'g"}{$node}; - - # global nodes send all local databases, merged, - # as their local database to global nodes - my %db; - - for (values %LOCAL_DBS) { - while (my ($f, $fv) = each %$_) { - while (my ($k, $kv) = each %$fv) { - $db{$f}{$k} = $kv; - } +# delete slaves on node-down +# clear slave db on node-down +$MASTER_MON = mon_nodes sub { + g_slave_disconnect $_[0] unless $_[1]; +}; + +# tell everybody who connects that we are a global node +push @AnyEvent::MP::Transport::HOOK_GREET, sub { + $_[0]{local_greeting}{global} = 1; +}; + +# connect from a global node +sub g_global_connect { + my ($node) = @_; + + # we need to set this currently, as to avoid race conditions + # because it takes a while until the other global node tells us it is global. + + undef $GLOBAL_DB{"'g"}{$node}; + undef $LOCAL_DBS{$node}{"'g"}{$node}; + + # global nodes send all local databases, merged, + # as their local database to global nodes + my %db; + + for (values %LOCAL_DBS) { + while (my ($f, $fv) = each %$_) { + while (my ($k, $kv) = each %$fv) { + $db{$f}{$k} = $kv; } } - - snd $node => g_set => \%db; - } - - # send our database to every global node that connects - push @AnyEvent::MP::Transport::HOOK_CONNECT, sub { - return unless $_[0]{remote_greeting}{global}; - - g_global_connect $_[0]{remote_node}; - }; - - # tell our master that we are global now - for (values %NODE) { - if ($_->{transport} && $_->{transport}{remote_greeting}{global}) { - snd $_->{id} => "g_global"; - g_global_connect $_->{id}; - } } - $node_req{g_global} = sub { - g_slave_disconnect $SRCNODE->{id}; - $SRCNODE->{transport}{remote_greeting}{global} = 1; - g_global_connect $SRCNODE->{id}; - }; - - # now add us to the set of global nodes - ldb_set "'g" => $NODE => undef; -} - -=item $guard = grp_reg $group, $port - -Register the given (local!) port in the named global group C<$group>. - -The port will be unregistered automatically when the port is destroyed. - -When not called in void context, then a guard object will be returned that -will also cause the name to be unregistered when destroyed. - -=cut - -# register local port -sub grp_reg($$) { - my ($group, $port) = @_; - - port_is_local $port - or Carp::croak "AnyEvent::MP::Global::grp_reg can only be called for local ports, caught"; - - defined wantarray && AnyEvent::Util::guard { unregister ($port, $group) } + snd $node => g_set => \%db; } -=item $ports = grp_get $group - -Returns all the ports currently registered to the given group (as -read-only(!) array reference). When the group has no registered members, -return C. - -=cut - -sub grp_get($) { +# send our database to every global node that connects +push @AnyEvent::MP::Transport::HOOK_CONNECT, sub { + return unless $_[0]{remote_greeting}{global}; + + g_global_connect $_[0]{remote_node}; +}; + +# tell our master that we are global now +for (values %NODE) { + if ($_->{transport} && $_->{transport}{remote_greeting}{global}) { + snd $_->{id} => "g_global"; + g_global_connect $_->{id}; + } } -=item $guard = grp_mon $group, $callback->($ports, $add, $del) +$NODE_REQ{g_global} = sub { + g_slave_disconnect $SRCNODE->{id}; + $SRCNODE->{transport}{remote_greeting}{global} = 1; + g_global_connect $SRCNODE->{id}; +}; -Installs a monitor on the given group. Each time there is a change it -will be called with the current group members as an arrayref as the -first argument. The second argument only contains ports added, the third -argument only ports removed. - -Unlike C, all three arguments will always be array-refs, even if -the array is empty. None of the arrays must be modified in any way. - -The first invocation will be with the first two arguments set to the -current members, as if all of them were just added, but only when the -group is actually non-empty. - -Optionally returns a guard object that uninstalls the watcher when it is -destroyed. - -=cut - -sub grp_mon($$) { - my ($grp, $cb) = @_; -} +# now add us to the set of global nodes +db_set "'g" => $NODE => undef; =back