--- AnyEvent-MP/MP.pm 2012/03/04 14:28:44 1.128 +++ AnyEvent-MP/MP.pm 2012/03/21 23:48:39 1.137 @@ -37,9 +37,9 @@ kil $port, my_error => "everything is broken"; # error kill # monitoring - mon $localport, $cb->(@msg) # callback is invoked on death - mon $localport, $otherport # kill otherport on abnormal death - mon $localport, $otherport, @msg # send message on death + mon $port, $cb->(@msg) # callback is invoked on death + mon $port, $localport # kill localport on abnormal death + mon $port, $localport, @msg # send message on death # temporarily execute code in port context peval $port, sub { die "kill the port!" }; @@ -249,12 +249,17 @@ precedence over any values configured via the rc file. The default is for the rc file to override any options specified in the program. -=item secure => $pass->($nodeid) +=item secure => $pass->(@msg) In addition to specifying a boolean, you can specify a code reference that -is called for every remote execution attempt - the execution request is +is called for every code execution attempt - the execution request is granted iff the callback returns a true value. +Most of the time the callback should look only at +C<$AnyEvent::MP::Kernel::SRCNODE> to make a decision, and not at the +actual message (which can be about anything, and is mostly provided for +diagnostic purposes). + See F for more info. =back @@ -400,15 +405,16 @@ sub rcv($@); -sub _kilme { - die "received message on port without callback"; -} +my $KILME = sub { + (my $tag = substr $_[0], 0, 30) =~ s/([\x20-\x7e])/./g; + kil $SELF, unhandled_message => "no callback found for message '$tag'"; +}; sub port(;&) { my $id = $UNIQ . ++$ID; my $port = "$NODE#$id"; - rcv $port, shift || \&_kilme; + rcv $port, shift || $KILME; $port } @@ -423,7 +429,7 @@ executing the callback. Runtime errors during callback execution will result in the port being Ced. -The default callback received all messages not matched by a more specific +The default callback receives all messages not matched by a more specific C match. =item rcv $local_port, tag => $callback->(@msg_without_tag), ... @@ -734,7 +740,17 @@ Transport/communication errors are reported as C<< transport_error => $message >>. -=cut +Common idioms: + + # silently remove yourself, do not kill linked ports + kil $SELF; + + # report a failure in some detail + kil $SELF, failure_mode_1 => "it failed with too high temperature"; + + # do not waste much time with killing, just die when something goes wrong + open my $fh, " with the @@ -894,11 +912,16 @@ =head1 DISTRIBUTED DATABASE AnyEvent::MP comes with a simple distributed database. The database will -be mirrored asynchronously at all global nodes. Other nodes bind to one of -the global nodes for their needs. +be mirrored asynchronously on all global nodes. Other nodes bind to one +of the global nodes for their needs. Every node has a "local database" +which contains all the values that are set locally. All local databases +are merged together to form the global database, which can be queried. + +The database structure is that of a two-level hash - the database hash +contains hashes which contain values, similarly to a perl hash of hashes, +i.e.: -The database consists of a two-level hash - a hash contains a hash which -contains values. + $DATABASE{$family}{$subkey} = $value The top level hash key is called "family", and the second-level hash key is called "subkey" or simply "key". @@ -913,7 +936,7 @@ The subkeys must be non-empty strings, with no further restrictions. The values should preferably be strings, but other perl scalars should -work as well (such as undef, arrays and hashes). +work as well (such as C, arrays and hashes). Every database entry is owned by one node - adding the same family/subkey combination on multiple nodes will not cause discomfort for AnyEvent::MP, @@ -927,40 +950,111 @@ db_set my_image_scalers => $port; And clients looking for an image scaler will want to get the -C keys: +C keys from time to time: + + db_keys my_image_scalers => sub { + @ports = @{ $_[0] }; + }; + +Or better yet, they want to monitor the database family, so they always +have a reasonable up-to-date copy: + + db_mon my_image_scalers => sub { + @ports = keys %{ $_[0] }; + }; - db_keys "my_image_scalers" => 60 => sub { - #d##TODO# +In general, you can set or delete single subkeys, but query and monitor +whole families only. + +If you feel the need to monitor or query a single subkey, try giving it +it's own family. =over -=item db_set $family => $subkey [=> $value] +=item $guard = db_set $family => $subkey [=> $value] Sets (or replaces) a key to the database - if C<$value> is omitted, C is used instead. -=item db_del $family => $subkey +When called in non-void context, C returns a guard that +automatically calls C when it is destroyed. + +=item db_del $family => $subkey... + +Deletes one or more subkeys from the database family. + +=item $guard = db_reg $family => $port => $value + +=item $guard = db_reg $family => $port + +=item $guard = db_reg $family + +Registers a port in the given family and optionally returns a guard to +remove it. + +This function basically does the same as: + + db_set $family => $port => $value + +Except that the port is monitored and automatically removed from the +database family when it is kil'ed. + +If C<$value> is missing, C is used. If C<$port> is missing, then +C<$SELF> is used. + +This function is most useful to register a port in some port group (which +is just another name for a database family), and have it removed when the +port is gone. This works best when the port is a local port. + +=cut + +sub db_reg($$;$) { + my $family = shift; + my $port = @_ ? shift : $SELF; + + my $clr = sub { db_del $family => $port }; + mon $port, $clr; + + db_set $family => $port => $_[0]; + + defined wantarray + and &Guard::guard ($clr) +} + +=item db_family $family => $cb->(\%familyhash) + +Queries the named database C<$family> and call the callback with the +family represented as a hash. You can keep and freely modify the hash. + +=item db_keys $family => $cb->(\@keys) + +Same as C, except it only queries the family I and passes +them as array reference to the callback. -Deletes a key from the database. +=item db_values $family => $cb->(\@values) -=item $guard = db_reg $family => $subkey [=> $value] +Same as C, except it only queries the family I and passes them +as array reference to the callback. -Sets the key on the database and returns a guard. When the guard is -destroyed, the key is deleted from the database. If C<$value> is missing, -then C is used. +=item $guard = db_mon $family => $cb->($familyhash, \@added, \@changed, \@deleted) -=item $guard = db_mon $family => $cb->($familyhash, \@subkeys...) +Creates a monitor on the given database family. Each time a key is set +or or is deleted the callback is called with a hash containing the +database family and three lists of added, changed and deleted subkeys, +respectively. If no keys have changed then the array reference might be +C or even missing. -Creates a monitor on the given database family. Each time a key is set or -or is deleted the callback is called with a hash containing the database -family and an arrayref with subkeys that have changed. +If not called in void context, a guard object is returned that, when +destroyed, stops the monitor. -Specifically, if one of the passed subkeys exists in the $familyhash, then -it is currently set to the value in the $familyhash. Otherwise, it has -been deleted. +The family hash reference and the key arrays belong to AnyEvent::MP and +B by the callback. When in doubt, make a +copy. -The first call will be with the current contents of the family and all -keys, as if they were just added. +As soon as possible after the monitoring starts, the callback will be +called with the intiial contents of the family, even if it is empty, +i.e. there will always be a timely call to the callback with the current +contents. It is possible that the callback is called with a change event even though the subkey is already present and the value has not changed. @@ -970,14 +1064,14 @@ Example: on every change to the family "mygroup", print out all keys. my $guard = db_mon mygroup => sub { - my ($family, $keys) = @_; + my ($family, $a, $c, $d) = @_; print "mygroup members: ", (join " ", keys %$family), "\n"; }; Exmaple: wait until the family "My::Module::workers" is non-empty. my $guard; $guard = db_mon My::Module::workers => sub { - my ($family, $keys) = @_; + my ($family, $a, $c, $d) = @_; return unless %$family; undef $guard; print "My::Module::workers now nonempty\n"; @@ -986,15 +1080,11 @@ Example: print all changes to the family "AnyRvent::Fantasy::Module". my $guard = db_mon AnyRvent::Fantasy::Module => sub { - my ($family, $keys) = @_; + my ($family, $a, $c, $d) = @_; - for (@$keys) { - print "$_: ", - (exists $family->{$_} - ? $family->{$_} - : "(deleted)"), - "\n"; - } + print "+$_=$family->{$_}\n" for @$a; + print "*$_=$family->{$_}\n" for @$c; + print "-$_=$family->{$_}\n" for @$d; }; =cut @@ -1164,6 +1254,87 @@ =back +=head1 PORTING FROM AnyEvent::MP VERSION 1.X + +AEMP version 2 has three major incompatible changes compared to version 1: + +=over 4 + +=item AnyEvent::MP::Global no longer has group management functions. + +AnyEvent::MP now comes with a distributed database that is more +powerful. It's database families map closely to ports, but the API has +minor differences: + + grp_reg $group, $port # old + db_reg $group, $port # new + + $list = grp_get $group # old + db_keys $group, sub { my $list = shift } # new + + grp_mon $group, $cb->(\@ports, $add, $del) # old + db_mon $group, $cb->(\%ports, $add, $change, $del) # new + +C is a no-brainer (just replace by C), but C +is no longer instant, because the local node might not have a copy of +the group. This can be partially remedied by using C to keep an +updated copy of the group: + + my $local_group_copy; + db_mon $group => sub { $local_group_copy = shift }; + + # no keys %$local_group_copy always returns the most up-to-date + # list of ports in the group. + +C can almost be replaced by C: + + db_mon $group => sub { + my ($ports, $add, $chg, $lde) = @_; + $ports = [keys %$ports]; + + # now $ports, $add and $del are the same as + # were originally passed by grp_mon. + ... + }; + +=item Nodes not longer connect to all other nodes. + +In AEMP 1.x, every node automatically loads the L +module, which in turn would create connections to all other nodes in the +network (helped by the seed nodes). + +In version 2.x, global nodes still connect to all other global nodes, but +other nodes don't - now every node either is a global node itself, or +attaches itself to another global node. + +If a node isn't a global node itself, then it attaches itself to one +of its seed nodes. If that seed node isn't a global node yet, it will +automatically be upgraded to a global node. + +So in many cases, nothing needs to be changed - one just has to make sure +that all seed nodes are meshed together with the other seed nodes (as with +AEMP 1.x), and other nodes specify them as seed nodes. + +Not opening a connection to every other node is usually an advantage, +except when you need the lower latency of an already established +connection. To ensure a node establishes a connection to another node, +you can monitor the node port (C), which will attempt to +create the connection (And notify you when the connection fails). + +=item Listener-less nodes are gone. + +And are not coming back, at least not in their old form. + +There are vague plans to implement some form of routing domains, which +might or might not bring back listener-less nodes, but don't count on it. + +The fact that most connections are now optional somewhat mitigates this, +as a node can be effectively unreachable from the outside without any +problems, as long as it isn't a global node and only reaches out to other +nodes (as opposed to being contacted from other nodes). + +=back + =head1 SEE ALSO L - a gentle introduction.