--- AnyEvent-MP/MP.pm 2009/07/31 20:55:46 1.2 +++ AnyEvent-MP/MP.pm 2009/08/14 16:15:37 1.54 @@ -6,21 +6,48 @@ use AnyEvent::MP; - NODE # returns this node identifier - $NODE # contains this node identifier + $NODE # contains this node's noderef + NODE # returns this node's noderef + NODE $port # returns the noderef of the port + $SELF # receiving/own port id in rcv callbacks + + # initialise the node so it can send/receive messages + initialise_node; # -OR- + initialise_node "localhost:4040"; # -OR- + initialise_node "slave/", "localhost:4040" + + # ports are message endpoints + + # sending messages snd $port, type => data...; + snd $port, @msg; + snd @msg_with_first_element_being_a_port; + + # creating/using ports, the simple way + my $simple_port = port { my @msg = @_; 0 }; + + # creating/using ports, tagged message matching + my $port = port; + rcv $port, ping => sub { snd $_[0], "pong"; 0 }; + rcv $port, pong => sub { warn "pong received\n"; 0 }; + + # create a port on another node + my $port = spawn $node, $initfunc, @initdata; + + # monitoring + mon $port, $cb->(@msg) # callback is invoked on death + mon $port, $otherport # kill otherport on abnormal death + mon $port, $otherport, @msg # send message on death + +=head1 CURRENT STATUS - rcv $port, smartmatch => $cb->($port, @msg); + AnyEvent::MP - stable API, should work + AnyEvent::MP::Intro - outdated + AnyEvent::MP::Kernel - WIP + AnyEvent::MP::Transport - mostly stable - # examples: - rcv $port2, ping => sub { snd $_[0], "pong"; 0 }; - rcv $port1, pong => sub { warn "pong received\n" }; - snd $port2, ping => $port1; - - # more, smarter, matches (_any_ is exported by this module) - rcv $port, [child_died => $pid] => sub { ... - rcv $port, [_any_, _any_, 3] => sub { .. $_[2] is 3 + stay tuned. =head1 DESCRIPTION @@ -29,47 +56,58 @@ Despite its simplicity, you can securely message other processes running on the same or other hosts. +For an introduction to this module family, see the L +manual page. + +At the moment, this module family is severly broken and underdocumented, +so do not use. This was uploaded mainly to reserve the CPAN namespace - +stay tuned! + =head1 CONCEPTS =over 4 =item port -A port is something you can send messages to with the C function, and -you can register C handlers with. All C handlers will receive -messages they match, messages will not be queued. - -=item port id - C - -A port id is always the node id, a hash-mark (C<#>) as separator, followed -by a port name. - -A port name can be a well known port (basically an identifier/bareword), -or a generated name, consisting of node id, a dot (C<.>), and an -identifier. +A port is something you can send messages to (with the C function). -=item node +Ports allow you to register C handlers that can match all or just +some messages. Messages will not be queued. -A node is a single process containing at least one port - the node -port. You can send messages to node ports to let them create new ports, -among other things. +=item port id - C -Initially, nodes are either private (single-process only) or hidden -(connected to a father node only). Only when they epxlicitly "go public" -can you send them messages form unrelated other nodes. +A port ID is the concatenation of a noderef, a hash-mark (C<#>) as +separator, and a port name (a printable string of unspecified format). An +exception is the the node port, whose ID is identical to its node +reference. -Public nodes automatically connect to all other public nodes in a network -when they connect, creating a full mesh. +=item node -=item node id - C, C, C +A node is a single process containing at least one port - the node port, +which provides nodes to manage each other remotely, and to create new +ports. -A node ID is a string that either uniquely identifies a given node (For -private and hidden nodes), or contains a recipe on how to reach a given +Nodes are either private (single-process only), slaves (connected to a +master node only) or public nodes (connectable from unrelated nodes). + +=item noderef - C, C, C + +A node reference is a string that either simply identifies the node (for +private and slave nodes), or contains a recipe on how to reach a given node (for public nodes). +This recipe is simply a comma-separated list of C pairs (for +TCP/IP, other protocols might look different). + +Node references come in two flavours: resolved (containing only numerical +addresses) or unresolved (where hostnames are used instead of addresses). + +Before using an unresolved node reference in a message you first have to +resolve it. + =back -=head1 FUNCTIONS +=head1 VARIABLES/FUNCTIONS =over 4 @@ -77,11 +115,8 @@ package AnyEvent::MP; -use AnyEvent::MP::Util (); -use AnyEvent::MP::Node; -use AnyEvent::MP::Transport; +use AnyEvent::MP::Kernel; -use utf8; use common::sense; use Carp (); @@ -90,192 +125,762 @@ use base "Exporter"; -our $VERSION = '0.0'; -our @EXPORT = qw(NODE $NODE $PORT snd rcv _any_); +our $VERSION = $AnyEvent::MP::Kernel::VERSION; -our $DEFAULT_SECRET; -our $DEFAULT_PORT = "4040"; +our @EXPORT = qw( + NODE $NODE *SELF node_of _any_ + resolve_node initialise_node + snd rcv mon kil reg psub spawn + port +); + +our $SELF; + +sub _self_die() { + my $msg = $@; + $msg =~ s/\n+$// unless ref $msg; + kil $SELF, die => $msg; +} -our $CONNECT_INTERVAL = 5; # new connect every 5s, at least -our $CONNECT_TIMEOUT = 30; # includes handshake +=item $thisnode = NODE / $NODE -sub default_secret { - unless (defined $DEFAULT_SECRET) { - if (open my $fh, "<$ENV{HOME}/.aemp-secret") { - sysread $fh, $DEFAULT_SECRET, -s $fh; - } else { - $DEFAULT_SECRET = AnyEvent::MP::Util::nonce 32; - } - } +The C function returns, and the C<$NODE> variable contains the +noderef of the local node. The value is initialised by a call to +C. - $DEFAULT_SECRET -} +=item $noderef = node_of $port -our $UNIQ = sprintf "%x.%x", $$, time; # per-process/node unique cookie -our $PUBLIC = 0; -our $NODE; -our $PORT; - -our %NODE; # node id to transport mapping, or "undef", for local node -our %PORT; # local ports -our %LISTENER; # local transports - -sub NODE() { $NODE } - -{ - use POSIX (); - my $nodename = (POSIX::uname)[1]; - $NODE = "$$\@$nodename"; -} +Extracts and returns the noderef from a port ID or a noderef. -sub _ANY_() { 1 } -sub _any_() { \&_ANY_ } +=item initialise_node $noderef, $seednode, $seednode... -sub add_node { - my ($noderef) = @_; +=item initialise_node "slave/", $master, $master... - return $NODE{$noderef} - if exists $NODE{$noderef}; +Before a node can talk to other nodes on the network it has to initialise +itself - the minimum a node needs to know is it's own name, and optionally +it should know the noderefs of some other nodes in the network. - for (split /,/, $noderef) { - return $NODE{$noderef} = $NODE{$_} - if exists $NODE{$_}; - } +This function initialises a node - it must be called exactly once (or +never) before calling other AnyEvent::MP functions. - # for indirect sends, use a different class - my $node = new AnyEvent::MP::Node::Direct $noderef; +All arguments (optionally except for the first) are noderefs, which can be +either resolved or unresolved. - $NODE{$_} = $node - for $noderef, split /,/, $noderef; +The first argument will be looked up in the configuration database first +(if it is C then the current nodename will be used instead) to find +the relevant configuration profile (see L). If none is found then +the default configuration is used. The configuration supplies additional +seed/master nodes and can override the actual noderef. - $node -} +There are two types of networked nodes, public nodes and slave nodes: -sub snd($@) { - my ($noderef, $port) = split /#/, shift, 2; +=over 4 + +=item public nodes + +For public nodes, C<$noderef> (supplied either directly to +C or indirectly via a profile or the nodename) must be a +noderef (possibly unresolved, in which case it will be resolved). + +After resolving, the node will bind itself on all endpoints and try to +connect to all additional C<$seednodes> that are specified. Seednodes are +optional and can be used to quickly bootstrap the node into an existing +network. + +=item slave nodes + +When the C<$noderef> (either as given or overriden by the config file) +is the special string C, then the node will become a slave +node. Slave nodes cannot be contacted from outside and will route most of +their traffic to the master node that they attach to. + +At least one additional noderef is required (either by specifying it +directly or because it is part of the configuration profile): The node +will try to connect to all of them and will become a slave attached to the +first node it can successfully connect to. + +=back + +This function will block until all nodes have been resolved and, for slave +nodes, until it has successfully established a connection to a master +server. + +Example: become a public node listening on the guessed noderef, or the one +specified via C for the current node. This should be the most common +form of invocation for "daemon"-type nodes. + + initialise_node; + +Example: become a slave node to any of the the seednodes specified via +C. This form is often used for commandline clients. + + initialise_node "slave/"; + +Example: become a slave node to any of the specified master servers. This +form is also often used for commandline clients. + + initialise_node "slave/", "master1", "192.168.13.17", "mp.example.net"; + +Example: become a public node, and try to contact some well-known master +servers to become part of the network. + + initialise_node undef, "master1", "master2"; + +Example: become a public node listening on port C<4041>. + + initialise_node 4041; + +Example: become a public node, only visible on localhost port 4044. + + initialise_node "localhost:4044"; + +=item $cv = resolve_node $noderef + +Takes an unresolved node reference that may contain hostnames and +abbreviated IDs, resolves all of them and returns a resolved node +reference. + +In addition to C pairs allowed in resolved noderefs, the +following forms are supported: + +=over 4 + +=item the empty string + +An empty-string component gets resolved as if the default port (4040) was +specified. + +=item naked port numbers (e.g. C<1234>) + +These are resolved by prepending the local nodename and a colon, to be +further resolved. + +=item hostnames (e.g. C, C) + +These are resolved by using AnyEvent::DNS to resolve them, optionally +looking up SRV records for the C port, if no port was +specified. + +=back + +=item $SELF + +Contains the current port id while executing C callbacks or C +blocks. - add_node $noderef - unless exists $NODE{$noderef}; +=item SELF, %SELF, @SELF... - $NODE{$noderef}->send ([$port, [@_]]); +Due to some quirks in how perl exports variables, it is impossible to +just export C<$SELF>, all the symbols called C are exported by this +module, but only C<$SELF> is currently used. + +=item snd $port, type => @data + +=item snd $port, @msg + +Send the given message to the given port ID, which can identify either +a local or a remote port, and must be a port ID. + +While the message can be about anything, it is highly recommended to use a +string as first element (a port ID, or some word that indicates a request +type etc.). + +The message data effectively becomes read-only after a call to this +function: modifying any argument is not allowed and can cause many +problems. + +The type of data you can transfer depends on the transport protocol: when +JSON is used, then only strings, numbers and arrays and hashes consisting +of those are allowed (no objects). When Storable is used, then anything +that Storable can serialise and deserialise is allowed, and for the local +node, anything can be passed. + +=item $local_port = port + +Create a new local port object and returns its port ID. Initially it has +no callbacks set and will throw an error when it receives messages. + +=item $local_port = port { my @msg = @_ } + +Creates a new local port, and returns its ID. Semantically the same as +creating a port and calling C on it. + +The block will be called for every message received on the port, with the +global variable C<$SELF> set to the port ID. Runtime errors will cause the +port to be Ced. The message will be passed as-is, no extra argument +(i.e. no port ID) will be passed to the callback. + +If you want to stop/destroy the port, simply C it: + + my $port = port { + my @msg = @_; + ... + kil $SELF; + }; + +=cut + +sub rcv($@); + +sub _kilme { + die "received message on port without callback"; } -sub _inject { - my ($port, $msg) = @{+shift}; +sub port(;&) { + my $id = "$UNIQ." . $ID++; + my $port = "$NODE#$id"; - $port = $PORT{$port} - or return; + rcv $port, shift || \&_kilme; - use Data::Dumper; - warn Dumper $msg; + $port } -sub normalise_noderef($) { - my ($noderef) = @_; +=item rcv $local_port, $callback->(@msg) - my $cv = AE::cv; - my @res; +Replaces the default callback on the specified port. There is no way to +remove the default callback: use C to disable it, or better +C the port when it is no longer needed. - $cv->begin (sub { - my %seen; - my @refs; - for (sort { $a->[0] <=> $b->[0] } @res) { - push @refs, $_->[1] unless $seen{$_->[1]}++ - } - shift->send (join ",", @refs); - }); +The global C<$SELF> (exported by this module) contains C<$port> while +executing the callback. Runtime errors during callback execution will +result in the port being Ced. + +The default callback received all messages not matched by a more specific +C match. + +=item rcv $local_port, tag => $callback->(@msg_without_tag), ... + +Register (or replace) callbacks to be called on messages starting with the +given tag on the given port (and return the port), or unregister it (when +C<$callback> is C<$undef> or missing). There can only be one callback +registered for each tag. + +The original message will be passed to the callback, after the first +element (the tag) has been removed. The callback will use the same +environment as the default callback (see above). + +Example: create a port and bind receivers on it in one go. + + my $port = rcv port, + msg1 => sub { ... }, + msg2 => sub { ... }, + ; + +Example: create a port, bind receivers and send it in a message elsewhere +in one go: + + snd $otherport, reply => + rcv port, + msg1 => sub { ... }, + ... + ; + +Example: temporarily register a rcv callback for a tag matching some port +(e.g. for a rpc reply) and unregister it after a message was received. + + rcv $port, $otherport => sub { + my @reply = @_; + + rcv $SELF, $otherport; + }; - $noderef = $DEFAULT_PORT unless length $noderef; +=cut + +sub rcv($@) { + my $port = shift; + my ($noderef, $portid) = split /#/, $port, 2; + + ($NODE{$noderef} || add_node $noderef) == $NODE{""} + or Carp::croak "$port: rcv can only be called on local ports, caught"; + + while (@_) { + if (ref $_[0]) { + if (my $self = $PORT_DATA{$portid}) { + "AnyEvent::MP::Port" eq ref $self + or Carp::croak "$port: rcv can only be called on message matching ports, caught"; + + $self->[2] = shift; + } else { + my $cb = shift; + $PORT{$portid} = sub { + local $SELF = $port; + eval { &$cb }; _self_die if $@; + }; + } + } elsif (defined $_[0]) { + my $self = $PORT_DATA{$portid} ||= do { + my $self = bless [$PORT{$port} || sub { }, { }, $port], "AnyEvent::MP::Port"; + + $PORT{$portid} = sub { + local $SELF = $port; + + if (my $cb = $self->[1]{$_[0]}) { + shift; + eval { &$cb }; _self_die if $@; + } else { + &{ $self->[0] }; + } + }; - my $idx; - for my $t (split /,/, $noderef) { - my $pri = ++$idx; - - #TODO: this should be outside normalise_noderef and in become_public - if ($t =~ /^\d*$/) { - my $nodename = (POSIX::uname)[1]; - - $cv->begin; - AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub { - for (@_) { - my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3]; - push @res, [ - $pri += 1e-5, - AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service - ]; - } - $cv->end; + $self }; -# my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename; -# -# for (@ipv4) { -# push @res, [ -# $pri, -# AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT, -# ]; -# } - } else { - my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT" - or Carp::croak "$t: unparsable transport descriptor"; + "AnyEvent::MP::Port" eq ref $self + or Carp::croak "$port: rcv can only be called on message matching ports, caught"; - $cv->begin; - AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub { - for (@_) { - my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3]; - push @res, [ - $pri += 1e-5, - AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service - ]; - } - $cv->end; + my ($tag, $cb) = splice @_, 0, 2; + + if (defined $cb) { + $self->[1]{$tag} = $cb; + } else { + delete $self->[1]{$tag}; } } } - $cv->end; + $port +} + +=item $closure = psub { BLOCK } + +Remembers C<$SELF> and creates a closure out of the BLOCK. When the +closure is executed, sets up the environment in the same way as in C +callbacks, i.e. runtime errors will cause the port to get Ced. + +This is useful when you register callbacks from C callbacks: + + rcv delayed_reply => sub { + my ($delay, @reply) = @_; + my $timer = AE::timer $delay, 0, psub { + snd @reply, $SELF; + }; + }; - $cv +=cut + +sub psub(&) { + my $cb = shift; + + my $port = $SELF + or Carp::croak "psub can only be called from within rcv or psub callbacks, not"; + + sub { + local $SELF = $port; + + if (wantarray) { + my @res = eval { &$cb }; + _self_die if $@; + @res + } else { + my $res = eval { &$cb }; + _self_die if $@; + $res + } + } } -sub become_public { - return if $PUBLIC; +=item $guard = mon $port, $cb->(@reason) + +=item $guard = mon $port, $rcvport + +=item $guard = mon $port + +=item $guard = mon $port, $rcvport, @msg + +Monitor the given port and do something when the port is killed or +messages to it were lost, and optionally return a guard that can be used +to stop monitoring again. + +C effectively guarantees that, in the absence of hardware failures, +that after starting the monitor, either all messages sent to the port +will arrive, or the monitoring action will be invoked after possible +message loss has been detected. No messages will be lost "in between" +(after the first lost message no further messages will be received by the +port). After the monitoring action was invoked, further messages might get +delivered again. + +In the first form (callback), the callback is simply called with any +number of C<@reason> elements (no @reason means that the port was deleted +"normally"). Note also that I<< the callback B never die >>, so use +C if unsure. - my $noderef = join ",", ref $_[0] ? @{+shift} : shift; - my @args = @_; +In the second form (another port given), the other port (C<$rcvport>) +will be C'ed with C<@reason>, iff a @reason was specified, i.e. on +"normal" kils nothing happens, while under all other conditions, the other +port is killed with the same reason. - $NODE = (normalise_noderef $noderef)->recv; +The third form (kill self) is the same as the second form, except that +C<$rvport> defaults to C<$SELF>. - my $self = new AnyEvent::MP::Node::Self noderef => $NODE; +In the last form (message), a message of the form C<@msg, @reason> will be +C. - $NODE{""} = $self; # empty string == local node +As a rule of thumb, monitoring requests should always monitor a port from +a local port (or callback). The reason is that kill messages might get +lost, just like any other message. Another less obvious reason is that +even monitoring requests can get lost (for exmaple, when the connection +to the other node goes down permanently). When monitoring a port locally +these problems do not exist. - for my $t (split /,/, $NODE) { - $NODE{$t} = $self; +Example: call a given callback when C<$port> is killed. - my ($host, $port) = AnyEvent::Socket::parse_hostport $t; + mon $port, sub { warn "port died because of <@_>\n" }; - $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port, - @args, - on_error => sub { - die "on_error<@_>\n";#d# - }, - on_connect => sub { - my ($tp) = @_; +Example: kill ourselves when C<$port> is killed abnormally. + + mon $port; + +Example: send us a restart message when another C<$port> is killed. + + mon $port, $self => "restart"; + +=cut + +sub mon { + my ($noderef, $port) = split /#/, shift, 2; - $NODE{$tp->{remote_id}} = $_[0]; - }, - sub { - my ($tp) = @_; + my $node = $NODE{$noderef} || add_node $noderef; - $NODE{"$tp->{peerhost}:$tp->{peerport}"} = $tp; - }, - ; + my $cb = @_ ? shift : $SELF || Carp::croak 'mon: called with one argument only, but $SELF not set,'; + + unless (ref $cb) { + if (@_) { + # send a kill info message + my (@msg) = ($cb, @_); + $cb = sub { snd @msg, @_ }; + } else { + # simply kill other port + my $port = $cb; + $cb = sub { kil $port, @_ if @_ }; + } + } + + $node->monitor ($port, $cb); + + defined wantarray + and AnyEvent::Util::guard { $node->unmonitor ($port, $cb) } +} + +=item $guard = mon_guard $port, $ref, $ref... + +Monitors the given C<$port> and keeps the passed references. When the port +is killed, the references will be freed. + +Optionally returns a guard that will stop the monitoring. + +This function is useful when you create e.g. timers or other watchers and +want to free them when the port gets killed: + + $port->rcv (start => sub { + my $timer; $timer = mon_guard $port, AE::timer 1, 1, sub { + undef $timer if 0.9 < rand; + }); + }); + +=cut + +sub mon_guard { + my ($port, @refs) = @_; + + #TODO: mon-less form? + + mon $port, sub { 0 && @refs } +} + +=item kil $port[, @reason] + +Kill the specified port with the given C<@reason>. + +If no C<@reason> is specified, then the port is killed "normally" (linked +ports will not be kileld, or even notified). + +Otherwise, linked ports get killed with the same reason (second form of +C, see below). + +Runtime errors while evaluating C callbacks or inside C blocks +will be reported as reason C<< die => $@ >>. + +Transport/communication errors are reported as C<< transport_error => +$message >>. + +=cut + +=item $port = spawn $node, $initfunc[, @initdata] + +Creates a port on the node C<$node> (which can also be a port ID, in which +case it's the node where that port resides). + +The port ID of the newly created port is return immediately, and it is +permissible to immediately start sending messages or monitor the port. + +After the port has been created, the init function is +called. This function must be a fully-qualified function name +(e.g. C). To specify a function in the main +program, use C<::name>. + +If the function doesn't exist, then the node tries to C +the package, then the package above the package and so on (e.g. +C, C, C) until the function +exists or it runs out of package names. + +The init function is then called with the newly-created port as context +object (C<$SELF>) and the C<@initdata> values as arguments. + +A common idiom is to pass your own port, monitor the spawned port, and +in the init function, monitor the original port. This two-way monitoring +ensures that both ports get cleaned up when there is a problem. + +Example: spawn a chat server port on C<$othernode>. + + # this node, executed from within a port context: + my $server = spawn $othernode, "MyApp::Chat::Server::connect", $SELF; + mon $server; + + # init function on C<$othernode> + sub connect { + my ($srcport) = @_; + + mon $srcport; + + rcv $SELF, sub { + ... + }; } - $PUBLIC = 1; +=cut + +sub _spawn { + my $port = shift; + my $init = shift; + + local $SELF = "$NODE#$port"; + eval { + &{ load_func $init } + }; + _self_die if $@; } +sub spawn(@) { + my ($noderef, undef) = split /#/, shift, 2; + + my $id = "$RUNIQ." . $ID++; + + $_[0] =~ /::/ + or Carp::croak "spawn init function must be a fully-qualified name, caught"; + + ($NODE{$noderef} || add_node $noderef) + ->send (["", "AnyEvent::MP::_spawn" => $id, @_]); + + "$noderef#$id" +} + +=back + +=head1 NODE MESSAGES + +Nodes understand the following messages sent to them. Many of them take +arguments called C<@reply>, which will simply be used to compose a reply +message - C<$reply[0]> is the port to reply to, C<$reply[1]> the type and +the remaining arguments are simply the message data. + +While other messages exist, they are not public and subject to change. + +=over 4 + +=cut + +=item lookup => $name, @reply + +Replies with the port ID of the specified well-known port, or C. + +=item devnull => ... + +Generic data sink/CPU heat conversion. + +=item relay => $port, @msg + +Simply forwards the message to the given port. + +=item eval => $string[ @reply] + +Evaluates the given string. If C<@reply> is given, then a message of the +form C<@reply, $@, @evalres> is sent. + +Example: crash another node. + + snd $othernode, eval => "exit"; + +=item time => @reply + +Replies the the current node time to C<@reply>. + +Example: tell the current node to send the current time to C<$myport> in a +C message. + + snd $NODE, time => $myport, timereply => 1, 2; + # => snd $myport, timereply => 1, 2,