--- AnyEvent-MP/MP.pm 2009/08/04 22:16:54 1.28 +++ AnyEvent-MP/MP.pm 2009/08/30 19:52:56 1.71 @@ -10,34 +10,55 @@ NODE # returns this node's noderef NODE $port # returns the noderef of the port + $SELF # receiving/own port id in rcv callbacks + + # initialise the node so it can send/receive messages + initialise_node; + + # ports are message endpoints + + # sending messages snd $port, type => data...; + snd $port, @msg; + snd @msg_with_first_element_being_a_port; - $SELF # receiving/own port id in rcv callbacks + # creating/using ports, the simple way + my $simple_port = port { my @msg = @_; 0 }; - rcv $port, smartmatch => $cb->($port, @msg); + # creating/using ports, tagged message matching + my $port = port; + rcv $port, ping => sub { snd $_[0], "pong"; 0 }; + rcv $port, pong => sub { warn "pong received\n"; 0 }; + + # create a port on another node + my $port = spawn $node, $initfunc, @initdata; + + # monitoring + mon $port, $cb->(@msg) # callback is invoked on death + mon $port, $otherport # kill otherport on abnormal death + mon $port, $otherport, @msg # send message on death + +=head1 CURRENT STATUS + + bin/aemp - stable. + AnyEvent::MP - stable API, should work. + AnyEvent::MP::Intro - uptodate, but incomplete. + AnyEvent::MP::Kernel - mostly stable. + AnyEvent::MP::Global - stable API, protocol not yet final. - # examples: - rcv $port2, ping => sub { snd $_[0], "pong"; 0 }; - rcv $port1, pong => sub { warn "pong received\n" }; - snd $port2, ping => $port1; - - # more, smarter, matches (_any_ is exported by this module) - rcv $port, [child_died => $pid] => sub { ... - rcv $port, [_any_, _any_, 3] => sub { .. $_[2] is 3 + stay tuned. =head1 DESCRIPTION This module (-family) implements a simple message passing framework. Despite its simplicity, you can securely message other processes running -on the same or other hosts. +on the same or other hosts, and you can supervise entities remotely. For an introduction to this module family, see the L -manual page. +manual page and the examples under F. -At the moment, this module family is severly broken and underdocumented, -so do not use. This was uploaded mainly to reserve the CPAN namespace - -stay tuned! The basic API should be finished, however. +At the moment, this module family is a bit underdocumented. =head1 CONCEPTS @@ -45,30 +66,55 @@ =item port -A port is something you can send messages to with the C function, and -you can register C handlers with. All C handlers will receive -messages they match, messages will not be queued. +A port is something you can send messages to (with the C function). + +Ports allow you to register C handlers that can match all or just +some messages. Messages send to ports will not be queued, regardless of +anything was listening for them or not. -=item port id - C +=item port ID - C -A port id is always the noderef, a hash-mark (C<#>) as separator, followed -by a port name (a printable string of unspecified format). +A port ID is the concatenation of a node ID, a hash-mark (C<#>) as +separator, and a port name (a printable string of unspecified format). =item node -A node is a single process containing at least one port - the node -port. You can send messages to node ports to let them create new ports, -among other things. - -Initially, nodes are either private (single-process only) or hidden -(connected to a master node only). Only when they epxlicitly "become -public" can you send them messages from unrelated other nodes. - -=item noderef - C, C, C - -A noderef is a string that either uniquely identifies a given node (for -private and hidden nodes), or contains a recipe on how to reach a given -node (for public nodes). +A node is a single process containing at least one port - the node port, +which enables nodes to manage each other remotely, and to create new +ports. + +Nodes are either public (have one or more listening ports) or private +(no listening ports). Private nodes cannot talk to other private nodes +currently. + +=item node ID - C<[a-za-Z0-9_\-.:]+> + +A node ID is a string that uniquely identifies the node within a +network. Depending on the configuration used, node IDs can look like a +hostname, a hostname and a port, or a random string. AnyEvent::MP itself +doesn't interpret node IDs in any way. + +=item binds - C + +Nodes can only talk to each other by creating some kind of connection to +each other. To do this, nodes should listen on one or more local transport +endpoints - binds. Currently, only standard C specifications can +be used, which specify TCP ports to listen on. + +=item seeds - C + +When a node starts, it knows nothing about the network. To teach the node +about the network it first has to contact some other node within the +network. This node is called a seed. + +Seeds are transport endpoint(s) of as many nodes as one wants. Those nodes +are expected to be long-running, and at least one of those should always +be available. When nodes run out of connections (e.g. due to a network +error), they try to re-establish connections to some seednodes again to +join the network. + +Apart from being sued for seeding, seednodes are not special in any way - +every public node can be a seednode. =back @@ -80,7 +126,7 @@ package AnyEvent::MP; -use AnyEvent::MP::Base; +use AnyEvent::MP::Kernel; use common::sense; @@ -90,11 +136,12 @@ use base "Exporter"; -our $VERSION = '0.1'; +our $VERSION = $AnyEvent::MP::Kernel::VERSION; + our @EXPORT = qw( - NODE $NODE *SELF node_of _any_ - become_slave become_public - snd rcv mon kil reg psub + NODE $NODE *SELF node_of after + initialise_node + snd rcv mon mon_guard kil reg psub spawn port ); @@ -108,301 +155,252 @@ =item $thisnode = NODE / $NODE -The C function returns, and the C<$NODE> variable contains -the noderef of the local node. The value is initialised by a call -to C or C, after which all local port -identifiers become invalid. +The C function returns, and the C<$NODE> variable contains, the node +ID of the node running in the current process. This value is initialised by +a call to C. -=item $noderef = node_of $portid +=item $nodeid = node_of $port -Extracts and returns the noderef from a portid or a noderef. +Extracts and returns the node ID from a port ID or a node ID. -=item $SELF +=item initialise_node $profile_name, key => value... -Contains the current port id while executing C callbacks or C -blocks. +Before a node can talk to other nodes on the network (i.e. enter +"distributed mode") it has to initialise itself - the minimum a node needs +to know is its own name, and optionally it should know the addresses of +some other nodes in the network to discover other nodes. -=item SELF, %SELF, @SELF... +This function initialises a node - it must be called exactly once (or +never) before calling other AnyEvent::MP functions. -Due to some quirks in how perl exports variables, it is impossible to -just export C<$SELF>, all the symbols called C are exported by this -module, but only C<$SELF> is currently used. +The first argument is a profile name. If it is C or missing, then +the current nodename will be used instead (i.e. F). -=item snd $portid, type => @data +The function first looks up the profile in the aemp configuration (see the +L commandline utility). the profile is calculated as follows: -=item snd $portid, @msg +First, all remaining key => value pairs (all of which are conviniently +undocumented at the moment) will be used. Then they will be overwritten by +any values specified in the global default configuration (see the F +utility), then the chain of profiles selected, if any. That means that +the values specified in the profile have highest priority and the values +specified via C have lowest priority. -Send the given message to the given port ID, which can identify either -a local or a remote port, and can be either a string or soemthignt hat -stringifies a sa port ID (such as a port object :). +If the profile specifies a node ID, then this will become the node ID of +this process. If not, then the profile name will be used as node ID. The +special node ID of C will be replaced by a random node ID. -While the message can be about anything, it is highly recommended to use a -string as first element (a portid, or some word that indicates a request -type etc.). +The next step is to look up the binds in the profile, followed by binding +aemp protocol listeners on all binds specified (it is possible and valid +to have no binds, meaning that the node cannot be contacted form the +outside. This means the node cannot talk to other nodes that also have no +binds, but it can still talk to all "normal" nodes). -The message data effectively becomes read-only after a call to this -function: modifying any argument is not allowed and can cause many -problems. +If the profile does not specify a binds list, then a default of C<*> is +used. -The type of data you can transfer depends on the transport protocol: when -JSON is used, then only strings, numbers and arrays and hashes consisting -of those are allowed (no objects). When Storable is used, then anything -that Storable can serialise and deserialise is allowed, and for the local -node, anything can be passed. - -=item kil $portid[, @reason] - -Kill the specified port with the given C<@reason>. - -If no C<@reason> is specified, then the port is killed "normally" (linked -ports will not be kileld, or even notified). - -Otherwise, linked ports get killed with the same reason (second form of -C, see below). +Lastly, the seeds list from the profile is passed to the +L module, which will then use it to keep +connectivity with at least on of those seed nodes at any point in time. -Runtime errors while evaluating C callbacks or inside C blocks -will be reported as reason C<< die => $@ >>. +Example: become a distributed node listening on the guessed noderef, or +the one specified via C for the current node. This should be the +most common form of invocation for "daemon"-type nodes. -Transport/communication errors are reported as C<< transport_error => -$message >>. + initialise_node; -=item $guard = mon $portid, $cb->(@reason) +Example: become an anonymous node. This form is often used for commandline +clients. -=item $guard = mon $portid, $otherport + initialise_node "anon/"; -=item $guard = mon $portid, $otherport, @msg +Example: become a distributed node. If there is no profile of the given +name, or no binds list was specified, resolve C and bind +on the resulting addresses. -Monitor the given port and do something when the port is killed. + initialise_node "localhost:4044"; -In the first form, the callback is simply called with any number -of C<@reason> elements (no @reason means that the port was deleted -"normally"). Note also that I<< the callback B never die >>, so use -C if unsure. - -In the second form, the other port will be C'ed with C<@reason>, iff -a @reason was specified, i.e. on "normal" kils nothing happens, while -under all other conditions, the other port is killed with the same reason. - -In the last form, a message of the form C<@msg, @reason> will be C. - -Example: call a given callback when C<$port> is killed. - - mon $port, sub { warn "port died because of <@_>\n" }; +=item $SELF -Example: kill ourselves when C<$port> is killed abnormally. +Contains the current port id while executing C callbacks or C +blocks. - mon $port, $self; +=item *SELF, SELF, %SELF, @SELF... -Example: send us a restart message another C<$port> is killed. +Due to some quirks in how perl exports variables, it is impossible to +just export C<$SELF>, all the symbols named C are exported by this +module, but only C<$SELF> is currently used. - mon $port, $self => "restart"; +=item snd $port, type => @data -=cut +=item snd $port, @msg -sub mon { - my ($noderef, $port, $cb) = ((split /#/, shift, 2), shift); +Send the given message to the given port, which can identify either a +local or a remote port, and must be a port ID. - my $node = $NODE{$noderef} || add_node $noderef; +While the message can be almost anything, it is highly recommended to +use a string as first element (a port ID, or some word that indicates a +request type etc.) and to consist if only simple perl values (scalars, +arrays, hashes) - if you think you need to pass an object, think again. + +The message data logically becomes read-only after a call to this +function: modifying any argument (or values referenced by them) is +forbidden, as there can be considerable time between the call to C +and the time the message is actually being serialised - in fact, it might +never be copied as within the same process it is simply handed to the +receiving port. - #TODO: ports must not be references - if (!ref $cb or "AnyEvent::MP::Port" eq ref $cb) { - if (@_) { - # send a kill info message - my (@msg) = ($cb, @_); - $cb = sub { snd @msg, @_ }; - } else { - # simply kill other port - my $port = $cb; - $cb = sub { kil $port, @_ if @_ }; - } - } +The type of data you can transfer depends on the transport protocol: when +JSON is used, then only strings, numbers and arrays and hashes consisting +of those are allowed (no objects). When Storable is used, then anything +that Storable can serialise and deserialise is allowed, and for the local +node, anything can be passed. Best rely only on the common denominator of +these. - $node->monitor ($port, $cb); +=item $local_port = port - defined wantarray - and AnyEvent::Util::guard { $node->unmonitor ($port, $cb) } -} +Create a new local port object and returns its port ID. Initially it has +no callbacks set and will throw an error when it receives messages. -=item $guard = mon_guard $port, $ref, $ref... +=item $local_port = port { my @msg = @_ } -Monitors the given C<$port> and keeps the passed references. When the port -is killed, the references will be freed. +Creates a new local port, and returns its ID. Semantically the same as +creating a port and calling C on it. -Optionally returns a guard that will stop the monitoring. +The block will be called for every message received on the port, with the +global variable C<$SELF> set to the port ID. Runtime errors will cause the +port to be Ced. The message will be passed as-is, no extra argument +(i.e. no port ID) will be passed to the callback. -This function is useful when you create e.g. timers or other watchers and -want to free them when the port gets killed: +If you want to stop/destroy the port, simply C it: - $port->rcv (start => sub { - my $timer; $timer = mon_guard $port, AE::timer 1, 1, sub { - undef $timer if 0.9 < rand; - }); - }); + my $port = port { + my @msg = @_; + ... + kil $SELF; + }; =cut -sub mon_guard { - my ($port, @refs) = @_; +sub rcv($@); - mon $port, sub { 0 && @refs } +sub _kilme { + die "received message on port without callback"; } -=item lnk $port1, $port2 - -Link two ports. This is simply a shorthand for: - - mon $port1, $port2; - mon $port2, $port1; - -It means that if either one is killed abnormally, the other one gets -killed as well. - -=item $local_port = port - -Create a new local port object that supports message matching. - -=item $portid = port { my @msg = @_; $finished } - -Creates a "mini port", that is, a very lightweight port without any -pattern matching behind it, and returns its ID. - -The block will be called for every message received on the port. When the -callback returns a true value its job is considered "done" and the port -will be destroyed. Otherwise it will stay alive. - -The message will be passed as-is, no extra argument (i.e. no port id) will -be passed to the callback. - -If you need the local port id in the callback, this works nicely: - - my $port; $port = miniport { - snd $otherport, reply => $port; - }; - -=cut - sub port(;&) { my $id = "$UNIQ." . $ID++; my $port = "$NODE#$id"; - if (@_) { - my $cb = shift; - $PORT{$id} = sub { - local $SELF = $port; - eval { - &$cb - and kil $id; - }; - _self_die if $@; - }; - } else { - my $self = bless { - id => "$NODE#$id", - }, "AnyEvent::MP::Port"; - - $PORT_DATA{$id} = $self; - $PORT{$id} = sub { - local $SELF = $port; - - eval { - for (@{ $self->{rc0}{$_[0]} }) { - $_ && &{$_->[0]} - && undef $_; - } - - for (@{ $self->{rcv}{$_[0]} }) { - $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1] - && &{$_->[0]} - && undef $_; - } - - for (@{ $self->{any} }) { - $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1] - && &{$_->[0]} - && undef $_; - } - }; - _self_die if $@; - }; - } + rcv $port, shift || \&_kilme; $port } -=item reg $portid, $name - -Registers the given port under the name C<$name>. If the name already -exists it is replaced. +=item rcv $local_port, $callback->(@msg) -A port can only be registered under one well known name. +Replaces the default callback on the specified port. There is no way to +remove the default callback: use C to disable it, or better +C the port when it is no longer needed. -A port automatically becomes unregistered when it is killed. +The global C<$SELF> (exported by this module) contains C<$port> while +executing the callback. Runtime errors during callback execution will +result in the port being Ced. -=cut +The default callback received all messages not matched by a more specific +C match. -sub reg(@) { - my ($portid, $name) = @_; +=item rcv $local_port, tag => $callback->(@msg_without_tag), ... - $REG{$name} = $portid; -} +Register (or replace) callbacks to be called on messages starting with the +given tag on the given port (and return the port), or unregister it (when +C<$callback> is C<$undef> or missing). There can only be one callback +registered for each tag. -=item rcv $portid, tagstring => $callback->(@msg), ... +The original message will be passed to the callback, after the first +element (the tag) has been removed. The callback will use the same +environment as the default callback (see above). -=item rcv $portid, $smartmatch => $callback->(@msg), ... +Example: create a port and bind receivers on it in one go. -=item rcv $portid, [$smartmatch...] => $callback->(@msg), ... + my $port = rcv port, + msg1 => sub { ... }, + msg2 => sub { ... }, + ; -Register callbacks to be called on matching messages on the given port. +Example: create a port, bind receivers and send it in a message elsewhere +in one go: -The callback has to return a true value when its work is done, after -which is will be removed, or a false value in which case it will stay -registered. + snd $otherport, reply => + rcv port, + msg1 => sub { ... }, + ... + ; -The global C<$SELF> (exported by this module) contains C<$portid> while -executing the callback. +Example: temporarily register a rcv callback for a tag matching some port +(e.g. for a rpc reply) and unregister it after a message was received. -Runtime errors wdurign callback execution will result in the port being -Ced. + rcv $port, $otherport => sub { + my @reply = @_; -If the match is an array reference, then it will be matched against the -first elements of the message, otherwise only the first element is being -matched. - -Any element in the match that is specified as C<_any_> (a function -exported by this module) matches any single element of the message. - -While not required, it is highly recommended that the first matching -element is a string identifying the message. The one-string-only match is -also the most efficient match (by far). + rcv $SELF, $otherport; + }; =cut sub rcv($@) { - my ($noderef, $port) = split /#/, shift, 2; + my $port = shift; + my ($noderef, $portid) = split /#/, $port, 2; - ($NODE{$noderef} || add_node $noderef) == $NODE{""} - or Carp::croak "$noderef#$port: rcv can only be called on local ports, caught"; + $NODE{$noderef} == $NODE{""} + or Carp::croak "$port: rcv can only be called on local ports, caught"; - my $self = $PORT_DATA{$port} - or Carp::croak "$noderef#$port: rcv can only be called on message matching ports, caught"; + while (@_) { + if (ref $_[0]) { + if (my $self = $PORT_DATA{$portid}) { + "AnyEvent::MP::Port" eq ref $self + or Carp::croak "$port: rcv can only be called on message matching ports, caught"; + + $self->[2] = shift; + } else { + my $cb = shift; + $PORT{$portid} = sub { + local $SELF = $port; + eval { &$cb }; _self_die if $@; + }; + } + } elsif (defined $_[0]) { + my $self = $PORT_DATA{$portid} ||= do { + my $self = bless [$PORT{$port} || sub { }, { }, $port], "AnyEvent::MP::Port"; + + $PORT{$portid} = sub { + local $SELF = $port; + + if (my $cb = $self->[1]{$_[0]}) { + shift; + eval { &$cb }; _self_die if $@; + } else { + &{ $self->[0] }; + } + }; - "AnyEvent::MP::Port" eq ref $self - or Carp::croak "$noderef#$port: rcv can only be called on message matching ports, caught"; + $self + }; - while (@_) { - my ($match, $cb) = splice @_, 0, 2; + "AnyEvent::MP::Port" eq ref $self + or Carp::croak "$port: rcv can only be called on message matching ports, caught"; - if (!ref $match) { - push @{ $self->{rc0}{$match} }, [$cb]; - } elsif (("ARRAY" eq ref $match && !ref $match->[0])) { - my ($type, @match) = @$match; - @match - ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match] - : push @{ $self->{rc0}{$match->[0]} }, [$cb]; - } else { - push @{ $self->{any} }, [$cb, $match]; + my ($tag, $cb) = splice @_, 0, 2; + + if (defined $cb) { + $self->[1]{$tag} = $cb; + } else { + delete $self->[1]{$tag}; + } } } + + $port } =item $closure = psub { BLOCK } @@ -443,137 +441,300 @@ } } -=back +=item $guard = mon $port, $cb->(@reason) # call $cb when $port dies -=head1 FUNCTIONS FOR NODES +=item $guard = mon $port, $rcvport # kill $rcvport when $port dies -=over 4 +=item $guard = mon $port # kill $SELF when $port dies + +=item $guard = mon $port, $rcvport, @msg # send a message when $port dies + +Monitor the given port and do something when the port is killed or +messages to it were lost, and optionally return a guard that can be used +to stop monitoring again. + +C effectively guarantees that, in the absence of hardware failures, +after starting the monitor, either all messages sent to the port will +arrive, or the monitoring action will be invoked after possible message +loss has been detected. No messages will be lost "in between" (after +the first lost message no further messages will be received by the +port). After the monitoring action was invoked, further messages might get +delivered again. + +Note that monitoring-actions are one-shot: once messages are lost (and a +monitoring alert was raised), they are removed and will not trigger again. + +In the first form (callback), the callback is simply called with any +number of C<@reason> elements (no @reason means that the port was deleted +"normally"). Note also that I<< the callback B never die >>, so use +C if unsure. + +In the second form (another port given), the other port (C<$rcvport>) +will be C'ed with C<@reason>, iff a @reason was specified, i.e. on +"normal" kils nothing happens, while under all other conditions, the other +port is killed with the same reason. + +The third form (kill self) is the same as the second form, except that +C<$rvport> defaults to C<$SELF>. + +In the last form (message), a message of the form C<@msg, @reason> will be +C. + +As a rule of thumb, monitoring requests should always monitor a port from +a local port (or callback). The reason is that kill messages might get +lost, just like any other message. Another less obvious reason is that +even monitoring requests can get lost (for exmaple, when the connection +to the other node goes down permanently). When monitoring a port locally +these problems do not exist. + +Example: call a given callback when C<$port> is killed. + + mon $port, sub { warn "port died because of <@_>\n" }; + +Example: kill ourselves when C<$port> is killed abnormally. -=item become_public endpoint... + mon $port; -Tells the node to become a public node, i.e. reachable from other nodes. +Example: send us a restart message when another C<$port> is killed. -If no arguments are given, or the first argument is C, then -AnyEvent::MP tries to bind on port C<4040> on all IP addresses that the -local nodename resolves to. - -Otherwise the first argument must be an array-reference with transport -endpoints ("ip:port", "hostname:port") or port numbers (in which case the -local nodename is used as hostname). The endpoints are all resolved and -will become the node reference. + mon $port, $self => "restart"; =cut -=back +sub mon { + my ($noderef, $port) = split /#/, shift, 2; -=head1 NODE MESSAGES + my $node = $NODE{$noderef} || add_node $noderef; -Nodes understand the following messages sent to them. Many of them take -arguments called C<@reply>, which will simply be used to compose a reply -message - C<$reply[0]> is the port to reply to, C<$reply[1]> the type and -the remaining arguments are simply the message data. + my $cb = @_ ? shift : $SELF || Carp::croak 'mon: called with one argument only, but $SELF not set,'; -=over 4 + unless (ref $cb) { + if (@_) { + # send a kill info message + my (@msg) = ($cb, @_); + $cb = sub { snd @msg, @_ }; + } else { + # simply kill other port + my $port = $cb; + $cb = sub { kil $port, @_ if @_ }; + } + } + + $node->monitor ($port, $cb); + + defined wantarray + and AnyEvent::Util::guard { $node->unmonitor ($port, $cb) } +} + +=item $guard = mon_guard $port, $ref, $ref... + +Monitors the given C<$port> and keeps the passed references. When the port +is killed, the references will be freed. + +Optionally returns a guard that will stop the monitoring. + +This function is useful when you create e.g. timers or other watchers and +want to free them when the port gets killed (note the use of C): + + $port->rcv (start => sub { + my $timer; $timer = mon_guard $port, AE::timer 1, 1, psub { + undef $timer if 0.9 < rand; + }); + }); + +=cut + +sub mon_guard { + my ($port, @refs) = @_; + + #TODO: mon-less form? + + mon $port, sub { 0 && @refs } +} + +=item kil $port[, @reason] + +Kill the specified port with the given C<@reason>. + +If no C<@reason> is specified, then the port is killed "normally" (ports +monitoring other ports will not necessarily die because a port dies +"normally"). + +Otherwise, linked ports get killed with the same reason (second form of +C, see above). + +Runtime errors while evaluating C callbacks or inside C blocks +will be reported as reason C<< die => $@ >>. + +Transport/communication errors are reported as C<< transport_error => +$message >>. =cut -=item lookup => $name, @reply +=item $port = spawn $node, $initfunc[, @initdata] + +Creates a port on the node C<$node> (which can also be a port ID, in which +case it's the node where that port resides). + +The port ID of the newly created port is returned immediately, and it is +possible to immediately start sending messages or to monitor the port. + +After the port has been created, the init function is called on the remote +node, in the same context as a C callback. This function must be a +fully-qualified function name (e.g. C). To +specify a function in the main program, use C<::name>. + +If the function doesn't exist, then the node tries to C +the package, then the package above the package and so on (e.g. +C, C, C) until the function +exists or it runs out of package names. + +The init function is then called with the newly-created port as context +object (C<$SELF>) and the C<@initdata> values as arguments. + +A common idiom is to pass a local port, immediately monitor the spawned +port, and in the remote init function, immediately monitor the passed +local port. This two-way monitoring ensures that both ports get cleaned up +when there is a problem. + +Example: spawn a chat server port on C<$othernode>. + + # this node, executed from within a port context: + my $server = spawn $othernode, "MyApp::Chat::Server::connect", $SELF; + mon $server; + + # init function on C<$othernode> + sub connect { + my ($srcport) = @_; + + mon $srcport; -Replies with the port ID of the specified well-known port, or C. + rcv $SELF, sub { + ... + }; + } + +=cut + +sub _spawn { + my $port = shift; + my $init = shift; + + local $SELF = "$NODE#$port"; + eval { + &{ load_func $init } + }; + _self_die if $@; +} -=item devnull => ... +sub spawn(@) { + my ($noderef, undef) = split /#/, shift, 2; -Generic data sink/CPU heat conversion. + my $id = "$RUNIQ." . $ID++; -=item relay => $port, @msg + $_[0] =~ /::/ + or Carp::croak "spawn init function must be a fully-qualified name, caught"; -Simply forwards the message to the given port. + snd_to_func $noderef, "AnyEvent::MP::_spawn" => $id, @_; -=item eval => $string[ @reply] + "$noderef#$id" +} -Evaluates the given string. If C<@reply> is given, then a message of the -form C<@reply, $@, @evalres> is sent. +=item after $timeout, @msg -Example: crash another node. +=item after $timeout, $callback - snd $othernode, eval => "exit"; +Either sends the given message, or call the given callback, after the +specified number of seconds. -=item time => @reply +This is simply a utility function that comes in handy at times - the +AnyEvent::MP author is not convinced of the wisdom of having it, though, +so it may go away in the future. -Replies the the current node time to C<@reply>. +=cut -Example: tell the current node to send the current time to C<$myport> in a -C message. +sub after($@) { + my ($timeout, @action) = @_; - snd $NODE, time => $myport, timereply => 1, 2; - # => snd $myport, timereply => 1, 2,