=head1 NAME Coro::MP - erlang-style multi-processing/message-passing framework =head1 SYNOPSIS use Coro::MP; $NODE # contains this node's node ID NODE # returns this node's node ID $SELF # receiving/own port id in rcv callbacks # initialise the node so it can send/receive messages configure; # ports are message destinations # sending messages snd $port, type => data...; snd $port, @msg; snd @msg_with_first_element_being_a_port; # creating/using ports my $port = port_async { # thread context, $SELF is set to $port # returning will "kil" the $port with an empty reason }; # simple receive my $port = port { my (undef, @data) = get "tag"; }; snd $port, tag => "data0", "data1"; # create a port on another node my $port = spawn $node, $initfunc, @initdata; # monitoring mon $localport, $cb->(@msg) # callback is invoked on death mon $localport, $otherport # kill otherport on abnormal death mon $localport, $otherport, @msg # send message on death =head1 DESCRIPTION This module (-family) implements a simple message passing framework. Despite its simplicity, you can securely message other processes running on the same or other hosts, and you can supervise entities remotely. This module depends heavily on L, in fact, many functions exported by this module are identical to AnyEvent::MP functions. This module family is simply the Coro API to AnyEvent::MP. Care has been taken to stay compatible with AnyEvent::MP, even if sometimes this required a less natural API (C should indeed spawn a thread, not just call an initfunc for example). For an introduction to AnyEvent::MP, see the L manual page. =head1 VARIABLES/FUNCTIONS =over 4 =cut package Coro::MP; use common::sense; use Carp (); use AnyEvent::MP::Kernel; use AnyEvent::MP qw(snd rcv mon kil psub spawn); # TODO, should use AnyEvent::MP::Kernel only use Coro; use Coro::AnyEvent (); use AE (); use base "Exporter"; our $VERSION = "0.1"; our @EXPORT = qw( NODE $NODE *SELF node_of configure snd rcv mon kil psub spawn port_async rcv_async get get_cond ); our $SELF; sub _self_die() { my $msg = $@; $msg =~ s/\n+$// unless ref $msg; kil $SELF, die => $msg; } =item NODE, $NODE, node_of, configure =item $SELF, *SELF, SELF, %SELF, @SELF... =item snd, rcv, mon, kil, psub These variables and functions work exactly as in AnyEvent::MP, in fact, they are usually exactly the same functions. =item spawn This function is also identical to C. This means that it doesn't spawn a new thread as one would expect, but simply calls an init function. The init function, however, can attach a new thread easily: sub initfun { my (@args) = @_; attach $SELF, sub { # thread-code }; } =item $local_port = port_async { ... } Creates a new local port, and returns its ID. A new thread is created and attached to the port (see C, below, for details). =cut sub rcv_async($$); sub port_async(;&) { my $id = "$UNIQ." . $ID++; my $port = "$NODE#$id"; @_ ? rcv_async $port, shift : AnyEvent::MP::rcv $port, undef; $port } =item rcv_async $port, $threadcb This function creates and attaches a thread on a port. The thread is set to execute C<$threadcb> and is put into the ready queue. The thread will receive all messages not filtered away by tagged receive callbacks (as set by C) - it simply replaces the default callback of an AnyEvent::MP port. The special variable C<$SELF> will be set to C<$port> during thread execution. When C<$threadcb> returns or the thread is canceled, the return/cancel values become the C reason. It is not allowed to call C more than once on a given port. =cut sub rcv_async($$) { my ($port, $threadcb) = @_; my (@queue, $coro); AnyEvent::MP::rcv $port, sub { push @queue, \@_; # TODO, take copy? $coro->ready; # TODO, maybe too many unwanted wake-ups? }; $coro = async { # this SELF swapping is horrible my $old_self; Coro::on_enter { $old_self = $SELF; $SELF = $port }; Coro::on_leave { $SELF = $old_self }; $threadcb->(); }; $coro->{_coro_mp_queue} = \@queue; mon $port, sub { $coro->cancel (@_) }; $coro->on_destroy (sub { kil $port, @_ }); } =item @msg = get $tag =item @msg = get $tag, $timeout Find, dequeue and return the next message with the specified C<$tag>. If no matching message is currently queued, wait up to C<$timeout> seconds (or forever if no C<$timeout> has been specified or it is C) for one to arrive. Returns the message with the initial tag removed. In case of a timeout, the empty list. The function I be called in list context. Note that empty messages cannot be distinguished from a timeout when using C. =cut sub get($;$) { my ($tag, $timeout) = @_; my $queue = $Coro::current->{_coro_mp_queue} or Carp::croak "Coro::MP::get called from thread not attached to any port"; my $i; while () { $queue->[$_][0] eq $tag and return @{ splice @$queue, $_, 1 } for $i..$#$queue; $i = @$queue; # wait for more messages if (ref $timeout) { schedule; defined $i or return; # timeout } elsif (defined $timeout) { $timeout or return; my $current = $Coro::current; $timeout = AE::timer $timeout, 0, sub { undef $i; $current->ready; }; } else { $timeout = \$i; # dummy } } } sub get_cond { die "nyi"; } =item calxxxTODO $port, @msg, $callback[, $timeout] A simple form of RPC - sends a message to the given C<$port> with the given contents (C<@msg>), but adds a reply port to the message. The reply port is created temporarily just for the purpose of receiving the reply, and will be Ced when no longer needed. A reply message sent to the port is passed to the C<$callback> as-is. If an optional time-out (in seconds) is given and it is not C, then the callback will be called without any arguments after the time-out elapsed and the port is Ced. If no time-out is given, then the local port will monitor the remote port instead, so it eventually gets cleaned-up. Currently this function returns the temporary port, but this "feature" might go in future versions unless you can make a convincing case that this is indeed useful for something. =cut sub calxxxTODO(@) { die "nyi"; } =back =head1 SEE ALSO L - a gentle introduction. L - like Coro::MP, but event-based. L. =head1 AUTHOR Marc Lehmann http://home.schmorp.de/ =cut 1