ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
(Generate patch)

Comparing AnyEvent-MP/MP.pm (file contents):
Revision 1.35 by root, Thu Aug 6 10:21:48 2009 UTC vs.
Revision 1.48 by root, Thu Aug 13 02:59:42 2009 UTC

8 8
9 $NODE # contains this node's noderef 9 $NODE # contains this node's noderef
10 NODE # returns this node's noderef 10 NODE # returns this node's noderef
11 NODE $port # returns the noderef of the port 11 NODE $port # returns the noderef of the port
12 12
13 $SELF # receiving/own port id in rcv callbacks
14
15 # initialise the node so it can send/receive messages
16 initialise_node; # -OR-
17 initialise_node "localhost:4040"; # -OR-
18 initialise_node "slave/", "localhost:4040"
19
20 # ports are message endpoints
21
22 # sending messages
13 snd $port, type => data...; 23 snd $port, type => data...;
24 snd $port, @msg;
25 snd @msg_with_first_element_being_a_port;
14 26
15 $SELF # receiving/own port id in rcv callbacks 27 # creating/using miniports
28 my $miniport = port { my @msg = @_; 0 };
16 29
30 # creating/using full ports
31 my $port = port;
17 rcv $port, smartmatch => $cb->($port, @msg); 32 rcv $port, smartmatch => $cb->(@msg);
18
19 # examples:
20 rcv $port2, ping => sub { snd $_[0], "pong"; 0 }; 33 rcv $port, ping => sub { snd $_[0], "pong"; 0 };
21 rcv $port1, pong => sub { warn "pong received\n" }; 34 rcv $port, pong => sub { warn "pong received\n"; 0 };
22 snd $port2, ping => $port1;
23 35
24 # more, smarter, matches (_any_ is exported by this module) 36 # more, smarter, matches (_any_ is exported by this module)
25 rcv $port, [child_died => $pid] => sub { ... 37 rcv $port, [child_died => $pid] => sub { ...
26 rcv $port, [_any_, _any_, 3] => sub { .. $_[2] is 3 38 rcv $port, [_any_, _any_, 3] => sub { .. $_[2] is 3
27 39
28 # linking two ports, so they both crash together 40 # create a port on another node
29 lnk $port1, $port2; 41 my $port = spawn $node, $initfunc, @initdata;
30 42
31 # monitoring 43 # monitoring
32 mon $port, $cb->(@msg) # callback is invoked on death 44 mon $port, $cb->(@msg) # callback is invoked on death
33 mon $port, $otherport # kill otherport on abnormal death 45 mon $port, $otherport # kill otherport on abnormal death
34 mon $port, $otherport, @msg # send message on death 46 mon $port, $otherport, @msg # send message on death
35 47
48=head1 CURRENT STATUS
49
50 AnyEvent::MP - stable API, should work
51 AnyEvent::MP::Intro - outdated
52 AnyEvent::MP::Kernel - WIP
53 AnyEvent::MP::Transport - mostly stable
54
55 stay tuned.
56
36=head1 DESCRIPTION 57=head1 DESCRIPTION
37 58
38This module (-family) implements a simple message passing framework. 59This module (-family) implements a simple message passing framework.
39 60
40Despite its simplicity, you can securely message other processes running 61Despite its simplicity, you can securely message other processes running
43For an introduction to this module family, see the L<AnyEvent::MP::Intro> 64For an introduction to this module family, see the L<AnyEvent::MP::Intro>
44manual page. 65manual page.
45 66
46At the moment, this module family is severly broken and underdocumented, 67At the moment, this module family is severly broken and underdocumented,
47so do not use. This was uploaded mainly to reserve the CPAN namespace - 68so do not use. This was uploaded mainly to reserve the CPAN namespace -
48stay tuned! The basic API should be finished, however. 69stay tuned!
49 70
50=head1 CONCEPTS 71=head1 CONCEPTS
51 72
52=over 4 73=over 4
53 74
98 119
99=cut 120=cut
100 121
101package AnyEvent::MP; 122package AnyEvent::MP;
102 123
103use AnyEvent::MP::Base; 124use AnyEvent::MP::Kernel;
104 125
105use common::sense; 126use common::sense;
106 127
107use Carp (); 128use Carp ();
108 129
109use AE (); 130use AE ();
110 131
111use base "Exporter"; 132use base "Exporter";
112 133
113our $VERSION = '0.1'; 134our $VERSION = $AnyEvent::MP::Kernel::VERSION;
135
114our @EXPORT = qw( 136our @EXPORT = qw(
115 NODE $NODE *SELF node_of _any_ 137 NODE $NODE *SELF node_of _any_
116 resolve_node initialise_node 138 resolve_node initialise_node
117 snd rcv mon kil reg psub 139 snd rcv mon kil reg psub spawn
118 port 140 port
119); 141);
120 142
121our $SELF; 143our $SELF;
122 144
305 $port 327 $port
306} 328}
307 329
308=item reg $port, $name 330=item reg $port, $name
309 331
310Registers the given port under the name C<$name>. If the name already 332=item reg $name
311exists it is replaced. 333
334Registers the given port (or C<$SELF><<< if missing) under the name
335C<$name>. If the name already exists it is replaced.
312 336
313A port can only be registered under one well known name. 337A port can only be registered under one well known name.
314 338
315A port automatically becomes unregistered when it is killed. 339A port automatically becomes unregistered when it is killed.
316 340
317=cut 341=cut
318 342
319sub reg(@) { 343sub reg(@) {
320 my ($port, $name) = @_; 344 my $port = @_ > 1 ? shift : $SELF || Carp::croak 'reg: called with one argument only, but $SELF not set,';
321 345
322 $REG{$name} = $port; 346 $REG{$_[0]} = $port;
323} 347}
324 348
325=item rcv $port, $callback->(@msg) 349=item rcv $port, $callback->(@msg)
326 350
327Replaces the callback on the specified miniport (after converting it to 351Replaces the callback on the specified miniport (after converting it to
332=item rcv $port, $smartmatch => $callback->(@msg), ... 356=item rcv $port, $smartmatch => $callback->(@msg), ...
333 357
334=item rcv $port, [$smartmatch...] => $callback->(@msg), ... 358=item rcv $port, [$smartmatch...] => $callback->(@msg), ...
335 359
336Register callbacks to be called on matching messages on the given full 360Register callbacks to be called on matching messages on the given full
337port (after converting it to one if required). 361port (after converting it to one if required) and return the port.
338 362
339The callback has to return a true value when its work is done, after 363The callback has to return a true value when its work is done, after
340which is will be removed, or a false value in which case it will stay 364which is will be removed, or a false value in which case it will stay
341registered. 365registered.
342 366
343The global C<$SELF> (exported by this module) contains C<$port> while 367The global C<$SELF> (exported by this module) contains C<$port> while
344executing the callback. 368executing the callback.
345 369
346Runtime errors wdurign callback execution will result in the port being 370Runtime errors during callback execution will result in the port being
347C<kil>ed. 371C<kil>ed.
348 372
349If the match is an array reference, then it will be matched against the 373If the match is an array reference, then it will be matched against the
350first elements of the message, otherwise only the first element is being 374first elements of the message, otherwise only the first element is being
351matched. 375matched.
354exported by this module) matches any single element of the message. 378exported by this module) matches any single element of the message.
355 379
356While not required, it is highly recommended that the first matching 380While not required, it is highly recommended that the first matching
357element is a string identifying the message. The one-string-only match is 381element is a string identifying the message. The one-string-only match is
358also the most efficient match (by far). 382also the most efficient match (by far).
383
384Example: create a port and bind receivers on it in one go.
385
386 my $port = rcv port,
387 msg1 => sub { ...; 0 },
388 msg2 => sub { ...; 0 },
389 ;
390
391Example: create a port, bind receivers and send it in a message elsewhere
392in one go:
393
394 snd $otherport, reply =>
395 rcv port,
396 msg1 => sub { ...; 0 },
397 ...
398 ;
359 399
360=cut 400=cut
361 401
362sub rcv($@) { 402sub rcv($@) {
363 my $port = shift; 403 my $port = shift;
470 } 510 }
471} 511}
472 512
473=item $guard = mon $port, $cb->(@reason) 513=item $guard = mon $port, $cb->(@reason)
474 514
475=item $guard = mon $port, $otherport 515=item $guard = mon $port, $rcvport
476 516
517=item $guard = mon $port
518
477=item $guard = mon $port, $otherport, @msg 519=item $guard = mon $port, $rcvport, @msg
478 520
479Monitor the given port and do something when the port is killed. 521Monitor the given port and do something when the port is killed or
522messages to it were lost, and optionally return a guard that can be used
523to stop monitoring again.
480 524
525C<mon> effectively guarantees that, in the absence of hardware failures,
526that after starting the monitor, either all messages sent to the port
527will arrive, or the monitoring action will be invoked after possible
528message loss has been detected. No messages will be lost "in between"
529(after the first lost message no further messages will be received by the
530port). After the monitoring action was invoked, further messages might get
531delivered again.
532
481In the first form, the callback is simply called with any number 533In the first form (callback), the callback is simply called with any
482of C<@reason> elements (no @reason means that the port was deleted 534number of C<@reason> elements (no @reason means that the port was deleted
483"normally"). Note also that I<< the callback B<must> never die >>, so use 535"normally"). Note also that I<< the callback B<must> never die >>, so use
484C<eval> if unsure. 536C<eval> if unsure.
485 537
486In the second form, the other port will be C<kil>'ed with C<@reason>, iff 538In the second form (another port given), the other port (C<$rcvport>)
487a @reason was specified, i.e. on "normal" kils nothing happens, while 539will be C<kil>'ed with C<@reason>, iff a @reason was specified, i.e. on
488under all other conditions, the other port is killed with the same reason. 540"normal" kils nothing happens, while under all other conditions, the other
541port is killed with the same reason.
489 542
543The third form (kill self) is the same as the second form, except that
544C<$rvport> defaults to C<$SELF>.
545
490In the last form, a message of the form C<@msg, @reason> will be C<snd>. 546In the last form (message), a message of the form C<@msg, @reason> will be
547C<snd>.
548
549As a rule of thumb, monitoring requests should always monitor a port from
550a local port (or callback). The reason is that kill messages might get
551lost, just like any other message. Another less obvious reason is that
552even monitoring requests can get lost (for exmaple, when the connection
553to the other node goes down permanently). When monitoring a port locally
554these problems do not exist.
491 555
492Example: call a given callback when C<$port> is killed. 556Example: call a given callback when C<$port> is killed.
493 557
494 mon $port, sub { warn "port died because of <@_>\n" }; 558 mon $port, sub { warn "port died because of <@_>\n" };
495 559
496Example: kill ourselves when C<$port> is killed abnormally. 560Example: kill ourselves when C<$port> is killed abnormally.
497 561
498 mon $port, $self; 562 mon $port;
499 563
500Example: send us a restart message another C<$port> is killed. 564Example: send us a restart message when another C<$port> is killed.
501 565
502 mon $port, $self => "restart"; 566 mon $port, $self => "restart";
503 567
504=cut 568=cut
505 569
506sub mon { 570sub mon {
507 my ($noderef, $port) = split /#/, shift, 2; 571 my ($noderef, $port) = split /#/, shift, 2;
508 572
509 my $node = $NODE{$noderef} || add_node $noderef; 573 my $node = $NODE{$noderef} || add_node $noderef;
510 574
511 my $cb = shift; 575 my $cb = @_ ? shift : $SELF || Carp::croak 'mon: called with one argument only, but $SELF not set,';
512 576
513 unless (ref $cb) { 577 unless (ref $cb) {
514 if (@_) { 578 if (@_) {
515 # send a kill info message 579 # send a kill info message
516 my (@msg) = ($cb, @_); 580 my (@msg) = ($cb, @_);
547=cut 611=cut
548 612
549sub mon_guard { 613sub mon_guard {
550 my ($port, @refs) = @_; 614 my ($port, @refs) = @_;
551 615
616 #TODO: mon-less form?
617
552 mon $port, sub { 0 && @refs } 618 mon $port, sub { 0 && @refs }
553} 619}
554 620
555=item lnk $port1, $port2
556
557Link two ports. This is simply a shorthand for:
558
559 mon $port1, $port2;
560 mon $port2, $port1;
561
562It means that if either one is killed abnormally, the other one gets
563killed as well.
564
565=item kil $port[, @reason] 621=item kil $port[, @reason]
566 622
567Kill the specified port with the given C<@reason>. 623Kill the specified port with the given C<@reason>.
568 624
569If no C<@reason> is specified, then the port is killed "normally" (linked 625If no C<@reason> is specified, then the port is killed "normally" (linked
575Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks 631Runtime errors while evaluating C<rcv> callbacks or inside C<psub> blocks
576will be reported as reason C<< die => $@ >>. 632will be reported as reason C<< die => $@ >>.
577 633
578Transport/communication errors are reported as C<< transport_error => 634Transport/communication errors are reported as C<< transport_error =>
579$message >>. 635$message >>.
636
637=cut
638
639=item $port = spawn $node, $initfunc[, @initdata]
640
641Creates a port on the node C<$node> (which can also be a port ID, in which
642case it's the node where that port resides).
643
644The port ID of the newly created port is return immediately, and it is
645permissible to immediately start sending messages or monitor the port.
646
647After the port has been created, the init function is
648called. This function must be a fully-qualified function name
649(e.g. C<MyApp::Chat::Server::init>). To specify a function in the main
650program, use C<::name>.
651
652If the function doesn't exist, then the node tries to C<require>
653the package, then the package above the package and so on (e.g.
654C<MyApp::Chat::Server>, C<MyApp::Chat>, C<MyApp>) until the function
655exists or it runs out of package names.
656
657The init function is then called with the newly-created port as context
658object (C<$SELF>) and the C<@initdata> values as arguments.
659
660A common idiom is to pass your own port, monitor the spawned port, and
661in the init function, monitor the original port. This two-way monitoring
662ensures that both ports get cleaned up when there is a problem.
663
664Example: spawn a chat server port on C<$othernode>.
665
666 # this node, executed from within a port context:
667 my $server = spawn $othernode, "MyApp::Chat::Server::connect", $SELF;
668 mon $server;
669
670 # init function on C<$othernode>
671 sub connect {
672 my ($srcport) = @_;
673
674 mon $srcport;
675
676 rcv $SELF, sub {
677 ...
678 };
679 }
680
681=cut
682
683sub _spawn {
684 my $port = shift;
685 my $init = shift;
686
687 local $SELF = "$NODE#$port";
688 eval {
689 &{ load_func $init }
690 };
691 _self_die if $@;
692}
693
694sub spawn(@) {
695 my ($noderef, undef) = split /#/, shift, 2;
696
697 my $id = "$RUNIQ." . $ID++;
698
699 $_[0] =~ /::/
700 or Carp::croak "spawn init function must be a fully-qualified name, caught";
701
702 ($NODE{$noderef} || add_node $noderef)
703 ->send (["", "AnyEvent::MP::_spawn" => $id, @_]);
704
705 "$noderef#$id"
706}
580 707
581=back 708=back
582 709
583=head1 NODE MESSAGES 710=head1 NODE MESSAGES
584 711
717or I<none>, there is no in-between, so monitoring single processes is 844or I<none>, there is no in-between, so monitoring single processes is
718difficult to implement. Monitoring in AEMP is more flexible than in 845difficult to implement. Monitoring in AEMP is more flexible than in
719Erlang, as one can choose between automatic kill, exit message or callback 846Erlang, as one can choose between automatic kill, exit message or callback
720on a per-process basis. 847on a per-process basis.
721 848
722=item * Erlang has different semantics for monitoring and linking, AEMP has the same. 849=item * Erlang tries to hide remote/local connections, AEMP does not.
723 850
724Monitoring in Erlang is not an indicator of process death/crashes, 851Monitoring in Erlang is not an indicator of process death/crashes,
725as linking is (except linking is unreliable in Erlang). In AEMP, the 852as linking is (except linking is unreliable in Erlang).
726semantics of monitoring and linking are identical, linking is simply 853
727two-way monitoring with automatic kill. 854In AEMP, you don't "look up" registered port names or send to named ports
855that might or might not be persistent. Instead, you normally spawn a port
856on the remote node. The init function monitors the you, and you monitor
857the remote port. Since both monitors are local to the node, they are much
858more reliable.
859
860This also saves round-trips and avoids sending messages to the wrong port
861(hard to do in Erlang).
862
863=back
864
865=head1 RATIONALE
866
867=over 4
868
869=item Why strings for ports and noderefs, why not objects?
870
871We considered "objects", but found that the actual number of methods
872thatc an be called are very low. Since port IDs and noderefs travel over
873the network frequently, the serialising/deserialising would add lots of
874overhead, as well as having to keep a proxy object.
875
876Strings can easily be printed, easily serialised etc. and need no special
877procedures to be "valid".
878
879And a a miniport consists of a single closure stored in a global hash - it
880can't become much cheaper.
881
882=item Why favour JSON, why not real serialising format such as Storable?
883
884In fact, any AnyEvent::MP node will happily accept Storable as framing
885format, but currently there is no way to make a node use Storable by
886default.
887
888The default framing protocol is JSON because a) JSON::XS is many times
889faster for small messages and b) most importantly, after years of
890experience we found that object serialisation is causing more problems
891than it gains: Just like function calls, objects simply do not travel
892easily over the network, mostly because they will always be a copy, so you
893always have to re-think your design.
894
895Keeping your messages simple, concentrating on data structures rather than
896objects, will keep your messages clean, tidy and efficient.
728 897
729=back 898=back
730 899
731=head1 SEE ALSO 900=head1 SEE ALSO
732 901

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines