--- AnyEvent-MP/MP.pm 2009/09/03 20:16:36 1.78 +++ AnyEvent-MP/MP.pm 2010/10/27 06:32:39 1.117 @@ -1,6 +1,6 @@ =head1 NAME -AnyEvent::MP - multi-processing/message-passing framework +AnyEvent::MP - erlang-style multi-processing/message-passing framework =head1 SYNOPSIS @@ -32,20 +32,30 @@ # create a port on another node my $port = spawn $node, $initfunc, @initdata; + # destroy a port again + kil $port; # "normal" kill + kil $port, my_error => "everything is broken"; # error kill + # monitoring - mon $port, $cb->(@msg) # callback is invoked on death - mon $port, $otherport # kill otherport on abnormal death - mon $port, $otherport, @msg # send message on death + mon $localport, $cb->(@msg) # callback is invoked on death + mon $localport, $otherport # kill otherport on abnormal death + mon $localport, $otherport, @msg # send message on death + + # temporarily execute code in port context + peval $port, sub { die "kill the port!" }; + + # execute callbacks in $SELF port context + my $timer = AE::timer 1, 0, psub { + die "kill the port, delayed"; + }; =head1 CURRENT STATUS bin/aemp - stable. AnyEvent::MP - stable API, should work. AnyEvent::MP::Intro - explains most concepts. - AnyEvent::MP::Kernel - mostly stable. - AnyEvent::MP::Global - stable API, protocol not yet final. - - stay tuned. + AnyEvent::MP::Kernel - mostly stable API. + AnyEvent::MP::Global - stable API. =head1 DESCRIPTION @@ -63,7 +73,8 @@ =item port -A port is something you can send messages to (with the C function). +Not to be confused with a TCP port, a "port" is something you can send +messages to (with the C function). Ports allow you to register C handlers that can match all or just some messages. Messages send to ports will not be queued, regardless of @@ -84,7 +95,7 @@ (no listening ports). Private nodes cannot talk to other private nodes currently. -=item node ID - C<[a-za-Z0-9_\-.:]+> +=item node ID - C<[A-Za-z0-9_\-.:]*> A node ID is a string that uniquely identifies the node within a network. Depending on the configuration used, node IDs can look like a @@ -98,20 +109,33 @@ endpoints - binds. Currently, only standard C specifications can be used, which specify TCP ports to listen on. -=item seeds - C +=item seed nodes When a node starts, it knows nothing about the network. To teach the node about the network it first has to contact some other node within the network. This node is called a seed. -Seeds are transport endpoint(s) of as many nodes as one wants. Those nodes -are expected to be long-running, and at least one of those should always -be available. When nodes run out of connections (e.g. due to a network -error), they try to re-establish connections to some seednodes again to -join the network. +Apart from the fact that other nodes know them as seed nodes and they have +to have fixed listening addresses, seed nodes are perfectly normal nodes - +any node can function as a seed node for others. + +In addition to discovering the network, seed nodes are also used to +maintain the network and to connect nodes that otherwise would have +trouble connecting. They form the backbone of an AnyEvent::MP network. + +Seed nodes are expected to be long-running, and at least one seed node +should always be available. They should also be relatively responsive - a +seed node that blocks for long periods will slow down everybody else. -Apart from being sued for seeding, seednodes are not special in any way - -every public node can be a seednode. +=item seeds - C + +Seeds are transport endpoint(s) (usually a hostname/IP address and a +TCP port) of nodes that should be used as seed nodes. + +The nodes listening on those endpoints are expected to be long-running, +and at least one of those should always be available. When nodes run out +of connections (e.g. due to a network error), they try to re-establish +connections to some seednodes again to join the network. =back @@ -133,12 +157,12 @@ use base "Exporter"; -our $VERSION = $AnyEvent::MP::Kernel::VERSION; +our $VERSION = 1.29; our @EXPORT = qw( NODE $NODE *SELF node_of after configure - snd rcv mon mon_guard kil reg psub spawn + snd rcv mon mon_guard kil psub peval spawn cal port ); @@ -169,6 +193,9 @@ to know is its own name, and optionally it should know the addresses of some other nodes in the network to discover other nodes. +The key/value pairs are basically the same ones as documented for the +F command line utility (sans the set/del prefix). + This function configures a node - it must be called exactly once (or never) before calling other AnyEvent::MP functions. @@ -217,7 +244,7 @@ =back -Example: become a distributed node using the locla node name as profile. +Example: become a distributed node using the local node name as profile. This should be the most common form of invocation for "daemon"-type nodes. configure @@ -361,7 +388,7 @@ ; Example: temporarily register a rcv callback for a tag matching some port -(e.g. for a rpc reply) and unregister it after a message was received. +(e.g. for an rpc reply) and unregister it after a message was received. rcv $port, $otherport => sub { my @reply = @_; @@ -384,7 +411,7 @@ "AnyEvent::MP::Port" eq ref $self or Carp::croak "$port: rcv can only be called on message matching ports, caught"; - $self->[2] = shift; + $self->[0] = shift; } else { my $cb = shift; $PORT{$portid} = sub { @@ -394,7 +421,7 @@ } } elsif (defined $_[0]) { my $self = $PORT_DATA{$portid} ||= do { - my $self = bless [$PORT{$port} || sub { }, { }, $port], "AnyEvent::MP::Port"; + my $self = bless [$PORT{$portid} || sub { }, { }, $port], "AnyEvent::MP::Port"; $PORT{$portid} = sub { local $SELF = $port; @@ -426,12 +453,52 @@ $port } +=item peval $port, $coderef[, @args] + +Evaluates the given C<$codref> within the contetx of C<$port>, that is, +when the code throews an exception the C<$port> will be killed. + +Any remaining args will be passed to the callback. Any return values will +be returned to the caller. + +This is useful when you temporarily want to execute code in the context of +a port. + +Example: create a port and run some initialisation code in it's context. + + my $port = port { ... }; + + peval $port, sub { + init + or die "unable to init"; + }; + +=cut + +sub peval($$) { + local $SELF = shift; + my $cb = shift; + + if (wantarray) { + my @res = eval { &$cb }; + _self_die if $@; + @res + } else { + my $res = eval { &$cb }; + _self_die if $@; + $res + } +} + =item $closure = psub { BLOCK } Remembers C<$SELF> and creates a closure out of the BLOCK. When the closure is executed, sets up the environment in the same way as in C callbacks, i.e. runtime errors will cause the port to get Ced. +The effect is basically as if it returned C<< sub { peval $SELF, sub { +BLOCK }, @_ } >>. + This is useful when you register callbacks from C callbacks: rcv delayed_reply => sub { @@ -476,17 +543,6 @@ messages to it were lost, and optionally return a guard that can be used to stop monitoring again. -C effectively guarantees that, in the absence of hardware failures, -after starting the monitor, either all messages sent to the port will -arrive, or the monitoring action will be invoked after possible message -loss has been detected. No messages will be lost "in between" (after -the first lost message no further messages will be received by the -port). After the monitoring action was invoked, further messages might get -delivered again. - -Note that monitoring-actions are one-shot: once messages are lost (and a -monitoring alert was raised), they are removed and will not trigger again. - In the first form (callback), the callback is simply called with any number of C<@reason> elements (no @reason means that the port was deleted "normally"). Note also that I<< the callback B never die >>, so use @@ -503,6 +559,9 @@ In the last form (message), a message of the form C<@msg, @reason> will be C. +Monitoring-actions are one-shot: once messages are lost (and a monitoring +alert was raised), they are removed and will not trigger again. + As a rule of thumb, monitoring requests should always monitor a port from a local port (or callback). The reason is that kill messages might get lost, just like any other message. Another less obvious reason is that @@ -510,6 +569,23 @@ to the other node goes down permanently). When monitoring a port locally these problems do not exist. +C effectively guarantees that, in the absence of hardware failures, +after starting the monitor, either all messages sent to the port will +arrive, or the monitoring action will be invoked after possible message +loss has been detected. No messages will be lost "in between" (after +the first lost message no further messages will be received by the +port). After the monitoring action was invoked, further messages might get +delivered again. + +Inter-host-connection timeouts and monitoring depend on the transport +used. The only transport currently implemented is TCP, and AnyEvent::MP +relies on TCP to detect node-downs (this can take 10-15 minutes on a +non-idle connection, and usually around two hours for idle connections). + +This means that monitoring is good for program errors and cleaning up +stuff eventually, but they are no replacement for a timeout when you need +to ensure some maximum latency. + Example: call a given callback when C<$port> is killed. mon $port, sub { warn "port died because of <@_>\n" }; @@ -546,7 +622,7 @@ $node->monitor ($port, $cb); defined wantarray - and AnyEvent::Util::guard { $node->unmonitor ($port, $cb) } + and ($cb += 0, AnyEvent::Util::guard { $node->unmonitor ($port, $cb) }) } =item $guard = mon_guard $port, $ref, $ref... @@ -579,12 +655,12 @@ Kill the specified port with the given C<@reason>. -If no C<@reason> is specified, then the port is killed "normally" (ports -monitoring other ports will not necessarily die because a port dies -"normally"). +If no C<@reason> is specified, then the port is killed "normally" - +monitor callback will be invoked, but the kil will not cause linked ports +(C form) to get killed. -Otherwise, linked ports get killed with the same reason (second form of -C, see above). +If a C<@reason> is specified, then linked ports (C +form) get killed with the same reason. Runtime errors while evaluating C callbacks or inside C blocks will be reported as reason C<< die => $@ >>. @@ -613,13 +689,19 @@ exists or it runs out of package names. The init function is then called with the newly-created port as context -object (C<$SELF>) and the C<@initdata> values as arguments. +object (C<$SELF>) and the C<@initdata> values as arguments. It I +call one of the C functions to set callbacks on C<$SELF>, otherwise +the port might not get created. A common idiom is to pass a local port, immediately monitor the spawned port, and in the remote init function, immediately monitor the passed local port. This two-way monitoring ensures that both ports get cleaned up when there is a problem. +C guarantees that the C<$initfunc> has no visible effects on the +caller before C returns (by delaying invocation when spawn is +called for the local node). + Example: spawn a chat server port on C<$othernode>. # this node, executed from within a port context: @@ -643,6 +725,7 @@ my $port = shift; my $init = shift; + # rcv will create the actual port local $SELF = "$NODE#$port"; eval { &{ load_func $init } @@ -687,6 +770,58 @@ }; } +=item cal $port, @msg, $callback[, $timeout] + +A simple form of RPC - sends a message to the given C<$port> with the +given contents (C<@msg>), but adds a reply port to the message. + +The reply port is created temporarily just for the purpose of receiving +the reply, and will be Ced when no longer needed. + +A reply message sent to the port is passed to the C<$callback> as-is. + +If an optional time-out (in seconds) is given and it is not C, +then the callback will be called without any arguments after the time-out +elapsed and the port is Ced. + +If no time-out is given (or it is C), then the local port will +monitor the remote port instead, so it eventually gets cleaned-up. + +Currently this function returns the temporary port, but this "feature" +might go in future versions unless you can make a convincing case that +this is indeed useful for something. + +=cut + +sub cal(@) { + my $timeout = ref $_[-1] ? undef : pop; + my $cb = pop; + + my $port = port { + undef $timeout; + kil $SELF; + &$cb; + }; + + if (defined $timeout) { + $timeout = AE::timer $timeout, 0, sub { + undef $timeout; + kil $port; + $cb->(); + }; + } else { + mon $_[0], sub { + kil $port; + $cb->(); + }; + } + + push @_, $port; + &snd; + + $port +} + =back =head1 AnyEvent::MP vs. Distributed Erlang @@ -696,10 +831,10 @@ programming techniques employed by Erlang apply to AnyEvent::MP. Here is a sample: - http://www.Erlang.se/doc/programming_rules.shtml - http://Erlang.org/doc/getting_started/part_frame.html # chapters 3 and 4 - http://Erlang.org/download/Erlang-book-part1.pdf # chapters 5 and 6 - http://Erlang.org/download/armstrong_thesis_2003.pdf # chapters 4 and 5 + http://www.erlang.se/doc/programming_rules.shtml + http://erlang.org/doc/getting_started/part_frame.html # chapters 3 and 4 + http://erlang.org/download/erlang-book-part1.pdf # chapters 5 and 6 + http://erlang.org/download/armstrong_thesis_2003.pdf # chapters 4 and 5 Despite the similarities, there are also some important differences: @@ -709,7 +844,8 @@ Erlang relies on special naming and DNS to work everywhere in the same way. AEMP relies on each node somehow knowing its own address(es) (e.g. by -configuration or DNS), but will otherwise discover other odes itself. +configuration or DNS), and possibly the addresses of some seed nodes, but +will otherwise discover other nodes (and their IDs) itself. =item * Erlang has a "remote ports are like local ports" philosophy, AEMP uses "local ports are like remote ports". @@ -744,14 +880,14 @@ =item * Erlang suffers from silent message loss, AEMP does not. -Erlang makes few guarantees on messages delivery - messages can get lost -without any of the processes realising it (i.e. you send messages a, b, -and c, and the other side only receives messages a and c). - -AEMP guarantees correct ordering, and the guarantee that after one message -is lost, all following ones sent to the same port are lost as well, until -monitoring raises an error, so there are no silent "holes" in the message -sequence. +Erlang implements few guarantees on messages delivery - messages can get +lost without any of the processes realising it (i.e. you send messages a, +b, and c, and the other side only receives messages a and c). + +AEMP guarantees (modulo hardware errors) correct ordering, and the +guarantee that after one message is lost, all following ones sent to the +same port are lost as well, until monitoring raises an error, so there are +no silent "holes" in the message sequence. =item * Erlang can send messages to the wrong port, AEMP does not. @@ -818,8 +954,8 @@ Strings can easily be printed, easily serialised etc. and need no special procedures to be "valid". -And as a result, a miniport consists of a single closure stored in a -global hash - it can't become much cheaper. +And as a result, a port with just a default receiver consists of a single +code reference stored in a global hash - it can't become much cheaper. =item Why favour JSON, why not a real serialising format such as Storable? @@ -845,9 +981,14 @@ L - more, lower-level, stuff. -L - network maintainance and port groups, to find +L - network maintenance and port groups, to find your applications. +L - establish data connections between nodes. + +L - simple service to display log messages from +all nodes. + L. =head1 AUTHOR