=head1 NAME AnyEvent::MP::Global - some network-global services =head1 SYNOPSIS use AnyEvent::MP::Global; =head1 DESCRIPTION This module maintains a fully-meshed network between global nodes and tries to have connections with all nodes in the network. It also manages named port groups - ports can register themselves in any number of groups that will be available network-wide, which is great for discovering services. Running it on one node will automatically run it on all nodes, although, at the moment, the global service is started by default anyways. =head1 GLOBALS AND FUNCTIONS =over 4 =cut package AnyEvent::MP::Global; use common::sense; use Carp (); use AnyEvent (); use AnyEvent::Util (); use AnyEvent::MP; use AnyEvent::MP::Kernel; use AnyEvent::MP::Transport (); use base "Exporter"; our @EXPORT = qw( grp_reg grp_get grp_mon ); $AnyEvent::MP::Kernel::WARN->(7, "starting global service."); ############################################################################# # node protocol parts for global nodes { package AnyEvent::MP::Kernel; # TODO: this is ugly, maybe this should go into MP::Kernel or a separate module #d# our %NODE; our $NODE; our $LISTENER; our $GLOBAL; our $MASTER; our $MASTER_MON; our $MASTER_TIMER; our %GLOBAL_SLAVE; our %GLOBAL_DB; # global db our %LOCAL_DBS; # local databases of other global nodes our %LOCAL_DB; # this node database our $SRCNODE; our %node_req; # only in global code our %GLOBAL_MON; # monitors {family}{"" or key} sub other_globals() { grep $_ ne $NODE && node_is_up $_, keys %{ $GLOBAL_DB{"'g"} } } # broadcasts a message to all other global nodes sub g_broadcast { snd $_, @_ for other_globals; } sub g_mon_check { warn "g_mon_check<@_>\n";#d# my %node = ( %{ $GLOBAL_MON{$_[1]}{$_[2]} }, %{ $GLOBAL_MON{$_[1]}{"" } }, %{ $GLOBAL_MON{"" }{"" } }, ); snd $_ => g_chg1 => $_[1], $_[2], @_ > 2 ? $_[3] : () for keys %node; } # add/replace a key in the database sub g_add($$$$) { $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]} = $GLOBAL_DB {$_[1]}{$_[2]} = $_[3]; g_broadcast g_add => $_[1] => $_[2] => $_[3] if exists $GLOBAL_SLAVE{$_[0]}; warn "g_add<@_>\n";#d# &g_mon_check; } # delete a key from the database sub g_del($$$) { delete $LOCAL_DBS{$_[0]}{$_[1]}{$_[2]}; g_broadcast g_del => $_[1] => $_[2] if exists $GLOBAL_SLAVE{$_[0]}; delete $GLOBAL_DB{$_[1]}{$_[2]}; # check if other node maybe still has the key, then we don't delete, but add for (values %LOCAL_DBS) { if (exists $_->{$_[1]}{$_[2]}) { $GLOBAL_DB{$_[1]}{$_[2]} = $_->{$_[1]}{$_[2]}; last; } } warn "g_del<@_>\n";#d# &g_mon_check; } # delete all keys from a database sub g_clr($) { my ($node) = @_; my $db = $LOCAL_DBS{$node}; while (my ($f, $k) = each %$db) { g_del $node, $f => $_ for keys %$k; } delete $LOCAL_DBS{$node}; } # set the whole (node-local) database - previous value must be empty sub g_set($$) { my ($node, $db) = @_; while (my ($f, $k) = each %$db) { g_add $node, $f => $_ => delete $k->{$_} for keys %$k; } } # gather node databases from slaves # other node wants to make us the master $node_req{g_slave} = sub { my ($db) = @_; my $node = $SRCNODE->{id}; undef $GLOBAL_SLAVE{$node}; g_set $node, $db; }; $node_req{g_add} = sub { &g_add ($SRCNODE->{id}, @_); }; $node_req{g_del} = sub { &g_del ($SRCNODE->{id}, @_); }; $node_req{g_set} = sub { g_set $SRCNODE->{id}, $_[0]; }; $node_req{g_find} = sub { my ($node) = @_; snd $SRCNODE->{id}, g_found => $node, $GLOBAL_DB{"'l"}{$node}; }; # monitoring sub g_slave_disconnect($) { my ($node) = @_; g_clr $node; if (my $mon = delete $GLOBAL_SLAVE{$node}) { while (my ($f, $fv) = each %$mon) { delete $GLOBAL_MON{$f}{$_} for keys %$fv; } } } # g_mon0 family key - stop monitoring $node_req{g_mon0} = sub { delete $GLOBAL_MON{$_[0]}{$_[1]}{$SRCNODE->{id}}; delete $GLOBAL_SLAVE{$SRCNODE->{id}}{$_[0]}{$_[1]}; }; # g_mon1 family key - start monitoring $node_req{g_mon1} = sub { undef $GLOBAL_SLAVE{$SRCNODE->{id}}{$_[0]}{$_[1]}; undef $GLOBAL_MON{$_[0]}{$_[1]}{$SRCNODE->{id}}; #d# generate lots of initial change requests, or one big? snd $SRCNODE->{id}, g_chg0 => $_[0], $_[1] }; ############################################################################# # switch to global mode $GLOBAL = 1; $MASTER = $NODE; undef $GLOBAL_SLAVE{$NODE}; # we are our own master (and slave) $LOCAL_DBS{$NODE} = { %LOCAL_DB }; # regularly try to connect to global nodes - maybe use seeding code? $MASTER_TIMER = AE::timer 0, $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}, sub { (add_node $_)->connect for keys %{ $GLOBAL_DB{"'g"} }; }; # instantly connect to other global nodes when we learn of them # so we don't have to wait for the timer. #TODO # $GLOBAL_MON{"'g"}{""}{""} = sub { # (add_node $_[1])->connect; # }; # delete slaves on node-down # clear slave db on node-down $MASTER_MON = mon_nodes sub { g_slave_disconnect $_[0] unless $_[1]; }; # tell everybody who connects that we are a global node push @AnyEvent::MP::Transport::HOOK_GREET, sub { $_[0]{local_greeting}{global} = 1; }; # connect from a global node sub g_global_connect { my ($node) = @_; # we need to set this currently, as to avoid race conditions # because it takes a while until the other global node tells us it is global. undef $GLOBAL_DB{"'g"}{$node}; undef $LOCAL_DBS{$node}{"'g"}{$node}; # global nodes send all local databases, merged, # as their local database to global nodes my %db; for (values %LOCAL_DBS) { while (my ($f, $fv) = each %$_) { while (my ($k, $kv) = each %$fv) { $db{$f}{$k} = $kv; } } } snd $node => g_set => \%db; } # send our database to every global node that connects push @AnyEvent::MP::Transport::HOOK_CONNECT, sub { return unless $_[0]{remote_greeting}{global}; g_global_connect $_[0]{remote_node}; }; # tell our master that we are global now for (values %NODE) { if ($_->{transport} && $_->{transport}{remote_greeting}{global}) { snd $_->{id} => "g_global"; g_global_connect $_->{id}; } } $node_req{g_global} = sub { g_slave_disconnect $SRCNODE->{id}; $SRCNODE->{transport}{remote_greeting}{global} = 1; g_global_connect $SRCNODE->{id}; }; #d#d disconnect everybody to bootstrap development grr # $_->transport_error # remove Self::transport_error # for values %NODE; # now add us to the set of global nodes ldb_set "'g" => $NODE => undef; } =item $guard = grp_reg $group, $port Register the given (local!) port in the named global group C<$group>. The port will be unregistered automatically when the port is destroyed. When not called in void context, then a guard object will be returned that will also cause the name to be unregistered when destroyed. =cut # register local port sub grp_reg($$) { my ($group, $port) = @_; port_is_local $port or Carp::croak "AnyEvent::MP::Global::grp_reg can only be called for local ports, caught"; defined wantarray && AnyEvent::Util::guard { unregister ($port, $group) } } =item $ports = grp_get $group Returns all the ports currently registered to the given group (as read-only(!) array reference). When the group has no registered members, return C. =cut sub grp_get($) { } =item $guard = grp_mon $group, $callback->($ports, $add, $del) Installs a monitor on the given group. Each time there is a change it will be called with the current group members as an arrayref as the first argument. The second argument only contains ports added, the third argument only ports removed. Unlike C, all three arguments will always be array-refs, even if the array is empty. None of the arrays must be modified in any way. The first invocation will be with the first two arguments set to the current members, as if all of them were just added, but only when the group is actually non-empty. Optionally returns a guard object that uninstalls the watcher when it is destroyed. =cut sub grp_mon($$) { my ($grp, $cb) = @_; } =back =head1 SEE ALSO L. =head1 AUTHOR Marc Lehmann http://home.schmorp.de/ =cut 1