=head1 NAME AnyEvent::MP::Global - network backbone services =head1 SYNOPSIS use AnyEvent::MP::Global; =head1 DESCRIPTION This module is usually run (or started on) seed nodes and provides a variety of services to connected nodes, such as the distributed database. The global nodes form a fully-meshed network, that is, all global nodes currently maintain connections to all other global nodes. Loading this module (e.g. as a service) transforms the local node into a global node. There are no user-servicable parts inside. For a limited time, this module also exports some AEMP 1.x compatibility functions (C, C and C). =cut package AnyEvent::MP::Global; use common::sense; use Carp (); use AnyEvent (); use AnyEvent::MP; use AnyEvent::MP::Kernel; AE::log 7 => "starting global service."; ############################################################################# # node protocol parts for global nodes package AnyEvent::MP::Kernel; # TODO: this is ugly (classical use vars vs. our), # maybe this should go into MP::Kernel our %NODE; our $NODE; our $GLOBAL; our $MASTER; our $MASTER_TIMER; our %GLOBAL_SLAVE; our %GLOBAL_DB; # global db our %LOCAL_DBS; # local databases of other nodes (global and slave) our %LOCAL_DB; # this node database our $SRCNODE; # the origin node id our %NODE_REQ; # only in global code our %GLOBAL_MON; # monitors {family} # broadcasts a message to all other global nodes sub g_broadcast { snd $_, @_ for grep $_ ne $NODE && node_is_up $_, keys %{ $GLOBAL_DB{"'g"} } } # add/replace/del inside a family in the database # @$del must not contain any key in %$set sub g_upd { my ($node, $family, $set, $del) = @_; my $ldb = $LOCAL_DBS{$node}{$family} ||= {}; my $gdb = $GLOBAL_DB {$family} ||= {}; # add/replace keys while (my ($k, $v) = each %$set) { $ldb->{$k} = $gdb->{$k} = $v; } my @del; # actual deletes # take care of deletes keydel: for my $k (@$del) { next unless exists $ldb->{$k}; delete $ldb->{$k}; delete $gdb->{$k}; # check if some other node still has the key, then we don't delete, but change for (values %LOCAL_DBS) { if (exists $_->{$family}{$k}) { $set->{$k} = $gdb->{$k} = $_->{$family}{$k}; next keydel; } } push @del, $k; } # family could be empty now delete $GLOBAL_DB{$family} unless %$gdb; delete $LOCAL_DBS{$node}{$family} unless %$ldb; return unless %$set || @del; g_broadcast g_upd => $family, $set, \@del if exists $GLOBAL_SLAVE{$node}; # tell subscribers we have changed the family snd $_ => g_chg2 => $family, $set, \@del for keys %{ $GLOBAL_MON{$family} }; } # set the whole (node-local) database - previous value must be empty sub g_set($$) { my ($node, $db) = @_; while (my ($f, $k) = each %$db) { g_upd $node, $f, $k; } } # delete all keys from a database sub g_clr($) { my ($node) = @_; my $db = $LOCAL_DBS{$node}; while (my ($f, $k) = each %$db) { g_upd $node, $f, undef, [keys %$k]; } delete $LOCAL_DBS{$node}; } # gather node databases from slaves # other node wants to make us the master and sends us their db $NODE_REQ{g_slave} = sub { my ($db) = @_ or return; # empty g_slave is used to start global service my $node = $SRCNODE; undef $GLOBAL_SLAVE{$node}; g_set $node, $db; }; # other global node sends us their database $NODE_REQ{g_set} = sub { my ($db) = @_; g_set $SRCNODE, $db; # a remote node always has to provide their listeners. for global # nodes, we mirror their 'l locally, just as we also set 'g. # that's not very efficient, but ensures that global nodes # find each other. db_set "'l" => $SRCNODE => $db->{"'l"}{$SRCNODE}; }; # other node (global and slave) sends us a family update $NODE_REQ{g_upd} = sub { &g_upd ($SRCNODE, @_); }; # slave node wants to know the listeners of a node $NODE_REQ{g_find} = sub { my ($node) = @_; snd $SRCNODE, g_found => $node, $GLOBAL_DB{"'l"}{$node}; }; $NODE_REQ{g_db_family} = sub { my ($family, $id) = @_; snd $SRCNODE, g_reply => $id, $GLOBAL_DB{$family} || {}; }; $NODE_REQ{g_db_keys} = sub { my ($family, $id) = @_; snd $SRCNODE, g_reply => $id, [keys %{ $GLOBAL_DB{$family} } ]; }; $NODE_REQ{g_db_values} = sub { my ($family, $id) = @_; snd $SRCNODE, g_reply => $id, [values %{ $GLOBAL_DB{$family} } ]; }; # monitoring sub g_disconnect($) { my ($node) = @_; db_del "'g" => $node; db_del "'l" => $node; g_clr $node; if (my $mon = delete $GLOBAL_SLAVE{$node}) { while (my ($f, $fv) = each %$mon) { delete $GLOBAL_MON{$f}{$_} for keys %$fv; delete $GLOBAL_MON{$f} unless %{ $GLOBAL_MON{$f} }; } } } # g_mon0 family - stop monitoring $NODE_REQ{g_mon0} = sub { delete $GLOBAL_MON{$_[0]}{$SRCNODE}; delete $GLOBAL_MON{$_[0]} unless %{ $GLOBAL_MON{$_[0]} }; delete $GLOBAL_SLAVE{$SRCNODE}{$_[0]}; }; # g_mon1 family key - start monitoring $NODE_REQ{g_mon1} = sub { undef $GLOBAL_SLAVE{$SRCNODE}{$_[0]}; undef $GLOBAL_MON{$_[0]}{$SRCNODE}; snd $SRCNODE, g_chg1 => $_[0], $GLOBAL_DB{$_[0]}; }; ############################################################################# # switch to global mode # delete data from other nodes on node-down mon_nodes sub { g_disconnect $_[0] unless $_[1]; }; # tell everybody who connects that we are a global node push @AnyEvent::MP::Transport::HOOK_GREET, sub { $_[0]{local_greeting}{global} = 1; }; # connect from a global node sub g_global_connect { my ($node) = @_; # each node puts the set of connected global nodes into # 'g - this causes a big duplication and mergefest, but # is the easiest way to ensure global nodes have a list # of all other global nodes. # we also mirror 'l as soon as we receive it, causing # even more overhead. db_set "'g" => $node; # global nodes send all local databases of their slaves, merged, # as their database to other global nodes my %db; while (my ($k, $v) = each %LOCAL_DBS) { next unless exists $GLOBAL_SLAVE{$k}; while (my ($f, $fv) = each %$v) { while (my ($k, $kv) = each %$fv) { $db{$f}{$k} = $kv; } } } snd $node => g_set => \%db; } # send our database to every global node that connects push @AnyEvent::MP::Transport::HOOK_CONNECT, sub { return unless $_[0]{remote_greeting}{global}; g_global_connect $_[0]{remote_node}; }; # tell other global nodes that we are global now # TODO: there is probably a race when two conencted nodes beocme global at the same time # very ugly. for (values %NODE) { if ($_->{transport} && $_->{transport}{remote_greeting}{global}) { snd $_->{id} => "g_global"; g_global_connect $_->{id}; } } $NODE_REQ{g_global} = sub { g_disconnect $SRCNODE; my $node = $NODE{$SRCNODE}; $node->{transport}{remote_greeting}{global} = 1; g_global_connect $SRCNODE; }; # enable global mode $GLOBAL = 1; # global nodes are their own masters - this # resends global requests and sets the local database. master_set $NODE; # from here on we should be able to act normally # now add us to the set of global nodes db_set "'g" => $NODE; # maintain connections to all global nodes that we know of db_mon "'g" => sub { keepalive_add $_ for @{ $_[1] }; keepalive_del $_ for @{ $_[3] }; }; ############################################################################# # compatibility functions for aemp 1.0 package AnyEvent::MP::Global; use base "Exporter"; our @EXPORT = qw(grp_reg grp_get grp_mon); sub grp_reg($$) { &db_reg } sub grp_get($) { my @ports = keys %{ $AnyEvent::MP::Kernel::GLOBAL_DB{$_[0]} }; @ports ? \@ports : undef } sub grp_mon($$) { my ($grp, $cb) = @_; db_mon $grp => sub { my ($ports, $add, $chg, $del) = @_; $cb->([keys %$ports], $add, $del); }; } =head1 SEE ALSO L. =head1 AUTHOR Marc Lehmann http://home.schmorp.de/ =cut 1