=head1 NAME AnyEvent::MP::Global - network backbone services =head1 SYNOPSIS use AnyEvent::MP::Global; =head1 DESCRIPTION This module is usually run (or started on) seed nodes and provides a variety of services to connected nodes, such as the distributed database. The global nodes form a fully-meshed network, that is, all global nodes currently maintain connections to all other global nodes. Loading this module (e.g. as a service) transforms the local node into a global node. There are no user-servicable parts inside. For a limited time, this module also exports some AEMP 1.x compatibility functions (C, C and C). =cut package AnyEvent::MP::Global; use common::sense; use Carp (); use AnyEvent (); use AnyEvent::MP; use AnyEvent::MP::Kernel; AE::log 7 => "starting global service."; ############################################################################# # node protocol parts for global nodes package AnyEvent::MP::Kernel; # TODO: this is ugly (classical use vars vs. our), # maybe this should go into MP::Kernel our %NODE; our $NODE; our $GLOBAL; our $MASTER; our $MASTER_MON; our $MASTER_TIMER; our %GLOBAL_SLAVE; our %GLOBAL_DB; # global db our %LOCAL_DBS; # local databases of other global nodes our %LOCAL_DB; # this node database our $SRCNODE; # the origin node id our %NODE_REQ; # only in global code our %GLOBAL_MON; # monitors {family} sub other_globals() { grep $_ ne $NODE && node_is_up $_, keys %{ $GLOBAL_DB{"'g"} } } # broadcasts a message to all other global nodes sub g_broadcast { snd $_, @_ for other_globals; } # add/replace/del inside a family in the database # @$dle must not contain any key in %$set sub g_upd { my ($node, $family, $set, $del) = @_; my $ldb = $LOCAL_DBS{$node}{$family} ||= {}; my $gdb = $GLOBAL_DB {$family} ||= {}; # add/replace keys while (my ($k, $v) = each %$set) { $ldb->{$k} = $gdb->{$k} = $v; } my @del; # actual deletes # take care of deletes keydel: for my $k (@$del) { delete $ldb->{$k}; delete $gdb->{$k}; # check if some other node still has the key, then we don't delete, but change for (values %LOCAL_DBS) { if (exists $_->{$family}{$k}) { $set->{$k} = $gdb->{$k} = $_->{$family}{$k}; next keydel; } } push @del, $k; } # family could be empty now delete $GLOBAL_DB{$family} unless %$gdb; delete $LOCAL_DBS{$node}{$family} unless %$ldb; g_broadcast g_upd => $family, $set, \@del if exists $GLOBAL_SLAVE{$node}; # tell subscribers we have changed the family snd $_ => g_chg2 => $family, $set, \@del for keys %{ $GLOBAL_MON{$family} }; } # set the whole (node-local) database - previous value must be empty sub g_set($$) { my ($node, $db) = @_; while (my ($f, $k) = each %$db) { g_upd $node, $f, $k; } } # delete all keys from a database sub g_clr($) { my ($node) = @_; my $db = $LOCAL_DBS{$node}; while (my ($f, $k) = each %$db) { g_upd $node, $f, undef, [keys %$k]; } delete $LOCAL_DBS{$node}; } # gather node databases from slaves # other node wants to make us the master $NODE_REQ{g_slave} = sub { my ($db) = @_ or return; # empty g_slave is used to start global service my $node = $SRCNODE; undef $GLOBAL_SLAVE{$node}; g_set $node, $db; }; $NODE_REQ{g_set} = sub { &g_set ($SRCNODE, @_); }; $NODE_REQ{g_upd} = sub { &g_upd ($SRCNODE, @_); }; $NODE_REQ{g_find} = sub { my ($node) = @_; snd $SRCNODE, g_found => $node, $GLOBAL_DB{"'l"}{$node}; }; $NODE_REQ{g_db_family} = sub { my ($family, $id) = @_; snd $SRCNODE, g_reply => $id, $GLOBAL_DB{$family} || {}; }; $NODE_REQ{g_db_keys} = sub { my ($family, $id) = @_; snd $SRCNODE, g_reply => $id, [keys %{ $GLOBAL_DB{$family} } ]; }; $NODE_REQ{g_db_values} = sub { my ($family, $id) = @_; snd $SRCNODE, g_reply => $id, [values %{ $GLOBAL_DB{$family} } ]; }; # monitoring sub g_slave_disconnect($) { my ($node) = @_; g_clr $node; if (my $mon = delete $GLOBAL_SLAVE{$node}) { while (my ($f, $fv) = each %$mon) { delete $GLOBAL_MON{$f}{$_} for keys %$fv; delete $GLOBAL_MON{$f} unless %{ $GLOBAL_MON{$f} }; } } } # g_mon0 family - stop monitoring $NODE_REQ{g_mon0} = sub { delete $GLOBAL_MON{$_[0]}{$SRCNODE}; delete $GLOBAL_MON{$_[0]} unless %{ $GLOBAL_MON{$_[0]} }; delete $GLOBAL_SLAVE{$SRCNODE}{$_[0]}; }; # g_mon1 family key - start monitoring $NODE_REQ{g_mon1} = sub { undef $GLOBAL_SLAVE{$SRCNODE}{$_[0]}; undef $GLOBAL_MON{$_[0]}{$SRCNODE}; snd $SRCNODE, g_chg1 => $_[0], $GLOBAL_DB{$_[0]}; }; ############################################################################# # switch to global mode # regularly try to connect to global nodes - maybe use seeding code? $MASTER_TIMER = AE::timer 0, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, sub { (add_node $_)->connect for other_globals; }; # instantly connect to other global nodes when we learn of them # so we don't have to wait for the timer. #TODO # $GLOBAL_MON{"'g"}{""}{""} = sub { # (add_node $_[1])->connect; # }; # delete slaves on node-down # clear slave db on node-down $MASTER_MON = mon_nodes sub { g_slave_disconnect $_[0] unless $_[1]; }; # tell everybody who connects that we are a global node push @AnyEvent::MP::Transport::HOOK_GREET, sub { $_[0]{local_greeting}{global} = 1; }; # connect from a global node sub g_global_connect { my ($node) = @_; # we need to set this currently, as to avoid race conditions # because it takes a while until the other global node tells us it is global. undef $GLOBAL_DB{"'g"}{$node}; undef $LOCAL_DBS{$node}{"'g"}{$node}; # global nodes send all local databases, merged, # as their local database to global nodes my %db; for (values %LOCAL_DBS) { while (my ($f, $fv) = each %$_) { while (my ($k, $kv) = each %$fv) { $db{$f}{$k} = $kv; } } } snd $node => g_set => \%db; } # send our database to every global node that connects push @AnyEvent::MP::Transport::HOOK_CONNECT, sub { return unless $_[0]{remote_greeting}{global}; g_global_connect $_[0]{remote_node}; }; # tell our master that we are global now for (values %NODE) { if ($_->{transport} && $_->{transport}{remote_greeting}{global}) { snd $_->{id} => "g_global"; g_global_connect $_->{id}; } } $NODE_REQ{g_global} = sub { g_slave_disconnect $SRCNODE; my $node = $NODE{$SRCNODE}; $node->{transport}{remote_greeting}{global} = 1; g_global_connect $SRCNODE; }; # enable global mode $GLOBAL = 1; # global nodes are their own masters - this # resends global requests and sets the local database. master_set $NODE; # now add us to the set of global nodes db_set "'g" => $NODE => undef; ############################################################################# # compatibility functions for aemp 1.0 package AnyEvent::MP::Global; use base "Exporter"; our @EXPORT = qw(grp_reg grp_get grp_mon); sub grp_reg($$) { &db_reg } sub grp_get($) { my @ports = keys %{ $AnyEvent::MP::Kernel::GLOBAL_DB{$_[0]} }; @ports ? \@ports : undef } sub grp_mon($$) { my ($grp, $cb) = @_; db_mon $grp => sub { my ($ports, $add, $chg, $del) = @_; $cb->([keys %$ports], $add, $del); }; } =head1 SEE ALSO L. =head1 AUTHOR Marc Lehmann http://home.schmorp.de/ =cut 1