--- AnyEvent-MP/MP.pm 2009/08/05 22:40:51 1.33 +++ AnyEvent-MP/MP.pm 2009/08/13 01:46:10 1.46 @@ -10,21 +10,45 @@ NODE # returns this node's noderef NODE $port # returns the noderef of the port + $SELF # receiving/own port id in rcv callbacks + + # ports are message endpoints + + # sending messages snd $port, type => data...; + snd $port, @msg; + snd @msg_with_first_element_being_a_port; - $SELF # receiving/own port id in rcv callbacks + # miniports + my $miniport = port { my @msg = @_; 0 }; - rcv $port, smartmatch => $cb->($port, @msg); + # full ports + my $port = port; + rcv $port, smartmatch => $cb->(@msg); + rcv $port, ping => sub { snd $_[0], "pong"; 0 }; + rcv $port, pong => sub { warn "pong received\n"; 0 }; - # examples: - rcv $port2, ping => sub { snd $_[0], "pong"; 0 }; - rcv $port1, pong => sub { warn "pong received\n" }; - snd $port2, ping => $port1; + # remote ports + my $port = spawn $node, $initfunc, @initdata; # more, smarter, matches (_any_ is exported by this module) rcv $port, [child_died => $pid] => sub { ... rcv $port, [_any_, _any_, 3] => sub { .. $_[2] is 3 + # monitoring + mon $port, $cb->(@msg) # callback is invoked on death + mon $port, $otherport # kill otherport on abnormal death + mon $port, $otherport, @msg # send message on death + +=head1 CURRENT STATUS + + AnyEvent::MP - stable API, should work + AnyEvent::MP::Intro - outdated + AnyEvent::MP::Kernel - WIP + AnyEvent::MP::Transport - mostly stable + + stay tuned. + =head1 DESCRIPTION This module (-family) implements a simple message passing framework. @@ -37,7 +61,7 @@ At the moment, this module family is severly broken and underdocumented, so do not use. This was uploaded mainly to reserve the CPAN namespace - -stay tuned! The basic API should be finished, however. +stay tuned! =head1 CONCEPTS @@ -92,7 +116,7 @@ package AnyEvent::MP; -use AnyEvent::MP::Base; +use AnyEvent::MP::Kernel; use common::sense; @@ -102,11 +126,12 @@ use base "Exporter"; -our $VERSION = '0.1'; +our $VERSION = $AnyEvent::MP::Kernel::VERSION; + our @EXPORT = qw( NODE $NODE *SELF node_of _any_ resolve_node initialise_node - snd rcv mon kil reg psub + snd rcv mon kil reg psub spawn port ); @@ -129,6 +154,70 @@ Extracts and returns the noderef from a portid or a noderef. +=item initialise_node $noderef, $seednode, $seednode... + +=item initialise_node "slave/", $master, $master... + +Before a node can talk to other nodes on the network it has to initialise +itself - the minimum a node needs to know is it's own name, and optionally +it should know the noderefs of some other nodes in the network. + +This function initialises a node - it must be called exactly once (or +never) before calling other AnyEvent::MP functions. + +All arguments are noderefs, which can be either resolved or unresolved. + +There are two types of networked nodes, public nodes and slave nodes: + +=over 4 + +=item public nodes + +For public nodes, C<$noderef> must either be a (possibly unresolved) +noderef, in which case it will be resolved, or C (or missing), in +which case the noderef will be guessed. + +Afterwards, the node will bind itself on all endpoints and try to connect +to all additional C<$seednodes> that are specified. Seednodes are optional +and can be used to quickly bootstrap the node into an existing network. + +=item slave nodes + +When the C<$noderef> is the special string C, then the node will +become a slave node. Slave nodes cannot be contacted from outside and will +route most of their traffic to the master node that they attach to. + +At least one additional noderef is required: The node will try to connect +to all of them and will become a slave attached to the first node it can +successfully connect to. + +=back + +This function will block until all nodes have been resolved and, for slave +nodes, until it has successfully established a connection to a master +server. + +Example: become a public node listening on the default node. + + initialise_node; + +Example: become a public node, and try to contact some well-known master +servers to become part of the network. + + initialise_node undef, "master1", "master2"; + +Example: become a public node listening on port C<4041>. + + initialise_node 4041; + +Example: become a public node, only visible on localhost port 4044. + + initialise_node "locahost:4044"; + +Example: become a slave node to any of the specified master servers. + + initialise_node "slave/", "master1", "192.168.13.17", "mp.example.net"; + =item $cv = resolve_node $noderef Takes an unresolved node reference that may contain hostnames and @@ -235,8 +324,10 @@ =item reg $port, $name -Registers the given port under the name C<$name>. If the name already -exists it is replaced. +=item reg $name + +Registers the given port (or C<$SELF><<< if missing) under the name +C<$name>. If the name already exists it is replaced. A port can only be registered under one well known name. @@ -245,9 +336,9 @@ =cut sub reg(@) { - my ($port, $name) = @_; + my $port = @_ > 1 ? shift : $SELF || Carp::croak 'reg: called with one argument only, but $SELF not set,'; - $REG{$name} = $port; + $REG{$_[0]} = $port; } =item rcv $port, $callback->(@msg) @@ -262,7 +353,7 @@ =item rcv $port, [$smartmatch...] => $callback->(@msg), ... Register callbacks to be called on matching messages on the given full -port (after converting it to one if required). +port (after converting it to one if required) and return the port. The callback has to return a true value when its work is done, after which is will be removed, or a false value in which case it will stay @@ -271,7 +362,7 @@ The global C<$SELF> (exported by this module) contains C<$port> while executing the callback. -Runtime errors wdurign callback execution will result in the port being +Runtime errors during callback execution will result in the port being Ced. If the match is an array reference, then it will be matched against the @@ -285,6 +376,22 @@ element is a string identifying the message. The one-string-only match is also the most efficient match (by far). +Example: create a port and bind receivers on it in one go. + + my $port = rcv port, + msg1 => sub { ...; 0 }, + msg2 => sub { ...; 0 }, + ; + +Example: create a port, bind receivers and send it in a message elsewhere +in one go: + + snd $otherport, reply => + rcv port, + msg1 => sub { ...; 0 }, + ... + ; + =cut sub rcv($@) { @@ -400,22 +507,46 @@ =item $guard = mon $port, $cb->(@reason) -=item $guard = mon $port, $otherport +=item $guard = mon $port, $rcvport + +=item $guard = mon $port -=item $guard = mon $port, $otherport, @msg +=item $guard = mon $port, $rcvport, @msg -Monitor the given port and do something when the port is killed. +Monitor the given port and do something when the port is killed or +messages to it were lost, and optionally return a guard that can be used +to stop monitoring again. + +C effectively guarantees that, in the absence of hardware failures, +that after starting the monitor, either all messages sent to the port +will arrive, or the monitoring action will be invoked after possible +message loss has been detected. No messages will be lost "in between" +(after the first lost message no further messages will be received by the +port). After the monitoring action was invoked, further messages might get +delivered again. -In the first form, the callback is simply called with any number -of C<@reason> elements (no @reason means that the port was deleted +In the first form (callback), the callback is simply called with any +number of C<@reason> elements (no @reason means that the port was deleted "normally"). Note also that I<< the callback B never die >>, so use C if unsure. -In the second form, the other port will be C'ed with C<@reason>, iff -a @reason was specified, i.e. on "normal" kils nothing happens, while -under all other conditions, the other port is killed with the same reason. - -In the last form, a message of the form C<@msg, @reason> will be C. +In the second form (another port given), the other port (C<$rcvport>) +will be C'ed with C<@reason>, iff a @reason was specified, i.e. on +"normal" kils nothing happens, while under all other conditions, the other +port is killed with the same reason. + +The third form (kill self) is the same as the second form, except that +C<$rvport> defaults to C<$SELF>. + +In the last form (message), a message of the form C<@msg, @reason> will be +C. + +As a rule of thumb, monitoring requests should always monitor a port from +a local port (or callback). The reason is that kill messages might get +lost, just like any other message. Another less obvious reason is that +even monitoring requests can get lost (for exmaple, when the connection +to the other node goes down permanently). When monitoring a port locally +these problems do not exist. Example: call a given callback when C<$port> is killed. @@ -423,9 +554,9 @@ Example: kill ourselves when C<$port> is killed abnormally. - mon $port, $self; + mon $port; -Example: send us a restart message another C<$port> is killed. +Example: send us a restart message when another C<$port> is killed. mon $port, $self => "restart"; @@ -436,7 +567,7 @@ my $node = $NODE{$noderef} || add_node $noderef; - my $cb = shift; + my $cb = @_ ? shift : $SELF || Carp::croak 'mon: called with one argument only, but $SELF not set,'; unless (ref $cb) { if (@_) { @@ -477,19 +608,11 @@ sub mon_guard { my ($port, @refs) = @_; + #TODO: mon-less form? + mon $port, sub { 0 && @refs } } -=item lnk $port1, $port2 - -Link two ports. This is simply a shorthand for: - - mon $port1, $port2; - mon $port2, $port1; - -It means that if either one is killed abnormally, the other one gets -killed as well. - =item kil $port[, @reason] Kill the specified port with the given C<@reason>. @@ -506,73 +629,76 @@ Transport/communication errors are reported as C<< transport_error => $message >>. -=back - -=head1 FUNCTIONS FOR NODES - -=over 4 - -=item initialise_node $noderef, $seednode, $seednode... - -=item initialise_node "slave/", $master, $master... - -Initialises a node - must be called exactly once before calling other -AnyEvent::MP functions when talking to other nodes is required. - -All arguments are noderefs, which can be either resolved or unresolved. +=cut -There are two types of networked nodes, public nodes and slave nodes: +=item $port = spawn $node, $initfunc[, @initdata] -=over 4 +Creates a port on the node C<$node> (which can also be a port ID, in which +case it's the node where that port resides). -=item public nodes +The port ID of the newly created port is return immediately, and it is +permissible to immediately start sending messages or monitor the port. -For public nodes, C<$noderef> must either be a (possibly unresolved) -noderef, in which case it will be resolved, or C (or missing), in -which case the noderef will be guessed. +After the port has been created, the init function is +called. This function must be a fully-qualified function name +(e.g. C). To specify a function in the main +program, use C<::name>. -Afterwards, the node will bind itself on all endpoints and try to connect -to all additional C<$seednodes> that are specified. Seednodes are optional -and can be used to quickly bootstrap the node into an existing network. - -=item slave nodes +If the function doesn't exist, then the node tries to C +the package, then the package above the package and so on (e.g. +C, C, C) until the function +exists or it runs out of package names. -When the C<$noderef> is the special string C, then the node will -become a slave node. Slave nodes cannot be contacted from outside and will -route most of their traffic to the master node that they attach to. +The init function is then called with the newly-created port as context +object (C<$SELF>) and the C<@initdata> values as arguments. -At least one additional noderef is required: The node will try to connect -to all of them and will become a slave attached to the first node it can -successfully connect to. +A common idiom is to pass your own port, monitor the spawned port, and +in the init function, monitor the original port. This two-way monitoring +ensures that both ports get cleaned up when there is a problem. -=back +Example: spawn a chat server port on C<$othernode>. -This function will block until all nodes have been resolved and, for slave -nodes, until it has successfully established a connection to a master -server. + # this node, executed from within a port context: + my $server = spawn $othernode, "MyApp::Chat::Server::connect", $SELF; + mon $server; -Example: become a public node listening on the default node. + # init function on C<$othernode> + sub connect { + my ($srcport) = @_; - initialise_node; + mon $srcport; -Example: become a public node, and try to contact some well-known master -servers to become part of the network. + rcv $SELF, sub { + ... + }; + } - initialise_node undef, "master1", "master2"; +=cut -Example: become a public node listening on port C<4041>. +sub _spawn { + my $port = shift; + my $init = shift; - initialise_node 4041; + local $SELF = "$NODE#$port"; + eval { + &{ load_func $init } + }; + _self_die if $@; +} -Example: become a public node, only visible on localhost port 4044. +sub spawn(@) { + my ($noderef, undef) = split /#/, shift, 2; - initialise_node "locahost:4044"; + my $id = "$RUNIQ." . $ID++; -Example: become a slave node to any of the specified master servers. + $_[0] =~ /::/ + or Carp::croak "spawn init function must be a fully-qualified name, caught"; - initialise_node "slave/", "master1", "192.168.13.17", "mp.example.net"; + ($NODE{$noderef} || add_node $noderef) + ->send (["", "AnyEvent::MP::_spawn" => $id, @_]); -=cut + "$noderef#$id" +} =back @@ -624,15 +750,15 @@ =head1 AnyEvent::MP vs. Distributed Erlang -AnyEvent::MP got lots of its ideas from distributed erlang (erlang node -== aemp node, erlang process == aemp port), so many of the documents and -programming techniques employed by erlang apply to AnyEvent::MP. Here is a +AnyEvent::MP got lots of its ideas from distributed Erlang (Erlang node +== aemp node, Erlang process == aemp port), so many of the documents and +programming techniques employed by Erlang apply to AnyEvent::MP. Here is a sample: - http://www.erlang.se/doc/programming_rules.shtml - http://erlang.org/doc/getting_started/part_frame.html # chapters 3 and 4 - http://erlang.org/download/erlang-book-part1.pdf # chapters 5 and 6 - http://erlang.org/download/armstrong_thesis_2003.pdf # chapters 4 and 5 + http://www.Erlang.se/doc/programming_rules.shtml + http://Erlang.org/doc/getting_started/part_frame.html # chapters 3 and 4 + http://Erlang.org/download/Erlang-book-part1.pdf # chapters 5 and 6 + http://Erlang.org/download/armstrong_thesis_2003.pdf # chapters 4 and 5 Despite the similarities, there are also some important differences: @@ -653,11 +779,11 @@ needs a queue. AEMP is event based, queuing messages would serve no useful purpose. -(But see L for a more erlang-like process model on top of AEMP). +(But see L for a more Erlang-like process model on top of AEMP). =item * Erlang sends are synchronous, AEMP sends are asynchronous. -Sending messages in erlang is synchronous and blocks the process. AEMP +Sending messages in Erlang is synchronous and blocks the process. AEMP sends are immediate, connection establishment is handled in the background. @@ -670,10 +796,10 @@ AEMP guarantees correct ordering, and the guarantee that there are no holes in the message sequence. -=item * In erlang, processes can be declared dead and later be found to be +=item * In Erlang, processes can be declared dead and later be found to be alive. -In erlang it can happen that a monitored process is declared dead and +In Erlang it can happen that a monitored process is declared dead and linked processes get killed, but later it turns out that the process is still alive - and can receive messages. @@ -683,7 +809,7 @@ =item * Erlang can send messages to the wrong port, AEMP does not. -In erlang it is quite possible that a node that restarts reuses a process +In Erlang it is quite possible that a node that restarts reuses a process ID known to other nodes for a completely different process, causing messages destined for that process to end up in an unrelated process. @@ -699,7 +825,7 @@ =item * The AEMP protocol is optimised for both text-based and binary communications. -The AEMP protocol, unlike the erlang protocol, supports both +The AEMP protocol, unlike the Erlang protocol, supports both language-independent text-only protocols (good for debugging) and binary, language-specific serialisers (e.g. Storable). @@ -707,6 +833,60 @@ with a minimum of work while gracefully degrading fucntionality to make the protocol simple. +=item * AEMP has more flexible monitoring options than Erlang. + +In Erlang, you can chose to receive I exit signals as messages +or I, there is no in-between, so monitoring single processes is +difficult to implement. Monitoring in AEMP is more flexible than in +Erlang, as one can choose between automatic kill, exit message or callback +on a per-process basis. + +=item * Erlang tries to hide remote/local connections, AEMP does not. + +Monitoring in Erlang is not an indicator of process death/crashes, +as linking is (except linking is unreliable in Erlang). + +In AEMP, you don't "look up" registered port names or send to named ports +that might or might not be persistent. Instead, you normally spawn a port +on the remote node. The init function monitors the you, and you monitor +the remote port. Since both monitors are local to the node, they are much +more reliable. + +This also saves round-trips and avoids sending messages to the wrong port +(hard to do in Erlang). + +=back + +=head1 RATIONALE + +=over 4 + +=item Why strings for ports and noderefs, why not objects? + +We considered "objects", but found that the actual number of methods +thatc an be called are very low. Since port IDs and noderefs travel over +the network frequently, the serialising/deserialising would add lots of +overhead, as well as having to keep a proxy object. + +Strings can easily be printed, easily serialised etc. and need no special +procedures to be "valid". + +=item Why favour JSON, why not real serialising format such as Storable? + +In fact, any AnyEvent::MP node will happily accept Storable as framing +format, but currently there is no way to make a node use Storable by +default. + +The default framing protocol is JSON because a) JSON::XS is many times +faster for small messages and b) most importantly, after years of +experience we found that object serialisation is causing more problems +than it gains: Just like function calls, objects simply do not travel +easily over the network, mostly because they will always be a copy, so you +always have to re-think your design. + +Keeping your messages simple, concentrating on data structures rather than +objects, will keep your messages clean, tidy and efficient. + =back =head1 SEE ALSO