--- AnyEvent-MP/MP.pm 2009/08/05 19:55:58 1.31 +++ AnyEvent-MP/MP.pm 2009/08/06 10:21:48 1.35 @@ -25,6 +25,14 @@ rcv $port, [child_died => $pid] => sub { ... rcv $port, [_any_, _any_, 3] => sub { .. $_[2] is 3 + # linking two ports, so they both crash together + lnk $port1, $port2; + + # monitoring + mon $port, $cb->(@msg) # callback is invoked on death + mon $port, $otherport # kill otherport on abnormal death + mon $port, $otherport, @msg # send message on death + =head1 DESCRIPTION This module (-family) implements a simple message passing framework. @@ -125,10 +133,74 @@ to C or C, after which all local port identifiers become invalid. -=item $noderef = node_of $portid +=item $noderef = node_of $port Extracts and returns the noderef from a portid or a noderef. +=item initialise_node $noderef, $seednode, $seednode... + +=item initialise_node "slave/", $master, $master... + +Before a node can talk to other nodes on the network it has to initialise +itself - the minimum a node needs to know is it's own name, and optionally +it should know the noderefs of some other nodes in the network. + +This function initialises a node - it must be called exactly once (or +never) before calling other AnyEvent::MP functions. + +All arguments are noderefs, which can be either resolved or unresolved. + +There are two types of networked nodes, public nodes and slave nodes: + +=over 4 + +=item public nodes + +For public nodes, C<$noderef> must either be a (possibly unresolved) +noderef, in which case it will be resolved, or C (or missing), in +which case the noderef will be guessed. + +Afterwards, the node will bind itself on all endpoints and try to connect +to all additional C<$seednodes> that are specified. Seednodes are optional +and can be used to quickly bootstrap the node into an existing network. + +=item slave nodes + +When the C<$noderef> is the special string C, then the node will +become a slave node. Slave nodes cannot be contacted from outside and will +route most of their traffic to the master node that they attach to. + +At least one additional noderef is required: The node will try to connect +to all of them and will become a slave attached to the first node it can +successfully connect to. + +=back + +This function will block until all nodes have been resolved and, for slave +nodes, until it has successfully established a connection to a master +server. + +Example: become a public node listening on the default node. + + initialise_node; + +Example: become a public node, and try to contact some well-known master +servers to become part of the network. + + initialise_node undef, "master1", "master2"; + +Example: become a public node listening on port C<4041>. + + initialise_node 4041; + +Example: become a public node, only visible on localhost port 4044. + + initialise_node "locahost:4044"; + +Example: become a slave node to any of the specified master servers. + + initialise_node "slave/", "master1", "192.168.13.17", "mp.example.net"; + =item $cv = resolve_node $noderef Takes an unresolved node reference that may contain hostnames and @@ -169,9 +241,9 @@ just export C<$SELF>, all the symbols called C are exported by this module, but only C<$SELF> is currently used. -=item snd $portid, type => @data +=item snd $port, type => @data -=item snd $portid, @msg +=item snd $port, @msg Send the given message to the given port ID, which can identify either a local or a remote port, and can be either a string or soemthignt hat @@ -191,124 +263,17 @@ that Storable can serialise and deserialise is allowed, and for the local node, anything can be passed. -=item kil $portid[, @reason] - -Kill the specified port with the given C<@reason>. - -If no C<@reason> is specified, then the port is killed "normally" (linked -ports will not be kileld, or even notified). - -Otherwise, linked ports get killed with the same reason (second form of -C, see below). - -Runtime errors while evaluating C callbacks or inside C blocks -will be reported as reason C<< die => $@ >>. - -Transport/communication errors are reported as C<< transport_error => -$message >>. - -=item $guard = mon $portid, $cb->(@reason) - -=item $guard = mon $portid, $otherport - -=item $guard = mon $portid, $otherport, @msg - -Monitor the given port and do something when the port is killed. - -In the first form, the callback is simply called with any number -of C<@reason> elements (no @reason means that the port was deleted -"normally"). Note also that I<< the callback B never die >>, so use -C if unsure. - -In the second form, the other port will be C'ed with C<@reason>, iff -a @reason was specified, i.e. on "normal" kils nothing happens, while -under all other conditions, the other port is killed with the same reason. - -In the last form, a message of the form C<@msg, @reason> will be C. - -Example: call a given callback when C<$port> is killed. - - mon $port, sub { warn "port died because of <@_>\n" }; - -Example: kill ourselves when C<$port> is killed abnormally. - - mon $port, $self; - -Example: send us a restart message another C<$port> is killed. - - mon $port, $self => "restart"; - -=cut - -sub mon { - my ($noderef, $port) = split /#/, shift, 2; - - my $node = $NODE{$noderef} || add_node $noderef; - - my $cb = shift; - - unless (ref $cb) { - if (@_) { - # send a kill info message - my (@msg) = ($cb, @_); - $cb = sub { snd @msg, @_ }; - } else { - # simply kill other port - my $port = $cb; - $cb = sub { kil $port, @_ if @_ }; - } - } - - $node->monitor ($port, $cb); - - defined wantarray - and AnyEvent::Util::guard { $node->unmonitor ($port, $cb) } -} - -=item $guard = mon_guard $port, $ref, $ref... - -Monitors the given C<$port> and keeps the passed references. When the port -is killed, the references will be freed. - -Optionally returns a guard that will stop the monitoring. - -This function is useful when you create e.g. timers or other watchers and -want to free them when the port gets killed: - - $port->rcv (start => sub { - my $timer; $timer = mon_guard $port, AE::timer 1, 1, sub { - undef $timer if 0.9 < rand; - }); - }); - -=cut - -sub mon_guard { - my ($port, @refs) = @_; - - mon $port, sub { 0 && @refs } -} - -=item lnk $port1, $port2 - -Link two ports. This is simply a shorthand for: - - mon $port1, $port2; - mon $port2, $port1; - -It means that if either one is killed abnormally, the other one gets -killed as well. - =item $local_port = port Create a new local port object that can be used either as a pattern matching port ("full port") or a single-callback port ("miniport"), depending on how C callbacks are bound to the object. -=item $portid = port { my @msg = @_; $finished } +=item $port = port { my @msg = @_; $finished } -Creates a "mini port", that is, a very lightweight port without any -pattern matching behind it, and returns its ID. +Creates a "miniport", that is, a very lightweight port without any pattern +matching behind it, and returns its ID. Semantically the same as creating +a port and calling C on it. The block will be called for every message received on the port. When the callback returns a true value its job is considered "done" and the port @@ -325,55 +290,22 @@ =cut +sub rcv($@); + sub port(;&) { my $id = "$UNIQ." . $ID++; my $port = "$NODE#$id"; if (@_) { - my $cb = shift; - $PORT{$id} = sub { - local $SELF = $port; - eval { - &$cb - and kil $id; - }; - _self_die if $@; - }; + rcv $port, shift; } else { - my $self = bless { - id => "$NODE#$id", - }, "AnyEvent::MP::Port"; - - $PORT_DATA{$id} = $self; - $PORT{$id} = sub { - local $SELF = $port; - - eval { - for (@{ $self->{rc0}{$_[0]} }) { - $_ && &{$_->[0]} - && undef $_; - } - - for (@{ $self->{rcv}{$_[0]} }) { - $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1] - && &{$_->[0]} - && undef $_; - } - - for (@{ $self->{any} }) { - $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1] - && &{$_->[0]} - && undef $_; - } - }; - _self_die if $@; - }; + $PORT{$id} = sub { }; # nop } $port } -=item reg $portid, $name +=item reg $port, $name Registers the given port under the name C<$name>. If the name already exists it is replaced. @@ -385,29 +317,30 @@ =cut sub reg(@) { - my ($portid, $name) = @_; + my ($port, $name) = @_; - $REG{$name} = $portid; + $REG{$name} = $port; } -=item rcv $portid, $callback->(@msg) +=item rcv $port, $callback->(@msg) -Replaces the callback on the specified miniport (or newly created port -object, see C). Full ports are configured with the following calls: +Replaces the callback on the specified miniport (after converting it to +one if required). -=item rcv $portid, tagstring => $callback->(@msg), ... +=item rcv $port, tagstring => $callback->(@msg), ... -=item rcv $portid, $smartmatch => $callback->(@msg), ... +=item rcv $port, $smartmatch => $callback->(@msg), ... -=item rcv $portid, [$smartmatch...] => $callback->(@msg), ... +=item rcv $port, [$smartmatch...] => $callback->(@msg), ... -Register callbacks to be called on matching messages on the given port. +Register callbacks to be called on matching messages on the given full +port (after converting it to one if required). The callback has to return a true value when its work is done, after which is will be removed, or a false value in which case it will stay registered. -The global C<$SELF> (exported by this module) contains C<$portid> while +The global C<$SELF> (exported by this module) contains C<$port> while executing the callback. Runtime errors wdurign callback execution will result in the port being @@ -427,34 +360,76 @@ =cut sub rcv($@) { - my $portid = shift; - my ($noderef, $port) = split /#/, $port, 2; + my $port = shift; + my ($noderef, $portid) = split /#/, $port, 2; ($NODE{$noderef} || add_node $noderef) == $NODE{""} - or Carp::croak "$noderef#$port: rcv can only be called on local ports, caught"; + or Carp::croak "$port: rcv can only be called on local ports, caught"; - my $self = $PORT_DATA{$port} - or Carp::croak "$noderef#$port: rcv can only be called on message matching ports, caught"; + if (@_ == 1) { + my $cb = shift; + delete $PORT_DATA{$portid}; + $PORT{$portid} = sub { + local $SELF = $port; + eval { + &$cb + and kil $port; + }; + _self_die if $@; + }; + } else { + my $self = $PORT_DATA{$portid} ||= do { + my $self = bless { + id => $port, + }, "AnyEvent::MP::Port"; + + $PORT{$portid} = sub { + local $SELF = $port; + + eval { + for (@{ $self->{rc0}{$_[0]} }) { + $_ && &{$_->[0]} + && undef $_; + } + + for (@{ $self->{rcv}{$_[0]} }) { + $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1] + && &{$_->[0]} + && undef $_; + } + + for (@{ $self->{any} }) { + $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1] + && &{$_->[0]} + && undef $_; + } + }; + _self_die if $@; + }; + + $self + }; - "AnyEvent::MP::Port" eq ref $self - or Carp::croak "$noderef#$port: rcv can only be called on message matching ports, caught"; + "AnyEvent::MP::Port" eq ref $self + or Carp::croak "$port: rcv can only be called on message matching ports, caught"; - while (@_) { - my ($match, $cb) = splice @_, 0, 2; - - if (!ref $match) { - push @{ $self->{rc0}{$match} }, [$cb]; - } elsif (("ARRAY" eq ref $match && !ref $match->[0])) { - my ($type, @match) = @$match; - @match - ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match] - : push @{ $self->{rc0}{$match->[0]} }, [$cb]; - } else { - push @{ $self->{any} }, [$cb, $match]; + while (@_) { + my ($match, $cb) = splice @_, 0, 2; + + if (!ref $match) { + push @{ $self->{rc0}{$match} }, [$cb]; + } elsif (("ARRAY" eq ref $match && !ref $match->[0])) { + my ($type, @match) = @$match; + @match + ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match] + : push @{ $self->{rc0}{$match->[0]} }, [$cb]; + } else { + push @{ $self->{any} }, [$cb, $match]; + } } } - $portid + $port } =item $closure = psub { BLOCK } @@ -495,25 +470,114 @@ } } -=back +=item $guard = mon $port, $cb->(@reason) -=head1 FUNCTIONS FOR NODES +=item $guard = mon $port, $otherport -=over 4 +=item $guard = mon $port, $otherport, @msg + +Monitor the given port and do something when the port is killed. + +In the first form, the callback is simply called with any number +of C<@reason> elements (no @reason means that the port was deleted +"normally"). Note also that I<< the callback B never die >>, so use +C if unsure. + +In the second form, the other port will be C'ed with C<@reason>, iff +a @reason was specified, i.e. on "normal" kils nothing happens, while +under all other conditions, the other port is killed with the same reason. + +In the last form, a message of the form C<@msg, @reason> will be C. + +Example: call a given callback when C<$port> is killed. + + mon $port, sub { warn "port died because of <@_>\n" }; + +Example: kill ourselves when C<$port> is killed abnormally. + + mon $port, $self; + +Example: send us a restart message another C<$port> is killed. + + mon $port, $self => "restart"; + +=cut + +sub mon { + my ($noderef, $port) = split /#/, shift, 2; + + my $node = $NODE{$noderef} || add_node $noderef; -=item become_public $noderef + my $cb = shift; -Tells the node to become a public node, i.e. reachable from other nodes. + unless (ref $cb) { + if (@_) { + # send a kill info message + my (@msg) = ($cb, @_); + $cb = sub { snd @msg, @_ }; + } else { + # simply kill other port + my $port = $cb; + $cb = sub { kil $port, @_ if @_ }; + } + } -The first argument is the (unresolved) node reference of the local node -(if missing then the empty string is used). + $node->monitor ($port, $cb); -It is quite common to not specify anything, in which case the local node -tries to listen on the default port, or to only specify a port number, in -which case AnyEvent::MP tries to guess the local addresses. + defined wantarray + and AnyEvent::Util::guard { $node->unmonitor ($port, $cb) } +} + +=item $guard = mon_guard $port, $ref, $ref... + +Monitors the given C<$port> and keeps the passed references. When the port +is killed, the references will be freed. + +Optionally returns a guard that will stop the monitoring. + +This function is useful when you create e.g. timers or other watchers and +want to free them when the port gets killed: + + $port->rcv (start => sub { + my $timer; $timer = mon_guard $port, AE::timer 1, 1, sub { + undef $timer if 0.9 < rand; + }); + }); =cut +sub mon_guard { + my ($port, @refs) = @_; + + mon $port, sub { 0 && @refs } +} + +=item lnk $port1, $port2 + +Link two ports. This is simply a shorthand for: + + mon $port1, $port2; + mon $port2, $port1; + +It means that if either one is killed abnormally, the other one gets +killed as well. + +=item kil $port[, @reason] + +Kill the specified port with the given C<@reason>. + +If no C<@reason> is specified, then the port is killed "normally" (linked +ports will not be kileld, or even notified). + +Otherwise, linked ports get killed with the same reason (second form of +C, see below). + +Runtime errors while evaluating C callbacks or inside C blocks +will be reported as reason C<< die => $@ >>. + +Transport/communication errors are reported as C<< transport_error => +$message >>. + =back =head1 NODE MESSAGES @@ -564,15 +628,15 @@ =head1 AnyEvent::MP vs. Distributed Erlang -AnyEvent::MP got lots of its ideas from distributed erlang (erlang node -== aemp node, erlang process == aemp port), so many of the documents and -programming techniques employed by erlang apply to AnyEvent::MP. Here is a +AnyEvent::MP got lots of its ideas from distributed Erlang (Erlang node +== aemp node, Erlang process == aemp port), so many of the documents and +programming techniques employed by Erlang apply to AnyEvent::MP. Here is a sample: - http://www.erlang.se/doc/programming_rules.shtml - http://erlang.org/doc/getting_started/part_frame.html # chapters 3 and 4 - http://erlang.org/download/erlang-book-part1.pdf # chapters 5 and 6 - http://erlang.org/download/armstrong_thesis_2003.pdf # chapters 4 and 5 + http://www.Erlang.se/doc/programming_rules.shtml + http://Erlang.org/doc/getting_started/part_frame.html # chapters 3 and 4 + http://Erlang.org/download/Erlang-book-part1.pdf # chapters 5 and 6 + http://Erlang.org/download/armstrong_thesis_2003.pdf # chapters 4 and 5 Despite the similarities, there are also some important differences: @@ -593,11 +657,11 @@ needs a queue. AEMP is event based, queuing messages would serve no useful purpose. -(But see L for a more erlang-like process model on top of AEMP). +(But see L for a more Erlang-like process model on top of AEMP). =item * Erlang sends are synchronous, AEMP sends are asynchronous. -Sending messages in erlang is synchronous and blocks the process. AEMP +Sending messages in Erlang is synchronous and blocks the process. AEMP sends are immediate, connection establishment is handled in the background. @@ -610,10 +674,10 @@ AEMP guarantees correct ordering, and the guarantee that there are no holes in the message sequence. -=item * In erlang, processes can be declared dead and later be found to be +=item * In Erlang, processes can be declared dead and later be found to be alive. -In erlang it can happen that a monitored process is declared dead and +In Erlang it can happen that a monitored process is declared dead and linked processes get killed, but later it turns out that the process is still alive - and can receive messages. @@ -623,7 +687,7 @@ =item * Erlang can send messages to the wrong port, AEMP does not. -In erlang it is quite possible that a node that restarts reuses a process +In Erlang it is quite possible that a node that restarts reuses a process ID known to other nodes for a completely different process, causing messages destined for that process to end up in an unrelated process. @@ -639,7 +703,7 @@ =item * The AEMP protocol is optimised for both text-based and binary communications. -The AEMP protocol, unlike the erlang protocol, supports both +The AEMP protocol, unlike the Erlang protocol, supports both language-independent text-only protocols (good for debugging) and binary, language-specific serialisers (e.g. Storable). @@ -647,6 +711,21 @@ with a minimum of work while gracefully degrading fucntionality to make the protocol simple. +=item * AEMP has more flexible monitoring options than Erlang. + +In Erlang, you can chose to receive I exit signals as messages +or I, there is no in-between, so monitoring single processes is +difficult to implement. Monitoring in AEMP is more flexible than in +Erlang, as one can choose between automatic kill, exit message or callback +on a per-process basis. + +=item * Erlang has different semantics for monitoring and linking, AEMP has the same. + +Monitoring in Erlang is not an indicator of process death/crashes, +as linking is (except linking is unreliable in Erlang). In AEMP, the +semantics of monitoring and linking are identical, linking is simply +two-way monitoring with automatic kill. + =back =head1 SEE ALSO