--- AnyEvent-MP/MP.pm 2009/08/13 15:29:58 1.49 +++ AnyEvent-MP/MP.pm 2009/08/14 14:07:44 1.51 @@ -24,19 +24,14 @@ snd $port, @msg; snd @msg_with_first_element_being_a_port; - # creating/using miniports - my $miniport = port { my @msg = @_; 0 }; + # creating/using ports, the simple way + my $somple_port = port { my @msg = @_; 0 }; - # creating/using full ports + # creating/using ports, type matching my $port = port; - rcv $port, smartmatch => $cb->(@msg); rcv $port, ping => sub { snd $_[0], "pong"; 0 }; rcv $port, pong => sub { warn "pong received\n"; 0 }; - # more, smarter, matches (_any_ is exported by this module) - rcv $port, [child_died => $pid] => sub { ... - rcv $port, [_any_, _any_, 3] => sub { .. $_[2] is 3 - # create a port on another node my $port = spawn $node, $initfunc, @initdata; @@ -305,105 +300,72 @@ =item $local_port = port -Create a new local port object that can be used either as a pattern -matching port ("full port") or a single-callback port ("miniport"), -depending on how C callbacks are bound to the object. - -=item $port = port { my @msg = @_; $finished } +Create a new local port object and returns its port ID. Initially it has +no callbacks set and will throw an error when it receives messages. -Creates a "miniport", that is, a very lightweight port without any pattern -matching behind it, and returns its ID. Semantically the same as creating -a port and calling C on it. +=item $local_port = port { my @msg = @_ } -The block will be called for every message received on the port. When the -callback returns a true value its job is considered "done" and the port -will be destroyed. Otherwise it will stay alive. +Creates a new local port, and returns its ID. Semantically the same as +creating a port and calling C on it. -The message will be passed as-is, no extra argument (i.e. no port id) will -be passed to the callback. +The block will be called for every message received on the port, with the +global variable C<$SELF> set to the port ID. Runtime errors will cause the +port to be Ced. The message will be passed as-is, no extra argument +(i.e. no port ID) will be passed to the callback. -If you need the local port id in the callback, this works nicely: +If you want to stop/destroy the port, simply C it: - my $port; $port = port { - snd $otherport, reply => $port; + my $port = port { + my @msg = @_; + ... + kil $SELF; }; =cut sub rcv($@); +sub _kilme { + die "received message on port without callback"; +} + sub port(;&) { my $id = "$UNIQ." . $ID++; my $port = "$NODE#$id"; - if (@_) { - rcv $port, shift; - } else { - $PORT{$id} = sub { }; # nop - } + rcv $port, shift || \&_kilme; $port } -=item reg $port, $name - -=item reg $name - -Registers the given port (or C<$SELF><<< if missing) under the name -C<$name>. If the name already exists it is replaced. - -A port can only be registered under one well known name. - -A port automatically becomes unregistered when it is killed. - -=cut - -sub reg(@) { - my $port = @_ > 1 ? shift : $SELF || Carp::croak 'reg: called with one argument only, but $SELF not set,'; - - $REG{$_[0]} = $port; -} - -=item rcv $port, $callback->(@msg) - -Replaces the callback on the specified miniport (after converting it to -one if required). - -=item rcv $port, tagstring => $callback->(@msg), ... +=item rcv $local_port, $callback->(@msg) -=item rcv $port, $smartmatch => $callback->(@msg), ... - -=item rcv $port, [$smartmatch...] => $callback->(@msg), ... - -Register callbacks to be called on matching messages on the given full -port (after converting it to one if required) and return the port. - -The callback has to return a true value when its work is done, after -which is will be removed, or a false value in which case it will stay -registered. +Replaces the default callback on the specified port. There is no way to +remove the default callback: use C to disable it, or better +C the port when it is no longer needed. The global C<$SELF> (exported by this module) contains C<$port> while -executing the callback. +executing the callback. Runtime errors during callback execution will +result in the port being Ced. -Runtime errors during callback execution will result in the port being -Ced. +The default callback received all messages not matched by a more specific +C match. -If the match is an array reference, then it will be matched against the -first elements of the message, otherwise only the first element is being -matched. +=item rcv $local_port, tag => $callback->(@msg_without_tag), ... -Any element in the match that is specified as C<_any_> (a function -exported by this module) matches any single element of the message. +Register callbacks to be called on messages starting with the given tag on +the given port (and return the port), or unregister it (when C<$callback> +is C<$undef>). -While not required, it is highly recommended that the first matching -element is a string identifying the message. The one-string-only match is -also the most efficient match (by far). +The original message will be passed to the callback, after the first +element (the tag) has been removed. The callback will use the same +environment as the default callback (see above). Example: create a port and bind receivers on it in one go. my $port = rcv port, - msg1 => sub { ...; 0 }, - msg2 => sub { ...; 0 }, + msg1 => sub { ... }, + msg2 => sub { ... }, ; Example: create a port, bind receivers and send it in a message elsewhere @@ -411,7 +373,7 @@ snd $otherport, reply => rcv port, - msg1 => sub { ...; 0 }, + msg1 => sub { ... }, ... ; @@ -424,65 +386,47 @@ ($NODE{$noderef} || add_node $noderef) == $NODE{""} or Carp::croak "$port: rcv can only be called on local ports, caught"; - if (@_ == 1) { - my $cb = shift; - delete $PORT_DATA{$portid}; - $PORT{$portid} = sub { - local $SELF = $port; - eval { - &$cb - and kil $port; - }; - _self_die if $@; - }; - } else { - my $self = $PORT_DATA{$portid} ||= do { - my $self = bless { - id => $port, - }, "AnyEvent::MP::Port"; - - $PORT{$portid} = sub { - local $SELF = $port; - - eval { - for (@{ $self->{rc0}{$_[0]} }) { - $_ && &{$_->[0]} - && undef $_; - } - - for (@{ $self->{rcv}{$_[0]} }) { - $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1] - && &{$_->[0]} - && undef $_; - } + while (@_) { + if (ref $_[0]) { + if (my $self = $PORT_DATA{$portid}) { + "AnyEvent::MP::Port" eq ref $self + or Carp::croak "$port: rcv can only be called on message matching ports, caught"; - for (@{ $self->{any} }) { - $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1] - && &{$_->[0]} - && undef $_; + $self->[2] = shift; + } else { + my $cb = shift; + $PORT{$portid} = sub { + local $SELF = $port; + eval { &$cb }; _self_die if $@; + }; + } + } elsif (defined $_[0]) { + my $self = $PORT_DATA{$portid} ||= do { + my $self = bless [$PORT{$port} || sub { }, { }, $port], "AnyEvent::MP::Port"; + + $PORT{$portid} = sub { + local $SELF = $port; + + if (my $cb = $self->[1]{$_[0]}) { + shift; + eval { &$cb }; _self_die if $@; + } else { + &{ $self->[0] }; } }; - _self_die if $@; - }; - $self - }; + $self + }; - "AnyEvent::MP::Port" eq ref $self - or Carp::croak "$port: rcv can only be called on message matching ports, caught"; + "AnyEvent::MP::Port" eq ref $self + or Carp::croak "$port: rcv can only be called on message matching ports, caught"; - while (@_) { - my ($match, $cb) = splice @_, 0, 2; + my ($tag, $cb) = splice @_, 0, 2; - if (!ref $match) { - push @{ $self->{rc0}{$match} }, [$cb]; - } elsif (("ARRAY" eq ref $match && !ref $match->[0])) { - my ($type, @match) = @$match; - @match - ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match] - : push @{ $self->{rc0}{$match->[0]} }, [$cb]; + if (defined $cb) { + $self->[1]{$tag} = $cb; } else { - push @{ $self->{any} }, [$cb, $match]; + delete $self->[1]{$tag}; } } } @@ -796,21 +740,38 @@ This means that AEMP requires a less tightly controlled environment at the cost of longer node references and a slightly higher management overhead. +=item Erlang has a "remote ports are like local ports" philosophy, AEMP +uses "local ports are like remote ports". + +The failure modes for local ports are quite different (runtime errors +only) then for remote ports - when a local port dies, you I it dies, +when a connection to another node dies, you know nothing about the other +port. + +Erlang pretends remote ports are as reliable as local ports, even when +they are not. + +AEMP encourages a "treat remote ports differently" philosophy, with local +ports being the special case/exception, where transport errors cannot +occur. + =item * Erlang uses processes and a mailbox, AEMP does not queue. -Erlang uses processes that selctively receive messages, and therefore -needs a queue. AEMP is event based, queuing messages would serve no useful -purpose. +Erlang uses processes that selectively receive messages, and therefore +needs a queue. AEMP is event based, queuing messages would serve no +useful purpose. For the same reason the pattern-matching abilities of +AnyEvent::MP are more limited, as there is little need to be able to +filter messages without dequeing them. (But see L for a more Erlang-like process model on top of AEMP). =item * Erlang sends are synchronous, AEMP sends are asynchronous. -Sending messages in Erlang is synchronous and blocks the process. AEMP -sends are immediate, connection establishment is handled in the -background. +Sending messages in Erlang is synchronous and blocks the process (and +so does not need a queue that can overflow). AEMP sends are immediate, +connection establishment is handled in the background. -=item * Erlang can silently lose messages, AEMP cannot. +=item * Erlang suffers from silent message loss, AEMP does not. Erlang makes few guarantees on messages delivery - messages can get lost without any of the processes realising it (i.e. you send messages a, b, @@ -832,9 +793,9 @@ =item * Erlang can send messages to the wrong port, AEMP does not. -In Erlang it is quite possible that a node that restarts reuses a process -ID known to other nodes for a completely different process, causing -messages destined for that process to end up in an unrelated process. +In Erlang it is quite likely that a node that restarts reuses a process ID +known to other nodes for a completely different process, causing messages +destined for that process to end up in an unrelated process. AEMP never reuses port IDs, so old messages or old port IDs floating around in the network will not be sent to an unrelated port.