=head1 NAME AnyEvent::MP::Kernel - the actual message passing kernel =head1 SYNOPSIS use AnyEvent::MP::Kernel; =head1 DESCRIPTION This module provides most of the basic functionality of AnyEvent::MP, exposed through higher level interfaces such as L and L. This module is mainly of interest when knowledge about connectivity, connected nodes etc. are needed. =head1 GLOBALS AND FUNCTIONS =over 4 =cut package AnyEvent::MP::Kernel; use common::sense; use POSIX (); use Carp (); use MIME::Base64 (); use AE (); use AnyEvent::MP::Node; use AnyEvent::MP::Transport; use base "Exporter"; our $VERSION = '0.8'; our @EXPORT = qw( %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID add_node load_func snd_to_func snd_on eval_on NODE $NODE node_of snd kil port_is_local resolve_node initialise_node known_nodes up_nodes mon_nodes node_is_known node_is_up ); our $DEFAULT_PORT = "4040"; our $CONNECT_INTERVAL = 2; # new connect every 2s, at least our $NETWORK_LATENCY = 3; # activity timeout our $MONITOR_TIMEOUT = 15; # fail monitoring after this time =item $AnyEvent::MP::Kernel::WARN->($level, $msg) This value is called with an error or warning message, when e.g. a connection could not be created, authorisation failed and so on. C<$level> sould be C<0> for messages ot be logged always, C<1> for unexpected messages and errors, C<2> for warnings, C<7> for messages about node connectivity and services, C<8> for debugging messages and C<9> for tracing messages. The default simply logs the message to STDERR. =cut our $WARN = sub { my ($level, $msg) = @_; $msg =~ s/\n$//; printf STDERR "%s <%d> %s\n", (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time), $level, $msg; }; sub load_func($) { my $func = $_[0]; unless (defined &$func) { my $pkg = $func; do { $pkg =~ s/::[^:]+$// or return sub { die "unable to resolve '$func'" }; eval "require $pkg"; } until defined &$func; } \&$func } sub nonce($) { my $nonce; if (open my $fh, "{id} -> @_\n" if TRACE && @_;#d# &{ $PORT{+shift} or return }; } # this function adds a node-ref, so you can send stuff to it # it is basically the central routing component. sub add_node { my ($node) = @_; $NODE{$node} ||= new AnyEvent::MP::Node::Direct $node } sub snd(@) { my ($nodeid, $portid) = split /#/, shift, 2; warn "SND $nodeid <- $portid @_\n" if TRACE;#d# ($NODE{$nodeid} || add_node $nodeid) ->{send} (["$portid", @_]); } =item $is_local = port_is_local $port Returns true iff the port is a local port. =cut sub port_is_local($) { my ($nodeid, undef) = split /#/, $_[0], 2; $NODE{$nodeid} == $NODE{""} } =item snd_to_func $node, $func, @args Expects a node ID and a name of a function. Asynchronously tries to call this function with the given arguments on that node. This function can be used to implement C-like interfaces. =cut sub snd_to_func($$;@) { my $nodeid = shift; ($NODE{$nodeid} || add_node $nodeid) ->send (["", @_]); } =item snd_on $node, @msg Executes C with the given C<@msg> (which must include the destination port) on the given node. =cut sub snd_on($@) { my $node = shift; snd $node, snd => @_; } =item eval_on $node, $string Evaluates the given string as Perl expression on the given node. =cut sub eval_on($@) { my $node = shift; snd $node, eval => @_; } sub kil(@) { my ($nodeid, $portid) = split /#/, shift, 2; length $portid or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught"; ($NODE{$nodeid} || add_node $nodeid) ->kill ("$portid", @_); } sub _nodename { require POSIX; (POSIX::uname ())[1] } sub _resolve($) { my ($nodeid) = @_; my $cv = AE::cv; my @res; $cv->begin (sub { my %seen; my @refs; for (sort { $a->[0] <=> $b->[0] } @res) { push @refs, $_->[1] unless $seen{$_->[1]}++ } shift->send (@refs); }); $nodeid = $DEFAULT_PORT unless length $nodeid; my $idx; for my $t (split /,/, $nodeid) { my $pri = ++$idx; $t = length $t ? _nodename . ":$t" : _nodename if $t =~ /^\d*$/; my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT" or Carp::croak "$t: unparsable transport descriptor"; $cv->begin; AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub { for (@_) { my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3]; push @res, [ $pri += 1e-5, AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service ]; } $cv->end; }; } $cv->end; $cv } sub initialise_node(;$%) { my ($profile) = @_; $profile = _nodename unless defined $profile; $CONFIG = AnyEvent::MP::Config::find_profile $profile; my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : $profile; $NODE = $node unless $node eq "anon/"; $NODE{$NODE} = $NODE{""}; $NODE{$NODE}{id} = $NODE; my $seeds = $CONFIG->{seeds}; my $binds = $CONFIG->{binds}; $binds ||= [$NODE]; $WARN->(8, "node $NODE starting up."); $LISTENER = []; %LISTENER = (); for (map _resolve $_, @$binds) { for my $bind ($_->recv) { my ($host, $port) = AnyEvent::Socket::parse_hostport $bind or Carp::croak "$bind: unparsable local bind address"; $LISTENER{$bind} = AnyEvent::MP::Transport::mp_server $host, $port; push @$LISTENER, $bind; } } # the global service is mandatory currently require AnyEvent::MP::Global; # connect to all seednodes AnyEvent::MP::Global::set_seeds (map $_->recv, map _resolve $_, @$seeds); for (@{ $CONFIG->{services} }) { if (s/::$//) { eval "require $_"; die $@ if $@; } else { (load_func $_)->(); } } } ############################################################################# # node monitoring and info sub _uniq_nodes { my %node; @node{values %NODE} = values %NODE; values %node; } sub _public_nodes { &_uniq_nodes } =item node_is_known $nodeid Returns true iff the given node is currently known to the system. =cut sub node_is_known($) { exists $NODE{$_[0]} } =item node_is_up $nodeid Returns true if the given node is "up", that is, the kernel thinks it has a working connection to it. If the node is known but not currently connected, returns C<0>. If the node is not known, returns C. =cut sub node_is_up($) { ($NODE{$_[0]} or return)->{transport} ? 1 : 0 } =item known_nodes Returns the node IDs of all public nodes connected to this node, including itself. =cut sub known_nodes { map $_->{id}, _public_nodes } =item up_nodes Return the node IDs of all public nodes that are currently connected (excluding the node itself). =cut sub up_nodes { map $_->{id}, grep $_->{transport}, _public_nodes } =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason) Registers a callback that is called each time a node goes up (connection is established) or down (connection is lost). Node up messages can only be followed by node down messages for the same node, and vice versa. The function returns an optional guard which can be used to de-register the monitoring callback again. =cut our %MON_NODES; sub mon_nodes($) { my ($cb) = @_; $MON_NODES{$cb+0} = $cb; wantarray && AnyEvent::Util::guard { delete $MON_NODES{$cb+0} } } sub _inject_nodeevent($$;@) { my ($node, $up, @reason) = @_; for my $cb (values %MON_NODES) { eval { $cb->($node->{id}, $up, @reason); 1 } or $WARN->(1, $@); } $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)"); } ############################################################################# # self node code our %node_req = ( # internal services # monitoring mon0 => sub { # disable monitoring my $portid = shift; my $node = $SRCNODE; $NODE{""}->unmonitor ($portid, delete $node->{rmon}{$portid}); }, mon1 => sub { # enable monitoring my $portid = shift; my $node = $SRCNODE; $NODE{""}->monitor ($portid, $node->{rmon}{$portid} = sub { $node->send (["", kil => $portid, @_]); }); }, kil => sub { my $cbs = delete $SRCNODE->{lmon}{+shift} or return; $_->(@_) for @$cbs; }, # "public" services - not actually public # relay message to another node / generic echo snd => \&snd, snd_multi => sub { snd @$_ for @_ }, # informational info => sub { snd @_, $NODE; }, known_nodes => sub { snd @_, known_nodes; }, up_nodes => sub { snd @_, up_nodes; }, # random garbage eval => sub { my @res = eval shift; snd @_, "$@", @res if @_; }, time => sub { snd @_, AE::time; }, devnull => sub { # }, "" => sub { # empty messages are sent by monitoring }, ); $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE; $PORT{""} = sub { my $tag = shift; eval { &{ $node_req{$tag} ||= load_func $tag } }; $WARN->(2, "error processing node message: $@") if $@; }; =back =head1 SEE ALSO L. =head1 AUTHOR Marc Lehmann http://home.schmorp.de/ =cut 1