=head1 NAME AnyEvent::MP::Global - some network-global services =head1 SYNOPSIS use AnyEvent::MP::Global; =head1 DESCRIPTION This module maintains a fully-meshed network, if possible, and tries to ensure that we are connected to at least one other node. It also manages named port groups - ports can register themselves in any number of groups that will be available network-wide, which is great for discovering services. Running it on one node will automatically run it on all nodes, although, at the moment, the global service is started by default anyways. =head1 GLOBALS AND FUNCTIONS =over 4 =cut package AnyEvent::MP::Global; use common::sense; use Carp (); use MIME::Base64 (); use AnyEvent (); use AnyEvent::Util (); use AnyEvent::MP; use AnyEvent::MP::Kernel; use base "Exporter"; our @EXPORT = qw( grp_reg grp_get grp_mon ); our $VERSION = $AnyEvent::MP::VERSION; our %addr; # port ID => [address...] mapping our %port; # our rendezvous port on the other side our %lreg; # local registry, name => [pid...] our %lmon; # local registry monitoring name,pid => mon our %greg; # global regstry, name => pid => undef our %gmon; # group monitorign, group => [$cb...] our $nodecnt; $AnyEvent::MP::Kernel::WARN->(7, "starting global service."); ############################################################################# # seednodes our @SEEDS; our %SEED_CONNECT; our $SEED_WATCHER; sub seed_connect { my ($seed) = @_; my ($host, $port) = AnyEvent::Socket::parse_hostport $seed or Carp::croak "$seed: unparsable seed address"; # ughhh $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect $host, $port, seed => $seed, sub { delete $SEED_CONNECT{$seed}; after 1, \&more_seeding; }, ; } sub more_seeding { return if $nodecnt; return unless @SEEDS; $AnyEvent::MP::Kernel::WARN->(9, "no nodes connected, seeding."); seed_connect $SEEDS[rand @SEEDS]; } sub avoid_seed($) { @SEEDS = grep $_ ne $_[0], @SEEDS; } sub set_seeds(@) { @SEEDS = @_; $SEED_WATCHER ||= AE::timer 5, $AnyEvent::MP::Kernel::MONITOR_TIMEOUT, \&more_seeding; for my $seed (@SEEDS) { after 0.100 * rand, sub { seed_connect $seed }; } } ############################################################################# sub _change { my ($group, $add, $del) = @_; my $kv = $greg{$group} ||= {}; delete @$kv{@$del}; @$kv{@$add} = (); my $ports = [keys %$kv]; $_->($ports, $add, $del) for @{ $gmon{$group} }; } sub unreg_groups($) { my ($node) = @_; my $qr = qr/^\Q$node\E(?:#|$)/; my @del; while (my ($group, $ports) = each %greg) { @del = grep /$qr/, keys %$ports; _change $group, [], \@del if @del; } } sub set_groups($$) { my ($node, $lreg) = @_; while (my ($k, $v) = each %$lreg) { _change $k, $v, []; } } =item $guard = grp_reg $group, $port Register the given (local!) port in the named global group C<$group>. The port will be unregistered automatically when the port is destroyed. When not called in void context, then a guard object will be returned that will also cause the name to be unregistered when destroyed. =cut # unregister local port sub unregister { my ($port, $group) = @_; delete $lmon{"$group\x00$port"}; @{ $lreg{$group} } = grep $_ ne $port, @{ $lreg{$group} }; _change $group, [], [$port]; snd $_, reg0 => $group, $port for values %port; } # register local port sub grp_reg($$) { my ($group, $port) = @_; port_is_local $port or Carp::croak "AnyEvent::MP::Global::grp_reg can only be called for local ports, caught"; grep $_ eq $port, @{ $lreg{$group} } and Carp::croak "'$group': group already registered, cannot register a second time"; $lmon{"$group\x00$port"} = mon $port, sub { unregister $port, $group }; push @{ $lreg{$group} }, $port; snd $_, reg1 => $group, $port for values %port; _change $group, [$port], []; wantarray && AnyEvent::Util::guard { unregister $port, $group } } =item $ports = grp_get $group Returns all the ports currently registered to the given group (as read-only(!) array reference). When the group has no registered members, return C. =cut sub grp_get($) { my @ports = keys %{ $greg{$_[0]} }; @ports ? \@ports : undef } =item $guard = grp_mon $group, $callback->($ports, $add, $del) Installs a monitor on the given group. Each time there is a change it will be called with the current group members as an arrayref as the first argument. The second argument only contains ports added, the third argument only ports removed. Unlike C, all three arguments will always be array-refs, even if the array is empty. None of the arrays must be modified in any way. The first invocation will be with the first two arguments set to the current members, as if all of them were just added, but only when the group is atcually non-empty. Optionally returns a guard object that uninstalls the watcher when it is destroyed. =cut sub grp_mon($$) { my ($grp, $cb) = @_; AnyEvent::MP::Kernel::delay sub { return unless $cb; push @{ $gmon{$grp} }, $cb; $cb->(((grp_get $grp) || return) x 2, []); }; defined wantarray && AnyEvent::Util::::guard { my @mon = grep $_ != $cb, @{ delete $gmon{$grp} }; $gmon{$grp} = \@mon if @mon; undef $cb; } } sub start_node { my ($node) = @_; return if exists $port{$node}; return if $node eq $NODE; # do not connect to ourselves # establish connection my $port = $port{$node} = spawn $node, "AnyEvent::MP::Global::connect", 0, $NODE; mon $port, sub { unreg_groups $node; delete $port{$node}; }; snd $port, addr => $AnyEvent::MP::Kernel::LISTENER; snd $port, nodes => \%addr if %addr; snd $port, set => \%lreg if %lreg; } # other nodes connect via this sub connect { my ($version, $node) = @_; # monitor them, silently die mon $node, psub { kil $SELF }; rcv $SELF, addr => sub { my $addresses = shift; $AnyEvent::MP::Kernel::WARN->(9, "$node told us its addresses (@$addresses)."); $addr{$node} = $addresses; # to help listener-less nodes, we broadcast new addresses to them unconditionally #TODO: should be done by a node finding out about a listener-less one if (@$addresses) { for my $other (values %AnyEvent::MP::Kernel::NODE) { if ($other->{transport}) { if ($addr{$other->{id}} && !@{ $addr{$other->{id}} }) { $AnyEvent::MP::Kernel::WARN->(9, "helping $other->{id} to find $node."); snd $port{$other->{id}}, nodes => { $node => $addresses }; } } } } }, nodes => sub { my ($kv) = @_; use JSON::XS;#d# my $kv_txt = JSON::XS->new->encode ($kv);#d# $AnyEvent::MP::Kernel::WARN->(9, "$node told us it knows about $kv_txt.");#d# while (my ($id, $addresses) = each %$kv) { my $node = AnyEvent::MP::Kernel::add_node $id; $node->connect (@$addresses); start_node $id; } }, find => sub { my ($othernode) = @_; $AnyEvent::MP::Kernel::WARN->(9, "$node asked us to find $othernode."); snd $port{$node}, nodes => { $othernode => $addr{$othernode} } if $addr{$othernode}; }, set => sub { set_groups $node, shift; }, reg0 => sub { _change $_[0], [], [$_[1]]; }, reg1 => sub { _change $_[0], [$_[1]], []; }, ; } sub mon_node { my ($node, $is_up) = @_; if ($is_up) { ++$nodecnt; start_node $node; } else { --$nodecnt; more_seeding unless $nodecnt; unreg_groups $node; # forget about the node delete $addr{$node}; # ask other nodes if they know the node snd $_, find => $node for values %port; } #warn "node<$node,$is_up>\n";#d# } mon_node $_, 1 for up_nodes; mon_nodes \&mon_node; =back =head1 SEE ALSO L. =head1 AUTHOR Marc Lehmann http://home.schmorp.de/ =cut 1