--- AnyEvent-MP/MP.pm 2009/08/02 15:47:04 1.9 +++ AnyEvent-MP/MP.pm 2009/08/06 10:21:48 1.35 @@ -6,11 +6,14 @@ use AnyEvent::MP; - NODE # returns this node identifier - $NODE # contains this node identifier + $NODE # contains this node's noderef + NODE # returns this node's noderef + NODE $port # returns the noderef of the port snd $port, type => data...; + $SELF # receiving/own port id in rcv callbacks + rcv $port, smartmatch => $cb->($port, @msg); # examples: @@ -22,6 +25,14 @@ rcv $port, [child_died => $pid] => sub { ... rcv $port, [_any_, _any_, 3] => sub { .. $_[2] is 3 + # linking two ports, so they both crash together + lnk $port1, $port2; + + # monitoring + mon $port, $cb->(@msg) # callback is invoked on death + mon $port, $otherport # kill otherport on abnormal death + mon $port, $otherport, @msg # send message on death + =head1 DESCRIPTION This module (-family) implements a simple message passing framework. @@ -29,9 +40,12 @@ Despite its simplicity, you can securely message other processes running on the same or other hosts. -At the moment, this module family is severly brokena nd underdocumented, -so do not use. This was uploaded mainly to resreve the CPAN namespace - -stay tuned! +For an introduction to this module family, see the L +manual page. + +At the moment, this module family is severly broken and underdocumented, +so do not use. This was uploaded mainly to reserve the CPAN namespace - +stay tuned! The basic API should be finished, however. =head1 CONCEPTS @@ -39,31 +53,43 @@ =item port -A port is something you can send messages to with the C function, and -you can register C handlers with. All C handlers will receive -messages they match, messages will not be queued. +A port is something you can send messages to (with the C function). + +Some ports allow you to register C handlers that can match specific +messages. All C handlers will receive messages they match, messages +will not be queued. =item port id - C -A port id is always the noderef, a hash-mark (C<#>) as separator, followed -by a port name (a printable string of unspecified format). +A port id is normaly the concatenation of a noderef, a hash-mark (C<#>) as +separator, and a port name (a printable string of unspecified format). An +exception is the the node port, whose ID is identical to its node +reference. =item node A node is a single process containing at least one port - the node -port. You can send messages to node ports to let them create new ports, -among other things. +port. You can send messages to node ports to find existing ports or to +create new ports, among other things. -Initially, nodes are either private (single-process only) or hidden -(connected to a master node only). Only when they epxlicitly "become -public" can you send them messages from unrelated other nodes. +Nodes are either private (single-process only), slaves (connected to a +master node only) or public nodes (connectable from unrelated nodes). =item noderef - C, C, C -A noderef is a string that either uniquely identifies a given node (for -private and hidden nodes), or contains a recipe on how to reach a given +A node reference is a string that either simply identifies the node (for +private and slave nodes), or contains a recipe on how to reach a given node (for public nodes). +This recipe is simply a comma-separated list of C pairs (for +TCP/IP, other protocols might look different). + +Node references come in two flavours: resolved (containing only numerical +addresses) or unresolved (where hostnames are used instead of addresses). + +Before using an unresolved node reference in a message you first have to +resolve it. + =back =head1 VARIABLES/FUNCTIONS @@ -84,22 +110,140 @@ use base "Exporter"; -our $VERSION = '0.02'; +our $VERSION = '0.1'; our @EXPORT = qw( - NODE $NODE $PORT snd rcv _any_ - create_port create_port_on - become_slave become_public + NODE $NODE *SELF node_of _any_ + resolve_node initialise_node + snd rcv mon kil reg psub + port ); -=item NODE / $NODE +our $SELF; + +sub _self_die() { + my $msg = $@; + $msg =~ s/\n+$// unless ref $msg; + kil $SELF, die => $msg; +} + +=item $thisnode = NODE / $NODE + +The C function returns, and the C<$NODE> variable contains +the noderef of the local node. The value is initialised by a call +to C or C, after which all local port +identifiers become invalid. + +=item $noderef = node_of $port + +Extracts and returns the noderef from a portid or a noderef. + +=item initialise_node $noderef, $seednode, $seednode... + +=item initialise_node "slave/", $master, $master... + +Before a node can talk to other nodes on the network it has to initialise +itself - the minimum a node needs to know is it's own name, and optionally +it should know the noderefs of some other nodes in the network. + +This function initialises a node - it must be called exactly once (or +never) before calling other AnyEvent::MP functions. + +All arguments are noderefs, which can be either resolved or unresolved. + +There are two types of networked nodes, public nodes and slave nodes: + +=over 4 + +=item public nodes + +For public nodes, C<$noderef> must either be a (possibly unresolved) +noderef, in which case it will be resolved, or C (or missing), in +which case the noderef will be guessed. + +Afterwards, the node will bind itself on all endpoints and try to connect +to all additional C<$seednodes> that are specified. Seednodes are optional +and can be used to quickly bootstrap the node into an existing network. + +=item slave nodes + +When the C<$noderef> is the special string C, then the node will +become a slave node. Slave nodes cannot be contacted from outside and will +route most of their traffic to the master node that they attach to. + +At least one additional noderef is required: The node will try to connect +to all of them and will become a slave attached to the first node it can +successfully connect to. + +=back + +This function will block until all nodes have been resolved and, for slave +nodes, until it has successfully established a connection to a master +server. + +Example: become a public node listening on the default node. + + initialise_node; + +Example: become a public node, and try to contact some well-known master +servers to become part of the network. + + initialise_node undef, "master1", "master2"; + +Example: become a public node listening on port C<4041>. + + initialise_node 4041; + +Example: become a public node, only visible on localhost port 4044. -The C function and the C<$NODE> variable contain the noderef of -the local node. The value is initialised by a call to C or -C, after which all local port identifiers become invalid. + initialise_node "locahost:4044"; -=item snd $portid, type => @data +Example: become a slave node to any of the specified master servers. -=item snd $portid, @msg + initialise_node "slave/", "master1", "192.168.13.17", "mp.example.net"; + +=item $cv = resolve_node $noderef + +Takes an unresolved node reference that may contain hostnames and +abbreviated IDs, resolves all of them and returns a resolved node +reference. + +In addition to C pairs allowed in resolved noderefs, the +following forms are supported: + +=over 4 + +=item the empty string + +An empty-string component gets resolved as if the default port (4040) was +specified. + +=item naked port numbers (e.g. C<1234>) + +These are resolved by prepending the local nodename and a colon, to be +further resolved. + +=item hostnames (e.g. C, C) + +These are resolved by using AnyEvent::DNS to resolve them, optionally +looking up SRV records for the C port, if no port was +specified. + +=back + +=item $SELF + +Contains the current port id while executing C callbacks or C +blocks. + +=item SELF, %SELF, @SELF... + +Due to some quirks in how perl exports variables, it is impossible to +just export C<$SELF>, all the symbols called C are exported by this +module, but only C<$SELF> is currently used. + +=item snd $port, type => @data + +=item snd $port, @msg Send the given message to the given port ID, which can identify either a local or a remote port, and can be either a string or soemthignt hat @@ -119,75 +263,89 @@ that Storable can serialise and deserialise is allowed, and for the local node, anything can be passed. -=item $local_port = create_port +=item $local_port = port -Create a new local port object. See the next section for allowed methods. +Create a new local port object that can be used either as a pattern +matching port ("full port") or a single-callback port ("miniport"), +depending on how C callbacks are bound to the object. -=cut +=item $port = port { my @msg = @_; $finished } -sub create_port { - my $id = "$AnyEvent::MP::Base::UNIQ." . ++$AnyEvent::MP::Base::ID; +Creates a "miniport", that is, a very lightweight port without any pattern +matching behind it, and returns its ID. Semantically the same as creating +a port and calling C on it. - my $self = bless { - id => "$NODE#$id", - names => [$id], - }, "AnyEvent::MP::Port"; - - $AnyEvent::MP::Base::PORT{$id} = sub { - unshift @_, $self; - - for (@{ $self->{rc0}{$_[1]} }) { - $_ && &{$_->[0]} - && undef $_; - } +The block will be called for every message received on the port. When the +callback returns a true value its job is considered "done" and the port +will be destroyed. Otherwise it will stay alive. - for (@{ $self->{rcv}{$_[1]} }) { - $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1] - && &{$_->[0]} - && undef $_; - } +The message will be passed as-is, no extra argument (i.e. no port id) will +be passed to the callback. - for (@{ $self->{any} }) { - $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1] - && &{$_->[0]} - && undef $_; - } +If you need the local port id in the callback, this works nicely: + + my $port; $port = port { + snd $otherport, reply => $port; }; - $self -} +=cut -package AnyEvent::MP::Port; +sub rcv($@); -=back +sub port(;&) { + my $id = "$UNIQ." . $ID++; + my $port = "$NODE#$id"; + + if (@_) { + rcv $port, shift; + } else { + $PORT{$id} = sub { }; # nop + } -=head1 METHODS FOR PORT OBJECTS + $port +} -=over 4 +=item reg $port, $name -=item "$port" +Registers the given port under the name C<$name>. If the name already +exists it is replaced. -A port object stringifies to its port ID, so can be used directly for -C operations. +A port can only be registered under one well known name. + +A port automatically becomes unregistered when it is killed. =cut -use overload - '""' => sub { $_[0]{id} }, - fallback => 1; +sub reg(@) { + my ($port, $name) = @_; + + $REG{$name} = $port; +} + +=item rcv $port, $callback->(@msg) -=item $port->rcv (type => $callback->($port, @msg)) +Replaces the callback on the specified miniport (after converting it to +one if required). -=item $port->rcv ($smartmatch => $callback->($port, @msg)) +=item rcv $port, tagstring => $callback->(@msg), ... -=item $port->rcv ([$smartmatch...] => $callback->($port, @msg)) +=item rcv $port, $smartmatch => $callback->(@msg), ... -Register a callback on the given port. +=item rcv $port, [$smartmatch...] => $callback->(@msg), ... + +Register callbacks to be called on matching messages on the given full +port (after converting it to one if required). The callback has to return a true value when its work is done, after which is will be removed, or a false value in which case it will stay registered. +The global C<$SELF> (exported by this module) contains C<$port> while +executing the callback. + +Runtime errors wdurign callback execution will result in the port being +Ced. + If the match is an array reference, then it will be matched against the first elements of the message, otherwise only the first element is being matched. @@ -202,78 +360,223 @@ =cut sub rcv($@) { - my ($self, $match, $cb) = @_; + my $port = shift; + my ($noderef, $portid) = split /#/, $port, 2; - if (!ref $match) { - push @{ $self->{rc0}{$match} }, [$cb]; - } elsif (("ARRAY" eq ref $match && !ref $match->[0])) { - my ($type, @match) = @$match; - @match - ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match] - : push @{ $self->{rc0}{$match->[0]} }, [$cb]; + ($NODE{$noderef} || add_node $noderef) == $NODE{""} + or Carp::croak "$port: rcv can only be called on local ports, caught"; + + if (@_ == 1) { + my $cb = shift; + delete $PORT_DATA{$portid}; + $PORT{$portid} = sub { + local $SELF = $port; + eval { + &$cb + and kil $port; + }; + _self_die if $@; + }; } else { - push @{ $self->{any} }, [$cb, $match]; + my $self = $PORT_DATA{$portid} ||= do { + my $self = bless { + id => $port, + }, "AnyEvent::MP::Port"; + + $PORT{$portid} = sub { + local $SELF = $port; + + eval { + for (@{ $self->{rc0}{$_[0]} }) { + $_ && &{$_->[0]} + && undef $_; + } + + for (@{ $self->{rcv}{$_[0]} }) { + $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1] + && &{$_->[0]} + && undef $_; + } + + for (@{ $self->{any} }) { + $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1] + && &{$_->[0]} + && undef $_; + } + }; + _self_die if $@; + }; + + $self + }; + + "AnyEvent::MP::Port" eq ref $self + or Carp::croak "$port: rcv can only be called on message matching ports, caught"; + + while (@_) { + my ($match, $cb) = splice @_, 0, 2; + + if (!ref $match) { + push @{ $self->{rc0}{$match} }, [$cb]; + } elsif (("ARRAY" eq ref $match && !ref $match->[0])) { + my ($type, @match) = @$match; + @match + ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match] + : push @{ $self->{rc0}{$match->[0]} }, [$cb]; + } else { + push @{ $self->{any} }, [$cb, $match]; + } + } } -} -=item $port->register ($name) + $port +} -Registers the given port under the well known name C<$name>. If the name -already exists it is replaced. +=item $closure = psub { BLOCK } -A port can only be registered under one well known name. +Remembers C<$SELF> and creates a closure out of the BLOCK. When the +closure is executed, sets up the environment in the same way as in C +callbacks, i.e. runtime errors will cause the port to get Ced. + +This is useful when you register callbacks from C callbacks: + + rcv delayed_reply => sub { + my ($delay, @reply) = @_; + my $timer = AE::timer $delay, 0, psub { + snd @reply, $SELF; + }; + }; =cut -sub register { - my ($self, $name) = @_; +sub psub(&) { + my $cb = shift; + + my $port = $SELF + or Carp::croak "psub can only be called from within rcv or psub callbacks, not"; - $self->{wkname} = $name; - $AnyEvent::MP::Base::WKP{$name} = "$self"; + sub { + local $SELF = $port; + + if (wantarray) { + my @res = eval { &$cb }; + _self_die if $@; + @res + } else { + my $res = eval { &$cb }; + _self_die if $@; + $res + } + } } -=item $port->destroy +=item $guard = mon $port, $cb->(@reason) + +=item $guard = mon $port, $otherport + +=item $guard = mon $port, $otherport, @msg + +Monitor the given port and do something when the port is killed. + +In the first form, the callback is simply called with any number +of C<@reason> elements (no @reason means that the port was deleted +"normally"). Note also that I<< the callback B never die >>, so use +C if unsure. + +In the second form, the other port will be C'ed with C<@reason>, iff +a @reason was specified, i.e. on "normal" kils nothing happens, while +under all other conditions, the other port is killed with the same reason. + +In the last form, a message of the form C<@msg, @reason> will be C. -Explicitly destroy/remove/nuke/vaporise the port. +Example: call a given callback when C<$port> is killed. -Ports are normally kept alive by there mere existance alone, and need to -be destroyed explicitly. + mon $port, sub { warn "port died because of <@_>\n" }; + +Example: kill ourselves when C<$port> is killed abnormally. + + mon $port, $self; + +Example: send us a restart message another C<$port> is killed. + + mon $port, $self => "restart"; =cut -sub destroy { - my ($self) = @_; +sub mon { + my ($noderef, $port) = split /#/, shift, 2; + + my $node = $NODE{$noderef} || add_node $noderef; - delete $AnyEvent::MP::Base::WKP{ $self->{wkname} }; + my $cb = shift; - delete $AnyEvent::MP::Base::PORT{$_} - for @{ $self->{names} }; + unless (ref $cb) { + if (@_) { + # send a kill info message + my (@msg) = ($cb, @_); + $cb = sub { snd @msg, @_ }; + } else { + # simply kill other port + my $port = $cb; + $cb = sub { kil $port, @_ if @_ }; + } + } + + $node->monitor ($port, $cb); + + defined wantarray + and AnyEvent::Util::guard { $node->unmonitor ($port, $cb) } } -=back +=item $guard = mon_guard $port, $ref, $ref... -=head1 FUNCTIONS FOR NODES +Monitors the given C<$port> and keeps the passed references. When the port +is killed, the references will be freed. -=over 4 +Optionally returns a guard that will stop the monitoring. -=item mon $noderef, $callback->($noderef, $status, $) +This function is useful when you create e.g. timers or other watchers and +want to free them when the port gets killed: -Monitors the given noderef. + $port->rcv (start => sub { + my $timer; $timer = mon_guard $port, AE::timer 1, 1, sub { + undef $timer if 0.9 < rand; + }); + }); -=item become_public endpoint... +=cut -Tells the node to become a public node, i.e. reachable from other nodes. +sub mon_guard { + my ($port, @refs) = @_; -If no arguments are given, or the first argument is C, then -AnyEvent::MP tries to bind on port C<4040> on all IP addresses that the -local nodename resolves to. + mon $port, sub { 0 && @refs } +} -Otherwise the first argument must be an array-reference with transport -endpoints ("ip:port", "hostname:port") or port numbers (in which case the -local nodename is used as hostname). The endpoints are all resolved and -will become the node reference. +=item lnk $port1, $port2 -=cut +Link two ports. This is simply a shorthand for: + + mon $port1, $port2; + mon $port2, $port1; + +It means that if either one is killed abnormally, the other one gets +killed as well. + +=item kil $port[, @reason] + +Kill the specified port with the given C<@reason>. + +If no C<@reason> is specified, then the port is killed "normally" (linked +ports will not be kileld, or even notified). + +Otherwise, linked ports get killed with the same reason (second form of +C, see below). + +Runtime errors while evaluating C callbacks or inside C blocks +will be reported as reason C<< die => $@ >>. + +Transport/communication errors are reported as C<< transport_error => +$message >>. =back @@ -284,11 +587,13 @@ message - C<$reply[0]> is the port to reply to, C<$reply[1]> the type and the remaining arguments are simply the message data. +While other messages exist, they are not public and subject to change. + =over 4 =cut -=item wkp => $name, @reply +=item lookup => $name, @reply Replies with the port ID of the specified well-known port, or C. @@ -321,6 +626,108 @@ =back +=head1 AnyEvent::MP vs. Distributed Erlang + +AnyEvent::MP got lots of its ideas from distributed Erlang (Erlang node +== aemp node, Erlang process == aemp port), so many of the documents and +programming techniques employed by Erlang apply to AnyEvent::MP. Here is a +sample: + + http://www.Erlang.se/doc/programming_rules.shtml + http://Erlang.org/doc/getting_started/part_frame.html # chapters 3 and 4 + http://Erlang.org/download/Erlang-book-part1.pdf # chapters 5 and 6 + http://Erlang.org/download/armstrong_thesis_2003.pdf # chapters 4 and 5 + +Despite the similarities, there are also some important differences: + +=over 4 + +=item * Node references contain the recipe on how to contact them. + +Erlang relies on special naming and DNS to work everywhere in the +same way. AEMP relies on each node knowing it's own address(es), with +convenience functionality. + +This means that AEMP requires a less tightly controlled environment at the +cost of longer node references and a slightly higher management overhead. + +=item * Erlang uses processes and a mailbox, AEMP does not queue. + +Erlang uses processes that selctively receive messages, and therefore +needs a queue. AEMP is event based, queuing messages would serve no useful +purpose. + +(But see L for a more Erlang-like process model on top of AEMP). + +=item * Erlang sends are synchronous, AEMP sends are asynchronous. + +Sending messages in Erlang is synchronous and blocks the process. AEMP +sends are immediate, connection establishment is handled in the +background. + +=item * Erlang can silently lose messages, AEMP cannot. + +Erlang makes few guarantees on messages delivery - messages can get lost +without any of the processes realising it (i.e. you send messages a, b, +and c, and the other side only receives messages a and c). + +AEMP guarantees correct ordering, and the guarantee that there are no +holes in the message sequence. + +=item * In Erlang, processes can be declared dead and later be found to be +alive. + +In Erlang it can happen that a monitored process is declared dead and +linked processes get killed, but later it turns out that the process is +still alive - and can receive messages. + +In AEMP, when port monitoring detects a port as dead, then that port will +eventually be killed - it cannot happen that a node detects a port as dead +and then later sends messages to it, finding it is still alive. + +=item * Erlang can send messages to the wrong port, AEMP does not. + +In Erlang it is quite possible that a node that restarts reuses a process +ID known to other nodes for a completely different process, causing +messages destined for that process to end up in an unrelated process. + +AEMP never reuses port IDs, so old messages or old port IDs floating +around in the network will not be sent to an unrelated port. + +=item * Erlang uses unprotected connections, AEMP uses secure +authentication and can use TLS. + +AEMP can use a proven protocol - SSL/TLS - to protect connections and +securely authenticate nodes. + +=item * The AEMP protocol is optimised for both text-based and binary +communications. + +The AEMP protocol, unlike the Erlang protocol, supports both +language-independent text-only protocols (good for debugging) and binary, +language-specific serialisers (e.g. Storable). + +It has also been carefully designed to be implementable in other languages +with a minimum of work while gracefully degrading fucntionality to make the +protocol simple. + +=item * AEMP has more flexible monitoring options than Erlang. + +In Erlang, you can chose to receive I exit signals as messages +or I, there is no in-between, so monitoring single processes is +difficult to implement. Monitoring in AEMP is more flexible than in +Erlang, as one can choose between automatic kill, exit message or callback +on a per-process basis. + +=item * Erlang has different semantics for monitoring and linking, AEMP has the same. + +Monitoring in Erlang is not an indicator of process death/crashes, +as linking is (except linking is unreliable in Erlang). In AEMP, the +semantics of monitoring and linking are identical, linking is simply +two-way monitoring with automatic kill. + +=back + =head1 SEE ALSO L.