=head1 NAME AnyEvent::MP::Global - some network-global services =head1 SYNOPSIS use AnyEvent::MP::Global; =head1 DESCRIPTION This module maintains a fully-meshed network between global nodes and tries to have connections with all nodes in the network. It also manages named port groups - ports can register themselves in any number of groups that will be available network-wide, which is great for discovering services. Running it on one node will automatically run it on all nodes, although, at the moment, the global service is started by default anyways. =head1 GLOBALS AND FUNCTIONS =over 4 =cut package AnyEvent::MP::Global; use common::sense; use Carp (); use AnyEvent (); use AnyEvent::Util (); use AnyEvent::MP; use AnyEvent::MP::Kernel; use AnyEvent::MP::Transport (); use base "Exporter"; our @EXPORT = qw( grp_reg grp_get grp_mon ); $AnyEvent::MP::Kernel::WARN->(7, "starting global service."); ############################################################################# # node protocol parts for global nodes { package AnyEvent::MP::Kernel; # TODO: this is ugly, maybe this should go into MP::Kernel or a separate module #d# our %NODE; our $NODE; our $LISTENER; our $GLOBAL; our $MASTER; our %GLOBAL_SLAVE; our $GLOBAL_MON; our %GLOBAL_DB; # global db our %LOCAL_DBS; # local databases of other global nodes our %LOCAL_DB; # this node database our $SRCNODE; our %node_req; # broadcasts a message to all other global nodes sub g_broadcast { snd $_, @_ for other_globals; } sub g_mon_check { warn "g_mon_check<@_>\n";#d# use Data::Dump; ddx \%GLOBAL_DB;#d# } # add/replace a key in the database sub g_add($$$$) { $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]} = $GLOBAL_DB {$_[1]}{$_[2]} = $_[3]; g_broadcast g_add => $_[1] => $_[2] => $_[3] if exists $GLOBAL_SLAVE{$_[0]}; warn "g_add<@_>\n";#d# &g_mon_check; } # delete a key from the database sub g_del($$$) { delete $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]}; g_broadcast g_del => $_[1] => $_[2] if exists $GLOBAL_SLAVE{$_[0]}; delete $GLOBAL_DB{$_[1]}{$_[2]}; # check if other node maybe still has the key, then we don't delete, but add for (values %LOCAL_DBS) { if (exists $_->{$_[1]}{$_[2]}) { $GLOBAL_DB{$_[1]}{$_[2]} = $_->{$_[1]}{$_[2]}; last; } } warn "g_del<@_>\n";#d# &g_mon_check; } # delete all keys from a database sub g_clr($) { my ($node) = @_; my $db = $LOCAL_DBS{$node}; while (my ($f, $k) = each %$db) { g_del $node, $f => $_ for keys %$k; } delete $LOCAL_DBS{$node}; } # set the whole (node-local) database - previous value must be empty sub g_set($$) { my ($node, $db) = @_; while (my ($f, $k) = each %$db) { g_add $node, $f => $_ => delete $k->{$_} for keys %$k; } } # gather node databases from slaves # other node wants to make us the master $node_req{g_slave} = sub { my ($db) = @_; my $node = $SRCNODE->{id}; undef $GLOBAL_SLAVE{$node}; g_set $node, $db; }; $node_req{g_add} = sub { &g_add ($SRCNODE->{id}, @_); }; $node_req{g_del} = sub { &g_del ($SRCNODE->{id}, @_); }; $node_req{g_set} = sub { g_set $SRCNODE->{id}, $_[0]; }; $node_req{g_find} = sub { my ($node) = @_; snd $SRCNODE->{id}, g_found => $node, $GLOBAL_DB{"'l"}{$node}; }; ############################################################################# # switch to global mode $GLOBAL = 1; $MASTER = $NODE; undef $GLOBAL_SLAVE{$NODE}; # we are our own master (and slave) undef $GLOBAL_MON; # delete slaves on nodw-down # clear slave db on node-down $GLOBAL_MON = mon_nodes sub { return if $_[1]; delete $GLOBAL_SLAVE{$_[0]}; g_clr $_[0]; ldb_set "'l" => $_[0]; # clear listener and global database entries on node-down #ldb_set "'g" => $_[0]; # really? }; # tell everybody who connects that we are a global node push @AnyEvent::MP::Transport::HOOK_GREET, sub { $_[0]{local_greeting}{global} = 1; }; # send our database to every global node that connects push @AnyEvent::MP::Transport::HOOK_CONNECT, sub { return unless $_[0]{remote_greeting}{global}; # global nodes send all local databases, merged, # as their local database to global nodes my %db; for (values %LOCAL_DBS) { while (my ($f, $k) = each %$_) { while (my ($kk, $kv) = each %$k) { $db{$f}{$kk} = $kv; } } } snd $_[0]{remote_node} => g_set => \%db; }; # # tell our master else that we are global now # for (values %NODE) { # if ($_->{transport} && $_->{transport}{remote_greeting}{global}) { # snd $_->{id} => "g_global"; # snd $_->{id} => g_set => \%LOCAL_DB; # } # } # #d#d disconnect everybody to bootstrap development grr $_->transport_error # remove Self::transport_error for values %NODE; # now add us to the set of global nodes ldb_set "'g" => $NODE => undef; g_set $NODE => \%LOCAL_DB; } =item $guard = grp_reg $group, $port Register the given (local!) port in the named global group C<$group>. The port will be unregistered automatically when the port is destroyed. When not called in void context, then a guard object will be returned that will also cause the name to be unregistered when destroyed. =cut # register local port sub grp_reg($$) { my ($group, $port) = @_; port_is_local $port or Carp::croak "AnyEvent::MP::Global::grp_reg can only be called for local ports, caught"; defined wantarray && AnyEvent::Util::guard { unregister ($port, $group) } } =item $ports = grp_get $group Returns all the ports currently registered to the given group (as read-only(!) array reference). When the group has no registered members, return C. =cut sub grp_get($) { } =item $guard = grp_mon $group, $callback->($ports, $add, $del) Installs a monitor on the given group. Each time there is a change it will be called with the current group members as an arrayref as the first argument. The second argument only contains ports added, the third argument only ports removed. Unlike C, all three arguments will always be array-refs, even if the array is empty. None of the arrays must be modified in any way. The first invocation will be with the first two arguments set to the current members, as if all of them were just added, but only when the group is actually non-empty. Optionally returns a guard object that uninstalls the watcher when it is destroyed. =cut sub grp_mon($$) { my ($grp, $cb) = @_; } =back =head1 SEE ALSO L. =head1 AUTHOR Marc Lehmann http://home.schmorp.de/ =cut 1