--- AnyEvent-MP/MP.pm 2009/08/04 23:35:51 1.30 +++ AnyEvent-MP/MP.pm 2009/08/05 22:40:51 1.33 @@ -105,8 +105,7 @@ our $VERSION = '0.1'; our @EXPORT = qw( NODE $NODE *SELF node_of _any_ - resolve_node - become_slave become_public + resolve_node initialise_node snd rcv mon kil reg psub port ); @@ -126,7 +125,7 @@ to C or C, after which all local port identifiers become invalid. -=item $noderef = node_of $portid +=item $noderef = node_of $port Extracts and returns the noderef from a portid or a noderef. @@ -170,9 +169,9 @@ just export C<$SELF>, all the symbols called C are exported by this module, but only C<$SELF> is currently used. -=item snd $portid, type => @data +=item snd $port, type => @data -=item snd $portid, @msg +=item snd $port, @msg Send the given message to the given port ID, which can identify either a local or a remote port, and can be either a string or soemthignt hat @@ -192,27 +191,218 @@ that Storable can serialise and deserialise is allowed, and for the local node, anything can be passed. -=item kil $portid[, @reason] +=item $local_port = port -Kill the specified port with the given C<@reason>. +Create a new local port object that can be used either as a pattern +matching port ("full port") or a single-callback port ("miniport"), +depending on how C callbacks are bound to the object. + +=item $port = port { my @msg = @_; $finished } + +Creates a "miniport", that is, a very lightweight port without any pattern +matching behind it, and returns its ID. Semantically the same as creating +a port and calling C on it. -If no C<@reason> is specified, then the port is killed "normally" (linked -ports will not be kileld, or even notified). +The block will be called for every message received on the port. When the +callback returns a true value its job is considered "done" and the port +will be destroyed. Otherwise it will stay alive. -Otherwise, linked ports get killed with the same reason (second form of -C, see below). +The message will be passed as-is, no extra argument (i.e. no port id) will +be passed to the callback. -Runtime errors while evaluating C callbacks or inside C blocks -will be reported as reason C<< die => $@ >>. +If you need the local port id in the callback, this works nicely: -Transport/communication errors are reported as C<< transport_error => -$message >>. + my $port; $port = port { + snd $otherport, reply => $port; + }; + +=cut + +sub rcv($@); + +sub port(;&) { + my $id = "$UNIQ." . $ID++; + my $port = "$NODE#$id"; + + if (@_) { + rcv $port, shift; + } else { + $PORT{$id} = sub { }; # nop + } + + $port +} + +=item reg $port, $name -=item $guard = mon $portid, $cb->(@reason) +Registers the given port under the name C<$name>. If the name already +exists it is replaced. -=item $guard = mon $portid, $otherport +A port can only be registered under one well known name. -=item $guard = mon $portid, $otherport, @msg +A port automatically becomes unregistered when it is killed. + +=cut + +sub reg(@) { + my ($port, $name) = @_; + + $REG{$name} = $port; +} + +=item rcv $port, $callback->(@msg) + +Replaces the callback on the specified miniport (after converting it to +one if required). + +=item rcv $port, tagstring => $callback->(@msg), ... + +=item rcv $port, $smartmatch => $callback->(@msg), ... + +=item rcv $port, [$smartmatch...] => $callback->(@msg), ... + +Register callbacks to be called on matching messages on the given full +port (after converting it to one if required). + +The callback has to return a true value when its work is done, after +which is will be removed, or a false value in which case it will stay +registered. + +The global C<$SELF> (exported by this module) contains C<$port> while +executing the callback. + +Runtime errors wdurign callback execution will result in the port being +Ced. + +If the match is an array reference, then it will be matched against the +first elements of the message, otherwise only the first element is being +matched. + +Any element in the match that is specified as C<_any_> (a function +exported by this module) matches any single element of the message. + +While not required, it is highly recommended that the first matching +element is a string identifying the message. The one-string-only match is +also the most efficient match (by far). + +=cut + +sub rcv($@) { + my $port = shift; + my ($noderef, $portid) = split /#/, $port, 2; + + ($NODE{$noderef} || add_node $noderef) == $NODE{""} + or Carp::croak "$port: rcv can only be called on local ports, caught"; + + if (@_ == 1) { + my $cb = shift; + delete $PORT_DATA{$portid}; + $PORT{$portid} = sub { + local $SELF = $port; + eval { + &$cb + and kil $port; + }; + _self_die if $@; + }; + } else { + my $self = $PORT_DATA{$portid} ||= do { + my $self = bless { + id => $port, + }, "AnyEvent::MP::Port"; + + $PORT{$portid} = sub { + local $SELF = $port; + + eval { + for (@{ $self->{rc0}{$_[0]} }) { + $_ && &{$_->[0]} + && undef $_; + } + + for (@{ $self->{rcv}{$_[0]} }) { + $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1] + && &{$_->[0]} + && undef $_; + } + + for (@{ $self->{any} }) { + $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1] + && &{$_->[0]} + && undef $_; + } + }; + _self_die if $@; + }; + + $self + }; + + "AnyEvent::MP::Port" eq ref $self + or Carp::croak "$port: rcv can only be called on message matching ports, caught"; + + while (@_) { + my ($match, $cb) = splice @_, 0, 2; + + if (!ref $match) { + push @{ $self->{rc0}{$match} }, [$cb]; + } elsif (("ARRAY" eq ref $match && !ref $match->[0])) { + my ($type, @match) = @$match; + @match + ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match] + : push @{ $self->{rc0}{$match->[0]} }, [$cb]; + } else { + push @{ $self->{any} }, [$cb, $match]; + } + } + } + + $port +} + +=item $closure = psub { BLOCK } + +Remembers C<$SELF> and creates a closure out of the BLOCK. When the +closure is executed, sets up the environment in the same way as in C +callbacks, i.e. runtime errors will cause the port to get Ced. + +This is useful when you register callbacks from C callbacks: + + rcv delayed_reply => sub { + my ($delay, @reply) = @_; + my $timer = AE::timer $delay, 0, psub { + snd @reply, $SELF; + }; + }; + +=cut + +sub psub(&) { + my $cb = shift; + + my $port = $SELF + or Carp::croak "psub can only be called from within rcv or psub callbacks, not"; + + sub { + local $SELF = $port; + + if (wantarray) { + my @res = eval { &$cb }; + _self_die if $@; + @res + } else { + my $res = eval { &$cb }; + _self_die if $@; + $res + } + } +} + +=item $guard = mon $port, $cb->(@reason) + +=item $guard = mon $port, $otherport + +=item $guard = mon $port, $otherport, @msg Monitor the given port and do something when the port is killed. @@ -300,208 +490,87 @@ It means that if either one is killed abnormally, the other one gets killed as well. -=item $local_port = port - -Create a new local port object that supports message matching. - -=item $portid = port { my @msg = @_; $finished } - -Creates a "mini port", that is, a very lightweight port without any -pattern matching behind it, and returns its ID. +=item kil $port[, @reason] -The block will be called for every message received on the port. When the -callback returns a true value its job is considered "done" and the port -will be destroyed. Otherwise it will stay alive. - -The message will be passed as-is, no extra argument (i.e. no port id) will -be passed to the callback. - -If you need the local port id in the callback, this works nicely: - - my $port; $port = miniport { - snd $otherport, reply => $port; - }; - -=cut - -sub port(;&) { - my $id = "$UNIQ." . $ID++; - my $port = "$NODE#$id"; - - if (@_) { - my $cb = shift; - $PORT{$id} = sub { - local $SELF = $port; - eval { - &$cb - and kil $id; - }; - _self_die if $@; - }; - } else { - my $self = bless { - id => "$NODE#$id", - }, "AnyEvent::MP::Port"; - - $PORT_DATA{$id} = $self; - $PORT{$id} = sub { - local $SELF = $port; - - eval { - for (@{ $self->{rc0}{$_[0]} }) { - $_ && &{$_->[0]} - && undef $_; - } - - for (@{ $self->{rcv}{$_[0]} }) { - $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1] - && &{$_->[0]} - && undef $_; - } - - for (@{ $self->{any} }) { - $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1] - && &{$_->[0]} - && undef $_; - } - }; - _self_die if $@; - }; - } - - $port -} - -=item reg $portid, $name - -Registers the given port under the name C<$name>. If the name already -exists it is replaced. - -A port can only be registered under one well known name. - -A port automatically becomes unregistered when it is killed. - -=cut - -sub reg(@) { - my ($portid, $name) = @_; - - $REG{$name} = $portid; -} - -=item rcv $portid, tagstring => $callback->(@msg), ... +Kill the specified port with the given C<@reason>. -=item rcv $portid, $smartmatch => $callback->(@msg), ... +If no C<@reason> is specified, then the port is killed "normally" (linked +ports will not be kileld, or even notified). -=item rcv $portid, [$smartmatch...] => $callback->(@msg), ... +Otherwise, linked ports get killed with the same reason (second form of +C, see below). -Register callbacks to be called on matching messages on the given port. +Runtime errors while evaluating C callbacks or inside C blocks +will be reported as reason C<< die => $@ >>. -The callback has to return a true value when its work is done, after -which is will be removed, or a false value in which case it will stay -registered. +Transport/communication errors are reported as C<< transport_error => +$message >>. -The global C<$SELF> (exported by this module) contains C<$portid> while -executing the callback. +=back -Runtime errors wdurign callback execution will result in the port being -Ced. +=head1 FUNCTIONS FOR NODES -If the match is an array reference, then it will be matched against the -first elements of the message, otherwise only the first element is being -matched. +=over 4 -Any element in the match that is specified as C<_any_> (a function -exported by this module) matches any single element of the message. +=item initialise_node $noderef, $seednode, $seednode... -While not required, it is highly recommended that the first matching -element is a string identifying the message. The one-string-only match is -also the most efficient match (by far). +=item initialise_node "slave/", $master, $master... -=cut +Initialises a node - must be called exactly once before calling other +AnyEvent::MP functions when talking to other nodes is required. -sub rcv($@) { - my ($noderef, $port) = split /#/, shift, 2; +All arguments are noderefs, which can be either resolved or unresolved. - ($NODE{$noderef} || add_node $noderef) == $NODE{""} - or Carp::croak "$noderef#$port: rcv can only be called on local ports, caught"; +There are two types of networked nodes, public nodes and slave nodes: - my $self = $PORT_DATA{$port} - or Carp::croak "$noderef#$port: rcv can only be called on message matching ports, caught"; +=over 4 - "AnyEvent::MP::Port" eq ref $self - or Carp::croak "$noderef#$port: rcv can only be called on message matching ports, caught"; +=item public nodes - while (@_) { - my ($match, $cb) = splice @_, 0, 2; - - if (!ref $match) { - push @{ $self->{rc0}{$match} }, [$cb]; - } elsif (("ARRAY" eq ref $match && !ref $match->[0])) { - my ($type, @match) = @$match; - @match - ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match] - : push @{ $self->{rc0}{$match->[0]} }, [$cb]; - } else { - push @{ $self->{any} }, [$cb, $match]; - } - } -} +For public nodes, C<$noderef> must either be a (possibly unresolved) +noderef, in which case it will be resolved, or C (or missing), in +which case the noderef will be guessed. -=item $closure = psub { BLOCK } +Afterwards, the node will bind itself on all endpoints and try to connect +to all additional C<$seednodes> that are specified. Seednodes are optional +and can be used to quickly bootstrap the node into an existing network. -Remembers C<$SELF> and creates a closure out of the BLOCK. When the -closure is executed, sets up the environment in the same way as in C -callbacks, i.e. runtime errors will cause the port to get Ced. +=item slave nodes -This is useful when you register callbacks from C callbacks: +When the C<$noderef> is the special string C, then the node will +become a slave node. Slave nodes cannot be contacted from outside and will +route most of their traffic to the master node that they attach to. - rcv delayed_reply => sub { - my ($delay, @reply) = @_; - my $timer = AE::timer $delay, 0, psub { - snd @reply, $SELF; - }; - }; +At least one additional noderef is required: The node will try to connect +to all of them and will become a slave attached to the first node it can +successfully connect to. -=cut +=back -sub psub(&) { - my $cb = shift; +This function will block until all nodes have been resolved and, for slave +nodes, until it has successfully established a connection to a master +server. - my $port = $SELF - or Carp::croak "psub can only be called from within rcv or psub callbacks, not"; +Example: become a public node listening on the default node. - sub { - local $SELF = $port; + initialise_node; - if (wantarray) { - my @res = eval { &$cb }; - _self_die if $@; - @res - } else { - my $res = eval { &$cb }; - _self_die if $@; - $res - } - } -} +Example: become a public node, and try to contact some well-known master +servers to become part of the network. -=back + initialise_node undef, "master1", "master2"; -=head1 FUNCTIONS FOR NODES +Example: become a public node listening on port C<4041>. -=over 4 + initialise_node 4041; -=item become_public $noderef +Example: become a public node, only visible on localhost port 4044. -Tells the node to become a public node, i.e. reachable from other nodes. + initialise_node "locahost:4044"; -The first argument is the (unresolved) node reference of the local node -(if missing then the empty string is used). +Example: become a slave node to any of the specified master servers. -It is quite common to not specify anything, in which case the local node -tries to listen on the default port, or to only specify a port number, in -which case AnyEvent::MP tries to guess the local addresses. + initialise_node "slave/", "master1", "192.168.13.17", "mp.example.net"; =cut