--- AnyEvent-MP/MP.pm 2009/08/02 18:08:38 1.11 +++ AnyEvent-MP/MP.pm 2009/08/04 14:10:51 1.21 @@ -30,7 +30,7 @@ on the same or other hosts. At the moment, this module family is severly brokena nd underdocumented, -so do not use. This was uploaded mainly to resreve the CPAN namespace - +so do not use. This was uploaded mainly to reserve the CPAN namespace - stay tuned! =head1 CONCEPTS @@ -86,9 +86,9 @@ our $VERSION = '0.02'; our @EXPORT = qw( - NODE $NODE $PORT snd rcv _any_ + NODE $NODE $PORT snd rcv mon kil _any_ create_port create_port_on - create_miniport + miniport become_slave become_public ); @@ -120,6 +120,67 @@ that Storable can serialise and deserialise is allowed, and for the local node, anything can be passed. +=item $guard = mon $portid, $cb->() + +=item $guard = mon $portid, $otherport + +=item $guard = mon $portid, $otherport, @msg + +Monitor the given port and call the given callback when the port is +destroyed or connection to it's node is lost. + +#TODO + +=cut + +sub mon { + my ($noderef, $port, $cb) = ((split /#/, shift, 2), shift); + + my $node = AnyEvent::MP::Base::add_node $noderef; + + #TODO: ports must not be references + if (!ref $cb or "AnyEvent::MP::Port" eq ref $cb) { + if (@_) { + # send a kill info message + my (@msg) = ($cb, @_); + $cb = sub { snd @msg, @_ }; + } else { + # simply kill other port + my $port = $cb; + $cb = sub { kil $port, @_ }; + } + } + + $node->monitor ($port, $cb); + + defined wantarray + and AnyEvent::Util::guard { $node->unmonitor ($port, $cb) } +} + +=item $guard = mon_guard $port, $ref, $ref... + +Monitors the given C<$port> and keeps the passed references. When the port +is killed, the references will be freed. + +Optionally returns a guard that will stop the monitoring. + +This function is useful when you create e.g. timers or other watchers and +want to free them when the port gets killed: + + $port->rcv (start => sub { + my $timer; $timer = mon_guard $port, AE::timer 1, 1, sub { + undef $timer if 0.9 < rand; + }); + }); + +=cut + +sub mon_guard { + my ($port, @refs) = @_; + + mon $port, sub { 0 && @refs } +} + =item $local_port = create_port Create a new local port object. See the next section for allowed methods. @@ -127,11 +188,10 @@ =cut sub create_port { - my $id = "$AnyEvent::MP::Base::UNIQ." . ++$AnyEvent::MP::Base::ID; + my $id = "$AnyEvent::MP::Base::UNIQ." . $AnyEvent::MP::Base::ID++; my $self = bless { id => "$NODE#$id", - names => [$id], }, "AnyEvent::MP::Port"; $AnyEvent::MP::Base::PORT{$id} = sub { @@ -158,19 +218,33 @@ $self } -=item $portid = create_miniport { } +=item $portid = miniport { my @msg = @_; $finished } + +Creates a "mini port", that is, a very lightweight port without any +pattern matching behind it, and returns its ID. + +The block will be called for every message received on the port. When the +callback returns a true value its job is considered "done" and the port +will be destroyed. Otherwise it will stay alive. + +The message will be passed as-is, no extra argument (i.e. no port id) will +be passed to the callback. -Creates a "mini port", that is, a port without much #TODO +If you need the local port id in the callback, this works nicely: + + my $port; $port = miniport { + snd $otherport, reply => $port; + }; =cut -sub create_miniport(&) { +sub miniport(&) { my $cb = shift; - my $id = "$AnyEvent::MP::Base::UNIQ." . ++$AnyEvent::MP::Base::ID; + my $id = "$AnyEvent::MP::Base::UNIQ." . $AnyEvent::MP::Base::ID++; $AnyEvent::MP::Base::PORT{$id} = sub { &$cb - and delete $AnyEvent::MP::Base::PORT{$id}; + and kil $id; }; "$NODE#$id" @@ -195,6 +269,8 @@ '""' => sub { $_[0]{id} }, fallback => 1; +sub TO_JSON { $_[0]{id} } + =item $port->rcv (type => $callback->($port, @msg)) =item $port->rcv ($smartmatch => $callback->($port, @msg)) @@ -255,7 +331,7 @@ Explicitly destroy/remove/nuke/vaporise the port. -Ports are normally kept alive by there mere existance alone, and need to +Ports are normally kept alive by their mere existance alone, and need to be destroyed explicitly. =cut @@ -265,8 +341,7 @@ delete $AnyEvent::MP::Base::WKP{ $self->{wkname} }; - delete $AnyEvent::MP::Base::PORT{$_} - for @{ $self->{names} }; + AnyEvent::MP::Base::kil $self->{id}; } =back