ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
(Generate patch)

Comparing AnyEvent-MP/MP.pm (file contents):
Revision 1.33 by root, Wed Aug 5 22:40:51 2009 UTC vs.
Revision 1.41 by root, Sat Aug 8 21:56:29 2009 UTC

8 8
9 $NODE # contains this node's noderef 9 $NODE # contains this node's noderef
10 NODE # returns this node's noderef 10 NODE # returns this node's noderef
11 NODE $port # returns the noderef of the port 11 NODE $port # returns the noderef of the port
12 12
13 $SELF # receiving/own port id in rcv callbacks
14
15 # ports are message endpoints
16
17 # sending messages
13 snd $port, type => data...; 18 snd $port, type => data...;
19 snd $port, @msg;
20 snd @msg_with_first_element_being_a_port;
14 21
15 $SELF # receiving/own port id in rcv callbacks 22 # miniports
23 my $miniport = port { my @msg = @_; 0 };
16 24
25 # full ports
26 my $port = port;
17 rcv $port, smartmatch => $cb->($port, @msg); 27 rcv $port, smartmatch => $cb->(@msg);
18
19 # examples:
20 rcv $port2, ping => sub { snd $_[0], "pong"; 0 }; 28 rcv $port, ping => sub { snd $_[0], "pong"; 0 };
21 rcv $port1, pong => sub { warn "pong received\n" }; 29 rcv $port, pong => sub { warn "pong received\n"; 0 };
22 snd $port2, ping => $port1; 30
31 # remote ports
32 my $port = spawn $node, $initfunc, @initdata;
23 33
24 # more, smarter, matches (_any_ is exported by this module) 34 # more, smarter, matches (_any_ is exported by this module)
25 rcv $port, [child_died => $pid] => sub { ... 35 rcv $port, [child_died => $pid] => sub { ...
26 rcv $port, [_any_, _any_, 3] => sub { .. $_[2] is 3 36 rcv $port, [_any_, _any_, 3] => sub { .. $_[2] is 3
37
38 # monitoring
39 mon $port, $cb->(@msg) # callback is invoked on death
40 mon $port, $otherport # kill otherport on abnormal death
41 mon $port, $otherport, @msg # send message on death
27 42
28=head1 DESCRIPTION 43=head1 DESCRIPTION
29 44
30This module (-family) implements a simple message passing framework. 45This module (-family) implements a simple message passing framework.
31 46
104 119
105our $VERSION = '0.1'; 120our $VERSION = '0.1';
106our @EXPORT = qw( 121our @EXPORT = qw(
107 NODE $NODE *SELF node_of _any_ 122 NODE $NODE *SELF node_of _any_
108 resolve_node initialise_node 123 resolve_node initialise_node
109 snd rcv mon kil reg psub 124 snd rcv mon kil reg psub spawn
110 port 125 port
111); 126);
112 127
113our $SELF; 128our $SELF;
114 129
127 142
128=item $noderef = node_of $port 143=item $noderef = node_of $port
129 144
130Extracts and returns the noderef from a portid or a noderef. 145Extracts and returns the noderef from a portid or a noderef.
131 146
147=item initialise_node $noderef, $seednode, $seednode...
148
149=item initialise_node "slave/", $master, $master...
150
151Before a node can talk to other nodes on the network it has to initialise
152itself - the minimum a node needs to know is it's own name, and optionally
153it should know the noderefs of some other nodes in the network.
154
155This function initialises a node - it must be called exactly once (or
156never) before calling other AnyEvent::MP functions.
157
158All arguments are noderefs, which can be either resolved or unresolved.
159
160There are two types of networked nodes, public nodes and slave nodes:
161
162=over 4
163
164=item public nodes
165
166For public nodes, C<$noderef> must either be a (possibly unresolved)
167noderef, in which case it will be resolved, or C<undef> (or missing), in
168which case the noderef will be guessed.
169
170Afterwards, the node will bind itself on all endpoints and try to connect
171to all additional C<$seednodes> that are specified. Seednodes are optional
172and can be used to quickly bootstrap the node into an existing network.
173
174=item slave nodes
175
176When the C<$noderef> is the special string C<slave/>, then the node will
177become a slave node. Slave nodes cannot be contacted from outside and will
178route most of their traffic to the master node that they attach to.
179
180At least one additional noderef is required: The node will try to connect
181to all of them and will become a slave attached to the first node it can
182successfully connect to.
183
184=back
185
186This function will block until all nodes have been resolved and, for slave
187nodes, until it has successfully established a connection to a master
188server.
189
190Example: become a public node listening on the default node.
191
192 initialise_node;
193
194Example: become a public node, and try to contact some well-known master
195servers to become part of the network.
196
197 initialise_node undef, "master1", "master2";
198
199Example: become a public node listening on port C<4041>.
200
201 initialise_node 4041;
202
203Example: become a public node, only visible on localhost port 4044.
204
205 initialise_node "locahost:4044";
206
207Example: become a slave node to any of the specified master servers.
208
209 initialise_node "slave/", "master1", "192.168.13.17", "mp.example.net";
210
132=item $cv = resolve_node $noderef 211=item $cv = resolve_node $noderef
133 212
134Takes an unresolved node reference that may contain hostnames and 213Takes an unresolved node reference that may contain hostnames and
135abbreviated IDs, resolves all of them and returns a resolved node 214abbreviated IDs, resolves all of them and returns a resolved node
136reference. 215reference.
233 $port 312 $port
234} 313}
235 314
236=item reg $port, $name 315=item reg $port, $name
237 316
238Registers the given port under the name C<$name>. If the name already 317=item reg $name
239exists it is replaced. 318
319Registers the given port (or C<$SELF><<< if missing) under the name
320C<$name>. If the name already exists it is replaced.
240 321
241A port can only be registered under one well known name. 322A port can only be registered under one well known name.
242 323
243A port automatically becomes unregistered when it is killed. 324A port automatically becomes unregistered when it is killed.
244 325
245=cut 326=cut
246 327
247sub reg(@) { 328sub reg(@) {
248 my ($port, $name) = @_; 329 my $port = @_ > 1 ? shift : $SELF || Carp::croak 'reg: called with one argument only, but $SELF not set,';
249 330
250 $REG{$name} = $port; 331 $REG{$_[0]} = $port;
251} 332}
252 333
253=item rcv $port, $callback->(@msg) 334=item rcv $port, $callback->(@msg)
254 335
255Replaces the callback on the specified miniport (after converting it to 336Replaces the callback on the specified miniport (after converting it to
260=item rcv $port, $smartmatch => $callback->(@msg), ... 341=item rcv $port, $smartmatch => $callback->(@msg), ...
261 342
262=item rcv $port, [$smartmatch...] => $callback->(@msg), ... 343=item rcv $port, [$smartmatch...] => $callback->(@msg), ...
263 344
264Register callbacks to be called on matching messages on the given full 345Register callbacks to be called on matching messages on the given full
265port (after converting it to one if required). 346port (after converting it to one if required) and return the port.
266 347
267The callback has to return a true value when its work is done, after 348The callback has to return a true value when its work is done, after
268which is will be removed, or a false value in which case it will stay 349which is will be removed, or a false value in which case it will stay
269registered. 350registered.
270 351
271The global C<$SELF> (exported by this module) contains C<$port> while 352The global C<$SELF> (exported by this module) contains C<$port> while
272executing the callback. 353executing the callback.
273 354
274Runtime errors wdurign callback execution will result in the port being 355Runtime errors during callback execution will result in the port being
275C<kil>ed. 356C<kil>ed.
276 357
277If the match is an array reference, then it will be matched against the 358If the match is an array reference, then it will be matched against the
278first elements of the message, otherwise only the first element is being 359first elements of the message, otherwise only the first element is being
279matched. 360matched.
282exported by this module) matches any single element of the message. 363exported by this module) matches any single element of the message.
283 364
284While not required, it is highly recommended that the first matching 365While not required, it is highly recommended that the first matching
285element is a string identifying the message. The one-string-only match is 366element is a string identifying the message. The one-string-only match is
286also the most efficient match (by far). 367also the most efficient match (by far).
368
369Example: create a port and bind receivers on it in one go.
370
371 my $port = rcv port,
372 msg1 => sub { ...; 0 },
373 msg2 => sub { ...; 0 },
374 ;
375
376Example: create a port, bind receivers and send it in a message elsewhere
377in one go:
378
379 snd $otherport, reply =>
380 rcv port,
381 msg1 => sub { ...; 0 },
382 ...
383 ;
287 384
288=cut 385=cut
289 386
290sub rcv($@) { 387sub rcv($@) {
291 my $port = shift; 388 my $port = shift;
398 } 495 }
399} 496}
400 497
401=item $guard = mon $port, $cb->(@reason) 498=item $guard = mon $port, $cb->(@reason)
402 499
403=item $guard = mon $port, $otherport 500=item $guard = mon $port, $rcvport
404 501
502=item $guard = mon $port
503
405=item $guard = mon $port, $otherport, @msg 504=item $guard = mon $port, $rcvport, @msg
406 505
407Monitor the given port and do something when the port is killed. 506Monitor the given port and do something when the port is killed, and
507optionally return a guard that can be used to stop monitoring again.
408 508
409In the first form, the callback is simply called with any number 509In the first form (callback), the callback is simply called with any
410of C<@reason> elements (no @reason means that the port was deleted 510number of C<@reason> elements (no @reason means that the port was deleted
411"normally"). Note also that I<< the callback B<must> never die >>, so use 511"normally"). Note also that I<< the callback B<must> never die >>, so use
412C<eval> if unsure. 512C<eval> if unsure.
413 513
414In the second form, the other port will be C<kil>'ed with C<@reason>, iff 514In the second form (another port given), the other port (C<$rcvport)
415a @reason was specified, i.e. on "normal" kils nothing happens, while 515will be C<kil>'ed with C<@reason>, iff a @reason was specified, i.e. on
416under all other conditions, the other port is killed with the same reason. 516"normal" kils nothing happens, while under all other conditions, the other
517port is killed with the same reason.
417 518
519The third form (kill self) is the same as the second form, except that
520C<$rvport> defaults to C<$SELF>.
521
418In the last form, a message of the form C<@msg, @reason> will be C<snd>. 522In the last form (message), a message of the form C<@msg, @reason> will be
523C<snd>.
524
525As a rule of thumb, monitoring requests should always monitor a port from
526a local port (or callback). The reason is that kill messages might get
527lost, just like any other message. Another less obvious reason is that
528even monitoring requests can get lost (for exmaple, when the connection
529to the other node goes down permanently). When monitoring a port locally
530these problems do not exist.
419 531
420Example: call a given callback when C<$port> is killed. 532Example: call a given callback when C<$port> is killed.
421 533
422 mon $port, sub { warn "port died because of <@_>\n" }; 534 mon $port, sub { warn "port died because of <@_>\n" };
423 535
424Example: kill ourselves when C<$port> is killed abnormally. 536Example: kill ourselves when C<$port> is killed abnormally.
425 537
426 mon $port, $self; 538 mon $port;
427 539
428Example: send us a restart message another C<$port> is killed. 540Example: send us a restart message when another C<$port> is killed.
429 541
430 mon $port, $self => "restart"; 542 mon $port, $self => "restart";
431 543
432=cut 544=cut
433 545
434sub mon { 546sub mon {
435 my ($noderef, $port) = split /#/, shift, 2; 547 my ($noderef, $port) = split /#/, shift, 2;
436 548
437 my $node = $NODE{$noderef} || add_node $noderef; 549 my $node = $NODE{$noderef} || add_node $noderef;
438 550
439 my $cb = shift; 551 my $cb = @_ ? shift : $SELF || Carp::croak 'mon: called with one argument only, but $SELF not set,';
440 552
441 unless (ref $cb) { 553 unless (ref $cb) {
442 if (@_) { 554 if (@_) {
443 # send a kill info message 555 # send a kill info message
444 my (@msg) = ($cb, @_); 556 my (@msg) = ($cb, @_);
475=cut 587=cut
476 588
477sub mon_guard { 589sub mon_guard {
478 my ($port, @refs) = @_; 590 my ($port, @refs) = @_;
479 591
592 #TODO: mon-less form?
593
480 mon $port, sub { 0 && @refs } 594 mon $port, sub { 0 && @refs }
481} 595}
482 596
483=item lnk $port1, $port2
484
485Link two ports. This is simply a shorthand for:
486
487 mon $port1, $port2;
488 mon $port2, $port1;
489
490It means that if either one is killed abnormally, the other one gets
491killed as well.
492
493=item kil $port[, @reason] 597=item kil $port[, @reason]
494 598
495Kill the specified port with the given C<@reason>. 599Kill the specified port with the given C<@reason>.
496 600
497If no C<@reason> is specified, then the port is killed "normally" (linked 601If no C<@reason> is specified, then the port is killed "normally" (linked
504will be reported as reason C<< die => $@ >>. 608will be reported as reason C<< die => $@ >>.
505 609
506Transport/communication errors are reported as C<< transport_error => 610Transport/communication errors are reported as C<< transport_error =>
507$message >>. 611$message >>.
508 612
509=back
510
511=head1 FUNCTIONS FOR NODES
512
513=over 4
514
515=item initialise_node $noderef, $seednode, $seednode...
516
517=item initialise_node "slave/", $master, $master...
518
519Initialises a node - must be called exactly once before calling other
520AnyEvent::MP functions when talking to other nodes is required.
521
522All arguments are noderefs, which can be either resolved or unresolved.
523
524There are two types of networked nodes, public nodes and slave nodes:
525
526=over 4
527
528=item public nodes
529
530For public nodes, C<$noderef> must either be a (possibly unresolved)
531noderef, in which case it will be resolved, or C<undef> (or missing), in
532which case the noderef will be guessed.
533
534Afterwards, the node will bind itself on all endpoints and try to connect
535to all additional C<$seednodes> that are specified. Seednodes are optional
536and can be used to quickly bootstrap the node into an existing network.
537
538=item slave nodes
539
540When the C<$noderef> is the special string C<slave/>, then the node will
541become a slave node. Slave nodes cannot be contacted from outside and will
542route most of their traffic to the master node that they attach to.
543
544At least one additional noderef is required: The node will try to connect
545to all of them and will become a slave attached to the first node it can
546successfully connect to.
547
548=back
549
550This function will block until all nodes have been resolved and, for slave
551nodes, until it has successfully established a connection to a master
552server.
553
554Example: become a public node listening on the default node.
555
556 initialise_node;
557
558Example: become a public node, and try to contact some well-known master
559servers to become part of the network.
560
561 initialise_node undef, "master1", "master2";
562
563Example: become a public node listening on port C<4041>.
564
565 initialise_node 4041;
566
567Example: become a public node, only visible on localhost port 4044.
568
569 initialise_node "locahost:4044";
570
571Example: become a slave node to any of the specified master servers.
572
573 initialise_node "slave/", "master1", "192.168.13.17", "mp.example.net";
574
575=cut 613=cut
614
615=item $port = spawn $node, $initfunc[, @initdata]
616
617Creates a port on the node C<$node> (which can also be a port ID, in which
618case it's the node where that port resides).
619
620The port ID of the newly created port is return immediately, and it is
621permissible to immediately start sending messages or monitor the port.
622
623After the port has been created, the init function is
624called. This function must be a fully-qualified function name
625(e.g. C<MyApp::Chat::Server::init>). To specify a function in the main
626program, use C<::name>.
627
628If the function doesn't exist, then the node tries to C<require>
629the package, then the package above the package and so on (e.g.
630C<MyApp::Chat::Server>, C<MyApp::Chat>, C<MyApp>) until the function
631exists or it runs out of package names.
632
633The init function is then called with the newly-created port as context
634object (C<$SELF>) and the C<@initdata> values as arguments.
635
636A common idiom is to pass your own port, monitor the spawned port, and
637in the init function, monitor the original port. This two-way monitoring
638ensures that both ports get cleaned up when there is a problem.
639
640Example: spawn a chat server port on C<$othernode>.
641
642 # this node, executed from within a port context:
643 my $server = spawn $othernode, "MyApp::Chat::Server::connect", $SELF;
644 mon $server;
645
646 # init function on C<$othernode>
647 sub connect {
648 my ($srcport) = @_;
649
650 mon $srcport;
651
652 rcv $SELF, sub {
653 ...
654 };
655 }
656
657=cut
658
659sub _spawn {
660 my $port = shift;
661 my $init = shift;
662
663 local $SELF = "$NODE#$port";
664 eval {
665 &{ load_func $init }
666 };
667 _self_die if $@;
668}
669
670sub spawn(@) {
671 my ($noderef, undef) = split /#/, shift, 2;
672
673 my $id = "$RUNIQ." . $ID++;
674
675 $_[0] =~ /::/
676 or Carp::croak "spawn init function must be a fully-qualified name, caught";
677
678 ($NODE{$noderef} || add_node $noderef)
679 ->send (["", "AnyEvent::MP::_spawn" => $id, @_]);
680
681 "$noderef#$id"
682}
576 683
577=back 684=back
578 685
579=head1 NODE MESSAGES 686=head1 NODE MESSAGES
580 687
622 729
623=back 730=back
624 731
625=head1 AnyEvent::MP vs. Distributed Erlang 732=head1 AnyEvent::MP vs. Distributed Erlang
626 733
627AnyEvent::MP got lots of its ideas from distributed erlang (erlang node 734AnyEvent::MP got lots of its ideas from distributed Erlang (Erlang node
628== aemp node, erlang process == aemp port), so many of the documents and 735== aemp node, Erlang process == aemp port), so many of the documents and
629programming techniques employed by erlang apply to AnyEvent::MP. Here is a 736programming techniques employed by Erlang apply to AnyEvent::MP. Here is a
630sample: 737sample:
631 738
632 http://www.erlang.se/doc/programming_rules.shtml 739 http://www.Erlang.se/doc/programming_rules.shtml
633 http://erlang.org/doc/getting_started/part_frame.html # chapters 3 and 4 740 http://Erlang.org/doc/getting_started/part_frame.html # chapters 3 and 4
634 http://erlang.org/download/erlang-book-part1.pdf # chapters 5 and 6 741 http://Erlang.org/download/Erlang-book-part1.pdf # chapters 5 and 6
635 http://erlang.org/download/armstrong_thesis_2003.pdf # chapters 4 and 5 742 http://Erlang.org/download/armstrong_thesis_2003.pdf # chapters 4 and 5
636 743
637Despite the similarities, there are also some important differences: 744Despite the similarities, there are also some important differences:
638 745
639=over 4 746=over 4
640 747
651 758
652Erlang uses processes that selctively receive messages, and therefore 759Erlang uses processes that selctively receive messages, and therefore
653needs a queue. AEMP is event based, queuing messages would serve no useful 760needs a queue. AEMP is event based, queuing messages would serve no useful
654purpose. 761purpose.
655 762
656(But see L<Coro::MP> for a more erlang-like process model on top of AEMP). 763(But see L<Coro::MP> for a more Erlang-like process model on top of AEMP).
657 764
658=item * Erlang sends are synchronous, AEMP sends are asynchronous. 765=item * Erlang sends are synchronous, AEMP sends are asynchronous.
659 766
660Sending messages in erlang is synchronous and blocks the process. AEMP 767Sending messages in Erlang is synchronous and blocks the process. AEMP
661sends are immediate, connection establishment is handled in the 768sends are immediate, connection establishment is handled in the
662background. 769background.
663 770
664=item * Erlang can silently lose messages, AEMP cannot. 771=item * Erlang can silently lose messages, AEMP cannot.
665 772
668and c, and the other side only receives messages a and c). 775and c, and the other side only receives messages a and c).
669 776
670AEMP guarantees correct ordering, and the guarantee that there are no 777AEMP guarantees correct ordering, and the guarantee that there are no
671holes in the message sequence. 778holes in the message sequence.
672 779
673=item * In erlang, processes can be declared dead and later be found to be 780=item * In Erlang, processes can be declared dead and later be found to be
674alive. 781alive.
675 782
676In erlang it can happen that a monitored process is declared dead and 783In Erlang it can happen that a monitored process is declared dead and
677linked processes get killed, but later it turns out that the process is 784linked processes get killed, but later it turns out that the process is
678still alive - and can receive messages. 785still alive - and can receive messages.
679 786
680In AEMP, when port monitoring detects a port as dead, then that port will 787In AEMP, when port monitoring detects a port as dead, then that port will
681eventually be killed - it cannot happen that a node detects a port as dead 788eventually be killed - it cannot happen that a node detects a port as dead
682and then later sends messages to it, finding it is still alive. 789and then later sends messages to it, finding it is still alive.
683 790
684=item * Erlang can send messages to the wrong port, AEMP does not. 791=item * Erlang can send messages to the wrong port, AEMP does not.
685 792
686In erlang it is quite possible that a node that restarts reuses a process 793In Erlang it is quite possible that a node that restarts reuses a process
687ID known to other nodes for a completely different process, causing 794ID known to other nodes for a completely different process, causing
688messages destined for that process to end up in an unrelated process. 795messages destined for that process to end up in an unrelated process.
689 796
690AEMP never reuses port IDs, so old messages or old port IDs floating 797AEMP never reuses port IDs, so old messages or old port IDs floating
691around in the network will not be sent to an unrelated port. 798around in the network will not be sent to an unrelated port.
697securely authenticate nodes. 804securely authenticate nodes.
698 805
699=item * The AEMP protocol is optimised for both text-based and binary 806=item * The AEMP protocol is optimised for both text-based and binary
700communications. 807communications.
701 808
702The AEMP protocol, unlike the erlang protocol, supports both 809The AEMP protocol, unlike the Erlang protocol, supports both
703language-independent text-only protocols (good for debugging) and binary, 810language-independent text-only protocols (good for debugging) and binary,
704language-specific serialisers (e.g. Storable). 811language-specific serialisers (e.g. Storable).
705 812
706It has also been carefully designed to be implementable in other languages 813It has also been carefully designed to be implementable in other languages
707with a minimum of work while gracefully degrading fucntionality to make the 814with a minimum of work while gracefully degrading fucntionality to make the
708protocol simple. 815protocol simple.
709 816
817=item * AEMP has more flexible monitoring options than Erlang.
818
819In Erlang, you can chose to receive I<all> exit signals as messages
820or I<none>, there is no in-between, so monitoring single processes is
821difficult to implement. Monitoring in AEMP is more flexible than in
822Erlang, as one can choose between automatic kill, exit message or callback
823on a per-process basis.
824
825=item * Erlang tries to hide remote/local connections, AEMP does not.
826
827Monitoring in Erlang is not an indicator of process death/crashes,
828as linking is (except linking is unreliable in Erlang).
829
830In AEMP, you don't "look up" registered port names or send to named ports
831that might or might not be persistent. Instead, you normally spawn a port
832on the remote node. The init function monitors the you, and you monitor
833the remote port. Since both monitors are local to the node, they are much
834more reliable.
835
836This also saves round-trips and avoids sending messages to the wrong port
837(hard to do in Erlang).
838
710=back 839=back
711 840
712=head1 SEE ALSO 841=head1 SEE ALSO
713 842
714L<AnyEvent>. 843L<AnyEvent>.

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines