--- AnyEvent-MP/MP.pm 2009/08/31 11:11:27 1.74 +++ AnyEvent-MP/MP.pm 2012/02/26 10:29:59 1.119 @@ -1,21 +1,20 @@ =head1 NAME -AnyEvent::MP - multi-processing/message-passing framework +AnyEvent::MP - erlang-style multi-processing/message-passing framework =head1 SYNOPSIS use AnyEvent::MP; - $NODE # contains this node's noderef - NODE # returns this node's noderef - NODE $port # returns the noderef of the port + $NODE # contains this node's node ID + NODE # returns this node's node ID $SELF # receiving/own port id in rcv callbacks # initialise the node so it can send/receive messages configure; - # ports are message endpoints + # ports are message destinations # sending messages snd $port, type => data...; @@ -33,20 +32,30 @@ # create a port on another node my $port = spawn $node, $initfunc, @initdata; + # destroy a port again + kil $port; # "normal" kill + kil $port, my_error => "everything is broken"; # error kill + # monitoring - mon $port, $cb->(@msg) # callback is invoked on death - mon $port, $otherport # kill otherport on abnormal death - mon $port, $otherport, @msg # send message on death + mon $localport, $cb->(@msg) # callback is invoked on death + mon $localport, $otherport # kill otherport on abnormal death + mon $localport, $otherport, @msg # send message on death + + # temporarily execute code in port context + peval $port, sub { die "kill the port!" }; + + # execute callbacks in $SELF port context + my $timer = AE::timer 1, 0, psub { + die "kill the port, delayed"; + }; =head1 CURRENT STATUS bin/aemp - stable. AnyEvent::MP - stable API, should work. - AnyEvent::MP::Intro - uptodate, but incomplete. - AnyEvent::MP::Kernel - mostly stable. - AnyEvent::MP::Global - stable API, protocol not yet final. - - stay tuned. + AnyEvent::MP::Intro - explains most concepts. + AnyEvent::MP::Kernel - mostly stable API. + AnyEvent::MP::Global - stable API. =head1 DESCRIPTION @@ -58,20 +67,21 @@ For an introduction to this module family, see the L manual page and the examples under F. -At the moment, this module family is a bit underdocumented. - =head1 CONCEPTS =over 4 =item port -A port is something you can send messages to (with the C function). +Not to be confused with a TCP port, a "port" is something you can send +messages to (with the C function). Ports allow you to register C handlers that can match all or just some messages. Messages send to ports will not be queued, regardless of anything was listening for them or not. +Ports are represented by (printable) strings called "port IDs". + =item port ID - C A port ID is the concatenation of a node ID, a hash-mark (C<#>) as @@ -85,36 +95,77 @@ Nodes are either public (have one or more listening ports) or private (no listening ports). Private nodes cannot talk to other private nodes -currently. +currently, but all nodes can talk to public nodes. -=item node ID - C<[a-za-Z0-9_\-.:]+> +Nodes is represented by (printable) strings called "node IDs". + +=item node ID - C<[A-Za-z0-9_\-.:]*> A node ID is a string that uniquely identifies the node within a network. Depending on the configuration used, node IDs can look like a hostname, a hostname and a port, or a random string. AnyEvent::MP itself -doesn't interpret node IDs in any way. +doesn't interpret node IDs in any way except to uniquely identify a node. =item binds - C Nodes can only talk to each other by creating some kind of connection to each other. To do this, nodes should listen on one or more local transport -endpoints - binds. Currently, only standard C specifications can -be used, which specify TCP ports to listen on. - -=item seeds - C +endpoints - binds. -When a node starts, it knows nothing about the network. To teach the node -about the network it first has to contact some other node within the -network. This node is called a seed. - -Seeds are transport endpoint(s) of as many nodes as one wants. Those nodes -are expected to be long-running, and at least one of those should always -be available. When nodes run out of connections (e.g. due to a network -error), they try to re-establish connections to some seednodes again to -join the network. - -Apart from being sued for seeding, seednodes are not special in any way - -every public node can be a seednode. +Currently, only standard C specifications can be used, which +specify TCP ports to listen on. So a bind is basically just a tcp socket +in listening mode thta accepts conenctions form other nodes. + +=item seed nodes + +When a node starts, it knows nothing about the network it is in - it +needs to connect to at least one other node that is already in the +network. These other nodes are called "seed nodes". + +Seed nodes themselves are not special - they are seed nodes only because +some other node I them as such, but any node can be used as seed +node for other nodes, and eahc node cna use a different set of seed nodes. + +In addition to discovering the network, seed nodes are also used to +maintain the network - all nodes using the same seed node form are part of +the same network. If a network is split into multiple subnets because e.g. +the network link between the parts goes down, then using the same seed +nodes for all nodes ensures that eventually the subnets get merged again. + +Seed nodes are expected to be long-running, and at least one seed node +should always be available. They should also be relatively responsive - a +seed node that blocks for long periods will slow down everybody else. + +For small networks, it's best if every node uses the same set of seed +nodes. For large networks, it can be useful to specify "regional" seed +nodes for most nodes in an area, and use all seed nodes as seed nodes for +each other. What's important is that all seed nodes connections form a +complete graph, so that the network cannot split into separate subnets +forever. + +Seed nodes are represented by seed IDs. + +=item seed IDs - C + +Seed IDs are transport endpoint(s) (usually a hostname/IP address and a +TCP port) of nodes that should be used as seed nodes. + +=item global nodes + +An AEMP network needs a discovery service - nodes need to know how to +connect to other nodes they only know by name. In addition, AEMP offers a +distributed "group database", which maps group names to a list of strings +- for example, to register worker ports. + +A network needs at least one global node to work, and allows every node to +be a global node. + +Any node that loads the L module becomes a global +node and tries to keep connections to all other nodes. So while it can +make sense to make every node "global" in small networks, it usually makes +sense to only make seed nodes into global nodes in large networks (nodes +keep connections to seed nodes and global nodes, so makign them the same +reduces overhead). =back @@ -136,12 +187,12 @@ use base "Exporter"; -our $VERSION = $AnyEvent::MP::Kernel::VERSION; +our $VERSION = '1.30'; our @EXPORT = qw( NODE $NODE *SELF node_of after configure - snd rcv mon mon_guard kil reg psub spawn + snd rcv mon mon_guard kil psub peval spawn cal port ); @@ -163,6 +214,8 @@ Extracts and returns the node ID from a port ID or a node ID. +=item configure $profile, key => value... + =item configure key => value... Before a node can talk to other nodes on the network (i.e. enter @@ -170,6 +223,9 @@ to know is its own name, and optionally it should know the addresses of some other nodes in the network to discover other nodes. +The key/value pairs are basically the same ones as documented for the +F command line utility (sans the set/del prefix). + This function configures a node - it must be called exactly once (or never) before calling other AnyEvent::MP functions. @@ -179,12 +235,12 @@ The function first looks up a profile in the aemp configuration (see the L commandline utility). The profile name can be specified via the -named C parameter. If it is missing, then the nodename (F) will be used as profile name. +named C parameter or can simply be the first parameter). If it is +missing, then the nodename (F) will be used as profile name. The profile data is then gathered as follows: -First, all remaining key => value pairs (all of which are conviniently +First, all remaining key => value pairs (all of which are conveniently undocumented at the moment) will be interpreted as configuration data. Then they will be overwritten by any values specified in the global default configuration (see the F utility), then the chain of @@ -212,13 +268,13 @@ =item step 3, connect to seed nodes -As the last step, the seeds list from the profile is passed to the +As the last step, the seed ID list from the profile is passed to the L module, which will then use it to keep connectivity with at least one node at any point in time. =back -Example: become a distributed node using the locla node name as profile. +Example: become a distributed node using the local node name as profile. This should be the most common form of invocation for "daemon"-type nodes. configure @@ -362,7 +418,7 @@ ; Example: temporarily register a rcv callback for a tag matching some port -(e.g. for a rpc reply) and unregister it after a message was received. +(e.g. for an rpc reply) and unregister it after a message was received. rcv $port, $otherport => sub { my @reply = @_; @@ -374,9 +430,9 @@ sub rcv($@) { my $port = shift; - my ($noderef, $portid) = split /#/, $port, 2; + my ($nodeid, $portid) = split /#/, $port, 2; - $NODE{$noderef} == $NODE{""} + $NODE{$nodeid} == $NODE{""} or Carp::croak "$port: rcv can only be called on local ports, caught"; while (@_) { @@ -385,7 +441,7 @@ "AnyEvent::MP::Port" eq ref $self or Carp::croak "$port: rcv can only be called on message matching ports, caught"; - $self->[2] = shift; + $self->[0] = shift; } else { my $cb = shift; $PORT{$portid} = sub { @@ -395,7 +451,7 @@ } } elsif (defined $_[0]) { my $self = $PORT_DATA{$portid} ||= do { - my $self = bless [$PORT{$port} || sub { }, { }, $port], "AnyEvent::MP::Port"; + my $self = bless [$PORT{$portid} || sub { }, { }, $port], "AnyEvent::MP::Port"; $PORT{$portid} = sub { local $SELF = $port; @@ -427,12 +483,52 @@ $port } +=item peval $port, $coderef[, @args] + +Evaluates the given C<$codref> within the contetx of C<$port>, that is, +when the code throews an exception the C<$port> will be killed. + +Any remaining args will be passed to the callback. Any return values will +be returned to the caller. + +This is useful when you temporarily want to execute code in the context of +a port. + +Example: create a port and run some initialisation code in it's context. + + my $port = port { ... }; + + peval $port, sub { + init + or die "unable to init"; + }; + +=cut + +sub peval($$) { + local $SELF = shift; + my $cb = shift; + + if (wantarray) { + my @res = eval { &$cb }; + _self_die if $@; + @res + } else { + my $res = eval { &$cb }; + _self_die if $@; + $res + } +} + =item $closure = psub { BLOCK } Remembers C<$SELF> and creates a closure out of the BLOCK. When the closure is executed, sets up the environment in the same way as in C callbacks, i.e. runtime errors will cause the port to get Ced. +The effect is basically as if it returned C<< sub { peval $SELF, sub { +BLOCK }, @_ } >>. + This is useful when you register callbacks from C callbacks: rcv delayed_reply => sub { @@ -477,24 +573,13 @@ messages to it were lost, and optionally return a guard that can be used to stop monitoring again. -C effectively guarantees that, in the absence of hardware failures, -after starting the monitor, either all messages sent to the port will -arrive, or the monitoring action will be invoked after possible message -loss has been detected. No messages will be lost "in between" (after -the first lost message no further messages will be received by the -port). After the monitoring action was invoked, further messages might get -delivered again. - -Note that monitoring-actions are one-shot: once messages are lost (and a -monitoring alert was raised), they are removed and will not trigger again. - In the first form (callback), the callback is simply called with any number of C<@reason> elements (no @reason means that the port was deleted "normally"). Note also that I<< the callback B never die >>, so use C if unsure. In the second form (another port given), the other port (C<$rcvport>) -will be C'ed with C<@reason>, iff a @reason was specified, i.e. on +will be C'ed with C<@reason>, if a @reason was specified, i.e. on "normal" kils nothing happens, while under all other conditions, the other port is killed with the same reason. @@ -504,13 +589,33 @@ In the last form (message), a message of the form C<@msg, @reason> will be C. +Monitoring-actions are one-shot: once messages are lost (and a monitoring +alert was raised), they are removed and will not trigger again. + As a rule of thumb, monitoring requests should always monitor a port from a local port (or callback). The reason is that kill messages might get lost, just like any other message. Another less obvious reason is that -even monitoring requests can get lost (for exmaple, when the connection +even monitoring requests can get lost (for example, when the connection to the other node goes down permanently). When monitoring a port locally these problems do not exist. +C effectively guarantees that, in the absence of hardware failures, +after starting the monitor, either all messages sent to the port will +arrive, or the monitoring action will be invoked after possible message +loss has been detected. No messages will be lost "in between" (after +the first lost message no further messages will be received by the +port). After the monitoring action was invoked, further messages might get +delivered again. + +Inter-host-connection timeouts and monitoring depend on the transport +used. The only transport currently implemented is TCP, and AnyEvent::MP +relies on TCP to detect node-downs (this can take 10-15 minutes on a +non-idle connection, and usually around two hours for idle connections). + +This means that monitoring is good for program errors and cleaning up +stuff eventually, but they are no replacement for a timeout when you need +to ensure some maximum latency. + Example: call a given callback when C<$port> is killed. mon $port, sub { warn "port died because of <@_>\n" }; @@ -526,9 +631,9 @@ =cut sub mon { - my ($noderef, $port) = split /#/, shift, 2; + my ($nodeid, $port) = split /#/, shift, 2; - my $node = $NODE{$noderef} || add_node $noderef; + my $node = $NODE{$nodeid} || add_node $nodeid; my $cb = @_ ? shift : $SELF || Carp::croak 'mon: called with one argument only, but $SELF not set,'; @@ -547,7 +652,7 @@ $node->monitor ($port, $cb); defined wantarray - and AnyEvent::Util::guard { $node->unmonitor ($port, $cb) } + and ($cb += 0, AnyEvent::Util::guard { $node->unmonitor ($port, $cb) }) } =item $guard = mon_guard $port, $ref, $ref... @@ -580,12 +685,12 @@ Kill the specified port with the given C<@reason>. -If no C<@reason> is specified, then the port is killed "normally" (ports -monitoring other ports will not necessarily die because a port dies -"normally"). +If no C<@reason> is specified, then the port is killed "normally" - +monitor callback will be invoked, but the kil will not cause linked ports +(C form) to get killed. -Otherwise, linked ports get killed with the same reason (second form of -C, see above). +If a C<@reason> is specified, then linked ports (C +form) get killed with the same reason. Runtime errors while evaluating C callbacks or inside C blocks will be reported as reason C<< die => $@ >>. @@ -614,13 +719,19 @@ exists or it runs out of package names. The init function is then called with the newly-created port as context -object (C<$SELF>) and the C<@initdata> values as arguments. +object (C<$SELF>) and the C<@initdata> values as arguments. It I +call one of the C functions to set callbacks on C<$SELF>, otherwise +the port might not get created. A common idiom is to pass a local port, immediately monitor the spawned port, and in the remote init function, immediately monitor the passed local port. This two-way monitoring ensures that both ports get cleaned up when there is a problem. +C guarantees that the C<$initfunc> has no visible effects on the +caller before C returns (by delaying invocation when spawn is +called for the local node). + Example: spawn a chat server port on C<$othernode>. # this node, executed from within a port context: @@ -644,6 +755,7 @@ my $port = shift; my $init = shift; + # rcv will create the actual port local $SELF = "$NODE#$port"; eval { &{ load_func $init } @@ -652,16 +764,16 @@ } sub spawn(@) { - my ($noderef, undef) = split /#/, shift, 2; + my ($nodeid, undef) = split /#/, shift, 2; my $id = "$RUNIQ." . $ID++; $_[0] =~ /::/ or Carp::croak "spawn init function must be a fully-qualified name, caught"; - snd_to_func $noderef, "AnyEvent::MP::_spawn" => $id, @_; + snd_to_func $nodeid, "AnyEvent::MP::_spawn" => $id, @_; - "$noderef#$id" + "$nodeid#$id" } =item after $timeout, @msg @@ -688,6 +800,58 @@ }; } +=item cal $port, @msg, $callback[, $timeout] + +A simple form of RPC - sends a message to the given C<$port> with the +given contents (C<@msg>), but adds a reply port to the message. + +The reply port is created temporarily just for the purpose of receiving +the reply, and will be Ced when no longer needed. + +A reply message sent to the port is passed to the C<$callback> as-is. + +If an optional time-out (in seconds) is given and it is not C, +then the callback will be called without any arguments after the time-out +elapsed and the port is Ced. + +If no time-out is given (or it is C), then the local port will +monitor the remote port instead, so it eventually gets cleaned-up. + +Currently this function returns the temporary port, but this "feature" +might go in future versions unless you can make a convincing case that +this is indeed useful for something. + +=cut + +sub cal(@) { + my $timeout = ref $_[-1] ? undef : pop; + my $cb = pop; + + my $port = port { + undef $timeout; + kil $SELF; + &$cb; + }; + + if (defined $timeout) { + $timeout = AE::timer $timeout, 0, sub { + undef $timeout; + kil $port; + $cb->(); + }; + } else { + mon $_[0], sub { + kil $port; + $cb->(); + }; + } + + push @_, $port; + &snd; + + $port +} + =back =head1 AnyEvent::MP vs. Distributed Erlang @@ -697,10 +861,10 @@ programming techniques employed by Erlang apply to AnyEvent::MP. Here is a sample: - http://www.Erlang.se/doc/programming_rules.shtml - http://Erlang.org/doc/getting_started/part_frame.html # chapters 3 and 4 - http://Erlang.org/download/Erlang-book-part1.pdf # chapters 5 and 6 - http://Erlang.org/download/armstrong_thesis_2003.pdf # chapters 4 and 5 + http://www.erlang.se/doc/programming_rules.shtml + http://erlang.org/doc/getting_started/part_frame.html # chapters 3 and 4 + http://erlang.org/download/erlang-book-part1.pdf # chapters 5 and 6 + http://erlang.org/download/armstrong_thesis_2003.pdf # chapters 4 and 5 Despite the similarities, there are also some important differences: @@ -710,7 +874,8 @@ Erlang relies on special naming and DNS to work everywhere in the same way. AEMP relies on each node somehow knowing its own address(es) (e.g. by -configuraiton or DNS), but will otherwise discover other odes itself. +configuration or DNS), and possibly the addresses of some seed nodes, but +will otherwise discover other nodes (and their IDs) itself. =item * Erlang has a "remote ports are like local ports" philosophy, AEMP uses "local ports are like remote ports". @@ -729,38 +894,49 @@ =item * Erlang uses processes and a mailbox, AEMP does not queue. -Erlang uses processes that selectively receive messages, and therefore -needs a queue. AEMP is event based, queuing messages would serve no -useful purpose. For the same reason the pattern-matching abilities of -AnyEvent::MP are more limited, as there is little need to be able to -filter messages without dequeing them. +Erlang uses processes that selectively receive messages out of order, and +therefore needs a queue. AEMP is event based, queuing messages would serve +no useful purpose. For the same reason the pattern-matching abilities +of AnyEvent::MP are more limited, as there is little need to be able to +filter messages without dequeuing them. -(But see L for a more Erlang-like process model on top of AEMP). +This is not a philosophical difference, but simply stems from AnyEvent::MP +being event-based, while Erlang is process-based. + +You cna have a look at L for a more Erlang-like process model on +top of AEMP and Coro threads. =item * Erlang sends are synchronous, AEMP sends are asynchronous. -Sending messages in Erlang is synchronous and blocks the process (and -so does not need a queue that can overflow). AEMP sends are immediate, -connection establishment is handled in the background. +Sending messages in Erlang is synchronous and blocks the process until +a conenction has been established and the message sent (and so does not +need a queue that can overflow). AEMP sends return immediately, connection +establishment is handled in the background. =item * Erlang suffers from silent message loss, AEMP does not. -Erlang makes few guarantees on messages delivery - messages can get lost -without any of the processes realising it (i.e. you send messages a, b, -and c, and the other side only receives messages a and c). - -AEMP guarantees correct ordering, and the guarantee that after one message -is lost, all following ones sent to the same port are lost as well, until -monitoring raises an error, so there are no silent "holes" in the message -sequence. +Erlang implements few guarantees on messages delivery - messages can get +lost without any of the processes realising it (i.e. you send messages a, +b, and c, and the other side only receives messages a and c). + +AEMP guarantees (modulo hardware errors) correct ordering, and the +guarantee that after one message is lost, all following ones sent to the +same port are lost as well, until monitoring raises an error, so there are +no silent "holes" in the message sequence. + +If you want your software to be very reliable, you have to cope with +corrupted and even out-of-order messages in both Erlang and AEMP. AEMP +simply tries to work better in common error cases, such as when a network +link goes down. =item * Erlang can send messages to the wrong port, AEMP does not. -In Erlang it is quite likely that a node that restarts reuses a process ID -known to other nodes for a completely different process, causing messages -destined for that process to end up in an unrelated process. +In Erlang it is quite likely that a node that restarts reuses an Erlang +process ID known to other nodes for a completely different process, +causing messages destined for that process to end up in an unrelated +process. -AEMP never reuses port IDs, so old messages or old port IDs floating +AEMP does not reuse port IDs, so old messages or old port IDs floating around in the network will not be sent to an unrelated port. =item * Erlang uses unprotected connections, AEMP uses secure @@ -773,7 +949,7 @@ communications. The AEMP protocol, unlike the Erlang protocol, supports both programming -language independent text-only protocols (good for debugging) and binary, +language independent text-only protocols (good for debugging), and binary, language-specific serialisers (e.g. Storable). By default, unless TLS is used, the protocol is actually completely text-based. @@ -783,11 +959,12 @@ =item * AEMP has more flexible monitoring options than Erlang. -In Erlang, you can chose to receive I exit signals as messages -or I, there is no in-between, so monitoring single processes is -difficult to implement. Monitoring in AEMP is more flexible than in -Erlang, as one can choose between automatic kill, exit message or callback -on a per-process basis. +In Erlang, you can chose to receive I exit signals as messages or +I, there is no in-between, so monitoring single Erlang processes is +difficult to implement. + +Monitoring in AEMP is more flexible than in Erlang, as one can choose +between automatic kill, exit message or callback on a per-port basis. =item * Erlang tries to hide remote/local connections, AEMP does not. @@ -819,8 +996,8 @@ Strings can easily be printed, easily serialised etc. and need no special procedures to be "valid". -And as a result, a miniport consists of a single closure stored in a -global hash - it can't become much cheaper. +And as a result, a port with just a default receiver consists of a single +code reference stored in a global hash - it can't become much cheaper. =item Why favour JSON, why not a real serialising format such as Storable? @@ -846,9 +1023,14 @@ L - more, lower-level, stuff. -L - network maintainance and port groups, to find +L - network maintenance and port groups, to find your applications. +L - establish data connections between nodes. + +L - simple service to display log messages from +all nodes. + L. =head1 AUTHOR