ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
(Generate patch)

Comparing AnyEvent-MP/MP.pm (file contents):
Revision 1.99 by root, Fri Oct 2 14:12:16 2009 UTC vs.
Revision 1.107 by root, Wed Dec 30 07:52:46 2009 UTC

30 rcv $port, pong => sub { warn "pong received\n" }; 30 rcv $port, pong => sub { warn "pong received\n" };
31 31
32 # create a port on another node 32 # create a port on another node
33 my $port = spawn $node, $initfunc, @initdata; 33 my $port = spawn $node, $initfunc, @initdata;
34 34
35 # destroy a prot again
36 kil $port; # "normal" kill
37 kil $port, my_error => "everything is broken"; # error kill
38
35 # monitoring 39 # monitoring
36 mon $localport, $cb->(@msg) # callback is invoked on death 40 mon $localport, $cb->(@msg) # callback is invoked on death
37 mon $localport, $otherport # kill otherport on abnormal death 41 mon $localport, $otherport # kill otherport on abnormal death
38 mon $localport, $otherport, @msg # send message on death 42 mon $localport, $otherport, @msg # send message on death
43
44 # temporarily execute code in port context
45 peval $port, sub { die "kill the port!" };
46
47 # execute callbacks in $SELF port context
48 my $timer = AE::timer 1, 0, psub {
49 die "kill the port, delayed";
50 };
39 51
40=head1 CURRENT STATUS 52=head1 CURRENT STATUS
41 53
42 bin/aemp - stable. 54 bin/aemp - stable.
43 AnyEvent::MP - stable API, should work. 55 AnyEvent::MP - stable API, should work.
143 155
144use AE (); 156use AE ();
145 157
146use base "Exporter"; 158use base "Exporter";
147 159
148our $VERSION = 1.2; 160our $VERSION = 1.24;
149 161
150our @EXPORT = qw( 162our @EXPORT = qw(
151 NODE $NODE *SELF node_of after 163 NODE $NODE *SELF node_of after
152 configure 164 configure
153 snd rcv mon mon_guard kil psub spawn cal 165 snd rcv mon mon_guard kil psub peval spawn cal
154 port 166 port
155); 167);
156 168
157our $SELF; 169our $SELF;
158 170
371 msg1 => sub { ... }, 383 msg1 => sub { ... },
372 ... 384 ...
373 ; 385 ;
374 386
375Example: temporarily register a rcv callback for a tag matching some port 387Example: temporarily register a rcv callback for a tag matching some port
376(e.g. for a rpc reply) and unregister it after a message was received. 388(e.g. for an rpc reply) and unregister it after a message was received.
377 389
378 rcv $port, $otherport => sub { 390 rcv $port, $otherport => sub {
379 my @reply = @_; 391 my @reply = @_;
380 392
381 rcv $SELF, $otherport; 393 rcv $SELF, $otherport;
394 if (ref $_[0]) { 406 if (ref $_[0]) {
395 if (my $self = $PORT_DATA{$portid}) { 407 if (my $self = $PORT_DATA{$portid}) {
396 "AnyEvent::MP::Port" eq ref $self 408 "AnyEvent::MP::Port" eq ref $self
397 or Carp::croak "$port: rcv can only be called on message matching ports, caught"; 409 or Carp::croak "$port: rcv can only be called on message matching ports, caught";
398 410
399 $self->[2] = shift; 411 $self->[0] = shift;
400 } else { 412 } else {
401 my $cb = shift; 413 my $cb = shift;
402 $PORT{$portid} = sub { 414 $PORT{$portid} = sub {
403 local $SELF = $port; 415 local $SELF = $port;
404 eval { &$cb }; _self_die if $@; 416 eval { &$cb }; _self_die if $@;
405 }; 417 };
406 } 418 }
407 } elsif (defined $_[0]) { 419 } elsif (defined $_[0]) {
408 my $self = $PORT_DATA{$portid} ||= do { 420 my $self = $PORT_DATA{$portid} ||= do {
409 my $self = bless [$PORT{$port} || sub { }, { }, $port], "AnyEvent::MP::Port"; 421 my $self = bless [$PORT{$portid} || sub { }, { }, $port], "AnyEvent::MP::Port";
410 422
411 $PORT{$portid} = sub { 423 $PORT{$portid} = sub {
412 local $SELF = $port; 424 local $SELF = $port;
413 425
414 if (my $cb = $self->[1]{$_[0]}) { 426 if (my $cb = $self->[1]{$_[0]}) {
436 } 448 }
437 449
438 $port 450 $port
439} 451}
440 452
453=item peval $port, $coderef[, @args]
454
455Evaluates the given C<$codref> within the contetx of C<$port>, that is,
456when the code throews an exception the C<$port> will be killed.
457
458Any remaining args will be passed to the callback. Any return values will
459be returned to the caller.
460
461This is useful when you temporarily want to execute code in the context of
462a port.
463
464Example: create a port and run some initialisation code in it's context.
465
466 my $port = port { ... };
467
468 peval $port, sub {
469 init
470 or die "unable to init";
471 };
472
473=cut
474
475sub peval($$) {
476 local $SELF = shift;
477 my $cb = shift;
478
479 if (wantarray) {
480 my @res = eval { &$cb };
481 _self_die if $@;
482 @res
483 } else {
484 my $res = eval { &$cb };
485 _self_die if $@;
486 $res
487 }
488}
489
441=item $closure = psub { BLOCK } 490=item $closure = psub { BLOCK }
442 491
443Remembers C<$SELF> and creates a closure out of the BLOCK. When the 492Remembers C<$SELF> and creates a closure out of the BLOCK. When the
444closure is executed, sets up the environment in the same way as in C<rcv> 493closure is executed, sets up the environment in the same way as in C<rcv>
445callbacks, i.e. runtime errors will cause the port to get C<kil>ed. 494callbacks, i.e. runtime errors will cause the port to get C<kil>ed.
495
496The effect is basically as if it returned C<< sub { peval $SELF, sub {
497BLOCK } } >>.
446 498
447This is useful when you register callbacks from C<rcv> callbacks: 499This is useful when you register callbacks from C<rcv> callbacks:
448 500
449 rcv delayed_reply => sub { 501 rcv delayed_reply => sub {
450 my ($delay, @reply) = @_; 502 my ($delay, @reply) = @_;
598 650
599=item kil $port[, @reason] 651=item kil $port[, @reason]
600 652
601Kill the specified port with the given C<@reason>. 653Kill the specified port with the given C<@reason>.
602 654
603If no C<@reason> is specified, then the port is killed "normally" (ports 655If no C<@reason> is specified, then the port is killed "normally" -
604monitoring other ports will not necessarily die because a port dies 656monitor callback will be invoked, but the kil will not cause linked ports
605"normally"). 657(C<mon $mport, $lport> form) to get killed.
606 658
607Otherwise, linked ports get killed with the same reason (second form of 659If a C<@reason> is specified, then linked ports (C<mon $mport, $lport>
608C<mon>, see above). 660form) get killed with the same reason.
609 661
610Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks 662Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks
611will be reported as reason C<< die => $@ >>. 663will be reported as reason C<< die => $@ >>.
612 664
613Transport/communication errors are reported as C<< transport_error => 665Transport/communication errors are reported as C<< transport_error =>
927L<AnyEvent::MP::Kernel> - more, lower-level, stuff. 979L<AnyEvent::MP::Kernel> - more, lower-level, stuff.
928 980
929L<AnyEvent::MP::Global> - network maintainance and port groups, to find 981L<AnyEvent::MP::Global> - network maintainance and port groups, to find
930your applications. 982your applications.
931 983
984L<AnyEvent::MP::DataConn> - establish data connections between nodes.
985
932L<AnyEvent::MP::LogCatcher> - simple service to display log messages from 986L<AnyEvent::MP::LogCatcher> - simple service to display log messages from
933all nodes. 987all nodes.
934 988
935L<AnyEvent>. 989L<AnyEvent>.
936 990

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines