=head1 NAME AnyEvent::MP::Global - network backbone services =head1 SYNOPSIS use AnyEvent::MP::Global; =head1 DESCRIPTION This module is usually run (or started on) seed nodes and provides a variety of services to connected nodes, such as the distributed database. The global nodes form a fully-meshed network, that is, all global nodes currently maintain connections to all other global nodes. Loading this module (e.g. as a service) transforms the local node into a global node. There are no user-servicable parts inside. =cut package AnyEvent::MP::Global; use common::sense; use Carp (); use AnyEvent (); use AnyEvent::MP; use AnyEvent::MP::Kernel; $AnyEvent::MP::Kernel::WARN->(7, "starting global service."); ############################################################################# # node protocol parts for global nodes package AnyEvent::MP::Kernel; # TODO: this is ugly (classical use vars vs. our), # maybe this should go into MP::Kernel our %NODE; our $NODE; our $GLOBAL; our $MASTER; our $MASTER_MON; our $MASTER_TIMER; our %GLOBAL_SLAVE; our %GLOBAL_DB; # global db our %LOCAL_DBS; # local databases of other global nodes our %LOCAL_DB; # this node database our $SRCNODE; # the origin node id our %NODE_REQ; # only in global code our %GLOBAL_MON; # monitors {family} sub other_globals() { grep $_ ne $NODE && node_is_up $_, keys %{ $GLOBAL_DB{"'g"} } } # broadcasts a message to all other global nodes sub g_broadcast { snd $_, @_ for other_globals; } # add/replace/del inside a family in the database # @$dle must not contain any key in %$set sub g_upd { my ($node, $family, $set, $del) = @_; my $ldb = $LOCAL_DBS{$node}{$family} ||= {}; my $gdb = $GLOBAL_DB {$family} ||= {}; # add/replace keys while (my ($k, $v) = each %$set) { $ldb->{$k} = $gdb->{$k} = $v; } my @del; # actual deletes # take care of deletes keydel: for my $k (@$del) { delete $ldb->{$k}; delete $gdb->{$k}; # check if some other node still has the key, then we don't delete, but change for (values %LOCAL_DBS) { if (exists $_->{$family}{$k}) { $set->{$k} = $gdb->{$k} = $_->{$family}{$k}; next keydel; } } push @del, $k; } # family could be empty now delete $GLOBAL_DB{$family} unless %$gdb; delete $LOCAL_DBS{$node}{$family} unless %$ldb; g_broadcast g_upd => $family, $set, \@del if exists $GLOBAL_SLAVE{$node}; # tell subscribers we have changed the family snd $_ => g_chg2 => $family, $set, \@del for keys %{ $GLOBAL_MON{$family} }; } # set the whole (node-local) database - previous value must be empty sub g_set($$) { my ($node, $db) = @_; while (my ($f, $k) = each %$db) { g_upd $node, $f, $k; } } # delete all keys from a database sub g_clr($) { my ($node) = @_; my $db = $LOCAL_DBS{$node}; while (my ($f, $k) = each %$db) { g_upd $node, $f, undef, [keys %$k]; } delete $LOCAL_DBS{$node}; } # gather node databases from slaves # other node wants to make us the master $NODE_REQ{g_slave} = sub { my ($db) = @_ or return; # empty g_slave is used to start global service my $node = $SRCNODE; undef $GLOBAL_SLAVE{$node}; g_set $node, $db; }; $NODE_REQ{g_set} = sub { g_set $SRCNODE, @_; }; $NODE_REQ{g_upd} = sub { g_upd $SRCNODE, @_; }; $NODE_REQ{g_find} = sub { my ($node) = @_; snd $SRCNODE, g_found => $node, $GLOBAL_DB{"'l"}{$node}; }; $NODE_REQ{g_db_family} = sub { my ($family, $id) = @_; snd $SRCNODE, g_reply => $id, $GLOBAL_DB{$family} || {}; }; $NODE_REQ{g_db_keys} = sub { my ($family, $id) = @_; snd $SRCNODE, g_reply => $id, [keys %{ $GLOBAL_DB{$family} } ]; }; $NODE_REQ{g_db_values} = sub { my ($family, $id) = @_; snd $SRCNODE, g_reply => $id, [values %{ $GLOBAL_DB{$family} } ]; }; # monitoring sub g_slave_disconnect($) { my ($node) = @_; g_clr $node; if (my $mon = delete $GLOBAL_SLAVE{$node}) { while (my ($f, $fv) = each %$mon) { delete $GLOBAL_MON{$f}{$_} for keys %$fv; delete $GLOBAL_MON{$f} unless %{ $GLOBAL_MON{$f} }; } } } # g_mon0 family - stop monitoring $NODE_REQ{g_mon0} = sub { delete $GLOBAL_MON{$_[0]}{$SRCNODE}; delete $GLOBAL_MON{$_[0]} unless %{ $GLOBAL_MON{$_[0]} }; delete $GLOBAL_SLAVE{$SRCNODE}{$_[0]}; }; # g_mon1 family key - start monitoring $NODE_REQ{g_mon1} = sub { undef $GLOBAL_SLAVE{$SRCNODE}{$_[0]}; undef $GLOBAL_MON{$_[0]}{$SRCNODE}; snd $SRCNODE, g_chg1 => $_[0], $GLOBAL_DB{$_[0]}; }; ############################################################################# # switch to global mode # regularly try to connect to global nodes - maybe use seeding code? $MASTER_TIMER = AE::timer 0, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, sub { (add_node $_)->connect for keys %{ $GLOBAL_DB{"'g"} }; }; # instantly connect to other global nodes when we learn of them # so we don't have to wait for the timer. #TODO # $GLOBAL_MON{"'g"}{""}{""} = sub { # (add_node $_[1])->connect; # }; # delete slaves on node-down # clear slave db on node-down $MASTER_MON = mon_nodes sub { g_slave_disconnect $_[0] unless $_[1]; }; # tell everybody who connects that we are a global node push @AnyEvent::MP::Transport::HOOK_GREET, sub { $_[0]{local_greeting}{global} = 1; }; # connect from a global node sub g_global_connect { my ($node) = @_; # we need to set this currently, as to avoid race conditions # because it takes a while until the other global node tells us it is global. undef $GLOBAL_DB{"'g"}{$node}; undef $LOCAL_DBS{$node}{"'g"}{$node}; # global nodes send all local databases, merged, # as their local database to global nodes my %db; for (values %LOCAL_DBS) { while (my ($f, $fv) = each %$_) { while (my ($k, $kv) = each %$fv) { $db{$f}{$k} = $kv; } } } snd $node => g_set => \%db; } # send our database to every global node that connects push @AnyEvent::MP::Transport::HOOK_CONNECT, sub { return unless $_[0]{remote_greeting}{global}; g_global_connect $_[0]{remote_node}; }; # tell our master that we are global now for (values %NODE) { if ($_->{transport} && $_->{transport}{remote_greeting}{global}) { snd $_->{id} => "g_global"; g_global_connect $_->{id}; } } $NODE_REQ{g_global} = sub { g_slave_disconnect $SRCNODE; my $node = $NODE{$SRCNODE}; $node->{transport}{remote_greeting}{global} = 1; g_global_connect $SRCNODE; }; # enable global mode $GLOBAL = 1; # global nodes are their own masters - this # resends global requests and sets the local database. master_set $NODE; # now add us to the set of global nodes db_set "'g" => $NODE => undef; ############################################################################# # compatibility functions for aemp 1.0 #d#TODO# #S AND FUNCTIONS $guard = grp_reg $group, $port Register the given (local!) port in the named global group $group. The port will be unregistered automatically when the port is destroyed. When not called in void context, then a guard object will be returned that will also cause the name to be unregistered when destroyed. # $ports = grp_get $group Returns all the ports currently registered to the given group (as read-only(!) array reference). When the group has no registered members, return undef. # $guard = grp_mon $group, $callback->($ports, $add, $del) Installs a monitor on the given group. Each time there is a change it will be called with the current group members as an arrayref as the first argument. The second argument only contains ports added, the third argument only ports removed. Unlike grp_get, all three arguments will always be array-refs, even if the array is empty. None of the arrays must be modified in any way. The first invocation will be with the first two arguments set to the current members, as if all of them were just added, but only when the group is actually non-empty. Optionally returns a guard object that uninstalls the watcher when it is destroyed. =head1 SEE ALSO L. =head1 AUTHOR Marc Lehmann http://home.schmorp.de/ =cut 1