ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/cvsroot/AnyEvent-MP/MP.pm
(Generate patch)

Comparing cvsroot/AnyEvent-MP/MP.pm (file contents):
Revision 1.49 by root, Thu Aug 13 15:29:58 2009 UTC vs.
Revision 1.54 by root, Fri Aug 14 16:15:37 2009 UTC

22 # sending messages 22 # sending messages
23 snd $port, type => data...; 23 snd $port, type => data...;
24 snd $port, @msg; 24 snd $port, @msg;
25 snd @msg_with_first_element_being_a_port; 25 snd @msg_with_first_element_being_a_port;
26 26
27 # creating/using miniports 27 # creating/using ports, the simple way
28 my $miniport = port { my @msg = @_; 0 }; 28 my $simple_port = port { my @msg = @_; 0 };
29 29
30 # creating/using full ports 30 # creating/using ports, tagged message matching
31 my $port = port; 31 my $port = port;
32 rcv $port, smartmatch => $cb->(@msg);
33 rcv $port, ping => sub { snd $_[0], "pong"; 0 }; 32 rcv $port, ping => sub { snd $_[0], "pong"; 0 };
34 rcv $port, pong => sub { warn "pong received\n"; 0 }; 33 rcv $port, pong => sub { warn "pong received\n"; 0 };
35
36 # more, smarter, matches (_any_ is exported by this module)
37 rcv $port, [child_died => $pid] => sub { ...
38 rcv $port, [_any_, _any_, 3] => sub { .. $_[2] is 3
39 34
40 # create a port on another node 35 # create a port on another node
41 my $port = spawn $node, $initfunc, @initdata; 36 my $port = spawn $node, $initfunc, @initdata;
42 37
43 # monitoring 38 # monitoring
74 69
75=item port 70=item port
76 71
77A port is something you can send messages to (with the C<snd> function). 72A port is something you can send messages to (with the C<snd> function).
78 73
79Some ports allow you to register C<rcv> handlers that can match specific 74Ports allow you to register C<rcv> handlers that can match all or just
80messages. All C<rcv> handlers will receive messages they match, messages 75some messages. Messages will not be queued.
81will not be queued.
82 76
83=item port id - C<noderef#portname> 77=item port id - C<noderef#portname>
84 78
85A port id is normaly the concatenation of a noderef, a hash-mark (C<#>) as 79A port ID is the concatenation of a noderef, a hash-mark (C<#>) as
86separator, and a port name (a printable string of unspecified format). An 80separator, and a port name (a printable string of unspecified format). An
87exception is the the node port, whose ID is identical to its node 81exception is the the node port, whose ID is identical to its node
88reference. 82reference.
89 83
90=item node 84=item node
91 85
92A node is a single process containing at least one port - the node 86A node is a single process containing at least one port - the node port,
93port. You can send messages to node ports to find existing ports or to 87which provides nodes to manage each other remotely, and to create new
94create new ports, among other things. 88ports.
95 89
96Nodes are either private (single-process only), slaves (connected to a 90Nodes are either private (single-process only), slaves (connected to a
97master node only) or public nodes (connectable from unrelated nodes). 91master node only) or public nodes (connectable from unrelated nodes).
98 92
99=item noderef - C<host:port,host:port...>, C<id@noderef>, C<id> 93=item noderef - C<host:port,host:port...>, C<id@noderef>, C<id>
148 kil $SELF, die => $msg; 142 kil $SELF, die => $msg;
149} 143}
150 144
151=item $thisnode = NODE / $NODE 145=item $thisnode = NODE / $NODE
152 146
153The C<NODE> function returns, and the C<$NODE> variable contains 147The C<NODE> function returns, and the C<$NODE> variable contains the
154the noderef of the local node. The value is initialised by a call 148noderef of the local node. The value is initialised by a call to
155to C<become_public> or C<become_slave>, after which all local port 149C<initialise_node>.
156identifiers become invalid.
157 150
158=item $noderef = node_of $port 151=item $noderef = node_of $port
159 152
160Extracts and returns the noderef from a portid or a noderef. 153Extracts and returns the noderef from a port ID or a noderef.
161 154
162=item initialise_node $noderef, $seednode, $seednode... 155=item initialise_node $noderef, $seednode, $seednode...
163 156
164=item initialise_node "slave/", $master, $master... 157=item initialise_node "slave/", $master, $master...
165 158
284=item snd $port, type => @data 277=item snd $port, type => @data
285 278
286=item snd $port, @msg 279=item snd $port, @msg
287 280
288Send the given message to the given port ID, which can identify either 281Send the given message to the given port ID, which can identify either
289a local or a remote port, and can be either a string or soemthignt hat 282a local or a remote port, and must be a port ID.
290stringifies a sa port ID (such as a port object :).
291 283
292While the message can be about anything, it is highly recommended to use a 284While the message can be about anything, it is highly recommended to use a
293string as first element (a portid, or some word that indicates a request 285string as first element (a port ID, or some word that indicates a request
294type etc.). 286type etc.).
295 287
296The message data effectively becomes read-only after a call to this 288The message data effectively becomes read-only after a call to this
297function: modifying any argument is not allowed and can cause many 289function: modifying any argument is not allowed and can cause many
298problems. 290problems.
303that Storable can serialise and deserialise is allowed, and for the local 295that Storable can serialise and deserialise is allowed, and for the local
304node, anything can be passed. 296node, anything can be passed.
305 297
306=item $local_port = port 298=item $local_port = port
307 299
308Create a new local port object that can be used either as a pattern 300Create a new local port object and returns its port ID. Initially it has
309matching port ("full port") or a single-callback port ("miniport"), 301no callbacks set and will throw an error when it receives messages.
310depending on how C<rcv> callbacks are bound to the object.
311 302
312=item $port = port { my @msg = @_; $finished } 303=item $local_port = port { my @msg = @_ }
313 304
314Creates a "miniport", that is, a very lightweight port without any pattern 305Creates a new local port, and returns its ID. Semantically the same as
315matching behind it, and returns its ID. Semantically the same as creating
316a port and calling C<rcv $port, $callback> on it. 306creating a port and calling C<rcv $port, $callback> on it.
317 307
318The block will be called for every message received on the port. When the 308The block will be called for every message received on the port, with the
319callback returns a true value its job is considered "done" and the port 309global variable C<$SELF> set to the port ID. Runtime errors will cause the
320will be destroyed. Otherwise it will stay alive. 310port to be C<kil>ed. The message will be passed as-is, no extra argument
311(i.e. no port ID) will be passed to the callback.
321 312
322The message will be passed as-is, no extra argument (i.e. no port id) will 313If you want to stop/destroy the port, simply C<kil> it:
323be passed to the callback.
324 314
325If you need the local port id in the callback, this works nicely: 315 my $port = port {
326 316 my @msg = @_;
327 my $port; $port = port { 317 ...
328 snd $otherport, reply => $port; 318 kil $SELF;
329 }; 319 };
330 320
331=cut 321=cut
332 322
333sub rcv($@); 323sub rcv($@);
324
325sub _kilme {
326 die "received message on port without callback";
327}
334 328
335sub port(;&) { 329sub port(;&) {
336 my $id = "$UNIQ." . $ID++; 330 my $id = "$UNIQ." . $ID++;
337 my $port = "$NODE#$id"; 331 my $port = "$NODE#$id";
338 332
339 if (@_) { 333 rcv $port, shift || \&_kilme;
340 rcv $port, shift;
341 } else {
342 $PORT{$id} = sub { }; # nop
343 }
344 334
345 $port 335 $port
346} 336}
347 337
348=item reg $port, $name
349
350=item reg $name
351
352Registers the given port (or C<$SELF><<< if missing) under the name
353C<$name>. If the name already exists it is replaced.
354
355A port can only be registered under one well known name.
356
357A port automatically becomes unregistered when it is killed.
358
359=cut
360
361sub reg(@) {
362 my $port = @_ > 1 ? shift : $SELF || Carp::croak 'reg: called with one argument only, but $SELF not set,';
363
364 $REG{$_[0]} = $port;
365}
366
367=item rcv $port, $callback->(@msg) 338=item rcv $local_port, $callback->(@msg)
368 339
369Replaces the callback on the specified miniport (after converting it to 340Replaces the default callback on the specified port. There is no way to
370one if required). 341remove the default callback: use C<sub { }> to disable it, or better
371 342C<kil> the port when it is no longer needed.
372=item rcv $port, tagstring => $callback->(@msg), ...
373
374=item rcv $port, $smartmatch => $callback->(@msg), ...
375
376=item rcv $port, [$smartmatch...] => $callback->(@msg), ...
377
378Register callbacks to be called on matching messages on the given full
379port (after converting it to one if required) and return the port.
380
381The callback has to return a true value when its work is done, after
382which is will be removed, or a false value in which case it will stay
383registered.
384 343
385The global C<$SELF> (exported by this module) contains C<$port> while 344The global C<$SELF> (exported by this module) contains C<$port> while
386executing the callback. 345executing the callback. Runtime errors during callback execution will
346result in the port being C<kil>ed.
387 347
388Runtime errors during callback execution will result in the port being 348The default callback received all messages not matched by a more specific
389C<kil>ed. 349C<tag> match.
390 350
391If the match is an array reference, then it will be matched against the 351=item rcv $local_port, tag => $callback->(@msg_without_tag), ...
392first elements of the message, otherwise only the first element is being
393matched.
394 352
395Any element in the match that is specified as C<_any_> (a function 353Register (or replace) callbacks to be called on messages starting with the
396exported by this module) matches any single element of the message. 354given tag on the given port (and return the port), or unregister it (when
355C<$callback> is C<$undef> or missing). There can only be one callback
356registered for each tag.
397 357
398While not required, it is highly recommended that the first matching 358The original message will be passed to the callback, after the first
399element is a string identifying the message. The one-string-only match is 359element (the tag) has been removed. The callback will use the same
400also the most efficient match (by far). 360environment as the default callback (see above).
401 361
402Example: create a port and bind receivers on it in one go. 362Example: create a port and bind receivers on it in one go.
403 363
404 my $port = rcv port, 364 my $port = rcv port,
405 msg1 => sub { ...; 0 }, 365 msg1 => sub { ... },
406 msg2 => sub { ...; 0 }, 366 msg2 => sub { ... },
407 ; 367 ;
408 368
409Example: create a port, bind receivers and send it in a message elsewhere 369Example: create a port, bind receivers and send it in a message elsewhere
410in one go: 370in one go:
411 371
412 snd $otherport, reply => 372 snd $otherport, reply =>
413 rcv port, 373 rcv port,
414 msg1 => sub { ...; 0 }, 374 msg1 => sub { ... },
415 ... 375 ...
416 ; 376 ;
377
378Example: temporarily register a rcv callback for a tag matching some port
379(e.g. for a rpc reply) and unregister it after a message was received.
380
381 rcv $port, $otherport => sub {
382 my @reply = @_;
383
384 rcv $SELF, $otherport;
385 };
417 386
418=cut 387=cut
419 388
420sub rcv($@) { 389sub rcv($@) {
421 my $port = shift; 390 my $port = shift;
422 my ($noderef, $portid) = split /#/, $port, 2; 391 my ($noderef, $portid) = split /#/, $port, 2;
423 392
424 ($NODE{$noderef} || add_node $noderef) == $NODE{""} 393 ($NODE{$noderef} || add_node $noderef) == $NODE{""}
425 or Carp::croak "$port: rcv can only be called on local ports, caught"; 394 or Carp::croak "$port: rcv can only be called on local ports, caught";
426 395
427 if (@_ == 1) { 396 while (@_) {
397 if (ref $_[0]) {
398 if (my $self = $PORT_DATA{$portid}) {
399 "AnyEvent::MP::Port" eq ref $self
400 or Carp::croak "$port: rcv can only be called on message matching ports, caught";
401
402 $self->[2] = shift;
403 } else {
428 my $cb = shift; 404 my $cb = shift;
429 delete $PORT_DATA{$portid};
430 $PORT{$portid} = sub { 405 $PORT{$portid} = sub {
431 local $SELF = $port; 406 local $SELF = $port;
432 eval { 407 eval { &$cb }; _self_die if $@;
433 &$cb 408 };
434 and kil $port;
435 }; 409 }
436 _self_die if $@; 410 } elsif (defined $_[0]) {
437 };
438 } else {
439 my $self = $PORT_DATA{$portid} ||= do { 411 my $self = $PORT_DATA{$portid} ||= do {
440 my $self = bless { 412 my $self = bless [$PORT{$port} || sub { }, { }, $port], "AnyEvent::MP::Port";
441 id => $port,
442 }, "AnyEvent::MP::Port";
443 413
444 $PORT{$portid} = sub { 414 $PORT{$portid} = sub {
445 local $SELF = $port; 415 local $SELF = $port;
446 416
447 eval {
448 for (@{ $self->{rc0}{$_[0]} }) { 417 if (my $cb = $self->[1]{$_[0]}) {
449 $_ && &{$_->[0]} 418 shift;
450 && undef $_; 419 eval { &$cb }; _self_die if $@;
451 } 420 } else {
452
453 for (@{ $self->{rcv}{$_[0]} }) {
454 $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1]
455 && &{$_->[0]} 421 &{ $self->[0] };
456 && undef $_;
457 }
458
459 for (@{ $self->{any} }) {
460 $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1]
461 && &{$_->[0]}
462 && undef $_;
463 } 422 }
464 }; 423 };
465 _self_die if $@; 424
425 $self
466 }; 426 };
467 427
468 $self
469 };
470
471 "AnyEvent::MP::Port" eq ref $self 428 "AnyEvent::MP::Port" eq ref $self
472 or Carp::croak "$port: rcv can only be called on message matching ports, caught"; 429 or Carp::croak "$port: rcv can only be called on message matching ports, caught";
473 430
474 while (@_) {
475 my ($match, $cb) = splice @_, 0, 2; 431 my ($tag, $cb) = splice @_, 0, 2;
476 432
477 if (!ref $match) { 433 if (defined $cb) {
478 push @{ $self->{rc0}{$match} }, [$cb]; 434 $self->[1]{$tag} = $cb;
479 } elsif (("ARRAY" eq ref $match && !ref $match->[0])) {
480 my ($type, @match) = @$match;
481 @match
482 ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match]
483 : push @{ $self->{rc0}{$match->[0]} }, [$cb];
484 } else { 435 } else {
485 push @{ $self->{any} }, [$cb, $match]; 436 delete $self->[1]{$tag};
486 } 437 }
487 } 438 }
488 } 439 }
489 440
490 $port 441 $port
794convenience functionality. 745convenience functionality.
795 746
796This means that AEMP requires a less tightly controlled environment at the 747This means that AEMP requires a less tightly controlled environment at the
797cost of longer node references and a slightly higher management overhead. 748cost of longer node references and a slightly higher management overhead.
798 749
750=item * Erlang has a "remote ports are like local ports" philosophy, AEMP
751uses "local ports are like remote ports".
752
753The failure modes for local ports are quite different (runtime errors
754only) then for remote ports - when a local port dies, you I<know> it dies,
755when a connection to another node dies, you know nothing about the other
756port.
757
758Erlang pretends remote ports are as reliable as local ports, even when
759they are not.
760
761AEMP encourages a "treat remote ports differently" philosophy, with local
762ports being the special case/exception, where transport errors cannot
763occur.
764
799=item * Erlang uses processes and a mailbox, AEMP does not queue. 765=item * Erlang uses processes and a mailbox, AEMP does not queue.
800 766
801Erlang uses processes that selctively receive messages, and therefore 767Erlang uses processes that selectively receive messages, and therefore
802needs a queue. AEMP is event based, queuing messages would serve no useful 768needs a queue. AEMP is event based, queuing messages would serve no
803purpose. 769useful purpose. For the same reason the pattern-matching abilities of
770AnyEvent::MP are more limited, as there is little need to be able to
771filter messages without dequeing them.
804 772
805(But see L<Coro::MP> for a more Erlang-like process model on top of AEMP). 773(But see L<Coro::MP> for a more Erlang-like process model on top of AEMP).
806 774
807=item * Erlang sends are synchronous, AEMP sends are asynchronous. 775=item * Erlang sends are synchronous, AEMP sends are asynchronous.
808 776
809Sending messages in Erlang is synchronous and blocks the process. AEMP 777Sending messages in Erlang is synchronous and blocks the process (and
810sends are immediate, connection establishment is handled in the 778so does not need a queue that can overflow). AEMP sends are immediate,
811background. 779connection establishment is handled in the background.
812 780
813=item * Erlang can silently lose messages, AEMP cannot. 781=item * Erlang suffers from silent message loss, AEMP does not.
814 782
815Erlang makes few guarantees on messages delivery - messages can get lost 783Erlang makes few guarantees on messages delivery - messages can get lost
816without any of the processes realising it (i.e. you send messages a, b, 784without any of the processes realising it (i.e. you send messages a, b,
817and c, and the other side only receives messages a and c). 785and c, and the other side only receives messages a and c).
818 786
830eventually be killed - it cannot happen that a node detects a port as dead 798eventually be killed - it cannot happen that a node detects a port as dead
831and then later sends messages to it, finding it is still alive. 799and then later sends messages to it, finding it is still alive.
832 800
833=item * Erlang can send messages to the wrong port, AEMP does not. 801=item * Erlang can send messages to the wrong port, AEMP does not.
834 802
835In Erlang it is quite possible that a node that restarts reuses a process 803In Erlang it is quite likely that a node that restarts reuses a process ID
836ID known to other nodes for a completely different process, causing 804known to other nodes for a completely different process, causing messages
837messages destined for that process to end up in an unrelated process. 805destined for that process to end up in an unrelated process.
838 806
839AEMP never reuses port IDs, so old messages or old port IDs floating 807AEMP never reuses port IDs, so old messages or old port IDs floating
840around in the network will not be sent to an unrelated port. 808around in the network will not be sent to an unrelated port.
841 809
842=item * Erlang uses unprotected connections, AEMP uses secure 810=item * Erlang uses unprotected connections, AEMP uses secure

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines