=head1 NAME AnyEvent::MP::Kernel - the actual message passing kernel =head1 SYNOPSIS use AnyEvent::MP::Kernel; =head1 DESCRIPTION This module provides most of the basic functionality of AnyEvent::MP, exposed through higher level interfaces such as L and L. This module is mainly of interest when knowledge about connectivity, connected nodes etc. is sought. =head1 GLOBALS AND FUNCTIONS =over 4 =cut package AnyEvent::MP::Kernel; use common::sense; use POSIX (); use Carp (); use AnyEvent (); use Guard (); use AnyEvent::MP::Node; use AnyEvent::MP::Transport; use base "Exporter"; our @EXPORT_OK = qw( %NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID ); our @EXPORT = qw( add_node load_func snd_to_func snd_on eval_on NODE $NODE node_of snd kil port_is_local configure up_nodes mon_nodes node_is_up db_set db_del db_reg db_mon db_family db_keys db_values ); =item $AnyEvent::MP::Kernel::WARN->($level, $msg) This value is called with an error or warning message, when e.g. a connection could not be created, authorisation failed and so on. It I block or send messages -queue it and use an idle watcher if you need to do any of these things. C<$level> should be C<0> for messages to be logged always, C<1> for unexpected messages and errors, C<2> for warnings, C<7> for messages about node connectivity and services, C<8> for debugging messages and C<9> for tracing messages. The default simply logs the message to STDERR. =item @AnyEvent::MP::Kernel::WARN All code references in this array are called for every log message, from the default C<$WARN> handler. This is an easy way to tie into the log messages without disturbing others. =cut our $WARNLEVEL = exists $ENV{PERL_ANYEVENT_MP_WARNLEVEL} ? $ENV{PERL_ANYEVENT_MP_WARNLEVEL} : 5; our @WARN; our $WARN = sub { &$_ for @WARN; return if $WARNLEVEL < $_[0]; my ($level, $msg) = @_; $msg =~ s/\n$//; printf STDERR "%s <%d> %s\n", (POSIX::strftime "%Y-%m-%d %H:%M:%S", localtime time), $level, $msg; }; =item $AnyEvent::MP::Kernel::WARNLEVEL [default 5 or $ENV{PERL_ANYEVENT_MP_WARNLEVEL}] The maximum level at which warning messages will be printed to STDERR by the default warn handler. =cut sub load_func($) { my $func = $_[0]; unless (defined &$func) { my $pkg = $func; do { $pkg =~ s/::[^:]+$// or return sub { die "unable to resolve function '$func'" }; local $@; unless (eval "require $pkg; 1") { my $error = $@; $error =~ /^Can't locate .*.pm in \@INC \(/ or return sub { die $error }; } } until defined &$func; } \&$func } my @alnum = ('0' .. '9', 'A' .. 'Z', 'a' .. 'z'); sub nonce($) { join "", map chr rand 256, 1 .. $_[0] } sub nonce62($) { join "", map $alnum[rand 62], 1 .. $_[0] } sub gen_uniq { my $now = AE::now; (join "", map $alnum[$_], $$ / 62 % 62, $$ % 62, (int $now ) % 62, (int $now * 100) % 62, (int $now * 10000) % 62, ) . nonce62 4; } our $CONFIG; # this node's configuration our $SECURE = sub { 1 }; our $RUNIQ; # remote uniq value our $UNIQ; # per-process/node unique cookie our $NODE; our $ID = "a"; our %NODE; # node id to transport mapping, or "undef", for local node our (%PORT, %PORT_DATA); # local ports our %RMON; # local ports monitored by remote nodes ($RMON{nodeid}{portid} == cb) our %LMON; # monitored _local_ ports our $GLOBAL; # true if node is a global ("directory") node our %BINDS; our $BINDS; # our listeners, as arrayref our $SRCNODE; # holds the sending node _object_ during _inject sub _init_names { # ~54 bits, for local port names, lowercase $ID appended $UNIQ = gen_uniq; # ~59 bits, for remote port names, one longer than $UNIQ and uppercase at the end to avoid clashes $RUNIQ = nonce62 10; $RUNIQ =~ s/(.)$/\U$1/; $NODE = "anon/$RUNIQ"; } _init_names; sub NODE() { $NODE } sub node_of($) { my ($node, undef) = split /#/, $_[0], 2; $node } BEGIN { *TRACE = $ENV{PERL_ANYEVENT_MP_TRACE} ? sub () { 1 } : sub () { 0 }; } our $DELAY_TIMER; our @DELAY_QUEUE; sub _delay_run { (shift @DELAY_QUEUE or return undef $DELAY_TIMER)->() while 1; } sub delay($) { push @DELAY_QUEUE, shift; $DELAY_TIMER ||= AE::timer 0, 0, \&_delay_run; } sub _inject { warn "RCV $SRCNODE->{id} -> " . eval { JSON::XS->new->encode (\@_) } . "\n" if TRACE && @_;#d# &{ $PORT{+shift} or return }; } # this function adds a node-ref, so you can send stuff to it # it is basically the central routing component. sub add_node { my ($node) = @_; $NODE{$node} ||= new AnyEvent::MP::Node::Remote $node } sub snd(@) { my ($nodeid, $portid) = split /#/, shift, 2; warn "SND $nodeid <- " . eval { JSON::XS->new->encode ([$portid, @_]) } . "\n" if TRACE && @_;#d# defined $nodeid #d#UGLY or Carp::croak "'undef' is not a valid node ID/port ID"; ($NODE{$nodeid} || add_node $nodeid) ->{send} (["$portid", @_]); } =item $is_local = port_is_local $port Returns true iff the port is a local port. =cut sub port_is_local($) { my ($nodeid, undef) = split /#/, $_[0], 2; $NODE{$nodeid} == $NODE{""} } =item snd_to_func $node, $func, @args Expects a node ID and a name of a function. Asynchronously tries to call this function with the given arguments on that node. This function can be used to implement C-like interfaces. =cut sub snd_to_func($$;@) { my $nodeid = shift; # on $NODE, we artificially delay... (for spawn) # this is very ugly - maybe we should simply delay ALL messages, # to avoid deep recursion issues. but that's so... slow... $AnyEvent::MP::Node::Self::DELAY = 1 if $nodeid ne $NODE; defined $nodeid #d#UGLY or Carp::croak "'undef' is not a valid node ID/port ID"; ($NODE{$nodeid} || add_node $nodeid)->{send} (["", @_]); } =item snd_on $node, @msg Executes C with the given C<@msg> (which must include the destination port) on the given node. =cut sub snd_on($@) { my $node = shift; snd $node, snd => @_; } =item eval_on $node, $string[, @reply] Evaluates the given string as Perl expression on the given node. When @reply is specified, then it is used to construct a reply message with C<"$@"> and any results from the eval appended. =cut sub eval_on($$;@) { my $node = shift; snd $node, eval => @_; } sub kil(@) { my ($nodeid, $portid) = split /#/, shift, 2; length $portid or Carp::croak "$nodeid#$portid: killing a node port is not allowed, caught"; ($NODE{$nodeid} || add_node $nodeid) ->kill ("$portid", @_); } ############################################################################# # node monitoring and info =item node_is_known $nodeid #TODO# Returns true iff the given node is currently known to this node. =cut sub node_is_known($) { exists $NODE{$_[0]} } =item node_is_up $nodeid Returns true if the given node is "up", that is, the kernel thinks it has a working connection to it. If the node is known (to this local node) but not currently connected, returns C<0>. If the node is not known, returns C. =cut sub node_is_up($) { ($NODE{$_[0]} or return)->{transport} ? 1 : 0 } =item up_nodes Return the node IDs of all nodes that are currently connected (excluding the node itself). =cut sub up_nodes() { map $_->{id}, grep $_->{transport}, values %NODE } =item $guard = mon_nodes $callback->($nodeid, $is_up, @reason) Registers a callback that is called each time a node goes up (a connection is established) or down (the connection is lost). Node up messages can only be followed by node down messages for the same node, and vice versa. Note that monitoring a node is usually better done by monitoring its node port. This function is mainly of interest to modules that are concerned about the network topology and low-level connection handling. Callbacks I block and I send any messages. The function returns an optional guard which can be used to unregister the monitoring callback again. Example: make sure you call function C for all nodes that are up or go up (and down). newnode $_, 1 for up_nodes; mon_nodes \&newnode; =cut our %MON_NODES; sub mon_nodes($) { my ($cb) = @_; $MON_NODES{$cb+0} = $cb; defined wantarray && Guard::guard { delete $MON_NODES{$cb+0} } } sub _inject_nodeevent($$;@) { my ($node, $up, @reason) = @_; for my $cb (values %MON_NODES) { eval { $cb->($node->{id}, $up, @reason); 1 } or $WARN->(1, $@); } $WARN->(7, "$node->{id} is " . ($up ? "up" : "down") . " (@reason)"); } ############################################################################# # self node code sub _kill { my $port = shift; delete $PORT{$port} or return; # killing nonexistent ports is O.K. delete $PORT_DATA{$port}; my $mon = delete $LMON{$port} or !@_ or $WARN->(2, "unmonitored local port $port died with reason: @_"); $_->(@_) for values %$mon; } sub _monitor { return $_[2](no_such_port => "cannot monitor nonexistent port", "$NODE#$_[1]") unless exists $PORT{$_[1]}; $LMON{$_[1]}{$_[2]+0} = $_[2]; } sub _unmonitor { delete $LMON{$_[1]}{$_[2]+0} if exists $LMON{$_[1]}; } sub _secure_check { $SECURE->($SRCNODE->{id}) or die "remote execution attempt by insecure node\n"; } our %NODE_REQ = ( # internal services # monitoring mon0 => sub { # stop monitoring a port for another node my $portid = shift; _unmonitor undef, $portid, delete $SRCNODE->{rmon}{$portid}; }, mon1 => sub { # start monitoring a port for another node my $portid = shift; Scalar::Util::weaken (my $node = $SRCNODE); _monitor undef, $portid, $node->{rmon}{$portid} = sub { delete $node->{rmon}{$portid}; $node->send (["", kil0 => $portid, @_]) if $node && $node->{transport}; }; }, # another node has killed a monitored port kil0 => sub { my $cbs = delete $SRCNODE->{lmon}{+shift} or return; $_->(@_) for @$cbs; }, # "public" services - not actually public # another node wants to kill a local port kil => \&_kill, # relay message to another node / generic echo snd => \&snd, snd_multiple => sub { snd @$_ for @_ }, # random utilities eval => sub { &_secure_check; my @res = do { package main; eval shift }; snd @_, "$@", @res if @_; }, time => sub { snd @_, AE::now; }, devnull => sub { # }, "" => sub { # empty messages are keepalives or similar devnull-applications }, ); $NODE{""} = $NODE{$NODE} = new AnyEvent::MP::Node::Self $NODE; $PORT{""} = sub { my $tag = shift; eval { &{ $NODE_REQ{$tag} ||= do { &_secure_check; load_func $tag } } }; $WARN->(2, "error processing node message from $SRCNODE->{id}: $@") if $@; }; our $NPROTO = 1; # tell everybody who connects our nproto push @AnyEvent::MP::Transport::HOOK_GREET, sub { $_[0]{local_greeting}{nproto} = $NPROTO; }; ############################################################################# # seed management, try to keep connections to all seeds at all times our %SEED_NODE; # seed ID => node ID|undef our %NODE_SEED; # map node ID to seed ID our %SEED_CONNECT; # $seed => transport_connector | 1=connected | 2=connecting our $SEED_WATCHER; our $SEED_RETRY; sub seed_connect { my ($seed) = @_; my ($host, $port) = AnyEvent::Socket::parse_hostport $seed or Carp::croak "$seed: unparsable seed address"; $AnyEvent::MP::Kernel::WARN->(9, "trying connect to seed node $seed."); $SEED_CONNECT{$seed} ||= AnyEvent::MP::Transport::mp_connect $host, $port, on_greeted => sub { # called after receiving remote greeting, learn remote node name # we rely on untrusted data here (the remote node name) this is # hopefully ok, as this can at most be used for DOSing, which is easy # when you can do MITM anyway. # if we connect to ourselves, nuke this seed, but make sure we act like a seed if ($_[0]{remote_node} eq $AnyEvent::MP::Kernel::NODE) { require AnyEvent::MP::Global; # every seed becomes a global node currently delete $SEED_NODE{$seed}; delete $NODE_SEED{$seed}; } else { $SEED_NODE{$seed} = $_[0]{remote_node}; $NODE_SEED{$_[0]{remote_node}} = $seed; } }, on_destroy => sub { delete $SEED_CONNECT{$seed}; }, sub { $SEED_CONNECT{$seed} = 1; } ; } sub seed_all { my @seeds = grep { !exists $SEED_CONNECT{$_} && !(defined $SEED_NODE{$_} && node_is_up $SEED_NODE{$_}) } keys %SEED_NODE; if (@seeds) { # start connection attempt for every seed we are not connected to yet seed_connect $_ for @seeds; $SEED_RETRY = $SEED_RETRY * 2 + rand; $SEED_RETRY = $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout} if $SEED_RETRY > $AnyEvent::MP::Kernel::CONFIG->{monitor_timeout}; $SEED_WATCHER = AE::timer $SEED_RETRY, 0, \&seed_all; } else { # all seeds connected or connecting, no need to restart timer undef $SEED_WATCHER; } } sub seed_again { $SEED_RETRY = 1; $SEED_WATCHER ||= AE::timer 1, 0, \&seed_all; } # sets new seed list, starts connecting sub set_seeds(@) { %SEED_NODE = (); %NODE_SEED = (); %SEED_CONNECT = (); @SEED_NODE{@_} = (); seed_again;#d# seed_all; } mon_nodes sub { # if we lost the connection to a seed node, make sure we are seeding seed_again if !$_[1] && exists $NODE_SEED{$_[0]}; }; ############################################################################# # talk with/to global nodes # protocol messages: # # sent by all slave nodes (slave to master) # g_slave database - make other global node master of the sender # # sent by any node to global nodes # g_set database - set whole database # g_add family key val - add/replace key to database # g_del family key - delete key from database # g_get family key reply... - send reply with data # # send by global nodes # g_global - node became global, similar to global=1 greeting # # database families # "'l" -> node -> listeners # "'g" -> node -> undef # ... # # used on all nodes: our $MASTER; # the global node we bind ourselves to, unless we are global ourselves our $MASTER_MON; our %LOCAL_DB; # this node database our %GLOBAL_DB; # all local databases, merged - empty on non-global nodes our $GPROTO = 1; # tell everybody who connects our nproto push @AnyEvent::MP::Transport::HOOK_GREET, sub { $_[0]{local_greeting}{gproto} = $GPROTO; }; ############################################################################# # master selection # master requests our %GLOBAL_REQ; # $id => \@req sub global_req_add { my ($id, $req) = @_; return if exists $GLOBAL_REQ{$id}; $GLOBAL_REQ{$id} = $req; snd $MASTER, @$req if $MASTER; } sub global_req_del { delete $GLOBAL_REQ{$_[0]}; } sub g_find { global_req_add "g_find $_[0]", [g_find => $_[0]]; } # reply for g_find started in Node.pm $NODE_REQ{g_found} = sub { global_req_del "g_find $_[0]"; my $node = $NODE{$_[0]} or return; $node->connect_to ($_[1]); }; sub master_set { $MASTER = $_[0]; snd $MASTER, g_slave => \%LOCAL_DB; # (re-)send queued requests snd $MASTER, @$_ for values %GLOBAL_REQ; } sub master_search { #TODO: should also look for other global nodes, but we don't know them #d# for (keys %NODE_SEED) { if (node_is_up $_) { master_set $_; return; } } $MASTER_MON = mon_nodes sub { return unless $_[1]; # we are only interested in node-ups return unless $NODE_SEED{$_[0]}; # we are only interested in seed nodes master_set $_[0]; $MASTER_MON = mon_nodes sub { if ($_[0] eq $MASTER && !$_[1]) { undef $MASTER; master_search (); } }; }; } # other node wants to make us the master $NODE_REQ{g_slave} = sub { my ($db) = @_; # load global module and redo the request require AnyEvent::MP::Global; &{ $NODE_REQ{g_slave} } }; ############################################################################# # local database operations # local database management sub db_set($$$) { $LOCAL_DB{$_[0]}{$_[1]} = $_[2]; snd $MASTER, g_add => $_[0] => $_[1] => $_[2] if defined $MASTER; } sub db_del($$) { delete $LOCAL_DB{$_[0]}{$_[1]}; snd $MASTER, g_del => $_[0] => $_[1] if defined $MASTER; } sub db_reg($$;$) { my ($family, $key) = @_; &db_set; Guard::guard { db_del $family => $key } } sub db_keys($$$) { #d# } #d# db_values #d# db_family #d# db_key our %LOCAL_MON; # f, reply our %MON_DB; # f, k, value sub db_mon($@) { my ($family, $cb) = @_; if (my $db = $MON_DB{$family}) { # if we already monitor this thingy, generate # create events for all of them $cb->($db, [keys %$db]); } else { # new monitor, request chg1 from upstream global_req_add "mon1 $family" => [g_mon1 => $family]; $MON_DB{$family} = {}; } $LOCAL_MON{$family}{$cb+0} = $cb; Guard::guard { my $mon = $LOCAL_MON{$family}; delete $mon->{$cb+0}; unless (%$mon) { global_req_del "mon1 $family"; # no global_req, because we don't care if we are not connected snd $MASTER, g_mon0 => $family if $MASTER; delete $LOCAL_MON{$family}; delete $MON_DB{$family}; } } } # full update $NODE_REQ{g_chg1} = sub { my ($f, $ndb) = @_; my $db = $MON_DB{$f}; my @k; # add or replace keys while (my ($k, $v) = each %$ndb) { $db->{$k} = $v; push @k, $k; } # delete keys that are no longer present for (grep !exists $ndb->{$_}, keys %$db) { delete $db->{$_}; push @k, $_; } $_->($db, \@k) for values %{ $LOCAL_MON{$_[0]} }; }; # incremental update $NODE_REQ{g_chg2} = sub { my $db = $MON_DB{$_[0]}; @_ >= 3 ? $db->{$_[1]} = $_[2] : delete $db->{$_[1]}; $_->($db, [$_[1]]) for values %{ $LOCAL_MON{$_[0]} }; }; ############################################################################# # configure sub nodename { require POSIX; (POSIX::uname ())[1] } sub _resolve($) { my ($nodeid) = @_; my $cv = AE::cv; my @res; $cv->begin (sub { my %seen; my @refs; for (sort { $a->[0] <=> $b->[0] } @res) { push @refs, $_->[1] unless $seen{$_->[1]}++ } shift->send (@refs); }); my $idx; for my $t (split /,/, $nodeid) { my $pri = ++$idx; $t = length $t ? nodename . ":$t" : nodename if $t =~ /^\d*$/; my ($host, $port) = AnyEvent::Socket::parse_hostport $t, 0 or Carp::croak "$t: unparsable transport descriptor"; $port = "0" if $port eq "*"; if ($host eq "*") { $cv->begin; # use fork_call, as Net::Interface is big, and we need it rarely. require AnyEvent::Util; AnyEvent::Util::fork_call ( sub { my @addr; require Net::Interface; for my $if (Net::Interface->interfaces) { # we statically lower-prioritise ipv6 here, TODO :() for $_ ($if->address (Net::Interface::AF_INET ())) { next if /^\x7f/; # skip localhost etc. push @addr, $_; } for ($if->address (Net::Interface::AF_INET6 ())) { #next if $if->scope ($_) <= 2; next unless /^[\x20-\x3f\xfc\xfd]/; # global unicast, site-local unicast push @addr, $_; } } @addr }, sub { for my $ip (@_) { push @res, [ $pri += 1e-5, AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $ip, $port ]; } $cv->end; } ); } else { $cv->begin; AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub { for (@_) { my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3]; push @res, [ $pri += 1e-5, AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service ]; } $cv->end; }; } } $cv->end; $cv } sub configure(@) { unshift @_, "profile" if @_ & 1; my (%kv) = @_; delete $NODE{$NODE}; # we do not support doing stuff before configure _init_names; my $profile = delete $kv{profile}; $profile = nodename unless defined $profile; $CONFIG = AnyEvent::MP::Config::find_profile $profile, %kv; if (exists $CONFIG->{secure}) { my $pass = !$CONFIG->{secure}; $SECURE = sub { $pass }; } my $node = exists $CONFIG->{nodeid} ? $CONFIG->{nodeid} : "$profile/"; $node or Carp::croak "$node: illegal node ID (see AnyEvent::MP manpage for syntax)\n"; $NODE = $node; $NODE =~ s/%n/nodename/ge; if ($NODE =~ s!(?:(?<=/)$|%u)!$RUNIQ!g) { # nodes with randomised node names do not need randomised port names $UNIQ = ""; } $NODE{$NODE} = $NODE{""}; $NODE{$NODE}{id} = $NODE; my $seeds = $CONFIG->{seeds}; my $binds = $CONFIG->{binds}; $binds ||= ["*"]; $WARN->(8, "node $NODE starting up."); $BINDS = []; %BINDS = (); for (map _resolve $_, @$binds) { for my $bind ($_->recv) { my ($host, $port) = AnyEvent::Socket::parse_hostport $bind or Carp::croak "$bind: unparsable local bind address"; my $listener = AnyEvent::MP::Transport::mp_server $host, $port, prepare => sub { my (undef, $host, $port) = @_; $bind = AnyEvent::Socket::format_hostport $host, $port; 0 }, ; $BINDS{$bind} = $listener; push @$BINDS, $bind; } } db_set "'l" => $NODE => $BINDS; $WARN->(8, "node listens on [@$BINDS]."); # connect to all seednodes set_seeds map $_->recv, map _resolve $_, @$seeds; master_search; if ($NODE eq "atha") {;#d# my $w; $w = AE::timer 4, 0, sub { undef $w; require AnyEvent::MP::Global };#d# } for (@{ $CONFIG->{services} }) { if (ref) { my ($func, @args) = @$_; (load_func $func)->(@args); } elsif (s/::$//) { eval "require $_"; die $@ if $@; } else { (load_func $_)->(); } } } =back =head1 SEE ALSO L. =head1 AUTHOR Marc Lehmann http://home.schmorp.de/ =cut 1