--- AnyEvent-MP/MP.pm 2009/12/30 13:37:53 1.108 +++ AnyEvent-MP/MP.pm 2012/03/21 15:22:16 1.136 @@ -32,14 +32,14 @@ # create a port on another node my $port = spawn $node, $initfunc, @initdata; - # destroy a prot again + # destroy a port again kil $port; # "normal" kill kil $port, my_error => "everything is broken"; # error kill # monitoring - mon $localport, $cb->(@msg) # callback is invoked on death - mon $localport, $otherport # kill otherport on abnormal death - mon $localport, $otherport, @msg # send message on death + mon $port, $cb->(@msg) # callback is invoked on death + mon $port, $localport # kill localport on abnormal death + mon $port, $localport, @msg # send message on death # temporarily execute code in port context peval $port, sub { die "kill the port!" }; @@ -80,10 +80,13 @@ some messages. Messages send to ports will not be queued, regardless of anything was listening for them or not. +Ports are represented by (printable) strings called "port IDs". + =item port ID - C -A port ID is the concatenation of a node ID, a hash-mark (C<#>) as -separator, and a port name (a printable string of unspecified format). +A port ID is the concatenation of a node ID, a hash-mark (C<#>) +as separator, and a port name (a printable string of unspecified +format created by AnyEvent::MP). =item node @@ -93,49 +96,77 @@ Nodes are either public (have one or more listening ports) or private (no listening ports). Private nodes cannot talk to other private nodes -currently. +currently, but all nodes can talk to public nodes. + +Nodes is represented by (printable) strings called "node IDs". -=item node ID - C<[A-Z_][a-zA-Z0-9_\-.:]*> +=item node ID - C<[A-Za-z0-9_\-.:]*> A node ID is a string that uniquely identifies the node within a network. Depending on the configuration used, node IDs can look like a hostname, a hostname and a port, or a random string. AnyEvent::MP itself -doesn't interpret node IDs in any way. +doesn't interpret node IDs in any way except to uniquely identify a node. =item binds - C Nodes can only talk to each other by creating some kind of connection to each other. To do this, nodes should listen on one or more local transport -endpoints - binds. Currently, only standard C specifications can -be used, which specify TCP ports to listen on. +endpoints - binds. + +Currently, only standard C specifications can be used, which +specify TCP ports to listen on. So a bind is basically just a tcp socket +in listening mode thta accepts conenctions form other nodes. =item seed nodes -When a node starts, it knows nothing about the network. To teach the node -about the network it first has to contact some other node within the -network. This node is called a seed. - -Apart from the fact that other nodes know them as seed nodes and they have -to have fixed listening addresses, seed nodes are perfectly normal nodes - -any node can function as a seed node for others. +When a node starts, it knows nothing about the network it is in - it +needs to connect to at least one other node that is already in the +network. These other nodes are called "seed nodes". + +Seed nodes themselves are not special - they are seed nodes only because +some other node I them as such, but any node can be used as seed +node for other nodes, and eahc node cna use a different set of seed nodes. In addition to discovering the network, seed nodes are also used to -maintain the network and to connect nodes that otherwise would have -trouble connecting. They form the backbone of an AnyEvent::MP network. +maintain the network - all nodes using the same seed node form are part of +the same network. If a network is split into multiple subnets because e.g. +the network link between the parts goes down, then using the same seed +nodes for all nodes ensures that eventually the subnets get merged again. Seed nodes are expected to be long-running, and at least one seed node should always be available. They should also be relatively responsive - a seed node that blocks for long periods will slow down everybody else. -=item seeds - C +For small networks, it's best if every node uses the same set of seed +nodes. For large networks, it can be useful to specify "regional" seed +nodes for most nodes in an area, and use all seed nodes as seed nodes for +each other. What's important is that all seed nodes connections form a +complete graph, so that the network cannot split into separate subnets +forever. -Seeds are transport endpoint(s) (usually a hostname/IP address and a +Seed nodes are represented by seed IDs. + +=item seed IDs - C + +Seed IDs are transport endpoint(s) (usually a hostname/IP address and a TCP port) of nodes that should be used as seed nodes. -The nodes listening on those endpoints are expected to be long-running, -and at least one of those should always be available. When nodes run out -of connections (e.g. due to a network error), they try to re-establish -connections to some seednodes again to join the network. +=item global nodes + +An AEMP network needs a discovery service - nodes need to know how to +connect to other nodes they only know by name. In addition, AEMP offers a +distributed "group database", which maps group names to a list of strings +- for example, to register worker ports. + +A network needs at least one global node to work, and allows every node to +be a global node. + +Any node that loads the L module becomes a global +node and tries to keep connections to all other nodes. So while it can +make sense to make every node "global" in small networks, it usually makes +sense to only make seed nodes into global nodes in large networks (nodes +keep connections to seed nodes and global nodes, so makign them the same +reduces overhead). =back @@ -147,23 +178,28 @@ package AnyEvent::MP; +use AnyEvent::MP::Config (); use AnyEvent::MP::Kernel; +use AnyEvent::MP::Kernel qw(%NODE %PORT %PORT_DATA $UNIQ $RUNIQ $ID); use common::sense; use Carp (); use AE (); +use Guard (); use base "Exporter"; -our $VERSION = 1.24; +our $VERSION = $AnyEvent::MP::Config::VERSION; our @EXPORT = qw( NODE $NODE *SELF node_of after configure snd rcv mon mon_guard kil psub peval spawn cal port + db_set db_del db_reg + db_mon db_family db_keys db_values ); our $SELF; @@ -193,12 +229,41 @@ to know is its own name, and optionally it should know the addresses of some other nodes in the network to discover other nodes. -The key/value pairs are basically the same ones as documented for the -F command line utility (sans the set/del prefix). - This function configures a node - it must be called exactly once (or never) before calling other AnyEvent::MP functions. +The key/value pairs are basically the same ones as documented for the +F command line utility (sans the set/del prefix), with these additions: + +=over 4 + +=item norc => $boolean (default false) + +If true, then the rc file (e.g. F<~/.perl-anyevent-mp>) will I +be consulted - all configuraiton options must be specified in the +C call. + +=item force => $boolean (default false) + +IF true, then the values specified in the C will take +precedence over any values configured via the rc file. The default is for +the rc file to override any options specified in the program. + +=item secure => $pass->(@msg) + +In addition to specifying a boolean, you can specify a code reference that +is called for every code execution attempt - the execution request is +granted iff the callback returns a true value. + +Most of the time the callback should look only at +C<$AnyEvent::MP::Kernel::SRCNODE> to make a decision, and not at the +actual message (which can be about anything, and is mostly provided for +diagnostic purposes). + +See F for more info. + +=back + =over 4 =item step 1, gathering configuration from profiles @@ -221,8 +286,14 @@ and can only be used to specify defaults. If the profile specifies a node ID, then this will become the node ID of -this process. If not, then the profile name will be used as node ID. The -special node ID of C will be replaced by a random node ID. +this process. If not, then the profile name will be used as node ID, with +a unique randoms tring (C) appended. + +The node ID can contain some C<%> sequences that are expanded: C<%n> +is expanded to the local nodename, C<%u> is replaced by a random +strign to make the node unique. For example, the F commandline +utility uses C as nodename, which might expand to +C. =item step 2, bind listener sockets @@ -238,7 +309,7 @@ =item step 3, connect to seed nodes -As the last step, the seeds list from the profile is passed to the +As the last step, the seed ID list from the profile is passed to the L module, which will then use it to keep connectivity with at least one node at any point in time. @@ -249,17 +320,17 @@ configure -Example: become an anonymous node. This form is often used for commandline -clients. +Example: become a semi-anonymous node. This form is often used for +commandline clients. - configure nodeid => "anon/"; + configure nodeid => "myscript/%n/%u"; -Example: configure a node using a profile called seed, which si suitable +Example: configure a node using a profile called seed, which is suitable for a seed node as it binds on all local addresses on a fixed port (4040, customary for aemp). # use the aemp commandline utility - # aemp profile seed nodeid anon/ binds '*:4040' + # aemp profile seed binds '*:4040' # then use it configure profile => "seed"; @@ -334,15 +405,16 @@ sub rcv($@); -sub _kilme { - die "received message on port without callback"; -} +my $KILME = sub { + (my $tag = substr $_[0], 0, 30) =~ s/([\x20-\x7e])/./g; + kil $SELF, unhandled_message => "no callback found for message '$tag'"; +}; sub port(;&) { - my $id = "$UNIQ." . $ID++; + my $id = $UNIQ . ++$ID; my $port = "$NODE#$id"; - rcv $port, shift || \&_kilme; + rcv $port, shift || $KILME; $port } @@ -357,7 +429,7 @@ executing the callback. Runtime errors during callback execution will result in the port being Ced. -The default callback received all messages not matched by a more specific +The default callback receives all messages not matched by a more specific C match. =item rcv $local_port, tag => $callback->(@msg_without_tag), ... @@ -497,7 +569,7 @@ callbacks, i.e. runtime errors will cause the port to get Ced. The effect is basically as if it returned C<< sub { peval $SELF, sub { -BLOCK } } >>. +BLOCK }, @_ } >>. This is useful when you register callbacks from C callbacks: @@ -622,7 +694,7 @@ $node->monitor ($port, $cb); defined wantarray - and ($cb += 0, AnyEvent::Util::guard { $node->unmonitor ($port, $cb) }) + and ($cb += 0, Guard::guard { $node->unmonitor ($port, $cb) }) } =item $guard = mon_guard $port, $ref, $ref... @@ -668,7 +740,17 @@ Transport/communication errors are reported as C<< transport_error => $message >>. -=cut +Common idioms: + + # silently remove yourself, do not kill linked ports + kil $SELF; + + # report a failure in some detail + kil $SELF, failure_mode_1 => "it failed with too high temperature"; + + # do not waste much time with killing, just die when something goes wrong + open my $fh, " with the @@ -824,6 +909,153 @@ =back +=head1 DISTRIBUTED DATABASE + +AnyEvent::MP comes with a simple distributed database. The database will +be mirrored asynchronously on all global nodes. Other nodes bind to one +of the global nodes for their needs. Every node has a "local database" +which contains all the values that are set locally. All local databases +are merged together to form the global database, which can be queried. + +The database structure is that of a two-level hash - the database hash +contains hashes which contain values, similarly to a perl hash of hashes, +i.e.: + + $DATABASE{$family}{$subkey} = $value + +The top level hash key is called "family", and the second-level hash key +is called "subkey" or simply "key". + +The family must be alphanumeric, i.e. start with a letter and consist +of letters, digits, underscores and colons (C<[A-Za-z][A-Za-z0-9_:]*>, +pretty much like Perl module names. + +As the family namespace is global, it is recommended to prefix family names +with the name of the application or module using it. + +The subkeys must be non-empty strings, with no further restrictions. + +The values should preferably be strings, but other perl scalars should +work as well (such as C, arrays and hashes). + +Every database entry is owned by one node - adding the same family/subkey +combination on multiple nodes will not cause discomfort for AnyEvent::MP, +but the result might be nondeterministic, i.e. the key might have +different values on different nodes. + +Different subkeys in the same family can be owned by different nodes +without problems, and in fact, this is the common method to create worker +pools. For example, a worker port for image scaling might do this: + + db_set my_image_scalers => $port; + +And clients looking for an image scaler will want to get the +C keys from time to time: + + db_keys my_image_scalers => sub { + @ports = @{ $_[0] }; + }; + +Or better yet, they want to monitor the database family, so they always +have a reasonable up-to-date copy: + + db_mon my_image_scalers => sub { + @ports = keys %{ $_[0] }; + }; + +In general, you can set or delete single subkeys, but query and monitor +whole families only. + +If you feel the need to monitor or query a single subkey, try giving it +it's own family. + +=over + +=item db_set $family => $subkey [=> $value] + +Sets (or replaces) a key to the database - if C<$value> is omitted, +C is used instead. + +=item db_del $family => $subkey... + +Deletes one or more subkeys from the database family. + +=item $guard = db_reg $family => $subkey [=> $value] + +Sets the key on the database and returns a guard. When the guard is +destroyed, the key is deleted from the database. If C<$value> is missing, +then C is used. + +=item db_family $family => $cb->(\%familyhash) + +Queries the named database C<$family> and call the callback with the +family represented as a hash. You can keep and freely modify the hash. + +=item db_keys $family => $cb->(\@keys) + +Same as C, except it only queries the family I and passes +them as array reference to the callback. + +=item db_values $family => $cb->(\@values) + +Same as C, except it only queries the family I and passes them +as array reference to the callback. + +=item $guard = db_mon $family => $cb->($familyhash, \@added, \@changed, \@deleted) + +Creates a monitor on the given database family. Each time a key is set +or or is deleted the callback is called with a hash containing the +database family and three lists of added, changed and deleted subkeys, +respectively. If no keys have changed then the array reference might be +C or even missing. + +If not called in void context, a guard object is returned that, when +destroyed, stops the monitor. + +The family hash reference and the key arrays belong to AnyEvent::MP and +B by the callback. When in doubt, make a +copy. + +As soon as possible after the monitoring starts, the callback will be +called with the intiial contents of the family, even if it is empty, +i.e. there will always be a timely call to the callback with the current +contents. + +It is possible that the callback is called with a change event even though +the subkey is already present and the value has not changed. + +The monitoring stops when the guard object is destroyed. + +Example: on every change to the family "mygroup", print out all keys. + + my $guard = db_mon mygroup => sub { + my ($family, $a, $c, $d) = @_; + print "mygroup members: ", (join " ", keys %$family), "\n"; + }; + +Exmaple: wait until the family "My::Module::workers" is non-empty. + + my $guard; $guard = db_mon My::Module::workers => sub { + my ($family, $a, $c, $d) = @_; + return unless %$family; + undef $guard; + print "My::Module::workers now nonempty\n"; + }; + +Example: print all changes to the family "AnyRvent::Fantasy::Module". + + my $guard = db_mon AnyRvent::Fantasy::Module => sub { + my ($family, $a, $c, $d) = @_; + + print "+$_=$family->{$_}\n" for @$a; + print "*$_=$family->{$_}\n" for @$c; + print "-$_=$family->{$_}\n" for @$d; + }; + +=cut + +=back + =head1 AnyEvent::MP vs. Distributed Erlang AnyEvent::MP got lots of its ideas from distributed Erlang (Erlang node @@ -864,19 +1096,24 @@ =item * Erlang uses processes and a mailbox, AEMP does not queue. -Erlang uses processes that selectively receive messages, and therefore -needs a queue. AEMP is event based, queuing messages would serve no -useful purpose. For the same reason the pattern-matching abilities of -AnyEvent::MP are more limited, as there is little need to be able to +Erlang uses processes that selectively receive messages out of order, and +therefore needs a queue. AEMP is event based, queuing messages would serve +no useful purpose. For the same reason the pattern-matching abilities +of AnyEvent::MP are more limited, as there is little need to be able to filter messages without dequeuing them. -(But see L for a more Erlang-like process model on top of AEMP). +This is not a philosophical difference, but simply stems from AnyEvent::MP +being event-based, while Erlang is process-based. + +You cna have a look at L for a more Erlang-like process model on +top of AEMP and Coro threads. =item * Erlang sends are synchronous, AEMP sends are asynchronous. -Sending messages in Erlang is synchronous and blocks the process (and -so does not need a queue that can overflow). AEMP sends are immediate, -connection establishment is handled in the background. +Sending messages in Erlang is synchronous and blocks the process until +a conenction has been established and the message sent (and so does not +need a queue that can overflow). AEMP sends return immediately, connection +establishment is handled in the background. =item * Erlang suffers from silent message loss, AEMP does not. @@ -884,18 +1121,24 @@ lost without any of the processes realising it (i.e. you send messages a, b, and c, and the other side only receives messages a and c). -AEMP guarantees correct ordering, and the guarantee that after one message -is lost, all following ones sent to the same port are lost as well, until -monitoring raises an error, so there are no silent "holes" in the message -sequence. +AEMP guarantees (modulo hardware errors) correct ordering, and the +guarantee that after one message is lost, all following ones sent to the +same port are lost as well, until monitoring raises an error, so there are +no silent "holes" in the message sequence. + +If you want your software to be very reliable, you have to cope with +corrupted and even out-of-order messages in both Erlang and AEMP. AEMP +simply tries to work better in common error cases, such as when a network +link goes down. =item * Erlang can send messages to the wrong port, AEMP does not. -In Erlang it is quite likely that a node that restarts reuses a process ID -known to other nodes for a completely different process, causing messages -destined for that process to end up in an unrelated process. +In Erlang it is quite likely that a node that restarts reuses an Erlang +process ID known to other nodes for a completely different process, +causing messages destined for that process to end up in an unrelated +process. -AEMP never reuses port IDs, so old messages or old port IDs floating +AEMP does not reuse port IDs, so old messages or old port IDs floating around in the network will not be sent to an unrelated port. =item * Erlang uses unprotected connections, AEMP uses secure @@ -908,7 +1151,7 @@ communications. The AEMP protocol, unlike the Erlang protocol, supports both programming -language independent text-only protocols (good for debugging) and binary, +language independent text-only protocols (good for debugging), and binary, language-specific serialisers (e.g. Storable). By default, unless TLS is used, the protocol is actually completely text-based. @@ -918,11 +1161,12 @@ =item * AEMP has more flexible monitoring options than Erlang. -In Erlang, you can chose to receive I exit signals as messages -or I, there is no in-between, so monitoring single processes is -difficult to implement. Monitoring in AEMP is more flexible than in -Erlang, as one can choose between automatic kill, exit message or callback -on a per-process basis. +In Erlang, you can chose to receive I exit signals as messages or +I, there is no in-between, so monitoring single Erlang processes is +difficult to implement. + +Monitoring in AEMP is more flexible than in Erlang, as one can choose +between automatic kill, exit message or callback on a per-port basis. =item * Erlang tries to hide remote/local connections, AEMP does not. @@ -954,8 +1198,8 @@ Strings can easily be printed, easily serialised etc. and need no special procedures to be "valid". -And as a result, a miniport consists of a single closure stored in a -global hash - it can't become much cheaper. +And as a result, a port with just a default receiver consists of a single +code reference stored in a global hash - it can't become much cheaper. =item Why favour JSON, why not a real serialising format such as Storable? @@ -981,7 +1225,7 @@ L - more, lower-level, stuff. -L - network maintainance and port groups, to find +L - network maintenance and port groups, to find your applications. L - establish data connections between nodes.