--- AnyEvent-MP/MP.pm 2009/08/08 00:22:16 1.40 +++ AnyEvent-MP/MP.pm 2009/08/15 04:34:34 1.57 @@ -12,6 +12,11 @@ $SELF # receiving/own port id in rcv callbacks + # initialise the node so it can send/receive messages + initialise_node; # -OR- + initialise_node "localhost:4040"; # -OR- + initialise_node "slave/", "localhost:4040" + # ports are message endpoints # sending messages @@ -19,27 +24,31 @@ snd $port, @msg; snd @msg_with_first_element_being_a_port; - # miniports - my $miniport = port { my @msg = @_; 0 }; + # creating/using ports, the simple way + my $simple_port = port { my @msg = @_; 0 }; - # full ports + # creating/using ports, tagged message matching my $port = port; - rcv $port, smartmatch => $cb->(@msg); rcv $port, ping => sub { snd $_[0], "pong"; 0 }; rcv $port, pong => sub { warn "pong received\n"; 0 }; - # remote ports + # create a port on another node my $port = spawn $node, $initfunc, @initdata; - # more, smarter, matches (_any_ is exported by this module) - rcv $port, [child_died => $pid] => sub { ... - rcv $port, [_any_, _any_, 3] => sub { .. $_[2] is 3 - # monitoring mon $port, $cb->(@msg) # callback is invoked on death mon $port, $otherport # kill otherport on abnormal death mon $port, $otherport, @msg # send message on death +=head1 CURRENT STATUS + + AnyEvent::MP - stable API, should work + AnyEvent::MP::Intro - outdated + AnyEvent::MP::Kernel - WIP + AnyEvent::MP::Transport - mostly stable + + stay tuned. + =head1 DESCRIPTION This module (-family) implements a simple message passing framework. @@ -52,7 +61,7 @@ At the moment, this module family is severly broken and underdocumented, so do not use. This was uploaded mainly to reserve the CPAN namespace - -stay tuned! The basic API should be finished, however. +stay tuned! =head1 CONCEPTS @@ -62,22 +71,21 @@ A port is something you can send messages to (with the C function). -Some ports allow you to register C handlers that can match specific -messages. All C handlers will receive messages they match, messages -will not be queued. +Ports allow you to register C handlers that can match all or just +some messages. Messages will not be queued. =item port id - C -A port id is normaly the concatenation of a noderef, a hash-mark (C<#>) as +A port ID is the concatenation of a noderef, a hash-mark (C<#>) as separator, and a port name (a printable string of unspecified format). An exception is the the node port, whose ID is identical to its node reference. =item node -A node is a single process containing at least one port - the node -port. You can send messages to node ports to find existing ports or to -create new ports, among other things. +A node is a single process containing at least one port - the node port, +which provides nodes to manage each other remotely, and to create new +ports. Nodes are either private (single-process only), slaves (connected to a master node only) or public nodes (connectable from unrelated nodes). @@ -107,7 +115,7 @@ package AnyEvent::MP; -use AnyEvent::MP::Base; +use AnyEvent::MP::Kernel; use common::sense; @@ -117,7 +125,8 @@ use base "Exporter"; -our $VERSION = '0.1'; +our $VERSION = $AnyEvent::MP::Kernel::VERSION; + our @EXPORT = qw( NODE $NODE *SELF node_of _any_ resolve_node initialise_node @@ -135,14 +144,13 @@ =item $thisnode = NODE / $NODE -The C function returns, and the C<$NODE> variable contains -the noderef of the local node. The value is initialised by a call -to C or C, after which all local port -identifiers become invalid. +The C function returns, and the C<$NODE> variable contains the +noderef of the local node. The value is initialised by a call to +C. =item $noderef = node_of $port -Extracts and returns the noderef from a portid or a noderef. +Extracts and returns the noderef from a port ID or a noderef. =item initialise_node $noderef, $seednode, $seednode... @@ -155,7 +163,14 @@ This function initialises a node - it must be called exactly once (or never) before calling other AnyEvent::MP functions. -All arguments are noderefs, which can be either resolved or unresolved. +All arguments (optionally except for the first) are noderefs, which can be +either resolved or unresolved. + +The first argument will be looked up in the configuration database first +(if it is C then the current nodename will be used instead) to find +the relevant configuration profile (see L). If none is found then +the default configuration is used. The configuration supplies additional +seed/master nodes and can override the actual noderef. There are two types of networked nodes, public nodes and slave nodes: @@ -163,23 +178,31 @@ =item public nodes -For public nodes, C<$noderef> must either be a (possibly unresolved) -noderef, in which case it will be resolved, or C (or missing), in -which case the noderef will be guessed. - -Afterwards, the node will bind itself on all endpoints and try to connect -to all additional C<$seednodes> that are specified. Seednodes are optional -and can be used to quickly bootstrap the node into an existing network. +For public nodes, C<$noderef> (supplied either directly to +C or indirectly via a profile or the nodename) must be a +noderef (possibly unresolved, in which case it will be resolved). + +After resolving, the node will bind itself on all endpoints and try to +connect to all additional C<$seednodes> that are specified. Seednodes are +optional and can be used to quickly bootstrap the node into an existing +network. =item slave nodes -When the C<$noderef> is the special string C, then the node will -become a slave node. Slave nodes cannot be contacted from outside and will -route most of their traffic to the master node that they attach to. - -At least one additional noderef is required: The node will try to connect -to all of them and will become a slave attached to the first node it can -successfully connect to. +When the C<$noderef> (either as given or overriden by the config file) +is the special string C, then the node will become a slave +node. Slave nodes cannot be contacted from outside and will route most of +their traffic to the master node that they attach to. + +At least one additional noderef is required (either by specifying it +directly or because it is part of the configuration profile): The node +will try to connect to all of them and will become a slave attached to the +first node it can successfully connect to. + +Note that slave nodes cannot change their name, and consequently, their +master, so if the master goes down, the slave node will not function well +anymore until it can re-establish conenciton to its master. This makes +slave nodes unsuitable for long-term nodes or fault-tolerant networks. =back @@ -187,10 +210,25 @@ nodes, until it has successfully established a connection to a master server. -Example: become a public node listening on the default node. +All the seednodes will also be specially marked to automatically retry +connecting to them infinitely. + +Example: become a public node listening on the guessed noderef, or the one +specified via C for the current node. This should be the most common +form of invocation for "daemon"-type nodes. initialise_node; +Example: become a slave node to any of the the seednodes specified via +C. This form is often used for commandline clients. + + initialise_node "slave/"; + +Example: become a slave node to any of the specified master servers. This +form is also often used for commandline clients. + + initialise_node "slave/", "master1", "192.168.13.17", "mp.example.net"; + Example: become a public node, and try to contact some well-known master servers to become part of the network. @@ -202,11 +240,7 @@ Example: become a public node, only visible on localhost port 4044. - initialise_node "locahost:4044"; - -Example: become a slave node to any of the specified master servers. - - initialise_node "slave/", "master1", "192.168.13.17", "mp.example.net"; + initialise_node "localhost:4044"; =item $cv = resolve_node $noderef @@ -253,11 +287,10 @@ =item snd $port, @msg Send the given message to the given port ID, which can identify either -a local or a remote port, and can be either a string or soemthignt hat -stringifies a sa port ID (such as a port object :). +a local or a remote port, and must be a port ID. While the message can be about anything, it is highly recommended to use a -string as first element (a portid, or some word that indicates a request +string as first element (a port ID, or some word that indicates a request type etc.). The message data effectively becomes read-only after a call to this @@ -272,105 +305,73 @@ =item $local_port = port -Create a new local port object that can be used either as a pattern -matching port ("full port") or a single-callback port ("miniport"), -depending on how C callbacks are bound to the object. - -=item $port = port { my @msg = @_; $finished } +Create a new local port object and returns its port ID. Initially it has +no callbacks set and will throw an error when it receives messages. -Creates a "miniport", that is, a very lightweight port without any pattern -matching behind it, and returns its ID. Semantically the same as creating -a port and calling C on it. +=item $local_port = port { my @msg = @_ } -The block will be called for every message received on the port. When the -callback returns a true value its job is considered "done" and the port -will be destroyed. Otherwise it will stay alive. +Creates a new local port, and returns its ID. Semantically the same as +creating a port and calling C on it. -The message will be passed as-is, no extra argument (i.e. no port id) will -be passed to the callback. +The block will be called for every message received on the port, with the +global variable C<$SELF> set to the port ID. Runtime errors will cause the +port to be Ced. The message will be passed as-is, no extra argument +(i.e. no port ID) will be passed to the callback. -If you need the local port id in the callback, this works nicely: +If you want to stop/destroy the port, simply C it: - my $port; $port = port { - snd $otherport, reply => $port; + my $port = port { + my @msg = @_; + ... + kil $SELF; }; =cut sub rcv($@); +sub _kilme { + die "received message on port without callback"; +} + sub port(;&) { my $id = "$UNIQ." . $ID++; my $port = "$NODE#$id"; - if (@_) { - rcv $port, shift; - } else { - $PORT{$id} = sub { }; # nop - } + rcv $port, shift || \&_kilme; $port } -=item reg $port, $name - -=item reg $name - -Registers the given port (or C<$SELF><<< if missing) under the name -C<$name>. If the name already exists it is replaced. +=item rcv $local_port, $callback->(@msg) -A port can only be registered under one well known name. - -A port automatically becomes unregistered when it is killed. - -=cut - -sub reg(@) { - my $port = @_ > 1 ? shift : $SELF || Carp::croak 'reg: called with one argument only, but $SELF not set,'; - - $REG{$_[0]} = $port; -} - -=item rcv $port, $callback->(@msg) - -Replaces the callback on the specified miniport (after converting it to -one if required). - -=item rcv $port, tagstring => $callback->(@msg), ... - -=item rcv $port, $smartmatch => $callback->(@msg), ... - -=item rcv $port, [$smartmatch...] => $callback->(@msg), ... - -Register callbacks to be called on matching messages on the given full -port (after converting it to one if required) and return the port. - -The callback has to return a true value when its work is done, after -which is will be removed, or a false value in which case it will stay -registered. +Replaces the default callback on the specified port. There is no way to +remove the default callback: use C to disable it, or better +C the port when it is no longer needed. The global C<$SELF> (exported by this module) contains C<$port> while -executing the callback. - -Runtime errors during callback execution will result in the port being -Ced. +executing the callback. Runtime errors during callback execution will +result in the port being Ced. -If the match is an array reference, then it will be matched against the -first elements of the message, otherwise only the first element is being -matched. +The default callback received all messages not matched by a more specific +C match. -Any element in the match that is specified as C<_any_> (a function -exported by this module) matches any single element of the message. +=item rcv $local_port, tag => $callback->(@msg_without_tag), ... -While not required, it is highly recommended that the first matching -element is a string identifying the message. The one-string-only match is -also the most efficient match (by far). +Register (or replace) callbacks to be called on messages starting with the +given tag on the given port (and return the port), or unregister it (when +C<$callback> is C<$undef> or missing). There can only be one callback +registered for each tag. + +The original message will be passed to the callback, after the first +element (the tag) has been removed. The callback will use the same +environment as the default callback (see above). Example: create a port and bind receivers on it in one go. my $port = rcv port, - msg1 => sub { ...; 0 }, - msg2 => sub { ...; 0 }, + msg1 => sub { ... }, + msg2 => sub { ... }, ; Example: create a port, bind receivers and send it in a message elsewhere @@ -378,10 +379,19 @@ snd $otherport, reply => rcv port, - msg1 => sub { ...; 0 }, + msg1 => sub { ... }, ... ; +Example: temporarily register a rcv callback for a tag matching some port +(e.g. for a rpc reply) and unregister it after a message was received. + + rcv $port, $otherport => sub { + my @reply = @_; + + rcv $SELF, $otherport; + }; + =cut sub rcv($@) { @@ -391,65 +401,47 @@ ($NODE{$noderef} || add_node $noderef) == $NODE{""} or Carp::croak "$port: rcv can only be called on local ports, caught"; - if (@_ == 1) { - my $cb = shift; - delete $PORT_DATA{$portid}; - $PORT{$portid} = sub { - local $SELF = $port; - eval { - &$cb - and kil $port; - }; - _self_die if $@; - }; - } else { - my $self = $PORT_DATA{$portid} ||= do { - my $self = bless { - id => $port, - }, "AnyEvent::MP::Port"; - - $PORT{$portid} = sub { - local $SELF = $port; - - eval { - for (@{ $self->{rc0}{$_[0]} }) { - $_ && &{$_->[0]} - && undef $_; - } - - for (@{ $self->{rcv}{$_[0]} }) { - $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1] - && &{$_->[0]} - && undef $_; - } + while (@_) { + if (ref $_[0]) { + if (my $self = $PORT_DATA{$portid}) { + "AnyEvent::MP::Port" eq ref $self + or Carp::croak "$port: rcv can only be called on message matching ports, caught"; - for (@{ $self->{any} }) { - $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1] - && &{$_->[0]} - && undef $_; + $self->[2] = shift; + } else { + my $cb = shift; + $PORT{$portid} = sub { + local $SELF = $port; + eval { &$cb }; _self_die if $@; + }; + } + } elsif (defined $_[0]) { + my $self = $PORT_DATA{$portid} ||= do { + my $self = bless [$PORT{$port} || sub { }, { }, $port], "AnyEvent::MP::Port"; + + $PORT{$portid} = sub { + local $SELF = $port; + + if (my $cb = $self->[1]{$_[0]}) { + shift; + eval { &$cb }; _self_die if $@; + } else { + &{ $self->[0] }; } }; - _self_die if $@; - }; - $self - }; + $self + }; - "AnyEvent::MP::Port" eq ref $self - or Carp::croak "$port: rcv can only be called on message matching ports, caught"; + "AnyEvent::MP::Port" eq ref $self + or Carp::croak "$port: rcv can only be called on message matching ports, caught"; - while (@_) { - my ($match, $cb) = splice @_, 0, 2; + my ($tag, $cb) = splice @_, 0, 2; - if (!ref $match) { - push @{ $self->{rc0}{$match} }, [$cb]; - } elsif (("ARRAY" eq ref $match && !ref $match->[0])) { - my ($type, @match) = @$match; - @match - ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match] - : push @{ $self->{rc0}{$match->[0]} }, [$cb]; + if (defined $cb) { + $self->[1]{$tag} = $cb; } else { - push @{ $self->{any} }, [$cb, $match]; + delete $self->[1]{$tag}; } } } @@ -503,15 +495,24 @@ =item $guard = mon $port, $rcvport, @msg -Monitor the given port and do something when the port is killed, and -optionally return a guard that can be used to stop monitoring again. +Monitor the given port and do something when the port is killed or +messages to it were lost, and optionally return a guard that can be used +to stop monitoring again. + +C effectively guarantees that, in the absence of hardware failures, +that after starting the monitor, either all messages sent to the port +will arrive, or the monitoring action will be invoked after possible +message loss has been detected. No messages will be lost "in between" +(after the first lost message no further messages will be received by the +port). After the monitoring action was invoked, further messages might get +delivered again. In the first form (callback), the callback is simply called with any number of C<@reason> elements (no @reason means that the port was deleted "normally"). Note also that I<< the callback B never die >>, so use C if unsure. -In the second form (another port given), the other port (C<$rcvport) +In the second form (another port given), the other port (C<$rcvport>) will be C'ed with C<@reason>, iff a @reason was specified, i.e. on "normal" kils nothing happens, while under all other conditions, the other port is killed with the same reason. @@ -548,12 +549,12 @@ my $node = $NODE{$noderef} || add_node $noderef; - my $cb = @_ ? $_[0] : $SELF || Carp::croak 'mon: called with one argument only, but $SELF not set,'; + my $cb = @_ ? shift : $SELF || Carp::croak 'mon: called with one argument only, but $SELF not set,'; unless (ref $cb) { if (@_) { # send a kill info message - my (@msg) = @_; + my (@msg) = ($cb, @_); $cb = sub { snd @msg, @_ }; } else { # simply kill other port @@ -675,60 +676,13 @@ $_[0] =~ /::/ or Carp::croak "spawn init function must be a fully-qualified name, caught"; - ($NODE{$noderef} || add_node $noderef) - ->send (["", "AnyEvent::MP::_spawn" => $id, @_]); + snd_to_func $noderef, "AnyEvent::MP::_spawn" => $id, @_; "$noderef#$id" } =back -=head1 NODE MESSAGES - -Nodes understand the following messages sent to them. Many of them take -arguments called C<@reply>, which will simply be used to compose a reply -message - C<$reply[0]> is the port to reply to, C<$reply[1]> the type and -the remaining arguments are simply the message data. - -While other messages exist, they are not public and subject to change. - -=over 4 - -=cut - -=item lookup => $name, @reply - -Replies with the port ID of the specified well-known port, or C. - -=item devnull => ... - -Generic data sink/CPU heat conversion. - -=item relay => $port, @msg - -Simply forwards the message to the given port. - -=item eval => $string[ @reply] - -Evaluates the given string. If C<@reply> is given, then a message of the -form C<@reply, $@, @evalres> is sent. - -Example: crash another node. - - snd $othernode, eval => "exit"; - -=item time => @reply - -Replies the the current node time to C<@reply>. - -Example: tell the current node to send the current time to C<$myport> in a -C message. - - snd $NODE, time => $myport, timereply => 1, 2; - # => snd $myport, timereply => 1, 2,