--- AnyEvent-MP/MP.pm 2009/07/31 20:55:46 1.2 +++ AnyEvent-MP/MP.pm 2009/08/02 18:26:00 1.14 @@ -29,6 +29,10 @@ Despite its simplicity, you can securely message other processes running on the same or other hosts. +At the moment, this module family is severly brokena nd underdocumented, +so do not use. This was uploaded mainly to resreve the CPAN namespace - +stay tuned! + =head1 CONCEPTS =over 4 @@ -39,14 +43,10 @@ you can register C handlers with. All C handlers will receive messages they match, messages will not be queued. -=item port id - C - -A port id is always the node id, a hash-mark (C<#>) as separator, followed -by a port name. +=item port id - C -A port name can be a well known port (basically an identifier/bareword), -or a generated name, consisting of node id, a dot (C<.>), and an -identifier. +A port id is always the noderef, a hash-mark (C<#>) as separator, followed +by a port name (a printable string of unspecified format). =item node @@ -55,21 +55,18 @@ among other things. Initially, nodes are either private (single-process only) or hidden -(connected to a father node only). Only when they epxlicitly "go public" -can you send them messages form unrelated other nodes. - -Public nodes automatically connect to all other public nodes in a network -when they connect, creating a full mesh. +(connected to a master node only). Only when they epxlicitly "become +public" can you send them messages from unrelated other nodes. -=item node id - C, C, C +=item noderef - C, C, C -A node ID is a string that either uniquely identifies a given node (For +A noderef is a string that either uniquely identifies a given node (for private and hidden nodes), or contains a recipe on how to reach a given node (for public nodes). =back -=head1 FUNCTIONS +=head1 VARIABLES/FUNCTIONS =over 4 @@ -77,11 +74,8 @@ package AnyEvent::MP; -use AnyEvent::MP::Util (); -use AnyEvent::MP::Node; -use AnyEvent::MP::Transport; +use AnyEvent::MP::Base; -use utf8; use common::sense; use Carp (); @@ -90,191 +84,260 @@ use base "Exporter"; -our $VERSION = '0.0'; -our @EXPORT = qw(NODE $NODE $PORT snd rcv _any_); +our $VERSION = '0.02'; +our @EXPORT = qw( + NODE $NODE $PORT snd rcv _any_ + create_port create_port_on + create_miniport + become_slave become_public +); + +=item NODE / $NODE + +The C function and the C<$NODE> variable contain the noderef of +the local node. The value is initialised by a call to C or +C, after which all local port identifiers become invalid. + +=item snd $portid, type => @data + +=item snd $portid, @msg + +Send the given message to the given port ID, which can identify either +a local or a remote port, and can be either a string or soemthignt hat +stringifies a sa port ID (such as a port object :). + +While the message can be about anything, it is highly recommended to use a +string as first element (a portid, or some word that indicates a request +type etc.). + +The message data effectively becomes read-only after a call to this +function: modifying any argument is not allowed and can cause many +problems. -our $DEFAULT_SECRET; -our $DEFAULT_PORT = "4040"; +The type of data you can transfer depends on the transport protocol: when +JSON is used, then only strings, numbers and arrays and hashes consisting +of those are allowed (no objects). When Storable is used, then anything +that Storable can serialise and deserialise is allowed, and for the local +node, anything can be passed. -our $CONNECT_INTERVAL = 5; # new connect every 5s, at least -our $CONNECT_TIMEOUT = 30; # includes handshake +=item $local_port = create_port -sub default_secret { - unless (defined $DEFAULT_SECRET) { - if (open my $fh, "<$ENV{HOME}/.aemp-secret") { - sysread $fh, $DEFAULT_SECRET, -s $fh; - } else { - $DEFAULT_SECRET = AnyEvent::MP::Util::nonce 32; +Create a new local port object. See the next section for allowed methods. + +=cut + +sub create_port { + my $id = "$AnyEvent::MP::Base::UNIQ." . ++$AnyEvent::MP::Base::ID; + + my $self = bless { + id => "$NODE#$id", + names => [$id], + }, "AnyEvent::MP::Port"; + + $AnyEvent::MP::Base::PORT{$id} = sub { + unshift @_, $self; + + for (@{ $self->{rc0}{$_[1]} }) { + $_ && &{$_->[0]} + && undef $_; } - } - $DEFAULT_SECRET + for (@{ $self->{rcv}{$_[1]} }) { + $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1] + && &{$_->[0]} + && undef $_; + } + + for (@{ $self->{any} }) { + $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1] + && &{$_->[0]} + && undef $_; + } + }; + + $self } -our $UNIQ = sprintf "%x.%x", $$, time; # per-process/node unique cookie -our $PUBLIC = 0; -our $NODE; -our $PORT; - -our %NODE; # node id to transport mapping, or "undef", for local node -our %PORT; # local ports -our %LISTENER; # local transports - -sub NODE() { $NODE } - -{ - use POSIX (); - my $nodename = (POSIX::uname)[1]; - $NODE = "$$\@$nodename"; +=item $portid = create_miniport { } + +Creates a "mini port", that is, a port without much #TODO + +=cut + +sub create_miniport(&) { + my $cb = shift; + my $id = "$AnyEvent::MP::Base::UNIQ." . ++$AnyEvent::MP::Base::ID; + + $AnyEvent::MP::Base::PORT{$id} = sub { +# unshift @_, "$NODE#$id"; + &$cb + and delete $AnyEvent::MP::Base::PORT{$id}; + }; + + "$NODE#$id" } -sub _ANY_() { 1 } -sub _any_() { \&_ANY_ } +package AnyEvent::MP::Port; -sub add_node { - my ($noderef) = @_; +=back - return $NODE{$noderef} - if exists $NODE{$noderef}; +=head1 METHODS FOR PORT OBJECTS - for (split /,/, $noderef) { - return $NODE{$noderef} = $NODE{$_} - if exists $NODE{$_}; - } +=over 4 - # for indirect sends, use a different class - my $node = new AnyEvent::MP::Node::Direct $noderef; +=item "$port" - $NODE{$_} = $node - for $noderef, split /,/, $noderef; +A port object stringifies to its port ID, so can be used directly for +C operations. - $node -} +=cut + +use overload + '""' => sub { $_[0]{id} }, + fallback => 1; + +=item $port->rcv (type => $callback->($port, @msg)) + +=item $port->rcv ($smartmatch => $callback->($port, @msg)) + +=item $port->rcv ([$smartmatch...] => $callback->($port, @msg)) + +Register a callback on the given port. + +The callback has to return a true value when its work is done, after +which is will be removed, or a false value in which case it will stay +registered. + +If the match is an array reference, then it will be matched against the +first elements of the message, otherwise only the first element is being +matched. + +Any element in the match that is specified as C<_any_> (a function +exported by this module) matches any single element of the message. -sub snd($@) { - my ($noderef, $port) = split /#/, shift, 2; +While not required, it is highly recommended that the first matching +element is a string identifying the message. The one-string-only match is +also the most efficient match (by far). - add_node $noderef - unless exists $NODE{$noderef}; +=cut + +sub rcv($@) { + my ($self, $match, $cb) = @_; - $NODE{$noderef}->send ([$port, [@_]]); + if (!ref $match) { + push @{ $self->{rc0}{$match} }, [$cb]; + } elsif (("ARRAY" eq ref $match && !ref $match->[0])) { + my ($type, @match) = @$match; + @match + ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match] + : push @{ $self->{rc0}{$match->[0]} }, [$cb]; + } else { + push @{ $self->{any} }, [$cb, $match]; + } } -sub _inject { - my ($port, $msg) = @{+shift}; +=item $port->register ($name) + +Registers the given port under the well known name C<$name>. If the name +already exists it is replaced. + +A port can only be registered under one well known name. - $port = $PORT{$port} - or return; +=cut + +sub register { + my ($self, $name) = @_; - use Data::Dumper; - warn Dumper $msg; + $self->{wkname} = $name; + $AnyEvent::MP::Base::WKP{$name} = "$self"; } -sub normalise_noderef($) { - my ($noderef) = @_; +=item $port->destroy - my $cv = AE::cv; - my @res; +Explicitly destroy/remove/nuke/vaporise the port. - $cv->begin (sub { - my %seen; - my @refs; - for (sort { $a->[0] <=> $b->[0] } @res) { - push @refs, $_->[1] unless $seen{$_->[1]}++ - } - shift->send (join ",", @refs); - }); +Ports are normally kept alive by there mere existance alone, and need to +be destroyed explicitly. - $noderef = $DEFAULT_PORT unless length $noderef; +=cut - my $idx; - for my $t (split /,/, $noderef) { - my $pri = ++$idx; - - #TODO: this should be outside normalise_noderef and in become_public - if ($t =~ /^\d*$/) { - my $nodename = (POSIX::uname)[1]; - - $cv->begin; - AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub { - for (@_) { - my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3]; - push @res, [ - $pri += 1e-5, - AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service - ]; - } - $cv->end; - }; - -# my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename; -# -# for (@ipv4) { -# push @res, [ -# $pri, -# AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT, -# ]; -# } - } else { - my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT" - or Carp::croak "$t: unparsable transport descriptor"; - - $cv->begin; - AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub { - for (@_) { - my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3]; - push @res, [ - $pri += 1e-5, - AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service - ]; - } - $cv->end; - } - } - } +sub destroy { + my ($self) = @_; - $cv->end; + delete $AnyEvent::MP::Base::WKP{ $self->{wkname} }; - $cv + delete $AnyEvent::MP::Base::PORT{$_} + for @{ $self->{names} }; } -sub become_public { - return if $PUBLIC; +=back - my $noderef = join ",", ref $_[0] ? @{+shift} : shift; - my @args = @_; +=head1 FUNCTIONS FOR NODES - $NODE = (normalise_noderef $noderef)->recv; +=over 4 - my $self = new AnyEvent::MP::Node::Self noderef => $NODE; +=item mon $noderef, $callback->($noderef, $status, $) - $NODE{""} = $self; # empty string == local node +Monitors the given noderef. - for my $t (split /,/, $NODE) { - $NODE{$t} = $self; +=item become_public endpoint... - my ($host, $port) = AnyEvent::Socket::parse_hostport $t; +Tells the node to become a public node, i.e. reachable from other nodes. - $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port, - @args, - on_error => sub { - die "on_error<@_>\n";#d# - }, - on_connect => sub { - my ($tp) = @_; +If no arguments are given, or the first argument is C, then +AnyEvent::MP tries to bind on port C<4040> on all IP addresses that the +local nodename resolves to. - $NODE{$tp->{remote_id}} = $_[0]; - }, - sub { - my ($tp) = @_; +Otherwise the first argument must be an array-reference with transport +endpoints ("ip:port", "hostname:port") or port numbers (in which case the +local nodename is used as hostname). The endpoints are all resolved and +will become the node reference. - $NODE{"$tp->{peerhost}:$tp->{peerport}"} = $tp; - }, - ; - } +=cut - $PUBLIC = 1; -} +=back + +=head1 NODE MESSAGES + +Nodes understand the following messages sent to them. Many of them take +arguments called C<@reply>, which will simply be used to compose a reply +message - C<$reply[0]> is the port to reply to, C<$reply[1]> the type and +the remaining arguments are simply the message data. + +=over 4 + +=cut + +=item wkp => $name, @reply + +Replies with the port ID of the specified well-known port, or C. + +=item devnull => ... + +Generic data sink/CPU heat conversion. + +=item relay => $port, @msg + +Simply forwards the message to the given port. + +=item eval => $string[ @reply] + +Evaluates the given string. If C<@reply> is given, then a message of the +form C<@reply, $@, @evalres> is sent. + +Example: crash another node. + + snd $othernode, eval => "exit"; + +=item time => @reply + +Replies the the current node time to C<@reply>. + +Example: tell the current node to send the current time to C<$myport> in a +C message. + + snd $NODE, time => $myport, timereply => 1, 2; + # => snd $myport, timereply => 1, 2,