ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
(Generate patch)

Comparing AnyEvent-MP/MP.pm (file contents):
Revision 1.37 by root, Fri Aug 7 16:47:23 2009 UTC vs.
Revision 1.42 by root, Sun Aug 9 00:41:49 2009 UTC

8 8
9 $NODE # contains this node's noderef 9 $NODE # contains this node's noderef
10 NODE # returns this node's noderef 10 NODE # returns this node's noderef
11 NODE $port # returns the noderef of the port 11 NODE $port # returns the noderef of the port
12 12
13 $SELF # receiving/own port id in rcv callbacks
14
15 # ports are message endpoints
16
17 # sending messages
13 snd $port, type => data...; 18 snd $port, type => data...;
19 snd $port, @msg;
20 snd @msg_with_first_element_being_a_port;
14 21
15 $SELF # receiving/own port id in rcv callbacks 22 # miniports
23 my $miniport = port { my @msg = @_; 0 };
16 24
25 # full ports
26 my $port = port;
17 rcv $port, smartmatch => $cb->($port, @msg); 27 rcv $port, smartmatch => $cb->(@msg);
18
19 # examples:
20 rcv $port2, ping => sub { snd $_[0], "pong"; 0 }; 28 rcv $port, ping => sub { snd $_[0], "pong"; 0 };
21 rcv $port1, pong => sub { warn "pong received\n" }; 29 rcv $port, pong => sub { warn "pong received\n"; 0 };
22 snd $port2, ping => $port1; 30
31 # remote ports
32 my $port = spawn $node, $initfunc, @initdata;
23 33
24 # more, smarter, matches (_any_ is exported by this module) 34 # more, smarter, matches (_any_ is exported by this module)
25 rcv $port, [child_died => $pid] => sub { ... 35 rcv $port, [child_died => $pid] => sub { ...
26 rcv $port, [_any_, _any_, 3] => sub { .. $_[2] is 3 36 rcv $port, [_any_, _any_, 3] => sub { .. $_[2] is 3
27 37
109 119
110our $VERSION = '0.1'; 120our $VERSION = '0.1';
111our @EXPORT = qw( 121our @EXPORT = qw(
112 NODE $NODE *SELF node_of _any_ 122 NODE $NODE *SELF node_of _any_
113 resolve_node initialise_node 123 resolve_node initialise_node
114 snd rcv mon kil reg psub 124 snd rcv mon kil reg psub spawn
115 port 125 port
116); 126);
117 127
118our $SELF; 128our $SELF;
119 129
340registered. 350registered.
341 351
342The global C<$SELF> (exported by this module) contains C<$port> while 352The global C<$SELF> (exported by this module) contains C<$port> while
343executing the callback. 353executing the callback.
344 354
345Runtime errors wdurign callback execution will result in the port being 355Runtime errors during callback execution will result in the port being
346C<kil>ed. 356C<kil>ed.
347 357
348If the match is an array reference, then it will be matched against the 358If the match is an array reference, then it will be matched against the
349first elements of the message, otherwise only the first element is being 359first elements of the message, otherwise only the first element is being
350matched. 360matched.
491 501
492=item $guard = mon $port 502=item $guard = mon $port
493 503
494=item $guard = mon $port, $rcvport, @msg 504=item $guard = mon $port, $rcvport, @msg
495 505
496Monitor the given port and do something when the port is killed, and 506Monitor the given port and do something when the port is killed or
497optionally return a guard that can be used to stop monitoring again. 507messages to it were lost, and optionally return a guard that can be used
508to stop monitoring again.
509
510C<mon> effectively guarantees that, in the absence of hardware failures,
511that after starting the monitor, either all messages sent to the port
512will arrive, or the monitoring action will be invoked after possible
513message loss has been detected. No messages will be lost "in between"
514(after the first lost message no further messages will be received by the
515port). After the monitoring action was invoked, further messages might get
516delivered again.
498 517
499In the first form (callback), the callback is simply called with any 518In the first form (callback), the callback is simply called with any
500number of C<@reason> elements (no @reason means that the port was deleted 519number of C<@reason> elements (no @reason means that the port was deleted
501"normally"). Note also that I<< the callback B<must> never die >>, so use 520"normally"). Note also that I<< the callback B<must> never die >>, so use
502C<eval> if unsure. 521C<eval> if unsure.
536sub mon { 555sub mon {
537 my ($noderef, $port) = split /#/, shift, 2; 556 my ($noderef, $port) = split /#/, shift, 2;
538 557
539 my $node = $NODE{$noderef} || add_node $noderef; 558 my $node = $NODE{$noderef} || add_node $noderef;
540 559
541 my $cb = @_ ? $_[0] : $SELF || Carp::croak 'mon: called with one argument only, but $SELF not set,'; 560 my $cb = @_ ? shift : $SELF || Carp::croak 'mon: called with one argument only, but $SELF not set,';
542 561
543 unless (ref $cb) { 562 unless (ref $cb) {
544 if (@_) { 563 if (@_) {
545 # send a kill info message 564 # send a kill info message
546 my (@msg) = @_; 565 my (@msg) = ($cb, @_);
547 $cb = sub { snd @msg, @_ }; 566 $cb = sub { snd @msg, @_ };
548 } else { 567 } else {
549 # simply kill other port 568 # simply kill other port
550 my $port = $cb; 569 my $port = $cb;
551 $cb = sub { kil $port, @_ if @_ }; 570 $cb = sub { kil $port, @_ if @_ };
598will be reported as reason C<< die => $@ >>. 617will be reported as reason C<< die => $@ >>.
599 618
600Transport/communication errors are reported as C<< transport_error => 619Transport/communication errors are reported as C<< transport_error =>
601$message >>. 620$message >>.
602 621
622=cut
623
624=item $port = spawn $node, $initfunc[, @initdata]
625
626Creates a port on the node C<$node> (which can also be a port ID, in which
627case it's the node where that port resides).
628
629The port ID of the newly created port is return immediately, and it is
630permissible to immediately start sending messages or monitor the port.
631
632After the port has been created, the init function is
633called. This function must be a fully-qualified function name
634(e.g. C<MyApp::Chat::Server::init>). To specify a function in the main
635program, use C<::name>.
636
637If the function doesn't exist, then the node tries to C<require>
638the package, then the package above the package and so on (e.g.
639C<MyApp::Chat::Server>, C<MyApp::Chat>, C<MyApp>) until the function
640exists or it runs out of package names.
641
642The init function is then called with the newly-created port as context
643object (C<$SELF>) and the C<@initdata> values as arguments.
644
645A common idiom is to pass your own port, monitor the spawned port, and
646in the init function, monitor the original port. This two-way monitoring
647ensures that both ports get cleaned up when there is a problem.
648
649Example: spawn a chat server port on C<$othernode>.
650
651 # this node, executed from within a port context:
652 my $server = spawn $othernode, "MyApp::Chat::Server::connect", $SELF;
653 mon $server;
654
655 # init function on C<$othernode>
656 sub connect {
657 my ($srcport) = @_;
658
659 mon $srcport;
660
661 rcv $SELF, sub {
662 ...
663 };
664 }
665
666=cut
667
668sub _spawn {
669 my $port = shift;
670 my $init = shift;
671
672 local $SELF = "$NODE#$port";
673 eval {
674 &{ load_func $init }
675 };
676 _self_die if $@;
677}
678
679sub spawn(@) {
680 my ($noderef, undef) = split /#/, shift, 2;
681
682 my $id = "$RUNIQ." . $ID++;
683
684 $_[0] =~ /::/
685 or Carp::croak "spawn init function must be a fully-qualified name, caught";
686
687 ($NODE{$noderef} || add_node $noderef)
688 ->send (["", "AnyEvent::MP::_spawn" => $id, @_]);
689
690 "$noderef#$id"
691}
692
603=back 693=back
604 694
605=head1 NODE MESSAGES 695=head1 NODE MESSAGES
606 696
607Nodes understand the following messages sent to them. Many of them take 697Nodes understand the following messages sent to them. Many of them take

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines