ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
(Generate patch)

Comparing AnyEvent-MP/MP.pm (file contents):
Revision 1.75 by root, Mon Aug 31 13:18:06 2009 UTC vs.
Revision 1.101 by root, Tue Oct 6 01:31:22 2009 UTC

1=head1 NAME 1=head1 NAME
2 2
3AnyEvent::MP - multi-processing/message-passing framework 3AnyEvent::MP - erlang-style multi-processing/message-passing framework
4 4
5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use AnyEvent::MP; 7 use AnyEvent::MP;
8 8
30 rcv $port, pong => sub { warn "pong received\n" }; 30 rcv $port, pong => sub { warn "pong received\n" };
31 31
32 # create a port on another node 32 # create a port on another node
33 my $port = spawn $node, $initfunc, @initdata; 33 my $port = spawn $node, $initfunc, @initdata;
34 34
35 # destroy a prot again
36 kil $port; # "normal" kill
37 kil $port, my_error => "everything is broken"; # error kill
38
35 # monitoring 39 # monitoring
36 mon $port, $cb->(@msg) # callback is invoked on death 40 mon $localport, $cb->(@msg) # callback is invoked on death
37 mon $port, $otherport # kill otherport on abnormal death 41 mon $localport, $otherport # kill otherport on abnormal death
38 mon $port, $otherport, @msg # send message on death 42 mon $localport, $otherport, @msg # send message on death
43
44 # temporarily execute code in port context
45 peval $port, sub { die "kill the port!" };
46
47 # execute callbacks in $SELF port context
48 my $timer = AE::timer 1, 0, psub {
49 die "kill the port, delayed";
50 };
39 51
40=head1 CURRENT STATUS 52=head1 CURRENT STATUS
41 53
42 bin/aemp - stable. 54 bin/aemp - stable.
43 AnyEvent::MP - stable API, should work. 55 AnyEvent::MP - stable API, should work.
44 AnyEvent::MP::Intro - uptodate, but incomplete. 56 AnyEvent::MP::Intro - explains most concepts.
45 AnyEvent::MP::Kernel - mostly stable. 57 AnyEvent::MP::Kernel - mostly stable API.
46 AnyEvent::MP::Global - stable API, protocol not yet final. 58 AnyEvent::MP::Global - stable API.
47
48 stay tuned.
49 59
50=head1 DESCRIPTION 60=head1 DESCRIPTION
51 61
52This module (-family) implements a simple message passing framework. 62This module (-family) implements a simple message passing framework.
53 63
55on the same or other hosts, and you can supervise entities remotely. 65on the same or other hosts, and you can supervise entities remotely.
56 66
57For an introduction to this module family, see the L<AnyEvent::MP::Intro> 67For an introduction to this module family, see the L<AnyEvent::MP::Intro>
58manual page and the examples under F<eg/>. 68manual page and the examples under F<eg/>.
59 69
60At the moment, this module family is a bit underdocumented.
61
62=head1 CONCEPTS 70=head1 CONCEPTS
63 71
64=over 4 72=over 4
65 73
66=item port 74=item port
67 75
68A port is something you can send messages to (with the C<snd> function). 76Not to be confused with a TCP port, a "port" is something you can send
77messages to (with the C<snd> function).
69 78
70Ports allow you to register C<rcv> handlers that can match all or just 79Ports allow you to register C<rcv> handlers that can match all or just
71some messages. Messages send to ports will not be queued, regardless of 80some messages. Messages send to ports will not be queued, regardless of
72anything was listening for them or not. 81anything was listening for them or not.
73 82
84 93
85Nodes are either public (have one or more listening ports) or private 94Nodes are either public (have one or more listening ports) or private
86(no listening ports). Private nodes cannot talk to other private nodes 95(no listening ports). Private nodes cannot talk to other private nodes
87currently. 96currently.
88 97
89=item node ID - C<[a-za-Z0-9_\-.:]+> 98=item node ID - C<[A-Z_][a-zA-Z0-9_\-.:]*>
90 99
91A node ID is a string that uniquely identifies the node within a 100A node ID is a string that uniquely identifies the node within a
92network. Depending on the configuration used, node IDs can look like a 101network. Depending on the configuration used, node IDs can look like a
93hostname, a hostname and a port, or a random string. AnyEvent::MP itself 102hostname, a hostname and a port, or a random string. AnyEvent::MP itself
94doesn't interpret node IDs in any way. 103doesn't interpret node IDs in any way.
98Nodes can only talk to each other by creating some kind of connection to 107Nodes can only talk to each other by creating some kind of connection to
99each other. To do this, nodes should listen on one or more local transport 108each other. To do this, nodes should listen on one or more local transport
100endpoints - binds. Currently, only standard C<ip:port> specifications can 109endpoints - binds. Currently, only standard C<ip:port> specifications can
101be used, which specify TCP ports to listen on. 110be used, which specify TCP ports to listen on.
102 111
103=item seeds - C<host:port> 112=item seed nodes
104 113
105When a node starts, it knows nothing about the network. To teach the node 114When a node starts, it knows nothing about the network. To teach the node
106about the network it first has to contact some other node within the 115about the network it first has to contact some other node within the
107network. This node is called a seed. 116network. This node is called a seed.
108 117
109Seeds are transport endpoint(s) of as many nodes as one wants. Those nodes 118Apart from the fact that other nodes know them as seed nodes and they have
119to have fixed listening addresses, seed nodes are perfectly normal nodes -
120any node can function as a seed node for others.
121
122In addition to discovering the network, seed nodes are also used to
123maintain the network and to connect nodes that otherwise would have
124trouble connecting. They form the backbone of an AnyEvent::MP network.
125
110are expected to be long-running, and at least one of those should always 126Seed nodes are expected to be long-running, and at least one seed node
111be available. When nodes run out of connections (e.g. due to a network 127should always be available. They should also be relatively responsive - a
112error), they try to re-establish connections to some seednodes again to 128seed node that blocks for long periods will slow down everybody else.
113join the network.
114 129
115Apart from being sued for seeding, seednodes are not special in any way - 130=item seeds - C<host:port>
116every public node can be a seednode. 131
132Seeds are transport endpoint(s) (usually a hostname/IP address and a
133TCP port) of nodes that should be used as seed nodes.
134
135The nodes listening on those endpoints are expected to be long-running,
136and at least one of those should always be available. When nodes run out
137of connections (e.g. due to a network error), they try to re-establish
138connections to some seednodes again to join the network.
117 139
118=back 140=back
119 141
120=head1 VARIABLES/FUNCTIONS 142=head1 VARIABLES/FUNCTIONS
121 143
133 155
134use AE (); 156use AE ();
135 157
136use base "Exporter"; 158use base "Exporter";
137 159
138our $VERSION = $AnyEvent::MP::Kernel::VERSION; 160our $VERSION = 1.21;
139 161
140our @EXPORT = qw( 162our @EXPORT = qw(
141 NODE $NODE *SELF node_of after 163 NODE $NODE *SELF node_of after
142 configure 164 configure
143 snd rcv mon mon_guard kil reg psub spawn 165 snd rcv mon mon_guard kil psub peval spawn cal
144 port 166 port
145); 167);
146 168
147our $SELF; 169our $SELF;
148 170
160 182
161=item $nodeid = node_of $port 183=item $nodeid = node_of $port
162 184
163Extracts and returns the node ID from a port ID or a node ID. 185Extracts and returns the node ID from a port ID or a node ID.
164 186
187=item configure $profile, key => value...
188
165=item configure key => value... 189=item configure key => value...
166 190
167Before a node can talk to other nodes on the network (i.e. enter 191Before a node can talk to other nodes on the network (i.e. enter
168"distributed mode") it has to configure itself - the minimum a node needs 192"distributed mode") it has to configure itself - the minimum a node needs
169to know is its own name, and optionally it should know the addresses of 193to know is its own name, and optionally it should know the addresses of
176 200
177=item step 1, gathering configuration from profiles 201=item step 1, gathering configuration from profiles
178 202
179The function first looks up a profile in the aemp configuration (see the 203The function first looks up a profile in the aemp configuration (see the
180L<aemp> commandline utility). The profile name can be specified via the 204L<aemp> commandline utility). The profile name can be specified via the
181named C<profile> parameter. If it is missing, then the nodename (F<uname 205named C<profile> parameter or can simply be the first parameter). If it is
182-n>) will be used as profile name. 206missing, then the nodename (F<uname -n>) will be used as profile name.
183 207
184The profile data is then gathered as follows: 208The profile data is then gathered as follows:
185 209
186First, all remaining key => value pairs (all of which are conviniently 210First, all remaining key => value pairs (all of which are conveniently
187undocumented at the moment) will be interpreted as configuration 211undocumented at the moment) will be interpreted as configuration
188data. Then they will be overwritten by any values specified in the global 212data. Then they will be overwritten by any values specified in the global
189default configuration (see the F<aemp> utility), then the chain of 213default configuration (see the F<aemp> utility), then the chain of
190profiles chosen by the profile name (and any C<parent> attributes). 214profiles chosen by the profile name (and any C<parent> attributes).
191 215
215L<AnyEvent::MP::Global> module, which will then use it to keep 239L<AnyEvent::MP::Global> module, which will then use it to keep
216connectivity with at least one node at any point in time. 240connectivity with at least one node at any point in time.
217 241
218=back 242=back
219 243
220Example: become a distributed node using the locla node name as profile. 244Example: become a distributed node using the local node name as profile.
221This should be the most common form of invocation for "daemon"-type nodes. 245This should be the most common form of invocation for "daemon"-type nodes.
222 246
223 configure 247 configure
224 248
225Example: become an anonymous node. This form is often used for commandline 249Example: become an anonymous node. This form is often used for commandline
424 } 448 }
425 449
426 $port 450 $port
427} 451}
428 452
453=item peval $port, $coderef[, @args]
454
455Evaluates the given C<$codref> within the contetx of C<$port>, that is,
456when the code throews an exception the C<$port> will be killed.
457
458Any remaining args will be passed to the callback. Any return values will
459be returned to the caller.
460
461This is useful when you temporarily want to execute code in the context of
462a port.
463
464Example: create a port and run some initialisation code in it's context.
465
466 my $port = port { ... };
467
468 peval $port, sub {
469 init
470 or die "unable to init";
471 };
472
473=cut
474
475sub peval($$) {
476 local $SELF = shift;
477 my $cb = shift;
478
479 if (wantarray) {
480 my @res = eval { &$cb };
481 _self_die if $@;
482 @res
483 } else {
484 my $res = eval { &$cb };
485 _self_die if $@;
486 $res
487 }
488}
489
429=item $closure = psub { BLOCK } 490=item $closure = psub { BLOCK }
430 491
431Remembers C<$SELF> and creates a closure out of the BLOCK. When the 492Remembers C<$SELF> and creates a closure out of the BLOCK. When the
432closure is executed, sets up the environment in the same way as in C<rcv> 493closure is executed, sets up the environment in the same way as in C<rcv>
433callbacks, i.e. runtime errors will cause the port to get C<kil>ed. 494callbacks, i.e. runtime errors will cause the port to get C<kil>ed.
495
496The effect is basically as if it returned C<< sub { peval $SELF, sub {
497BLOCK } } >>.
434 498
435This is useful when you register callbacks from C<rcv> callbacks: 499This is useful when you register callbacks from C<rcv> callbacks:
436 500
437 rcv delayed_reply => sub { 501 rcv delayed_reply => sub {
438 my ($delay, @reply) = @_; 502 my ($delay, @reply) = @_;
474 538
475Monitor the given port and do something when the port is killed or 539Monitor the given port and do something when the port is killed or
476messages to it were lost, and optionally return a guard that can be used 540messages to it were lost, and optionally return a guard that can be used
477to stop monitoring again. 541to stop monitoring again.
478 542
543In the first form (callback), the callback is simply called with any
544number of C<@reason> elements (no @reason means that the port was deleted
545"normally"). Note also that I<< the callback B<must> never die >>, so use
546C<eval> if unsure.
547
548In the second form (another port given), the other port (C<$rcvport>)
549will be C<kil>'ed with C<@reason>, if a @reason was specified, i.e. on
550"normal" kils nothing happens, while under all other conditions, the other
551port is killed with the same reason.
552
553The third form (kill self) is the same as the second form, except that
554C<$rvport> defaults to C<$SELF>.
555
556In the last form (message), a message of the form C<@msg, @reason> will be
557C<snd>.
558
559Monitoring-actions are one-shot: once messages are lost (and a monitoring
560alert was raised), they are removed and will not trigger again.
561
562As a rule of thumb, monitoring requests should always monitor a port from
563a local port (or callback). The reason is that kill messages might get
564lost, just like any other message. Another less obvious reason is that
565even monitoring requests can get lost (for example, when the connection
566to the other node goes down permanently). When monitoring a port locally
567these problems do not exist.
568
479C<mon> effectively guarantees that, in the absence of hardware failures, 569C<mon> effectively guarantees that, in the absence of hardware failures,
480after starting the monitor, either all messages sent to the port will 570after starting the monitor, either all messages sent to the port will
481arrive, or the monitoring action will be invoked after possible message 571arrive, or the monitoring action will be invoked after possible message
482loss has been detected. No messages will be lost "in between" (after 572loss has been detected. No messages will be lost "in between" (after
483the first lost message no further messages will be received by the 573the first lost message no further messages will be received by the
484port). After the monitoring action was invoked, further messages might get 574port). After the monitoring action was invoked, further messages might get
485delivered again. 575delivered again.
486 576
487Note that monitoring-actions are one-shot: once messages are lost (and a 577Inter-host-connection timeouts and monitoring depend on the transport
488monitoring alert was raised), they are removed and will not trigger again. 578used. The only transport currently implemented is TCP, and AnyEvent::MP
579relies on TCP to detect node-downs (this can take 10-15 minutes on a
580non-idle connection, and usually around two hours for idle connections).
489 581
490In the first form (callback), the callback is simply called with any 582This means that monitoring is good for program errors and cleaning up
491number of C<@reason> elements (no @reason means that the port was deleted 583stuff eventually, but they are no replacement for a timeout when you need
492"normally"). Note also that I<< the callback B<must> never die >>, so use 584to ensure some maximum latency.
493C<eval> if unsure.
494
495In the second form (another port given), the other port (C<$rcvport>)
496will be C<kil>'ed with C<@reason>, iff a @reason was specified, i.e. on
497"normal" kils nothing happens, while under all other conditions, the other
498port is killed with the same reason.
499
500The third form (kill self) is the same as the second form, except that
501C<$rvport> defaults to C<$SELF>.
502
503In the last form (message), a message of the form C<@msg, @reason> will be
504C<snd>.
505
506As a rule of thumb, monitoring requests should always monitor a port from
507a local port (or callback). The reason is that kill messages might get
508lost, just like any other message. Another less obvious reason is that
509even monitoring requests can get lost (for exmaple, when the connection
510to the other node goes down permanently). When monitoring a port locally
511these problems do not exist.
512 585
513Example: call a given callback when C<$port> is killed. 586Example: call a given callback when C<$port> is killed.
514 587
515 mon $port, sub { warn "port died because of <@_>\n" }; 588 mon $port, sub { warn "port died because of <@_>\n" };
516 589
544 } 617 }
545 618
546 $node->monitor ($port, $cb); 619 $node->monitor ($port, $cb);
547 620
548 defined wantarray 621 defined wantarray
549 and AnyEvent::Util::guard { $node->unmonitor ($port, $cb) } 622 and ($cb += 0, AnyEvent::Util::guard { $node->unmonitor ($port, $cb) })
550} 623}
551 624
552=item $guard = mon_guard $port, $ref, $ref... 625=item $guard = mon_guard $port, $ref, $ref...
553 626
554Monitors the given C<$port> and keeps the passed references. When the port 627Monitors the given C<$port> and keeps the passed references. When the port
611the package, then the package above the package and so on (e.g. 684the package, then the package above the package and so on (e.g.
612C<MyApp::Chat::Server>, C<MyApp::Chat>, C<MyApp>) until the function 685C<MyApp::Chat::Server>, C<MyApp::Chat>, C<MyApp>) until the function
613exists or it runs out of package names. 686exists or it runs out of package names.
614 687
615The init function is then called with the newly-created port as context 688The init function is then called with the newly-created port as context
616object (C<$SELF>) and the C<@initdata> values as arguments. 689object (C<$SELF>) and the C<@initdata> values as arguments. It I<must>
690call one of the C<rcv> functions to set callbacks on C<$SELF>, otherwise
691the port might not get created.
617 692
618A common idiom is to pass a local port, immediately monitor the spawned 693A common idiom is to pass a local port, immediately monitor the spawned
619port, and in the remote init function, immediately monitor the passed 694port, and in the remote init function, immediately monitor the passed
620local port. This two-way monitoring ensures that both ports get cleaned up 695local port. This two-way monitoring ensures that both ports get cleaned up
621when there is a problem. 696when there is a problem.
622 697
698C<spawn> guarantees that the C<$initfunc> has no visible effects on the
699caller before C<spawn> returns (by delaying invocation when spawn is
700called for the local node).
701
623Example: spawn a chat server port on C<$othernode>. 702Example: spawn a chat server port on C<$othernode>.
624 703
625 # this node, executed from within a port context: 704 # this node, executed from within a port context:
626 my $server = spawn $othernode, "MyApp::Chat::Server::connect", $SELF; 705 my $server = spawn $othernode, "MyApp::Chat::Server::connect", $SELF;
627 mon $server; 706 mon $server;
641 720
642sub _spawn { 721sub _spawn {
643 my $port = shift; 722 my $port = shift;
644 my $init = shift; 723 my $init = shift;
645 724
725 # rcv will create the actual port
646 local $SELF = "$NODE#$port"; 726 local $SELF = "$NODE#$port";
647 eval { 727 eval {
648 &{ load_func $init } 728 &{ load_func $init }
649 }; 729 };
650 _self_die if $@; 730 _self_die if $@;
685 ? $action[0]() 765 ? $action[0]()
686 : snd @action; 766 : snd @action;
687 }; 767 };
688} 768}
689 769
770=item cal $port, @msg, $callback[, $timeout]
771
772A simple form of RPC - sends a message to the given C<$port> with the
773given contents (C<@msg>), but adds a reply port to the message.
774
775The reply port is created temporarily just for the purpose of receiving
776the reply, and will be C<kil>ed when no longer needed.
777
778A reply message sent to the port is passed to the C<$callback> as-is.
779
780If an optional time-out (in seconds) is given and it is not C<undef>,
781then the callback will be called without any arguments after the time-out
782elapsed and the port is C<kil>ed.
783
784If no time-out is given (or it is C<undef>), then the local port will
785monitor the remote port instead, so it eventually gets cleaned-up.
786
787Currently this function returns the temporary port, but this "feature"
788might go in future versions unless you can make a convincing case that
789this is indeed useful for something.
790
791=cut
792
793sub cal(@) {
794 my $timeout = ref $_[-1] ? undef : pop;
795 my $cb = pop;
796
797 my $port = port {
798 undef $timeout;
799 kil $SELF;
800 &$cb;
801 };
802
803 if (defined $timeout) {
804 $timeout = AE::timer $timeout, 0, sub {
805 undef $timeout;
806 kil $port;
807 $cb->();
808 };
809 } else {
810 mon $_[0], sub {
811 kil $port;
812 $cb->();
813 };
814 }
815
816 push @_, $port;
817 &snd;
818
819 $port
820}
821
690=back 822=back
691 823
692=head1 AnyEvent::MP vs. Distributed Erlang 824=head1 AnyEvent::MP vs. Distributed Erlang
693 825
694AnyEvent::MP got lots of its ideas from distributed Erlang (Erlang node 826AnyEvent::MP got lots of its ideas from distributed Erlang (Erlang node
695== aemp node, Erlang process == aemp port), so many of the documents and 827== aemp node, Erlang process == aemp port), so many of the documents and
696programming techniques employed by Erlang apply to AnyEvent::MP. Here is a 828programming techniques employed by Erlang apply to AnyEvent::MP. Here is a
697sample: 829sample:
698 830
699 http://www.Erlang.se/doc/programming_rules.shtml 831 http://www.erlang.se/doc/programming_rules.shtml
700 http://Erlang.org/doc/getting_started/part_frame.html # chapters 3 and 4 832 http://erlang.org/doc/getting_started/part_frame.html # chapters 3 and 4
701 http://Erlang.org/download/Erlang-book-part1.pdf # chapters 5 and 6 833 http://erlang.org/download/erlang-book-part1.pdf # chapters 5 and 6
702 http://Erlang.org/download/armstrong_thesis_2003.pdf # chapters 4 and 5 834 http://erlang.org/download/armstrong_thesis_2003.pdf # chapters 4 and 5
703 835
704Despite the similarities, there are also some important differences: 836Despite the similarities, there are also some important differences:
705 837
706=over 4 838=over 4
707 839
708=item * Node IDs are arbitrary strings in AEMP. 840=item * Node IDs are arbitrary strings in AEMP.
709 841
710Erlang relies on special naming and DNS to work everywhere in the same 842Erlang relies on special naming and DNS to work everywhere in the same
711way. AEMP relies on each node somehow knowing its own address(es) (e.g. by 843way. AEMP relies on each node somehow knowing its own address(es) (e.g. by
712configuraiton or DNS), but will otherwise discover other odes itself. 844configuration or DNS), and possibly the addresses of some seed nodes, but
845will otherwise discover other nodes (and their IDs) itself.
713 846
714=item * Erlang has a "remote ports are like local ports" philosophy, AEMP 847=item * Erlang has a "remote ports are like local ports" philosophy, AEMP
715uses "local ports are like remote ports". 848uses "local ports are like remote ports".
716 849
717The failure modes for local ports are quite different (runtime errors 850The failure modes for local ports are quite different (runtime errors
730 863
731Erlang uses processes that selectively receive messages, and therefore 864Erlang uses processes that selectively receive messages, and therefore
732needs a queue. AEMP is event based, queuing messages would serve no 865needs a queue. AEMP is event based, queuing messages would serve no
733useful purpose. For the same reason the pattern-matching abilities of 866useful purpose. For the same reason the pattern-matching abilities of
734AnyEvent::MP are more limited, as there is little need to be able to 867AnyEvent::MP are more limited, as there is little need to be able to
735filter messages without dequeing them. 868filter messages without dequeuing them.
736 869
737(But see L<Coro::MP> for a more Erlang-like process model on top of AEMP). 870(But see L<Coro::MP> for a more Erlang-like process model on top of AEMP).
738 871
739=item * Erlang sends are synchronous, AEMP sends are asynchronous. 872=item * Erlang sends are synchronous, AEMP sends are asynchronous.
740 873
742so does not need a queue that can overflow). AEMP sends are immediate, 875so does not need a queue that can overflow). AEMP sends are immediate,
743connection establishment is handled in the background. 876connection establishment is handled in the background.
744 877
745=item * Erlang suffers from silent message loss, AEMP does not. 878=item * Erlang suffers from silent message loss, AEMP does not.
746 879
747Erlang makes few guarantees on messages delivery - messages can get lost 880Erlang implements few guarantees on messages delivery - messages can get
748without any of the processes realising it (i.e. you send messages a, b, 881lost without any of the processes realising it (i.e. you send messages a,
749and c, and the other side only receives messages a and c). 882b, and c, and the other side only receives messages a and c).
750 883
751AEMP guarantees correct ordering, and the guarantee that after one message 884AEMP guarantees correct ordering, and the guarantee that after one message
752is lost, all following ones sent to the same port are lost as well, until 885is lost, all following ones sent to the same port are lost as well, until
753monitoring raises an error, so there are no silent "holes" in the message 886monitoring raises an error, so there are no silent "holes" in the message
754sequence. 887sequence.
846L<AnyEvent::MP::Kernel> - more, lower-level, stuff. 979L<AnyEvent::MP::Kernel> - more, lower-level, stuff.
847 980
848L<AnyEvent::MP::Global> - network maintainance and port groups, to find 981L<AnyEvent::MP::Global> - network maintainance and port groups, to find
849your applications. 982your applications.
850 983
984L<AnyEvent::MP::LogCatcher> - simple service to display log messages from
985all nodes.
986
851L<AnyEvent>. 987L<AnyEvent>.
852 988
853=head1 AUTHOR 989=head1 AUTHOR
854 990
855 Marc Lehmann <schmorp@schmorp.de> 991 Marc Lehmann <schmorp@schmorp.de>

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines