--- AnyEvent-MP/MP.pm 2009/08/07 16:47:23 1.37 +++ AnyEvent-MP/MP.pm 2009/08/08 00:22:16 1.40 @@ -10,16 +10,26 @@ NODE # returns this node's noderef NODE $port # returns the noderef of the port + $SELF # receiving/own port id in rcv callbacks + + # ports are message endpoints + + # sending messages snd $port, type => data...; + snd $port, @msg; + snd @msg_with_first_element_being_a_port; - $SELF # receiving/own port id in rcv callbacks + # miniports + my $miniport = port { my @msg = @_; 0 }; - rcv $port, smartmatch => $cb->($port, @msg); + # full ports + my $port = port; + rcv $port, smartmatch => $cb->(@msg); + rcv $port, ping => sub { snd $_[0], "pong"; 0 }; + rcv $port, pong => sub { warn "pong received\n"; 0 }; - # examples: - rcv $port2, ping => sub { snd $_[0], "pong"; 0 }; - rcv $port1, pong => sub { warn "pong received\n" }; - snd $port2, ping => $port1; + # remote ports + my $port = spawn $node, $initfunc, @initdata; # more, smarter, matches (_any_ is exported by this module) rcv $port, [child_died => $pid] => sub { ... @@ -111,7 +121,7 @@ our @EXPORT = qw( NODE $NODE *SELF node_of _any_ resolve_node initialise_node - snd rcv mon kil reg psub + snd rcv mon kil reg psub spawn port ); @@ -342,7 +352,7 @@ The global C<$SELF> (exported by this module) contains C<$port> while executing the callback. -Runtime errors wdurign callback execution will result in the port being +Runtime errors during callback execution will result in the port being Ced. If the match is an array reference, then it will be matched against the @@ -600,6 +610,77 @@ Transport/communication errors are reported as C<< transport_error => $message >>. +=cut + +=item $port = spawn $node, $initfunc[, @initdata] + +Creates a port on the node C<$node> (which can also be a port ID, in which +case it's the node where that port resides). + +The port ID of the newly created port is return immediately, and it is +permissible to immediately start sending messages or monitor the port. + +After the port has been created, the init function is +called. This function must be a fully-qualified function name +(e.g. C). To specify a function in the main +program, use C<::name>. + +If the function doesn't exist, then the node tries to C +the package, then the package above the package and so on (e.g. +C, C, C) until the function +exists or it runs out of package names. + +The init function is then called with the newly-created port as context +object (C<$SELF>) and the C<@initdata> values as arguments. + +A common idiom is to pass your own port, monitor the spawned port, and +in the init function, monitor the original port. This two-way monitoring +ensures that both ports get cleaned up when there is a problem. + +Example: spawn a chat server port on C<$othernode>. + + # this node, executed from within a port context: + my $server = spawn $othernode, "MyApp::Chat::Server::connect", $SELF; + mon $server; + + # init function on C<$othernode> + sub connect { + my ($srcport) = @_; + + mon $srcport; + + rcv $SELF, sub { + ... + }; + } + +=cut + +sub _spawn { + my $port = shift; + my $init = shift; + + local $SELF = "$NODE#$port"; + eval { + &{ load_func $init } + }; + _self_die if $@; +} + +sub spawn(@) { + my ($noderef, undef) = split /#/, shift, 2; + + my $id = "$RUNIQ." . $ID++; + + $_[0] =~ /::/ + or Carp::croak "spawn init function must be a fully-qualified name, caught"; + + ($NODE{$noderef} || add_node $noderef) + ->send (["", "AnyEvent::MP::_spawn" => $id, @_]); + + "$noderef#$id" +} + =back =head1 NODE MESSAGES