--- AnyEvent-MP/MP.pm 2009/08/01 10:02:33 1.6 +++ AnyEvent-MP/MP.pm 2009/08/02 18:08:38 1.11 @@ -74,11 +74,8 @@ package AnyEvent::MP; -use AnyEvent::MP::Util (); -use AnyEvent::MP::Node; -use AnyEvent::MP::Transport; +use AnyEvent::MP::Base; -use utf8; use common::sense; use Carp (); @@ -87,26 +84,13 @@ use base "Exporter"; -our $VERSION = '0.01'; -our @EXPORT = qw(NODE $NODE $PORT snd rcv _any_); - -our $DEFAULT_SECRET; -our $DEFAULT_PORT = "4040"; - -our $CONNECT_INTERVAL = 5; # new connect every 5s, at least -our $CONNECT_TIMEOUT = 30; # includes handshake - -sub default_secret { - unless (defined $DEFAULT_SECRET) { - if (open my $fh, "<$ENV{HOME}/.aemp-secret") { - sysread $fh, $DEFAULT_SECRET, -s $fh; - } else { - $DEFAULT_SECRET = AnyEvent::MP::Util::nonce 32; - } - } - - $DEFAULT_SECRET -} +our $VERSION = '0.02'; +our @EXPORT = qw( + NODE $NODE $PORT snd rcv _any_ + create_port create_port_on + create_miniport + become_slave become_public +); =item NODE / $NODE @@ -114,58 +98,17 @@ the local node. The value is initialised by a call to C or C, after which all local port identifiers become invalid. -=cut - -our $UNIQ = sprintf "%x.%x", $$, time; # per-process/node unique cookie -our $ID = "a0"; -our $PUBLIC = 0; -our $NODE; -our $PORT; - -our %NODE; # node id to transport mapping, or "undef", for local node -our %PORT; # local ports -our %LISTENER; # local transports - -sub NODE() { $NODE } - -{ - use POSIX (); - my $nodename = (POSIX::uname)[1]; - $NODE = "$$\@$nodename"; -} - -sub _ANY_() { 1 } -sub _any_() { \&_ANY_ } - -sub add_node { - my ($noderef) = @_; - - return $NODE{$noderef} - if exists $NODE{$noderef}; - - for (split /,/, $noderef) { - return $NODE{$noderef} = $NODE{$_} - if exists $NODE{$_}; - } - - # for indirect sends, use a different class - my $node = new AnyEvent::MP::Node::Direct $noderef; - - $NODE{$_} = $node - for $noderef, split /,/, $noderef; - - $node -} - =item snd $portid, type => @data =item snd $portid, @msg -Send the given message to the given port ID, which can identify either a -local or a remote port. - -While the message can be about anything, it is highly recommended to use -a constant string as first element. +Send the given message to the given port ID, which can identify either +a local or a remote port, and can be either a string or soemthignt hat +stringifies a sa port ID (such as a port object :). + +While the message can be about anything, it is highly recommended to use a +string as first element (a portid, or some word that indicates a request +type etc.). The message data effectively becomes read-only after a call to this function: modifying any argument is not allowed and can cause many @@ -177,25 +120,88 @@ that Storable can serialise and deserialise is allowed, and for the local node, anything can be passed. +=item $local_port = create_port + +Create a new local port object. See the next section for allowed methods. + =cut -sub snd(@) { - my ($noderef, $port) = split /#/, shift, 2; +sub create_port { + my $id = "$AnyEvent::MP::Base::UNIQ." . ++$AnyEvent::MP::Base::ID; + + my $self = bless { + id => "$NODE#$id", + names => [$id], + }, "AnyEvent::MP::Port"; + + $AnyEvent::MP::Base::PORT{$id} = sub { + unshift @_, $self; + + for (@{ $self->{rc0}{$_[1]} }) { + $_ && &{$_->[0]} + && undef $_; + } + + for (@{ $self->{rcv}{$_[1]} }) { + $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1] + && &{$_->[0]} + && undef $_; + } + + for (@{ $self->{any} }) { + $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1] + && &{$_->[0]} + && undef $_; + } + }; + + $self +} + +=item $portid = create_miniport { } + +Creates a "mini port", that is, a port without much #TODO + +=cut - add_node $noderef - unless exists $NODE{$noderef}; +sub create_miniport(&) { + my $cb = shift; + my $id = "$AnyEvent::MP::Base::UNIQ." . ++$AnyEvent::MP::Base::ID; + + $AnyEvent::MP::Base::PORT{$id} = sub { + &$cb + and delete $AnyEvent::MP::Base::PORT{$id}; + }; - $NODE{$noderef}->send (["$port", [@_]]); + "$NODE#$id" } -=item rcv $portid, type => $callback->(@msg) +package AnyEvent::MP::Port; -=item rcv $portid, $smartmatch => $callback->(@msg) +=back -=item rcv $portid, [$smartmatch...] => $callback->(@msg) +=head1 METHODS FOR PORT OBJECTS + +=over 4 -Register a callback on the port identified by C<$portid>, which I be -a local port. +=item "$port" + +A port object stringifies to its port ID, so can be used directly for +C operations. + +=cut + +use overload + '""' => sub { $_[0]{id} }, + fallback => 1; + +=item $port->rcv (type => $callback->($port, @msg)) + +=item $port->rcv ($smartmatch => $callback->($port, @msg)) + +=item $port->rcv ([$smartmatch...] => $callback->($port, @msg)) + +Register a callback on the given port. The callback has to return a true value when its work is done, after which is will be removed, or a false value in which case it will stay @@ -215,159 +221,78 @@ =cut sub rcv($@) { - my ($port, $match, $cb) = @_; - - my $port = $PORT{$port} - or do { - my ($noderef, $lport) = split /#/, $port; - "AnyEvent::MP::Node::Self" eq ref $NODE{$noderef} - or Carp::croak "$port: can only rcv on local ports"; - - $PORT{$lport} - or Carp::croak "$port: port does not exist"; - - $PORT{$port} = $PORT{$lport} # also return - }; + my ($self, $match, $cb) = @_; if (!ref $match) { - push @{ $port->{rc0}{$match} }, [$cb]; + push @{ $self->{rc0}{$match} }, [$cb]; } elsif (("ARRAY" eq ref $match && !ref $match->[0])) { my ($type, @match) = @$match; @match - ? push @{ $port->{rcv}{$match->[0]} }, [$cb, \@match] - : push @{ $port->{rc0}{$match->[0]} }, [$cb]; + ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match] + : push @{ $self->{rc0}{$match->[0]} }, [$cb]; } else { - push @{ $port->{any} }, [$cb, $match]; + push @{ $self->{any} }, [$cb, $match]; } } -sub _inject { - my ($port, $msg) = @{+shift}; +=item $port->register ($name) - $port = $PORT{$port} - or return; +Registers the given port under the well known name C<$name>. If the name +already exists it is replaced. - @_ = @$msg; +A port can only be registered under one well known name. - for (@{ $port->{rc0}{$msg->[0]} }) { - $_ && &{$_->[0]} - && undef $_; - } +=cut - for (@{ $port->{rcv}{$msg->[0]} }) { - $_ && [@_[1..$#{$_->[1]}]] ~~ $_->[1] - && &{$_->[0]} - && undef $_; - } +sub register { + my ($self, $name) = @_; - for (@{ $port->{any} }) { - $_ && [@_[0..$#{$_->[1]}]] ~~ $_->[1] - && &{$_->[0]} - && undef $_; - } + $self->{wkname} = $name; + $AnyEvent::MP::Base::WKP{$name} = "$self"; } -sub normalise_noderef($) { - my ($noderef) = @_; +=item $port->destroy - my $cv = AE::cv; - my @res; +Explicitly destroy/remove/nuke/vaporise the port. - $cv->begin (sub { - my %seen; - my @refs; - for (sort { $a->[0] <=> $b->[0] } @res) { - push @refs, $_->[1] unless $seen{$_->[1]}++ - } - shift->send (join ",", @refs); - }); +Ports are normally kept alive by there mere existance alone, and need to +be destroyed explicitly. - $noderef = $DEFAULT_PORT unless length $noderef; +=cut - my $idx; - for my $t (split /,/, $noderef) { - my $pri = ++$idx; - - #TODO: this should be outside normalise_noderef and in become_public - if ($t =~ /^\d*$/) { - my $nodename = (POSIX::uname)[1]; - - $cv->begin; - AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub { - for (@_) { - my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3]; - push @res, [ - $pri += 1e-5, - AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service - ]; - } - $cv->end; - }; - -# my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename; -# -# for (@ipv4) { -# push @res, [ -# $pri, -# AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT, -# ]; -# } - } else { - my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT" - or Carp::croak "$t: unparsable transport descriptor"; - - $cv->begin; - AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub { - for (@_) { - my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3]; - push @res, [ - $pri += 1e-5, - AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service - ]; - } - $cv->end; - } - } - } +sub destroy { + my ($self) = @_; - $cv->end; + delete $AnyEvent::MP::Base::WKP{ $self->{wkname} }; - $cv + delete $AnyEvent::MP::Base::PORT{$_} + for @{ $self->{names} }; } -sub become_public { - return if $PUBLIC; +=back - my $noderef = join ",", ref $_[0] ? @{+shift} : shift; - my @args = @_; +=head1 FUNCTIONS FOR NODES - $NODE = (normalise_noderef $noderef)->recv; +=over 4 - for my $t (split /,/, $NODE) { - $NODE{$t} = $NODE{""}; - - my ($host, $port) = AnyEvent::Socket::parse_hostport $t; - - $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port, - @args, - on_error => sub { - die "on_error<@_>\n";#d# - }, - on_connect => sub { - my ($tp) = @_; - - $NODE{$tp->{remote_id}} = $_[0]; - }, - sub { - my ($tp) = @_; - - $NODE{"$tp->{peerhost}:$tp->{peerport}"} = $tp; - }, - ; - } +=item mon $noderef, $callback->($noderef, $status, $) - $PUBLIC = 1; -} +Monitors the given noderef. + +=item become_public endpoint... + +Tells the node to become a public node, i.e. reachable from other nodes. + +If no arguments are given, or the first argument is C, then +AnyEvent::MP tries to bind on port C<4040> on all IP addresses that the +local nodename resolves to. + +Otherwise the first argument must be an array-reference with transport +endpoints ("ip:port", "hostname:port") or port numbers (in which case the +local nodename is used as hostname). The endpoints are all resolved and +will become the node reference. + +=cut =back @@ -382,31 +307,18 @@ =cut -############################################################################# -# self node code - -sub _new_port($) { - my ($name) = @_; +=item wkp => $name, @reply - my ($noderef, $portname) = split /#/, $name; +Replies with the port ID of the specified well-known port, or C. - $PORT{$name} = - $PORT{$portname} = { - names => [$name, $portname], - }; -} +=item devnull => ... -$NODE{""} = new AnyEvent::MP::Node::Self noderef => $NODE; -_new_port ""; +Generic data sink/CPU heat conversion. =item relay => $port, @msg Simply forwards the message to the given port. -=cut - -rcv "", relay => \&snd; - =item eval => $string[ @reply] Evaluates the given string. If C<@reply> is given, then a message of the @@ -416,14 +328,6 @@ snd $othernode, eval => "exit"; -=cut - -rcv "", eval => sub { - my (undef, $string, @reply) = @_; - my @res = eval $string; - snd @reply, "$@", @res if @reply; -}; - =item time => @reply Replies the the current node time to C<@reply>. @@ -434,10 +338,6 @@ snd $NODE, time => $myport, timereply => 1, 2; # => snd $myport, timereply => 1, 2,