--- AnyEvent-MP/MP.pm 2009/07/31 20:55:46 1.2 +++ AnyEvent-MP/MP.pm 2009/08/01 10:02:33 1.6 @@ -29,6 +29,10 @@ Despite its simplicity, you can securely message other processes running on the same or other hosts. +At the moment, this module family is severly brokena nd underdocumented, +so do not use. This was uploaded mainly to resreve the CPAN namespace - +stay tuned! + =head1 CONCEPTS =over 4 @@ -39,14 +43,10 @@ you can register C handlers with. All C handlers will receive messages they match, messages will not be queued. -=item port id - C - -A port id is always the node id, a hash-mark (C<#>) as separator, followed -by a port name. +=item port id - C -A port name can be a well known port (basically an identifier/bareword), -or a generated name, consisting of node id, a dot (C<.>), and an -identifier. +A port id is always the noderef, a hash-mark (C<#>) as separator, followed +by a port name (a printable string of unspecified format). =item node @@ -55,21 +55,18 @@ among other things. Initially, nodes are either private (single-process only) or hidden -(connected to a father node only). Only when they epxlicitly "go public" -can you send them messages form unrelated other nodes. - -Public nodes automatically connect to all other public nodes in a network -when they connect, creating a full mesh. +(connected to a master node only). Only when they epxlicitly "become +public" can you send them messages from unrelated other nodes. -=item node id - C, C, C +=item noderef - C, C, C -A node ID is a string that either uniquely identifies a given node (For +A noderef is a string that either uniquely identifies a given node (for private and hidden nodes), or contains a recipe on how to reach a given node (for public nodes). =back -=head1 FUNCTIONS +=head1 VARIABLES/FUNCTIONS =over 4 @@ -90,7 +87,7 @@ use base "Exporter"; -our $VERSION = '0.0'; +our $VERSION = '0.01'; our @EXPORT = qw(NODE $NODE $PORT snd rcv _any_); our $DEFAULT_SECRET; @@ -111,7 +108,16 @@ $DEFAULT_SECRET } +=item NODE / $NODE + +The C function and the C<$NODE> variable contain the noderef of +the local node. The value is initialised by a call to C or +C, after which all local port identifiers become invalid. + +=cut + our $UNIQ = sprintf "%x.%x", $$, time; # per-process/node unique cookie +our $ID = "a0"; our $PUBLIC = 0; our $NODE; our $PORT; @@ -151,13 +157,88 @@ $node } -sub snd($@) { +=item snd $portid, type => @data + +=item snd $portid, @msg + +Send the given message to the given port ID, which can identify either a +local or a remote port. + +While the message can be about anything, it is highly recommended to use +a constant string as first element. + +The message data effectively becomes read-only after a call to this +function: modifying any argument is not allowed and can cause many +problems. + +The type of data you can transfer depends on the transport protocol: when +JSON is used, then only strings, numbers and arrays and hashes consisting +of those are allowed (no objects). When Storable is used, then anything +that Storable can serialise and deserialise is allowed, and for the local +node, anything can be passed. + +=cut + +sub snd(@) { my ($noderef, $port) = split /#/, shift, 2; add_node $noderef unless exists $NODE{$noderef}; - $NODE{$noderef}->send ([$port, [@_]]); + $NODE{$noderef}->send (["$port", [@_]]); +} + +=item rcv $portid, type => $callback->(@msg) + +=item rcv $portid, $smartmatch => $callback->(@msg) + +=item rcv $portid, [$smartmatch...] => $callback->(@msg) + +Register a callback on the port identified by C<$portid>, which I be +a local port. + +The callback has to return a true value when its work is done, after +which is will be removed, or a false value in which case it will stay +registered. + +If the match is an array reference, then it will be matched against the +first elements of the message, otherwise only the first element is being +matched. + +Any element in the match that is specified as C<_any_> (a function +exported by this module) matches any single element of the message. + +While not required, it is highly recommended that the first matching +element is a string identifying the message. The one-string-only match is +also the most efficient match (by far). + +=cut + +sub rcv($@) { + my ($port, $match, $cb) = @_; + + my $port = $PORT{$port} + or do { + my ($noderef, $lport) = split /#/, $port; + "AnyEvent::MP::Node::Self" eq ref $NODE{$noderef} + or Carp::croak "$port: can only rcv on local ports"; + + $PORT{$lport} + or Carp::croak "$port: port does not exist"; + + $PORT{$port} = $PORT{$lport} # also return + }; + + if (!ref $match) { + push @{ $port->{rc0}{$match} }, [$cb]; + } elsif (("ARRAY" eq ref $match && !ref $match->[0])) { + my ($type, @match) = @$match; + @match + ? push @{ $port->{rcv}{$match->[0]} }, [$cb, \@match] + : push @{ $port->{rc0}{$match->[0]} }, [$cb]; + } else { + push @{ $port->{any} }, [$cb, $match]; + } } sub _inject { @@ -166,8 +247,24 @@ $port = $PORT{$port} or return; - use Data::Dumper; - warn Dumper $msg; + @_ = @$msg; + + for (@{ $port->{rc0}{$msg->[0]} }) { + $_ && &{$_->[0]} + && undef $_; + } + + for (@{ $port->{rcv}{$msg->[0]} }) { + $_ && [@_[1..$#{$_->[1]}]] ~~ $_->[1] + && &{$_->[0]} + && undef $_; + } + + for (@{ $port->{any} }) { + $_ && [@_[0..$#{$_->[1]}]] ~~ $_->[1] + && &{$_->[0]} + && undef $_; + } } sub normalise_noderef($) { @@ -246,12 +343,8 @@ $NODE = (normalise_noderef $noderef)->recv; - my $self = new AnyEvent::MP::Node::Self noderef => $NODE; - - $NODE{""} = $self; # empty string == local node - for my $t (split /,/, $NODE) { - $NODE{$t} = $self; + $NODE{$t} = $NODE{""}; my ($host, $port) = AnyEvent::Socket::parse_hostport $t; @@ -278,6 +371,75 @@ =back +=head1 NODE MESSAGES + +Nodes understand the following messages sent to them. Many of them take +arguments called C<@reply>, which will simply be used to compose a reply +message - C<$reply[0]> is the port to reply to, C<$reply[1]> the type and +the remaining arguments are simply the message data. + +=over 4 + +=cut + +############################################################################# +# self node code + +sub _new_port($) { + my ($name) = @_; + + my ($noderef, $portname) = split /#/, $name; + + $PORT{$name} = + $PORT{$portname} = { + names => [$name, $portname], + }; +} + +$NODE{""} = new AnyEvent::MP::Node::Self noderef => $NODE; +_new_port ""; + +=item relay => $port, @msg + +Simply forwards the message to the given port. + +=cut + +rcv "", relay => \&snd; + +=item eval => $string[ @reply] + +Evaluates the given string. If C<@reply> is given, then a message of the +form C<@reply, $@, @evalres> is sent. + +Example: crash another node. + + snd $othernode, eval => "exit"; + +=cut + +rcv "", eval => sub { + my (undef, $string, @reply) = @_; + my @res = eval $string; + snd @reply, "$@", @res if @reply; +}; + +=item time => @reply + +Replies the the current node time to C<@reply>. + +Example: tell the current node to send the current time to C<$myport> in a +C message. + + snd $NODE, time => $myport, timereply => 1, 2; + # => snd $myport, timereply => 1, 2,