ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
(Generate patch)

Comparing AnyEvent-MP/MP.pm (file contents):
Revision 1.49 by root, Thu Aug 13 15:29:58 2009 UTC vs.
Revision 1.58 by root, Sun Aug 16 02:55:16 2009 UTC

22 # sending messages 22 # sending messages
23 snd $port, type => data...; 23 snd $port, type => data...;
24 snd $port, @msg; 24 snd $port, @msg;
25 snd @msg_with_first_element_being_a_port; 25 snd @msg_with_first_element_being_a_port;
26 26
27 # creating/using miniports 27 # creating/using ports, the simple way
28 my $miniport = port { my @msg = @_; 0 }; 28 my $simple_port = port { my @msg = @_; 0 };
29 29
30 # creating/using full ports 30 # creating/using ports, tagged message matching
31 my $port = port; 31 my $port = port;
32 rcv $port, smartmatch => $cb->(@msg);
33 rcv $port, ping => sub { snd $_[0], "pong"; 0 }; 32 rcv $port, ping => sub { snd $_[0], "pong"; 0 };
34 rcv $port, pong => sub { warn "pong received\n"; 0 }; 33 rcv $port, pong => sub { warn "pong received\n"; 0 };
35
36 # more, smarter, matches (_any_ is exported by this module)
37 rcv $port, [child_died => $pid] => sub { ...
38 rcv $port, [_any_, _any_, 3] => sub { .. $_[2] is 3
39 34
40 # create a port on another node 35 # create a port on another node
41 my $port = spawn $node, $initfunc, @initdata; 36 my $port = spawn $node, $initfunc, @initdata;
42 37
43 # monitoring 38 # monitoring
74 69
75=item port 70=item port
76 71
77A port is something you can send messages to (with the C<snd> function). 72A port is something you can send messages to (with the C<snd> function).
78 73
79Some ports allow you to register C<rcv> handlers that can match specific 74Ports allow you to register C<rcv> handlers that can match all or just
80messages. All C<rcv> handlers will receive messages they match, messages 75some messages. Messages will not be queued.
81will not be queued.
82 76
83=item port id - C<noderef#portname> 77=item port id - C<noderef#portname>
84 78
85A port id is normaly the concatenation of a noderef, a hash-mark (C<#>) as 79A port ID is the concatenation of a noderef, a hash-mark (C<#>) as
86separator, and a port name (a printable string of unspecified format). An 80separator, and a port name (a printable string of unspecified format). An
87exception is the the node port, whose ID is identical to its node 81exception is the the node port, whose ID is identical to its node
88reference. 82reference.
89 83
90=item node 84=item node
91 85
92A node is a single process containing at least one port - the node 86A node is a single process containing at least one port - the node port,
93port. You can send messages to node ports to find existing ports or to 87which provides nodes to manage each other remotely, and to create new
94create new ports, among other things. 88ports.
95 89
96Nodes are either private (single-process only), slaves (connected to a 90Nodes are either private (single-process only), slaves (connected to a
97master node only) or public nodes (connectable from unrelated nodes). 91master node only) or public nodes (connectable from unrelated nodes).
98 92
99=item noderef - C<host:port,host:port...>, C<id@noderef>, C<id> 93=item noderef - C<host:port,host:port...>, C<id@noderef>, C<id>
148 kil $SELF, die => $msg; 142 kil $SELF, die => $msg;
149} 143}
150 144
151=item $thisnode = NODE / $NODE 145=item $thisnode = NODE / $NODE
152 146
153The C<NODE> function returns, and the C<$NODE> variable contains 147The C<NODE> function returns, and the C<$NODE> variable contains the
154the noderef of the local node. The value is initialised by a call 148noderef of the local node. The value is initialised by a call to
155to C<become_public> or C<become_slave>, after which all local port 149C<initialise_node>.
156identifiers become invalid.
157 150
158=item $noderef = node_of $port 151=item $noderef = node_of $port
159 152
160Extracts and returns the noderef from a portid or a noderef. 153Extracts and returns the noderef from a port ID or a noderef.
161 154
162=item initialise_node $noderef, $seednode, $seednode... 155=item initialise_node $noderef, $seednode, $seednode...
163 156
164=item initialise_node "slave/", $master, $master... 157=item initialise_node "slave/", $master, $master...
165 158
204At least one additional noderef is required (either by specifying it 197At least one additional noderef is required (either by specifying it
205directly or because it is part of the configuration profile): The node 198directly or because it is part of the configuration profile): The node
206will try to connect to all of them and will become a slave attached to the 199will try to connect to all of them and will become a slave attached to the
207first node it can successfully connect to. 200first node it can successfully connect to.
208 201
202Note that slave nodes cannot change their name, and consequently, their
203master, so if the master goes down, the slave node will not function well
204anymore until it can re-establish conenciton to its master. This makes
205slave nodes unsuitable for long-term nodes or fault-tolerant networks.
206
209=back 207=back
210 208
211This function will block until all nodes have been resolved and, for slave 209This function will block until all nodes have been resolved and, for slave
212nodes, until it has successfully established a connection to a master 210nodes, until it has successfully established a connection to a master
213server. 211server.
214 212
213All the seednodes will also be specially marked to automatically retry
214connecting to them infinitely.
215
215Example: become a public node listening on the guessed noderef, or the one 216Example: become a public node listening on the guessed noderef, or the one
216specified via C<aemp> for the current node. This should be the most common 217specified via C<aemp> for the current node. This should be the most common
217form of invocation for "daemon"-type nodes. 218form of invocation for "daemon"-type nodes.
218 219
219 initialise_node; 220 initialise_node;
284=item snd $port, type => @data 285=item snd $port, type => @data
285 286
286=item snd $port, @msg 287=item snd $port, @msg
287 288
288Send the given message to the given port ID, which can identify either 289Send the given message to the given port ID, which can identify either
289a local or a remote port, and can be either a string or soemthignt hat 290a local or a remote port, and must be a port ID.
290stringifies a sa port ID (such as a port object :).
291 291
292While the message can be about anything, it is highly recommended to use a 292While the message can be about anything, it is highly recommended to use a
293string as first element (a portid, or some word that indicates a request 293string as first element (a port ID, or some word that indicates a request
294type etc.). 294type etc.).
295 295
296The message data effectively becomes read-only after a call to this 296The message data effectively becomes read-only after a call to this
297function: modifying any argument is not allowed and can cause many 297function: modifying any argument is not allowed and can cause many
298problems. 298problems.
303that Storable can serialise and deserialise is allowed, and for the local 303that Storable can serialise and deserialise is allowed, and for the local
304node, anything can be passed. 304node, anything can be passed.
305 305
306=item $local_port = port 306=item $local_port = port
307 307
308Create a new local port object that can be used either as a pattern 308Create a new local port object and returns its port ID. Initially it has
309matching port ("full port") or a single-callback port ("miniport"), 309no callbacks set and will throw an error when it receives messages.
310depending on how C<rcv> callbacks are bound to the object.
311 310
312=item $port = port { my @msg = @_; $finished } 311=item $local_port = port { my @msg = @_ }
313 312
314Creates a "miniport", that is, a very lightweight port without any pattern 313Creates a new local port, and returns its ID. Semantically the same as
315matching behind it, and returns its ID. Semantically the same as creating
316a port and calling C<rcv $port, $callback> on it. 314creating a port and calling C<rcv $port, $callback> on it.
317 315
318The block will be called for every message received on the port. When the 316The block will be called for every message received on the port, with the
319callback returns a true value its job is considered "done" and the port 317global variable C<$SELF> set to the port ID. Runtime errors will cause the
320will be destroyed. Otherwise it will stay alive. 318port to be C<kil>ed. The message will be passed as-is, no extra argument
319(i.e. no port ID) will be passed to the callback.
321 320
322The message will be passed as-is, no extra argument (i.e. no port id) will 321If you want to stop/destroy the port, simply C<kil> it:
323be passed to the callback.
324 322
325If you need the local port id in the callback, this works nicely: 323 my $port = port {
326 324 my @msg = @_;
327 my $port; $port = port { 325 ...
328 snd $otherport, reply => $port; 326 kil $SELF;
329 }; 327 };
330 328
331=cut 329=cut
332 330
333sub rcv($@); 331sub rcv($@);
332
333sub _kilme {
334 die "received message on port without callback";
335}
334 336
335sub port(;&) { 337sub port(;&) {
336 my $id = "$UNIQ." . $ID++; 338 my $id = "$UNIQ." . $ID++;
337 my $port = "$NODE#$id"; 339 my $port = "$NODE#$id";
338 340
339 if (@_) { 341 rcv $port, shift || \&_kilme;
340 rcv $port, shift;
341 } else {
342 $PORT{$id} = sub { }; # nop
343 }
344 342
345 $port 343 $port
346} 344}
347 345
348=item reg $port, $name
349
350=item reg $name
351
352Registers the given port (or C<$SELF><<< if missing) under the name
353C<$name>. If the name already exists it is replaced.
354
355A port can only be registered under one well known name.
356
357A port automatically becomes unregistered when it is killed.
358
359=cut
360
361sub reg(@) {
362 my $port = @_ > 1 ? shift : $SELF || Carp::croak 'reg: called with one argument only, but $SELF not set,';
363
364 $REG{$_[0]} = $port;
365}
366
367=item rcv $port, $callback->(@msg) 346=item rcv $local_port, $callback->(@msg)
368 347
369Replaces the callback on the specified miniport (after converting it to 348Replaces the default callback on the specified port. There is no way to
370one if required). 349remove the default callback: use C<sub { }> to disable it, or better
371 350C<kil> the port when it is no longer needed.
372=item rcv $port, tagstring => $callback->(@msg), ...
373
374=item rcv $port, $smartmatch => $callback->(@msg), ...
375
376=item rcv $port, [$smartmatch...] => $callback->(@msg), ...
377
378Register callbacks to be called on matching messages on the given full
379port (after converting it to one if required) and return the port.
380
381The callback has to return a true value when its work is done, after
382which is will be removed, or a false value in which case it will stay
383registered.
384 351
385The global C<$SELF> (exported by this module) contains C<$port> while 352The global C<$SELF> (exported by this module) contains C<$port> while
386executing the callback. 353executing the callback. Runtime errors during callback execution will
354result in the port being C<kil>ed.
387 355
388Runtime errors during callback execution will result in the port being 356The default callback received all messages not matched by a more specific
389C<kil>ed. 357C<tag> match.
390 358
391If the match is an array reference, then it will be matched against the 359=item rcv $local_port, tag => $callback->(@msg_without_tag), ...
392first elements of the message, otherwise only the first element is being
393matched.
394 360
395Any element in the match that is specified as C<_any_> (a function 361Register (or replace) callbacks to be called on messages starting with the
396exported by this module) matches any single element of the message. 362given tag on the given port (and return the port), or unregister it (when
363C<$callback> is C<$undef> or missing). There can only be one callback
364registered for each tag.
397 365
398While not required, it is highly recommended that the first matching 366The original message will be passed to the callback, after the first
399element is a string identifying the message. The one-string-only match is 367element (the tag) has been removed. The callback will use the same
400also the most efficient match (by far). 368environment as the default callback (see above).
401 369
402Example: create a port and bind receivers on it in one go. 370Example: create a port and bind receivers on it in one go.
403 371
404 my $port = rcv port, 372 my $port = rcv port,
405 msg1 => sub { ...; 0 }, 373 msg1 => sub { ... },
406 msg2 => sub { ...; 0 }, 374 msg2 => sub { ... },
407 ; 375 ;
408 376
409Example: create a port, bind receivers and send it in a message elsewhere 377Example: create a port, bind receivers and send it in a message elsewhere
410in one go: 378in one go:
411 379
412 snd $otherport, reply => 380 snd $otherport, reply =>
413 rcv port, 381 rcv port,
414 msg1 => sub { ...; 0 }, 382 msg1 => sub { ... },
415 ... 383 ...
416 ; 384 ;
385
386Example: temporarily register a rcv callback for a tag matching some port
387(e.g. for a rpc reply) and unregister it after a message was received.
388
389 rcv $port, $otherport => sub {
390 my @reply = @_;
391
392 rcv $SELF, $otherport;
393 };
417 394
418=cut 395=cut
419 396
420sub rcv($@) { 397sub rcv($@) {
421 my $port = shift; 398 my $port = shift;
422 my ($noderef, $portid) = split /#/, $port, 2; 399 my ($noderef, $portid) = split /#/, $port, 2;
423 400
424 ($NODE{$noderef} || add_node $noderef) == $NODE{""} 401 $NODE{$noderef} == $NODE{""}
425 or Carp::croak "$port: rcv can only be called on local ports, caught"; 402 or Carp::croak "$port: rcv can only be called on local ports, caught";
426 403
427 if (@_ == 1) { 404 while (@_) {
405 if (ref $_[0]) {
406 if (my $self = $PORT_DATA{$portid}) {
407 "AnyEvent::MP::Port" eq ref $self
408 or Carp::croak "$port: rcv can only be called on message matching ports, caught";
409
410 $self->[2] = shift;
411 } else {
428 my $cb = shift; 412 my $cb = shift;
429 delete $PORT_DATA{$portid};
430 $PORT{$portid} = sub { 413 $PORT{$portid} = sub {
431 local $SELF = $port; 414 local $SELF = $port;
432 eval { 415 eval { &$cb }; _self_die if $@;
433 &$cb 416 };
434 and kil $port;
435 }; 417 }
436 _self_die if $@; 418 } elsif (defined $_[0]) {
437 };
438 } else {
439 my $self = $PORT_DATA{$portid} ||= do { 419 my $self = $PORT_DATA{$portid} ||= do {
440 my $self = bless { 420 my $self = bless [$PORT{$port} || sub { }, { }, $port], "AnyEvent::MP::Port";
441 id => $port,
442 }, "AnyEvent::MP::Port";
443 421
444 $PORT{$portid} = sub { 422 $PORT{$portid} = sub {
445 local $SELF = $port; 423 local $SELF = $port;
446 424
447 eval {
448 for (@{ $self->{rc0}{$_[0]} }) { 425 if (my $cb = $self->[1]{$_[0]}) {
449 $_ && &{$_->[0]} 426 shift;
450 && undef $_; 427 eval { &$cb }; _self_die if $@;
451 } 428 } else {
452
453 for (@{ $self->{rcv}{$_[0]} }) {
454 $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1]
455 && &{$_->[0]} 429 &{ $self->[0] };
456 && undef $_;
457 }
458
459 for (@{ $self->{any} }) {
460 $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1]
461 && &{$_->[0]}
462 && undef $_;
463 } 430 }
464 }; 431 };
465 _self_die if $@; 432
433 $self
466 }; 434 };
467 435
468 $self
469 };
470
471 "AnyEvent::MP::Port" eq ref $self 436 "AnyEvent::MP::Port" eq ref $self
472 or Carp::croak "$port: rcv can only be called on message matching ports, caught"; 437 or Carp::croak "$port: rcv can only be called on message matching ports, caught";
473 438
474 while (@_) {
475 my ($match, $cb) = splice @_, 0, 2; 439 my ($tag, $cb) = splice @_, 0, 2;
476 440
477 if (!ref $match) { 441 if (defined $cb) {
478 push @{ $self->{rc0}{$match} }, [$cb]; 442 $self->[1]{$tag} = $cb;
479 } elsif (("ARRAY" eq ref $match && !ref $match->[0])) {
480 my ($type, @match) = @$match;
481 @match
482 ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match]
483 : push @{ $self->{rc0}{$match->[0]} }, [$cb];
484 } else { 443 } else {
485 push @{ $self->{any} }, [$cb, $match]; 444 delete $self->[1]{$tag};
486 } 445 }
487 } 446 }
488 } 447 }
489 448
490 $port 449 $port
546message loss has been detected. No messages will be lost "in between" 505message loss has been detected. No messages will be lost "in between"
547(after the first lost message no further messages will be received by the 506(after the first lost message no further messages will be received by the
548port). After the monitoring action was invoked, further messages might get 507port). After the monitoring action was invoked, further messages might get
549delivered again. 508delivered again.
550 509
510Note that monitoring-actions are one-shot: once released, they are removed
511and will not trigger again.
512
551In the first form (callback), the callback is simply called with any 513In the first form (callback), the callback is simply called with any
552number of C<@reason> elements (no @reason means that the port was deleted 514number of C<@reason> elements (no @reason means that the port was deleted
553"normally"). Note also that I<< the callback B<must> never die >>, so use 515"normally"). Note also that I<< the callback B<must> never die >>, so use
554C<eval> if unsure. 516C<eval> if unsure.
555 517
715 my $id = "$RUNIQ." . $ID++; 677 my $id = "$RUNIQ." . $ID++;
716 678
717 $_[0] =~ /::/ 679 $_[0] =~ /::/
718 or Carp::croak "spawn init function must be a fully-qualified name, caught"; 680 or Carp::croak "spawn init function must be a fully-qualified name, caught";
719 681
720 ($NODE{$noderef} || add_node $noderef) 682 snd_to_func $noderef, "AnyEvent::MP::_spawn" => $id, @_;
721 ->send (["", "AnyEvent::MP::_spawn" => $id, @_]);
722 683
723 "$noderef#$id" 684 "$noderef#$id"
724} 685}
725
726=back
727
728=head1 NODE MESSAGES
729
730Nodes understand the following messages sent to them. Many of them take
731arguments called C<@reply>, which will simply be used to compose a reply
732message - C<$reply[0]> is the port to reply to, C<$reply[1]> the type and
733the remaining arguments are simply the message data.
734
735While other messages exist, they are not public and subject to change.
736
737=over 4
738
739=cut
740
741=item lookup => $name, @reply
742
743Replies with the port ID of the specified well-known port, or C<undef>.
744
745=item devnull => ...
746
747Generic data sink/CPU heat conversion.
748
749=item relay => $port, @msg
750
751Simply forwards the message to the given port.
752
753=item eval => $string[ @reply]
754
755Evaluates the given string. If C<@reply> is given, then a message of the
756form C<@reply, $@, @evalres> is sent.
757
758Example: crash another node.
759
760 snd $othernode, eval => "exit";
761
762=item time => @reply
763
764Replies the the current node time to C<@reply>.
765
766Example: tell the current node to send the current time to C<$myport> in a
767C<timereply> message.
768
769 snd $NODE, time => $myport, timereply => 1, 2;
770 # => snd $myport, timereply => 1, 2, <time>
771 686
772=back 687=back
773 688
774=head1 AnyEvent::MP vs. Distributed Erlang 689=head1 AnyEvent::MP vs. Distributed Erlang
775 690
794convenience functionality. 709convenience functionality.
795 710
796This means that AEMP requires a less tightly controlled environment at the 711This means that AEMP requires a less tightly controlled environment at the
797cost of longer node references and a slightly higher management overhead. 712cost of longer node references and a slightly higher management overhead.
798 713
714=item * Erlang has a "remote ports are like local ports" philosophy, AEMP
715uses "local ports are like remote ports".
716
717The failure modes for local ports are quite different (runtime errors
718only) then for remote ports - when a local port dies, you I<know> it dies,
719when a connection to another node dies, you know nothing about the other
720port.
721
722Erlang pretends remote ports are as reliable as local ports, even when
723they are not.
724
725AEMP encourages a "treat remote ports differently" philosophy, with local
726ports being the special case/exception, where transport errors cannot
727occur.
728
799=item * Erlang uses processes and a mailbox, AEMP does not queue. 729=item * Erlang uses processes and a mailbox, AEMP does not queue.
800 730
801Erlang uses processes that selctively receive messages, and therefore 731Erlang uses processes that selectively receive messages, and therefore
802needs a queue. AEMP is event based, queuing messages would serve no useful 732needs a queue. AEMP is event based, queuing messages would serve no
803purpose. 733useful purpose. For the same reason the pattern-matching abilities of
734AnyEvent::MP are more limited, as there is little need to be able to
735filter messages without dequeing them.
804 736
805(But see L<Coro::MP> for a more Erlang-like process model on top of AEMP). 737(But see L<Coro::MP> for a more Erlang-like process model on top of AEMP).
806 738
807=item * Erlang sends are synchronous, AEMP sends are asynchronous. 739=item * Erlang sends are synchronous, AEMP sends are asynchronous.
808 740
809Sending messages in Erlang is synchronous and blocks the process. AEMP 741Sending messages in Erlang is synchronous and blocks the process (and
810sends are immediate, connection establishment is handled in the 742so does not need a queue that can overflow). AEMP sends are immediate,
811background. 743connection establishment is handled in the background.
812 744
813=item * Erlang can silently lose messages, AEMP cannot. 745=item * Erlang suffers from silent message loss, AEMP does not.
814 746
815Erlang makes few guarantees on messages delivery - messages can get lost 747Erlang makes few guarantees on messages delivery - messages can get lost
816without any of the processes realising it (i.e. you send messages a, b, 748without any of the processes realising it (i.e. you send messages a, b,
817and c, and the other side only receives messages a and c). 749and c, and the other side only receives messages a and c).
818 750
830eventually be killed - it cannot happen that a node detects a port as dead 762eventually be killed - it cannot happen that a node detects a port as dead
831and then later sends messages to it, finding it is still alive. 763and then later sends messages to it, finding it is still alive.
832 764
833=item * Erlang can send messages to the wrong port, AEMP does not. 765=item * Erlang can send messages to the wrong port, AEMP does not.
834 766
835In Erlang it is quite possible that a node that restarts reuses a process 767In Erlang it is quite likely that a node that restarts reuses a process ID
836ID known to other nodes for a completely different process, causing 768known to other nodes for a completely different process, causing messages
837messages destined for that process to end up in an unrelated process. 769destined for that process to end up in an unrelated process.
838 770
839AEMP never reuses port IDs, so old messages or old port IDs floating 771AEMP never reuses port IDs, so old messages or old port IDs floating
840around in the network will not be sent to an unrelated port. 772around in the network will not be sent to an unrelated port.
841 773
842=item * Erlang uses unprotected connections, AEMP uses secure 774=item * Erlang uses unprotected connections, AEMP uses secure

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines