ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
(Generate patch)

Comparing AnyEvent-MP/MP.pm (file contents):
Revision 1.49 by root, Thu Aug 13 15:29:58 2009 UTC vs.
Revision 1.52 by root, Fri Aug 14 15:13:20 2009 UTC

22 # sending messages 22 # sending messages
23 snd $port, type => data...; 23 snd $port, type => data...;
24 snd $port, @msg; 24 snd $port, @msg;
25 snd @msg_with_first_element_being_a_port; 25 snd @msg_with_first_element_being_a_port;
26 26
27 # creating/using miniports 27 # creating/using ports, the simple way
28 my $miniport = port { my @msg = @_; 0 }; 28 my $somple_port = port { my @msg = @_; 0 };
29 29
30 # creating/using full ports 30 # creating/using ports, tagged message matching
31 my $port = port; 31 my $port = port;
32 rcv $port, smartmatch => $cb->(@msg);
33 rcv $port, ping => sub { snd $_[0], "pong"; 0 }; 32 rcv $port, ping => sub { snd $_[0], "pong"; 0 };
34 rcv $port, pong => sub { warn "pong received\n"; 0 }; 33 rcv $port, pong => sub { warn "pong received\n"; 0 };
35
36 # more, smarter, matches (_any_ is exported by this module)
37 rcv $port, [child_died => $pid] => sub { ...
38 rcv $port, [_any_, _any_, 3] => sub { .. $_[2] is 3
39 34
40 # create a port on another node 35 # create a port on another node
41 my $port = spawn $node, $initfunc, @initdata; 36 my $port = spawn $node, $initfunc, @initdata;
42 37
43 # monitoring 38 # monitoring
148 kil $SELF, die => $msg; 143 kil $SELF, die => $msg;
149} 144}
150 145
151=item $thisnode = NODE / $NODE 146=item $thisnode = NODE / $NODE
152 147
153The C<NODE> function returns, and the C<$NODE> variable contains 148The C<NODE> function returns, and the C<$NODE> variable contains the
154the noderef of the local node. The value is initialised by a call 149noderef of the local node. The value is initialised by a call to
155to C<become_public> or C<become_slave>, after which all local port 150C<initialise_node>.
156identifiers become invalid.
157 151
158=item $noderef = node_of $port 152=item $noderef = node_of $port
159 153
160Extracts and returns the noderef from a portid or a noderef. 154Extracts and returns the noderef from a port ID or a noderef.
161 155
162=item initialise_node $noderef, $seednode, $seednode... 156=item initialise_node $noderef, $seednode, $seednode...
163 157
164=item initialise_node "slave/", $master, $master... 158=item initialise_node "slave/", $master, $master...
165 159
284=item snd $port, type => @data 278=item snd $port, type => @data
285 279
286=item snd $port, @msg 280=item snd $port, @msg
287 281
288Send the given message to the given port ID, which can identify either 282Send the given message to the given port ID, which can identify either
289a local or a remote port, and can be either a string or soemthignt hat 283a local or a remote port, and must be a port ID.
290stringifies a sa port ID (such as a port object :).
291 284
292While the message can be about anything, it is highly recommended to use a 285While the message can be about anything, it is highly recommended to use a
293string as first element (a portid, or some word that indicates a request 286string as first element (a port ID, or some word that indicates a request
294type etc.). 287type etc.).
295 288
296The message data effectively becomes read-only after a call to this 289The message data effectively becomes read-only after a call to this
297function: modifying any argument is not allowed and can cause many 290function: modifying any argument is not allowed and can cause many
298problems. 291problems.
303that Storable can serialise and deserialise is allowed, and for the local 296that Storable can serialise and deserialise is allowed, and for the local
304node, anything can be passed. 297node, anything can be passed.
305 298
306=item $local_port = port 299=item $local_port = port
307 300
308Create a new local port object that can be used either as a pattern 301Create a new local port object and returns its port ID. Initially it has
309matching port ("full port") or a single-callback port ("miniport"), 302no callbacks set and will throw an error when it receives messages.
310depending on how C<rcv> callbacks are bound to the object.
311 303
312=item $port = port { my @msg = @_; $finished } 304=item $local_port = port { my @msg = @_ }
313 305
314Creates a "miniport", that is, a very lightweight port without any pattern 306Creates a new local port, and returns its ID. Semantically the same as
315matching behind it, and returns its ID. Semantically the same as creating
316a port and calling C<rcv $port, $callback> on it. 307creating a port and calling C<rcv $port, $callback> on it.
317 308
318The block will be called for every message received on the port. When the 309The block will be called for every message received on the port, with the
319callback returns a true value its job is considered "done" and the port 310global variable C<$SELF> set to the port ID. Runtime errors will cause the
320will be destroyed. Otherwise it will stay alive. 311port to be C<kil>ed. The message will be passed as-is, no extra argument
312(i.e. no port ID) will be passed to the callback.
321 313
322The message will be passed as-is, no extra argument (i.e. no port id) will 314If you want to stop/destroy the port, simply C<kil> it:
323be passed to the callback.
324 315
325If you need the local port id in the callback, this works nicely: 316 my $port = port {
326 317 my @msg = @_;
327 my $port; $port = port { 318 ...
328 snd $otherport, reply => $port; 319 kil $SELF;
329 }; 320 };
330 321
331=cut 322=cut
332 323
333sub rcv($@); 324sub rcv($@);
325
326sub _kilme {
327 die "received message on port without callback";
328}
334 329
335sub port(;&) { 330sub port(;&) {
336 my $id = "$UNIQ." . $ID++; 331 my $id = "$UNIQ." . $ID++;
337 my $port = "$NODE#$id"; 332 my $port = "$NODE#$id";
338 333
339 if (@_) { 334 rcv $port, shift || \&_kilme;
340 rcv $port, shift;
341 } else {
342 $PORT{$id} = sub { }; # nop
343 }
344 335
345 $port 336 $port
346} 337}
347 338
348=item reg $port, $name
349
350=item reg $name
351
352Registers the given port (or C<$SELF><<< if missing) under the name
353C<$name>. If the name already exists it is replaced.
354
355A port can only be registered under one well known name.
356
357A port automatically becomes unregistered when it is killed.
358
359=cut
360
361sub reg(@) {
362 my $port = @_ > 1 ? shift : $SELF || Carp::croak 'reg: called with one argument only, but $SELF not set,';
363
364 $REG{$_[0]} = $port;
365}
366
367=item rcv $port, $callback->(@msg) 339=item rcv $local_port, $callback->(@msg)
368 340
369Replaces the callback on the specified miniport (after converting it to 341Replaces the default callback on the specified port. There is no way to
370one if required). 342remove the default callback: use C<sub { }> to disable it, or better
371 343C<kil> the port when it is no longer needed.
372=item rcv $port, tagstring => $callback->(@msg), ...
373
374=item rcv $port, $smartmatch => $callback->(@msg), ...
375
376=item rcv $port, [$smartmatch...] => $callback->(@msg), ...
377
378Register callbacks to be called on matching messages on the given full
379port (after converting it to one if required) and return the port.
380
381The callback has to return a true value when its work is done, after
382which is will be removed, or a false value in which case it will stay
383registered.
384 344
385The global C<$SELF> (exported by this module) contains C<$port> while 345The global C<$SELF> (exported by this module) contains C<$port> while
386executing the callback. 346executing the callback. Runtime errors during callback execution will
347result in the port being C<kil>ed.
387 348
388Runtime errors during callback execution will result in the port being 349The default callback received all messages not matched by a more specific
389C<kil>ed. 350C<tag> match.
390 351
391If the match is an array reference, then it will be matched against the 352=item rcv $local_port, tag => $callback->(@msg_without_tag), ...
392first elements of the message, otherwise only the first element is being
393matched.
394 353
395Any element in the match that is specified as C<_any_> (a function 354Register callbacks to be called on messages starting with the given tag on
396exported by this module) matches any single element of the message. 355the given port (and return the port), or unregister it (when C<$callback>
356is C<$undef>).
397 357
398While not required, it is highly recommended that the first matching 358The original message will be passed to the callback, after the first
399element is a string identifying the message. The one-string-only match is 359element (the tag) has been removed. The callback will use the same
400also the most efficient match (by far). 360environment as the default callback (see above).
401 361
402Example: create a port and bind receivers on it in one go. 362Example: create a port and bind receivers on it in one go.
403 363
404 my $port = rcv port, 364 my $port = rcv port,
405 msg1 => sub { ...; 0 }, 365 msg1 => sub { ... },
406 msg2 => sub { ...; 0 }, 366 msg2 => sub { ... },
407 ; 367 ;
408 368
409Example: create a port, bind receivers and send it in a message elsewhere 369Example: create a port, bind receivers and send it in a message elsewhere
410in one go: 370in one go:
411 371
412 snd $otherport, reply => 372 snd $otherport, reply =>
413 rcv port, 373 rcv port,
414 msg1 => sub { ...; 0 }, 374 msg1 => sub { ... },
415 ... 375 ...
416 ; 376 ;
417 377
418=cut 378=cut
419 379
422 my ($noderef, $portid) = split /#/, $port, 2; 382 my ($noderef, $portid) = split /#/, $port, 2;
423 383
424 ($NODE{$noderef} || add_node $noderef) == $NODE{""} 384 ($NODE{$noderef} || add_node $noderef) == $NODE{""}
425 or Carp::croak "$port: rcv can only be called on local ports, caught"; 385 or Carp::croak "$port: rcv can only be called on local ports, caught";
426 386
427 if (@_ == 1) { 387 while (@_) {
388 if (ref $_[0]) {
389 if (my $self = $PORT_DATA{$portid}) {
390 "AnyEvent::MP::Port" eq ref $self
391 or Carp::croak "$port: rcv can only be called on message matching ports, caught";
392
393 $self->[2] = shift;
394 } else {
428 my $cb = shift; 395 my $cb = shift;
429 delete $PORT_DATA{$portid};
430 $PORT{$portid} = sub { 396 $PORT{$portid} = sub {
431 local $SELF = $port; 397 local $SELF = $port;
432 eval { 398 eval { &$cb }; _self_die if $@;
433 &$cb 399 };
434 and kil $port;
435 }; 400 }
436 _self_die if $@; 401 } elsif (defined $_[0]) {
437 };
438 } else {
439 my $self = $PORT_DATA{$portid} ||= do { 402 my $self = $PORT_DATA{$portid} ||= do {
440 my $self = bless { 403 my $self = bless [$PORT{$port} || sub { }, { }, $port], "AnyEvent::MP::Port";
441 id => $port,
442 }, "AnyEvent::MP::Port";
443 404
444 $PORT{$portid} = sub { 405 $PORT{$portid} = sub {
445 local $SELF = $port; 406 local $SELF = $port;
446 407
447 eval {
448 for (@{ $self->{rc0}{$_[0]} }) { 408 if (my $cb = $self->[1]{$_[0]}) {
449 $_ && &{$_->[0]} 409 shift;
450 && undef $_; 410 eval { &$cb }; _self_die if $@;
451 } 411 } else {
452
453 for (@{ $self->{rcv}{$_[0]} }) {
454 $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1]
455 && &{$_->[0]} 412 &{ $self->[0] };
456 && undef $_;
457 }
458
459 for (@{ $self->{any} }) {
460 $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1]
461 && &{$_->[0]}
462 && undef $_;
463 } 413 }
464 }; 414 };
465 _self_die if $@; 415
416 $self
466 }; 417 };
467 418
468 $self
469 };
470
471 "AnyEvent::MP::Port" eq ref $self 419 "AnyEvent::MP::Port" eq ref $self
472 or Carp::croak "$port: rcv can only be called on message matching ports, caught"; 420 or Carp::croak "$port: rcv can only be called on message matching ports, caught";
473 421
474 while (@_) {
475 my ($match, $cb) = splice @_, 0, 2; 422 my ($tag, $cb) = splice @_, 0, 2;
476 423
477 if (!ref $match) { 424 if (defined $cb) {
478 push @{ $self->{rc0}{$match} }, [$cb]; 425 $self->[1]{$tag} = $cb;
479 } elsif (("ARRAY" eq ref $match && !ref $match->[0])) {
480 my ($type, @match) = @$match;
481 @match
482 ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match]
483 : push @{ $self->{rc0}{$match->[0]} }, [$cb];
484 } else { 426 } else {
485 push @{ $self->{any} }, [$cb, $match]; 427 delete $self->[1]{$tag};
486 } 428 }
487 } 429 }
488 } 430 }
489 431
490 $port 432 $port
794convenience functionality. 736convenience functionality.
795 737
796This means that AEMP requires a less tightly controlled environment at the 738This means that AEMP requires a less tightly controlled environment at the
797cost of longer node references and a slightly higher management overhead. 739cost of longer node references and a slightly higher management overhead.
798 740
741=item Erlang has a "remote ports are like local ports" philosophy, AEMP
742uses "local ports are like remote ports".
743
744The failure modes for local ports are quite different (runtime errors
745only) then for remote ports - when a local port dies, you I<know> it dies,
746when a connection to another node dies, you know nothing about the other
747port.
748
749Erlang pretends remote ports are as reliable as local ports, even when
750they are not.
751
752AEMP encourages a "treat remote ports differently" philosophy, with local
753ports being the special case/exception, where transport errors cannot
754occur.
755
799=item * Erlang uses processes and a mailbox, AEMP does not queue. 756=item * Erlang uses processes and a mailbox, AEMP does not queue.
800 757
801Erlang uses processes that selctively receive messages, and therefore 758Erlang uses processes that selectively receive messages, and therefore
802needs a queue. AEMP is event based, queuing messages would serve no useful 759needs a queue. AEMP is event based, queuing messages would serve no
803purpose. 760useful purpose. For the same reason the pattern-matching abilities of
761AnyEvent::MP are more limited, as there is little need to be able to
762filter messages without dequeing them.
804 763
805(But see L<Coro::MP> for a more Erlang-like process model on top of AEMP). 764(But see L<Coro::MP> for a more Erlang-like process model on top of AEMP).
806 765
807=item * Erlang sends are synchronous, AEMP sends are asynchronous. 766=item * Erlang sends are synchronous, AEMP sends are asynchronous.
808 767
809Sending messages in Erlang is synchronous and blocks the process. AEMP 768Sending messages in Erlang is synchronous and blocks the process (and
810sends are immediate, connection establishment is handled in the 769so does not need a queue that can overflow). AEMP sends are immediate,
811background. 770connection establishment is handled in the background.
812 771
813=item * Erlang can silently lose messages, AEMP cannot. 772=item * Erlang suffers from silent message loss, AEMP does not.
814 773
815Erlang makes few guarantees on messages delivery - messages can get lost 774Erlang makes few guarantees on messages delivery - messages can get lost
816without any of the processes realising it (i.e. you send messages a, b, 775without any of the processes realising it (i.e. you send messages a, b,
817and c, and the other side only receives messages a and c). 776and c, and the other side only receives messages a and c).
818 777
830eventually be killed - it cannot happen that a node detects a port as dead 789eventually be killed - it cannot happen that a node detects a port as dead
831and then later sends messages to it, finding it is still alive. 790and then later sends messages to it, finding it is still alive.
832 791
833=item * Erlang can send messages to the wrong port, AEMP does not. 792=item * Erlang can send messages to the wrong port, AEMP does not.
834 793
835In Erlang it is quite possible that a node that restarts reuses a process 794In Erlang it is quite likely that a node that restarts reuses a process ID
836ID known to other nodes for a completely different process, causing 795known to other nodes for a completely different process, causing messages
837messages destined for that process to end up in an unrelated process. 796destined for that process to end up in an unrelated process.
838 797
839AEMP never reuses port IDs, so old messages or old port IDs floating 798AEMP never reuses port IDs, so old messages or old port IDs floating
840around in the network will not be sent to an unrelated port. 799around in the network will not be sent to an unrelated port.
841 800
842=item * Erlang uses unprotected connections, AEMP uses secure 801=item * Erlang uses unprotected connections, AEMP uses secure

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines