ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP.pm
(Generate patch)

Comparing AnyEvent-MP/MP.pm (file contents):
Revision 1.49 by root, Thu Aug 13 15:29:58 2009 UTC vs.
Revision 1.50 by root, Fri Aug 14 14:01:05 2009 UTC

22 # sending messages 22 # sending messages
23 snd $port, type => data...; 23 snd $port, type => data...;
24 snd $port, @msg; 24 snd $port, @msg;
25 snd @msg_with_first_element_being_a_port; 25 snd @msg_with_first_element_being_a_port;
26 26
27 # creating/using miniports 27 # creating/using ports, the simple way
28 my $miniport = port { my @msg = @_; 0 }; 28 my $somple_port = port { my @msg = @_; 0 };
29 29
30 # creating/using full ports 30 # creating/using ports, type matching
31 my $port = port; 31 my $port = port;
32 rcv $port, smartmatch => $cb->(@msg);
33 rcv $port, ping => sub { snd $_[0], "pong"; 0 }; 32 rcv $port, ping => sub { snd $_[0], "pong"; 0 };
34 rcv $port, pong => sub { warn "pong received\n"; 0 }; 33 rcv $port, pong => sub { warn "pong received\n"; 0 };
35
36 # more, smarter, matches (_any_ is exported by this module)
37 rcv $port, [child_died => $pid] => sub { ...
38 rcv $port, [_any_, _any_, 3] => sub { .. $_[2] is 3
39 34
40 # create a port on another node 35 # create a port on another node
41 my $port = spawn $node, $initfunc, @initdata; 36 my $port = spawn $node, $initfunc, @initdata;
42 37
43 # monitoring 38 # monitoring
303that Storable can serialise and deserialise is allowed, and for the local 298that Storable can serialise and deserialise is allowed, and for the local
304node, anything can be passed. 299node, anything can be passed.
305 300
306=item $local_port = port 301=item $local_port = port
307 302
308Create a new local port object that can be used either as a pattern 303Create a new local port object and returns its port ID. Initially it has
309matching port ("full port") or a single-callback port ("miniport"), 304no callbacks set and will throw an error when it receives messages.
310depending on how C<rcv> callbacks are bound to the object.
311 305
312=item $port = port { my @msg = @_; $finished } 306=item $local_port = port { my @msg = @_ }
313 307
314Creates a "miniport", that is, a very lightweight port without any pattern 308Creates a new local port, and returns its ID. Semantically the same as
315matching behind it, and returns its ID. Semantically the same as creating
316a port and calling C<rcv $port, $callback> on it. 309creating a port and calling C<rcv $port, $callback> on it.
317 310
318The block will be called for every message received on the port. When the 311The block will be called for every message received on the port, with the
319callback returns a true value its job is considered "done" and the port 312global variable C<$SELF> set to the port ID. Runtime errors will cause the
320will be destroyed. Otherwise it will stay alive. 313port to be C<kil>ed. The message will be passed as-is, no extra argument
314(i.e. no port ID) will be passed to the callback.
321 315
322The message will be passed as-is, no extra argument (i.e. no port id) will 316If you want to stop/destroy the port, simply C<kil> it:
323be passed to the callback.
324 317
325If you need the local port id in the callback, this works nicely: 318 my $port = port {
326 319 my @msg = @_;
327 my $port; $port = port { 320 ...
328 snd $otherport, reply => $port; 321 kil $SELF;
329 }; 322 };
330 323
331=cut 324=cut
332 325
333sub rcv($@); 326sub rcv($@);
327
328sub _kilme {
329 die "received message on port without callback";
330}
334 331
335sub port(;&) { 332sub port(;&) {
336 my $id = "$UNIQ." . $ID++; 333 my $id = "$UNIQ." . $ID++;
337 my $port = "$NODE#$id"; 334 my $port = "$NODE#$id";
338 335
339 if (@_) { 336 rcv $port, shift || \&_kilme;
340 rcv $port, shift;
341 } else {
342 $PORT{$id} = sub { }; # nop
343 }
344 337
345 $port 338 $port
346} 339}
347 340
348=item reg $port, $name
349
350=item reg $name
351
352Registers the given port (or C<$SELF><<< if missing) under the name
353C<$name>. If the name already exists it is replaced.
354
355A port can only be registered under one well known name.
356
357A port automatically becomes unregistered when it is killed.
358
359=cut
360
361sub reg(@) {
362 my $port = @_ > 1 ? shift : $SELF || Carp::croak 'reg: called with one argument only, but $SELF not set,';
363
364 $REG{$_[0]} = $port;
365}
366
367=item rcv $port, $callback->(@msg) 341=item rcv $local_port, $callback->(@msg)
368 342
369Replaces the callback on the specified miniport (after converting it to 343Replaces the default callback on the specified port. There is no way to
370one if required). 344remove the default callback: use C<sub { }> to disable it, or better
371 345C<kil> the port when it is no longer needed.
372=item rcv $port, tagstring => $callback->(@msg), ...
373
374=item rcv $port, $smartmatch => $callback->(@msg), ...
375
376=item rcv $port, [$smartmatch...] => $callback->(@msg), ...
377
378Register callbacks to be called on matching messages on the given full
379port (after converting it to one if required) and return the port.
380
381The callback has to return a true value when its work is done, after
382which is will be removed, or a false value in which case it will stay
383registered.
384 346
385The global C<$SELF> (exported by this module) contains C<$port> while 347The global C<$SELF> (exported by this module) contains C<$port> while
386executing the callback. 348executing the callback. Runtime errors during callback execution will
349result in the port being C<kil>ed.
387 350
388Runtime errors during callback execution will result in the port being 351The default callback received all messages not matched by a more specific
389C<kil>ed. 352C<tag> match.
390 353
391If the match is an array reference, then it will be matched against the 354=item rcv $local_port, tag => $callback->(@msg_without_tag), ...
392first elements of the message, otherwise only the first element is being
393matched.
394 355
395Any element in the match that is specified as C<_any_> (a function 356Register callbacks to be called on messages starting with the given tag on
396exported by this module) matches any single element of the message. 357the given port (and return the port), or unregister it (when C<$callback>
358is C<$undef>).
397 359
398While not required, it is highly recommended that the first matching 360The original message will be passed to the callback, after the first
399element is a string identifying the message. The one-string-only match is 361element (the tag) has been removed. The callback will use the same
400also the most efficient match (by far). 362environment as the default callback (see above).
401 363
402Example: create a port and bind receivers on it in one go. 364Example: create a port and bind receivers on it in one go.
403 365
404 my $port = rcv port, 366 my $port = rcv port,
405 msg1 => sub { ...; 0 }, 367 msg1 => sub { ... },
406 msg2 => sub { ...; 0 }, 368 msg2 => sub { ... },
407 ; 369 ;
408 370
409Example: create a port, bind receivers and send it in a message elsewhere 371Example: create a port, bind receivers and send it in a message elsewhere
410in one go: 372in one go:
411 373
412 snd $otherport, reply => 374 snd $otherport, reply =>
413 rcv port, 375 rcv port,
414 msg1 => sub { ...; 0 }, 376 msg1 => sub { ... },
415 ... 377 ...
416 ; 378 ;
417 379
418=cut 380=cut
419 381
422 my ($noderef, $portid) = split /#/, $port, 2; 384 my ($noderef, $portid) = split /#/, $port, 2;
423 385
424 ($NODE{$noderef} || add_node $noderef) == $NODE{""} 386 ($NODE{$noderef} || add_node $noderef) == $NODE{""}
425 or Carp::croak "$port: rcv can only be called on local ports, caught"; 387 or Carp::croak "$port: rcv can only be called on local ports, caught";
426 388
427 if (@_ == 1) { 389 while (@_) {
390 if (ref $_[0]) {
391 if (my $self = $PORT_DATA{$portid}) {
392 "AnyEvent::MP::Port" eq ref $self
393 or Carp::croak "$port: rcv can only be called on message matching ports, caught";
394
395 $self->[2] = shift;
396 } else {
428 my $cb = shift; 397 my $cb = shift;
429 delete $PORT_DATA{$portid};
430 $PORT{$portid} = sub { 398 $PORT{$portid} = sub {
431 local $SELF = $port; 399 local $SELF = $port;
432 eval { 400 eval { &$cb }; _self_die if $@;
433 &$cb 401 };
434 and kil $port;
435 }; 402 }
436 _self_die if $@; 403 } elsif (defined $_[0]) {
437 };
438 } else {
439 my $self = $PORT_DATA{$portid} ||= do { 404 my $self = $PORT_DATA{$portid} ||= do {
440 my $self = bless { 405 my $self = bless [$PORT{$port} || sub { }, { }, $port], "AnyEvent::MP::Port";
441 id => $port,
442 }, "AnyEvent::MP::Port";
443 406
444 $PORT{$portid} = sub { 407 $PORT{$portid} = sub {
445 local $SELF = $port; 408 local $SELF = $port;
446 409
447 eval {
448 for (@{ $self->{rc0}{$_[0]} }) { 410 if (my $cb = $self->[1]{$_[0]}) {
449 $_ && &{$_->[0]} 411 shift;
450 && undef $_; 412 eval { &$cb }; _self_die if $@;
451 } 413 } else {
452
453 for (@{ $self->{rcv}{$_[0]} }) {
454 $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1]
455 && &{$_->[0]} 414 &{ $self->[0] };
456 && undef $_;
457 }
458
459 for (@{ $self->{any} }) {
460 $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1]
461 && &{$_->[0]}
462 && undef $_;
463 } 415 }
464 }; 416 };
465 _self_die if $@; 417
418 $self
466 }; 419 };
467 420
468 $self
469 };
470
471 "AnyEvent::MP::Port" eq ref $self 421 "AnyEvent::MP::Port" eq ref $self
472 or Carp::croak "$port: rcv can only be called on message matching ports, caught"; 422 or Carp::croak "$port: rcv can only be called on message matching ports, caught";
473 423
474 while (@_) {
475 my ($match, $cb) = splice @_, 0, 2; 424 my ($tag, $cb) = splice @_, 0, 2;
476 425
477 if (!ref $match) { 426 if (defined $cb) {
478 push @{ $self->{rc0}{$match} }, [$cb]; 427 $self->[1]{$tag} = $cb;
479 } elsif (("ARRAY" eq ref $match && !ref $match->[0])) {
480 my ($type, @match) = @$match;
481 @match
482 ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match]
483 : push @{ $self->{rc0}{$match->[0]} }, [$cb];
484 } else { 428 } else {
485 push @{ $self->{any} }, [$cb, $match]; 429 delete $self->[1]{$tag};
486 } 430 }
487 } 431 }
488 } 432 }
489 433
490 $port 434 $port

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines