--- AnyEvent-MP/MP.pm 2009/08/04 23:35:51 1.30 +++ AnyEvent-MP/MP.pm 2009/08/05 19:58:46 1.32 @@ -105,8 +105,7 @@ our $VERSION = '0.1'; our @EXPORT = qw( NODE $NODE *SELF node_of _any_ - resolve_node - become_slave become_public + resolve_node initialise_node snd rcv mon kil reg psub port ); @@ -192,117 +191,11 @@ that Storable can serialise and deserialise is allowed, and for the local node, anything can be passed. -=item kil $portid[, @reason] - -Kill the specified port with the given C<@reason>. - -If no C<@reason> is specified, then the port is killed "normally" (linked -ports will not be kileld, or even notified). - -Otherwise, linked ports get killed with the same reason (second form of -C, see below). - -Runtime errors while evaluating C callbacks or inside C blocks -will be reported as reason C<< die => $@ >>. - -Transport/communication errors are reported as C<< transport_error => -$message >>. - -=item $guard = mon $portid, $cb->(@reason) - -=item $guard = mon $portid, $otherport - -=item $guard = mon $portid, $otherport, @msg - -Monitor the given port and do something when the port is killed. - -In the first form, the callback is simply called with any number -of C<@reason> elements (no @reason means that the port was deleted -"normally"). Note also that I<< the callback B never die >>, so use -C if unsure. - -In the second form, the other port will be C'ed with C<@reason>, iff -a @reason was specified, i.e. on "normal" kils nothing happens, while -under all other conditions, the other port is killed with the same reason. - -In the last form, a message of the form C<@msg, @reason> will be C. - -Example: call a given callback when C<$port> is killed. - - mon $port, sub { warn "port died because of <@_>\n" }; - -Example: kill ourselves when C<$port> is killed abnormally. - - mon $port, $self; - -Example: send us a restart message another C<$port> is killed. - - mon $port, $self => "restart"; - -=cut - -sub mon { - my ($noderef, $port) = split /#/, shift, 2; - - my $node = $NODE{$noderef} || add_node $noderef; - - my $cb = shift; - - unless (ref $cb) { - if (@_) { - # send a kill info message - my (@msg) = ($cb, @_); - $cb = sub { snd @msg, @_ }; - } else { - # simply kill other port - my $port = $cb; - $cb = sub { kil $port, @_ if @_ }; - } - } - - $node->monitor ($port, $cb); - - defined wantarray - and AnyEvent::Util::guard { $node->unmonitor ($port, $cb) } -} - -=item $guard = mon_guard $port, $ref, $ref... - -Monitors the given C<$port> and keeps the passed references. When the port -is killed, the references will be freed. - -Optionally returns a guard that will stop the monitoring. - -This function is useful when you create e.g. timers or other watchers and -want to free them when the port gets killed: - - $port->rcv (start => sub { - my $timer; $timer = mon_guard $port, AE::timer 1, 1, sub { - undef $timer if 0.9 < rand; - }); - }); - -=cut - -sub mon_guard { - my ($port, @refs) = @_; - - mon $port, sub { 0 && @refs } -} - -=item lnk $port1, $port2 - -Link two ports. This is simply a shorthand for: - - mon $port1, $port2; - mon $port2, $port1; - -It means that if either one is killed abnormally, the other one gets -killed as well. - =item $local_port = port -Create a new local port object that supports message matching. +Create a new local port object that can be used either as a pattern +matching port ("full port") or a single-callback port ("miniport"), +depending on how C callbacks are bound to the object. =item $portid = port { my @msg = @_; $finished } @@ -318,7 +211,7 @@ If you need the local port id in the callback, this works nicely: - my $port; $port = miniport { + my $port; $port = port { snd $otherport, reply => $port; }; @@ -389,13 +282,19 @@ $REG{$name} = $portid; } +=item rcv $portid, $callback->(@msg) + +Replaces the callback on the specified miniport (or newly created port +object, see C). Full ports are configured with the following calls: + =item rcv $portid, tagstring => $callback->(@msg), ... =item rcv $portid, $smartmatch => $callback->(@msg), ... =item rcv $portid, [$smartmatch...] => $callback->(@msg), ... -Register callbacks to be called on matching messages on the given port. +Register callbacks to be called on matching messages on the given full +port (or newly created port). The callback has to return a true value when its work is done, after which is will be removed, or a false value in which case it will stay @@ -421,7 +320,8 @@ =cut sub rcv($@) { - my ($noderef, $port) = split /#/, shift, 2; + my $portid = shift; + my ($noderef, $port) = split /#/, $port, 2; ($NODE{$noderef} || add_node $noderef) == $NODE{""} or Carp::croak "$noderef#$port: rcv can only be called on local ports, caught"; @@ -446,6 +346,8 @@ push @{ $self->{any} }, [$cb, $match]; } } + + $portid } =item $closure = psub { BLOCK } @@ -486,6 +388,114 @@ } } +=item $guard = mon $portid, $cb->(@reason) + +=item $guard = mon $portid, $otherport + +=item $guard = mon $portid, $otherport, @msg + +Monitor the given port and do something when the port is killed. + +In the first form, the callback is simply called with any number +of C<@reason> elements (no @reason means that the port was deleted +"normally"). Note also that I<< the callback B never die >>, so use +C if unsure. + +In the second form, the other port will be C'ed with C<@reason>, iff +a @reason was specified, i.e. on "normal" kils nothing happens, while +under all other conditions, the other port is killed with the same reason. + +In the last form, a message of the form C<@msg, @reason> will be C. + +Example: call a given callback when C<$port> is killed. + + mon $port, sub { warn "port died because of <@_>\n" }; + +Example: kill ourselves when C<$port> is killed abnormally. + + mon $port, $self; + +Example: send us a restart message another C<$port> is killed. + + mon $port, $self => "restart"; + +=cut + +sub mon { + my ($noderef, $port) = split /#/, shift, 2; + + my $node = $NODE{$noderef} || add_node $noderef; + + my $cb = shift; + + unless (ref $cb) { + if (@_) { + # send a kill info message + my (@msg) = ($cb, @_); + $cb = sub { snd @msg, @_ }; + } else { + # simply kill other port + my $port = $cb; + $cb = sub { kil $port, @_ if @_ }; + } + } + + $node->monitor ($port, $cb); + + defined wantarray + and AnyEvent::Util::guard { $node->unmonitor ($port, $cb) } +} + +=item $guard = mon_guard $port, $ref, $ref... + +Monitors the given C<$port> and keeps the passed references. When the port +is killed, the references will be freed. + +Optionally returns a guard that will stop the monitoring. + +This function is useful when you create e.g. timers or other watchers and +want to free them when the port gets killed: + + $port->rcv (start => sub { + my $timer; $timer = mon_guard $port, AE::timer 1, 1, sub { + undef $timer if 0.9 < rand; + }); + }); + +=cut + +sub mon_guard { + my ($port, @refs) = @_; + + mon $port, sub { 0 && @refs } +} + +=item lnk $port1, $port2 + +Link two ports. This is simply a shorthand for: + + mon $port1, $port2; + mon $port2, $port1; + +It means that if either one is killed abnormally, the other one gets +killed as well. + +=item kil $portid[, @reason] + +Kill the specified port with the given C<@reason>. + +If no C<@reason> is specified, then the port is killed "normally" (linked +ports will not be kileld, or even notified). + +Otherwise, linked ports get killed with the same reason (second form of +C, see below). + +Runtime errors while evaluating C callbacks or inside C blocks +will be reported as reason C<< die => $@ >>. + +Transport/communication errors are reported as C<< transport_error => +$message >>. + =back =head1 FUNCTIONS FOR NODES