… | |
… | |
35 | # destroy a port again |
35 | # destroy a port again |
36 | kil $port; # "normal" kill |
36 | kil $port; # "normal" kill |
37 | kil $port, my_error => "everything is broken"; # error kill |
37 | kil $port, my_error => "everything is broken"; # error kill |
38 | |
38 | |
39 | # monitoring |
39 | # monitoring |
40 | mon $localport, $cb->(@msg) # callback is invoked on death |
40 | mon $port, $cb->(@msg) # callback is invoked on death |
41 | mon $localport, $otherport # kill otherport on abnormal death |
41 | mon $port, $localport # kill localport on abnormal death |
42 | mon $localport, $otherport, @msg # send message on death |
42 | mon $port, $localport, @msg # send message on death |
43 | |
43 | |
44 | # temporarily execute code in port context |
44 | # temporarily execute code in port context |
45 | peval $port, sub { die "kill the port!" }; |
45 | peval $port, sub { die "kill the port!" }; |
46 | |
46 | |
47 | # execute callbacks in $SELF port context |
47 | # execute callbacks in $SELF port context |
… | |
… | |
197 | NODE $NODE *SELF node_of after |
197 | NODE $NODE *SELF node_of after |
198 | configure |
198 | configure |
199 | snd rcv mon mon_guard kil psub peval spawn cal |
199 | snd rcv mon mon_guard kil psub peval spawn cal |
200 | port |
200 | port |
201 | db_set db_del db_reg |
201 | db_set db_del db_reg |
|
|
202 | db_mon db_family db_keys db_values |
202 | ); |
203 | ); |
203 | |
204 | |
204 | our $SELF; |
205 | our $SELF; |
205 | |
206 | |
206 | sub _self_die() { |
207 | sub _self_die() { |
… | |
… | |
833 | ref $action[0] |
834 | ref $action[0] |
834 | ? $action[0]() |
835 | ? $action[0]() |
835 | : snd @action; |
836 | : snd @action; |
836 | }; |
837 | }; |
837 | } |
838 | } |
|
|
839 | |
|
|
840 | #=item $cb2 = timeout $seconds, $cb[, @args] |
838 | |
841 | |
839 | =item cal $port, @msg, $callback[, $timeout] |
842 | =item cal $port, @msg, $callback[, $timeout] |
840 | |
843 | |
841 | A simple form of RPC - sends a message to the given C<$port> with the |
844 | A simple form of RPC - sends a message to the given C<$port> with the |
842 | given contents (C<@msg>), but adds a reply port to the message. |
845 | given contents (C<@msg>), but adds a reply port to the message. |
… | |
… | |
924 | pools. For example, a worker port for image scaling might do this: |
927 | pools. For example, a worker port for image scaling might do this: |
925 | |
928 | |
926 | db_set my_image_scalers => $port; |
929 | db_set my_image_scalers => $port; |
927 | |
930 | |
928 | And clients looking for an image scaler will want to get the |
931 | And clients looking for an image scaler will want to get the |
929 | C<my_image_scalers> keys: |
932 | C<my_image_scalers> keys from time to time: |
930 | |
933 | |
931 | db_keys "my_image_scalers" => 60 => sub { |
934 | db_keys my_image_scalers => sub { |
932 | #d##TODO# |
935 | @ports = @{ $_[0] }; |
|
|
936 | }; |
|
|
937 | |
|
|
938 | Or better yet, they want to monitor the database family, so they always |
|
|
939 | have a reasonable up-to-date copy: |
|
|
940 | |
|
|
941 | db_mon my_image_scalers => sub { |
|
|
942 | @ports = keys %{ $_[0] }; |
|
|
943 | }; |
|
|
944 | |
|
|
945 | In general, you can set or delete single subkeys, but query and monitor |
|
|
946 | whole families only. |
|
|
947 | |
|
|
948 | If you feel the need to monitor or query a single subkey, try giving it |
|
|
949 | it's own family. |
933 | |
950 | |
934 | =over |
951 | =over |
935 | |
952 | |
936 | =item db_set $family => $subkey [=> $value] |
953 | =item db_set $family => $subkey [=> $value] |
937 | |
954 | |
… | |
… | |
945 | =item $guard = db_reg $family => $subkey [=> $value] |
962 | =item $guard = db_reg $family => $subkey [=> $value] |
946 | |
963 | |
947 | Sets the key on the database and returns a guard. When the guard is |
964 | Sets the key on the database and returns a guard. When the guard is |
948 | destroyed, the key is deleted from the database. If C<$value> is missing, |
965 | destroyed, the key is deleted from the database. If C<$value> is missing, |
949 | then C<undef> is used. |
966 | then C<undef> is used. |
|
|
967 | |
|
|
968 | =item db_family $family => $cb->(\%familyhash) |
|
|
969 | |
|
|
970 | Queries the named database C<$family> and call the callback with the |
|
|
971 | family represented as a hash. You can keep and freely modify the hash. |
|
|
972 | |
|
|
973 | =item db_keys $family => $cb->(\@keys) |
|
|
974 | |
|
|
975 | Same as C<db_family>, except it only queries the family I<subkeys> and passes |
|
|
976 | them as array reference to the callback. |
|
|
977 | |
|
|
978 | =item db_values $family => $cb->(\@values) |
|
|
979 | |
|
|
980 | Same as C<db_family>, except it only queries the family I<values> and passes them |
|
|
981 | as array reference to the callback. |
|
|
982 | |
|
|
983 | =item $guard = db_mon $family => $cb->($familyhash, \@subkeys...) |
|
|
984 | |
|
|
985 | Creates a monitor on the given database family. Each time a key is set or |
|
|
986 | or is deleted the callback is called with a hash containing the database |
|
|
987 | family and an arrayref with subkeys that have changed. |
|
|
988 | |
|
|
989 | Specifically, if one of the passed subkeys exists in the $familyhash, then |
|
|
990 | it is currently set to the value in the $familyhash. Otherwise, it has |
|
|
991 | been deleted. |
|
|
992 | |
|
|
993 | The family hash reference belongs to AnyEvent::MP and B<must not be |
|
|
994 | modified or stored> by the callback. When in doubt, make a copy. |
|
|
995 | |
|
|
996 | The first call will be with the current contents of the family and all |
|
|
997 | keys, as if they were just added. |
|
|
998 | |
|
|
999 | It is possible that the callback is called with a change event even though |
|
|
1000 | the subkey is already present and the value has not changed. |
|
|
1001 | |
|
|
1002 | The monitoring stops when the guard object is destroyed. |
|
|
1003 | |
|
|
1004 | Example: on every change to the family "mygroup", print out all keys. |
|
|
1005 | |
|
|
1006 | my $guard = db_mon mygroup => sub { |
|
|
1007 | my ($family, $keys) = @_; |
|
|
1008 | print "mygroup members: ", (join " ", keys %$family), "\n"; |
|
|
1009 | }; |
|
|
1010 | |
|
|
1011 | Exmaple: wait until the family "My::Module::workers" is non-empty. |
|
|
1012 | |
|
|
1013 | my $guard; $guard = db_mon My::Module::workers => sub { |
|
|
1014 | my ($family, $keys) = @_; |
|
|
1015 | return unless %$family; |
|
|
1016 | undef $guard; |
|
|
1017 | print "My::Module::workers now nonempty\n"; |
|
|
1018 | }; |
|
|
1019 | |
|
|
1020 | Example: print all changes to the family "AnyRvent::Fantasy::Module". |
|
|
1021 | |
|
|
1022 | my $guard = db_mon AnyRvent::Fantasy::Module => sub { |
|
|
1023 | my ($family, $keys) = @_; |
|
|
1024 | |
|
|
1025 | for (@$keys) { |
|
|
1026 | print "$_: ", |
|
|
1027 | (exists $family->{$_} |
|
|
1028 | ? $family->{$_} |
|
|
1029 | : "(deleted)"), |
|
|
1030 | "\n"; |
|
|
1031 | } |
|
|
1032 | }; |
950 | |
1033 | |
951 | =cut |
1034 | =cut |
952 | |
1035 | |
953 | =back |
1036 | =back |
954 | |
1037 | |