--- AnyEvent-MP/MP.pm 2012/03/04 14:28:44 1.128 +++ AnyEvent-MP/MP.pm 2012/03/22 20:07:31 1.139 @@ -37,9 +37,9 @@ kil $port, my_error => "everything is broken"; # error kill # monitoring - mon $localport, $cb->(@msg) # callback is invoked on death - mon $localport, $otherport # kill otherport on abnormal death - mon $localport, $otherport, @msg # send message on death + mon $port, $cb->(@msg) # callback is invoked on death + mon $port, $localport # kill localport on abnormal death + mon $port, $localport, @msg # send message on death # temporarily execute code in port context peval $port, sub { die "kill the port!" }; @@ -249,12 +249,17 @@ precedence over any values configured via the rc file. The default is for the rc file to override any options specified in the program. -=item secure => $pass->($nodeid) +=item secure => $pass->(@msg) In addition to specifying a boolean, you can specify a code reference that -is called for every remote execution attempt - the execution request is +is called for every code execution attempt - the execution request is granted iff the callback returns a true value. +Most of the time the callback should look only at +C<$AnyEvent::MP::Kernel::SRCNODE> to make a decision, and not at the +actual message (which can be about anything, and is mostly provided for +diagnostic purposes). + See F for more info. =back @@ -400,15 +405,16 @@ sub rcv($@); -sub _kilme { - die "received message on port without callback"; -} +my $KILME = sub { + (my $tag = substr $_[0], 0, 30) =~ s/([\x20-\x7e])/./g; + kil $SELF, unhandled_message => "no callback found for message '$tag'"; +}; sub port(;&) { my $id = $UNIQ . ++$ID; my $port = "$NODE#$id"; - rcv $port, shift || \&_kilme; + rcv $port, shift || $KILME; $port } @@ -423,7 +429,7 @@ executing the callback. Runtime errors during callback execution will result in the port being Ced. -The default callback received all messages not matched by a more specific +The default callback receives all messages not matched by a more specific C match. =item rcv $local_port, tag => $callback->(@msg_without_tag), ... @@ -597,43 +603,50 @@ } } -=item $guard = mon $port, $cb->(@reason) # call $cb when $port dies - =item $guard = mon $port, $rcvport # kill $rcvport when $port dies =item $guard = mon $port # kill $SELF when $port dies +=item $guard = mon $port, $cb->(@reason) # call $cb when $port dies + =item $guard = mon $port, $rcvport, @msg # send a message when $port dies Monitor the given port and do something when the port is killed or messages to it were lost, and optionally return a guard that can be used to stop monitoring again. -In the first form (callback), the callback is simply called with any -number of C<@reason> elements (no @reason means that the port was deleted -"normally"). Note also that I<< the callback B never die >>, so use -C if unsure. +The first two forms distinguish between "normal" and "abnormal" kil's: -In the second form (another port given), the other port (C<$rcvport>) -will be C'ed with C<@reason>, if a @reason was specified, i.e. on -"normal" kils nothing happens, while under all other conditions, the other -port is killed with the same reason. +In the first form (another port given), if the C<$port> is C'ed with +a non-empty reason, the other port (C<$rcvport>) will be kil'ed with the +same reason. That is, on "normal" kil's nothing happens, while under all +other conditions, the other port is killed with the same reason. -The third form (kill self) is the same as the second form, except that +The second form (kill self) is the same as the first form, except that C<$rvport> defaults to C<$SELF>. -In the last form (message), a message of the form C<@msg, @reason> will be -C. +The remaining forms don't distinguish between "normal" and "abnormal" kil's +- it's up to the callback or receiver to check whether the C<@reason> is +empty and act accordingly. + +In the third form (callback), the callback is simply called with any +number of C<@reason> elements (empty @reason means that the port was deleted +"normally"). Note also that I<< the callback B never die >>, so use +C if unsure. + +In the last form (message), a message of the form C<$rcvport, @msg, +@reason> will be C. Monitoring-actions are one-shot: once messages are lost (and a monitoring -alert was raised), they are removed and will not trigger again. +alert was raised), they are removed and will not trigger again, even if it +turns out that the port is still alive. -As a rule of thumb, monitoring requests should always monitor a port from -a local port (or callback). The reason is that kill messages might get -lost, just like any other message. Another less obvious reason is that -even monitoring requests can get lost (for example, when the connection -to the other node goes down permanently). When monitoring a port locally -these problems do not exist. +As a rule of thumb, monitoring requests should always monitor a remote +port locally (using a local C<$rcvport> or a callback). The reason is that +kill messages might get lost, just like any other message. Another less +obvious reason is that even monitoring requests can get lost (for example, +when the connection to the other node goes down permanently). When +monitoring a port locally these problems do not exist. C effectively guarantees that, in the absence of hardware failures, after starting the monitor, either all messages sent to the port will @@ -734,7 +747,17 @@ Transport/communication errors are reported as C<< transport_error => $message >>. -=cut +Common idioms: + + # silently remove yourself, do not kill linked ports + kil $SELF; + + # report a failure in some detail + kil $SELF, failure_mode_1 => "it failed with too high temperature"; + + # do not waste much time with killing, just die when something goes wrong + open my $fh, " with the @@ -894,11 +919,16 @@ =head1 DISTRIBUTED DATABASE AnyEvent::MP comes with a simple distributed database. The database will -be mirrored asynchronously at all global nodes. Other nodes bind to one of -the global nodes for their needs. +be mirrored asynchronously on all global nodes. Other nodes bind to one +of the global nodes for their needs. Every node has a "local database" +which contains all the values that are set locally. All local databases +are merged together to form the global database, which can be queried. + +The database structure is that of a two-level hash - the database hash +contains hashes which contain values, similarly to a perl hash of hashes, +i.e.: -The database consists of a two-level hash - a hash contains a hash which -contains values. + $DATABASE{$family}{$subkey} = $value The top level hash key is called "family", and the second-level hash key is called "subkey" or simply "key". @@ -913,7 +943,7 @@ The subkeys must be non-empty strings, with no further restrictions. The values should preferably be strings, but other perl scalars should -work as well (such as undef, arrays and hashes). +work as well (such as C, arrays and hashes). Every database entry is owned by one node - adding the same family/subkey combination on multiple nodes will not cause discomfort for AnyEvent::MP, @@ -927,40 +957,111 @@ db_set my_image_scalers => $port; And clients looking for an image scaler will want to get the -C keys: +C keys from time to time: + + db_keys my_image_scalers => sub { + @ports = @{ $_[0] }; + }; + +Or better yet, they want to monitor the database family, so they always +have a reasonable up-to-date copy: - db_keys "my_image_scalers" => 60 => sub { - #d##TODO# + db_mon my_image_scalers => sub { + @ports = keys %{ $_[0] }; + }; + +In general, you can set or delete single subkeys, but query and monitor +whole families only. + +If you feel the need to monitor or query a single subkey, try giving it +it's own family. =over -=item db_set $family => $subkey [=> $value] +=item $guard = db_set $family => $subkey [=> $value] Sets (or replaces) a key to the database - if C<$value> is omitted, C is used instead. -=item db_del $family => $subkey +When called in non-void context, C returns a guard that +automatically calls C when it is destroyed. + +=item db_del $family => $subkey... + +Deletes one or more subkeys from the database family. + +=item $guard = db_reg $family => $port => $value + +=item $guard = db_reg $family => $port + +=item $guard = db_reg $family + +Registers a port in the given family and optionally returns a guard to +remove it. + +This function basically does the same as: + + db_set $family => $port => $value + +Except that the port is monitored and automatically removed from the +database family when it is kil'ed. + +If C<$value> is missing, C is used. If C<$port> is missing, then +C<$SELF> is used. + +This function is most useful to register a port in some port group (which +is just another name for a database family), and have it removed when the +port is gone. This works best when the port is a local port. + +=cut + +sub db_reg($$;$) { + my $family = shift; + my $port = @_ ? shift : $SELF; + + my $clr = sub { db_del $family => $port }; + mon $port, $clr; + + db_set $family => $port => $_[0]; + + defined wantarray + and &Guard::guard ($clr) +} + +=item db_family $family => $cb->(\%familyhash) + +Queries the named database C<$family> and call the callback with the +family represented as a hash. You can keep and freely modify the hash. + +=item db_keys $family => $cb->(\@keys) -Deletes a key from the database. +Same as C, except it only queries the family I and passes +them as array reference to the callback. -=item $guard = db_reg $family => $subkey [=> $value] +=item db_values $family => $cb->(\@values) -Sets the key on the database and returns a guard. When the guard is -destroyed, the key is deleted from the database. If C<$value> is missing, -then C is used. +Same as C, except it only queries the family I and passes them +as array reference to the callback. -=item $guard = db_mon $family => $cb->($familyhash, \@subkeys...) +=item $guard = db_mon $family => $cb->($familyhash, \@added, \@changed, \@deleted) -Creates a monitor on the given database family. Each time a key is set or -or is deleted the callback is called with a hash containing the database -family and an arrayref with subkeys that have changed. +Creates a monitor on the given database family. Each time a key is set +or or is deleted the callback is called with a hash containing the +database family and three lists of added, changed and deleted subkeys, +respectively. If no keys have changed then the array reference might be +C or even missing. -Specifically, if one of the passed subkeys exists in the $familyhash, then -it is currently set to the value in the $familyhash. Otherwise, it has -been deleted. +If not called in void context, a guard object is returned that, when +destroyed, stops the monitor. -The first call will be with the current contents of the family and all -keys, as if they were just added. +The family hash reference and the key arrays belong to AnyEvent::MP and +B by the callback. When in doubt, make a +copy. + +As soon as possible after the monitoring starts, the callback will be +called with the intiial contents of the family, even if it is empty, +i.e. there will always be a timely call to the callback with the current +contents. It is possible that the callback is called with a change event even though the subkey is already present and the value has not changed. @@ -970,14 +1071,14 @@ Example: on every change to the family "mygroup", print out all keys. my $guard = db_mon mygroup => sub { - my ($family, $keys) = @_; + my ($family, $a, $c, $d) = @_; print "mygroup members: ", (join " ", keys %$family), "\n"; }; Exmaple: wait until the family "My::Module::workers" is non-empty. my $guard; $guard = db_mon My::Module::workers => sub { - my ($family, $keys) = @_; + my ($family, $a, $c, $d) = @_; return unless %$family; undef $guard; print "My::Module::workers now nonempty\n"; @@ -986,15 +1087,11 @@ Example: print all changes to the family "AnyRvent::Fantasy::Module". my $guard = db_mon AnyRvent::Fantasy::Module => sub { - my ($family, $keys) = @_; + my ($family, $a, $c, $d) = @_; - for (@$keys) { - print "$_: ", - (exists $family->{$_} - ? $family->{$_} - : "(deleted)"), - "\n"; - } + print "+$_=$family->{$_}\n" for @$a; + print "*$_=$family->{$_}\n" for @$c; + print "-$_=$family->{$_}\n" for @$d; }; =cut @@ -1164,6 +1261,111 @@ =back +=head1 PORTING FROM AnyEvent::MP VERSION 1.X + +AEMP version 2 has a few major incompatible changes compared to version 1: + +=over 4 + +=item AnyEvent::MP::Global no longer has group management functions. + +AnyEvent::MP now comes with a distributed database that is more +powerful. Its database families map closely to port groups, but the API +has changed (the functions are also now exported by AnyEvent::MP). Here is +a rough porting guide: + + grp_reg $group, $port # old + db_reg $group, $port # new + + $list = grp_get $group # old + db_keys $group, sub { my $list = shift } # new + + grp_mon $group, $cb->(\@ports, $add, $del) # old + db_mon $group, $cb->(\%ports, $add, $change, $del) # new + +C is a no-brainer (just replace by C), but C is +no longer instant, because the local node might not have a copy of the +group. You can either modify your code to allow for a callback, or use +C to keep an updated copy of the group: + + my $local_group_copy; + db_mon $group => sub { $local_group_copy = $_[0] }; + + # now "keys %$local_group_copy" always returns the most up-to-date + # list of ports in the group. + +C can be replaced by C with minor changes - C +passes a hash as first argument, and an extra C<$chg> argument that can be +ignored: + + db_mon $group => sub { + my ($ports, $add, $chg, $lde) = @_; + $ports = [keys %$ports]; + + # now $ports, $add and $del are the same as + # were originally passed by grp_mon. + ... + }; + +=item Nodes not longer connect to all other nodes. + +In AEMP 1.x, every node automatically loads the L +module, which in turn would create connections to all other nodes in the +network (helped by the seed nodes). + +In version 2.x, global nodes still connect to all other global nodes, but +other nodes don't - now every node either is a global node itself, or +attaches itself to another global node. + +If a node isn't a global node itself, then it attaches itself to one +of its seed nodes. If that seed node isn't a global node yet, it will +automatically be upgraded to a global node. + +So in many cases, nothing needs to be changed - one just has to make sure +that all seed nodes are meshed together with the other seed nodes (as with +AEMP 1.x), and other nodes specify them as seed nodes. This is most easily +achieved by specifying the same set of seed nodes for all nodes in the +network. + +Not opening a connection to every other node is usually an advantage, +except when you need the lower latency of an already established +connection. To ensure a node establishes a connection to another node, +you can monitor the node port (C), which will attempt to +create the connection (and notify you when the connection fails). + +=item Listener-less nodes (nodes without binds) are gone. + +And are not coming back, at least not in their old form. If no C +are specified for a node, AnyEvent::MP assumes a default of C<*:*>. + +There are vague plans to implement some form of routing domains, which +might or might not bring back listener-less nodes, but don't count on it. + +The fact that most connections are now optional somewhat mitigates this, +as a node can be effectively unreachable from the outside without any +problems, as long as it isn't a global node and only reaches out to other +nodes (as opposed to being contacted from other nodes). + +=item $AnyEvent::MP::Kernel::WARN has gone. + +AnyEvent has acquired a logging framework (L), and AEMP now +uses this, and so should your programs. + +Every module now documents what kinds of messages it generates, with +AnyEvent::MP acting as a catch all. + +On the positive side, this means that instead of setting +C, you can get away by setting C - +much less to type. + +=back + +=head1 LOGGING + +AnyEvent::MP does not normally log anything by itself, but sinc eit is the +root of the contetx hierarchy for AnyEvent::MP modules, it will receive +all log messages by submodules. + =head1 SEE ALSO L - a gentle introduction.