--- AnyEvent-MP/MP.pm 2009/08/01 15:04:30 1.7 +++ AnyEvent-MP/MP.pm 2009/08/04 22:13:45 1.27 @@ -6,11 +6,14 @@ use AnyEvent::MP; - NODE # returns this node identifier - $NODE # contains this node identifier + $NODE # contains this node's noderef + NODE # returns this node's noderef + NODE $port # returns the noderef of the port snd $port, type => data...; + $SELF # receiving/own port id in rcv callbacks + rcv $port, smartmatch => $cb->($port, @msg); # examples: @@ -29,9 +32,12 @@ Despite its simplicity, you can securely message other processes running on the same or other hosts. -At the moment, this module family is severly brokena nd underdocumented, -so do not use. This was uploaded mainly to resreve the CPAN namespace - -stay tuned! +For an introduction to this module family, see the L +manual page. + +At the moment, this module family is severly broken and underdocumented, +so do not use. This was uploaded mainly to reserve the CPAN namespace - +stay tuned! The basic API should be finished, however. =head1 CONCEPTS @@ -74,11 +80,8 @@ package AnyEvent::MP; -use AnyEvent::MP::Util (); -use AnyEvent::MP::Node; -use AnyEvent::MP::Transport; +use AnyEvent::MP::Base; -use utf8; use common::sense; use Carp (); @@ -87,120 +90,280 @@ use base "Exporter"; -our $VERSION = '0.01'; -our @EXPORT = qw(NODE $NODE $PORT snd rcv _any_); +our $VERSION = '0.1'; +our @EXPORT = qw( + NODE $NODE *SELF node_of _any_ + become_slave become_public + snd rcv mon kil reg psub + port +); + +our $SELF; + +sub _self_die() { + my $msg = $@; + $msg =~ s/\n+$// unless ref $msg; + kil $SELF, die => $msg; +} + +=item $thisnode = NODE / $NODE + +The C function returns, and the C<$NODE> variable contains +the noderef of the local node. The value is initialised by a call +to C or C, after which all local port +identifiers become invalid. + +=item $noderef = node_of $portid + +Extracts and returns the noderef from a portid or a noderef. + +=item $SELF + +Contains the current port id while executing C callbacks or C +blocks. + +=item SELF, %SELF, @SELF... + +Due to some quirks in how perl exports variables, it is impossible to +just export C<$SELF>, all the symbols called C are exported by this +module, but only C<$SELF> is currently used. + +=item snd $portid, type => @data + +=item snd $portid, @msg + +Send the given message to the given port ID, which can identify either +a local or a remote port, and can be either a string or soemthignt hat +stringifies a sa port ID (such as a port object :). + +While the message can be about anything, it is highly recommended to use a +string as first element (a portid, or some word that indicates a request +type etc.). + +The message data effectively becomes read-only after a call to this +function: modifying any argument is not allowed and can cause many +problems. + +The type of data you can transfer depends on the transport protocol: when +JSON is used, then only strings, numbers and arrays and hashes consisting +of those are allowed (no objects). When Storable is used, then anything +that Storable can serialise and deserialise is allowed, and for the local +node, anything can be passed. + +=item kil $portid[, @reason] + +Kill the specified port with the given C<@reason>. + +If no C<@reason> is specified, then the port is killed "normally" (linked +ports will not be kileld, or even notified). + +Otherwise, linked ports get killed with the same reason (second form of +C, see below). + +Runtime errors while evaluating C callbacks or inside C blocks +will be reported as reason C<< die => $@ >>. + +Transport/communication errors are reported as C<< transport_error => +$message >>. + +=item $guard = mon $portid, $cb->(@reason) + +=item $guard = mon $portid, $otherport + +=item $guard = mon $portid, $otherport, @msg + +Monitor the given port and do something when the port is killed. + +In the first form, the callback is simply called with any number +of C<@reason> elements (no @reason means that the port was deleted +"normally"). Note also that I<< the callback B never die >>, so use +C if unsure. + +In the second form, the other port will be C'ed with C<@reason>, iff +a @reason was specified, i.e. on "normal" kils nothing happens, while +under all other conditions, the other port is killed with the same reason. + +In the last form, a message of the form C<@msg, @reason> will be C. -our $DEFAULT_SECRET; -our $DEFAULT_PORT = "4040"; +Example: call a given callback when C<$port> is killed. -our $CONNECT_INTERVAL = 5; # new connect every 5s, at least -our $CONNECT_TIMEOUT = 30; # includes handshake + mon $port, sub { warn "port died because of <@_>\n" }; -sub default_secret { - unless (defined $DEFAULT_SECRET) { - if (open my $fh, "<$ENV{HOME}/.aemp-secret") { - sysread $fh, $DEFAULT_SECRET, -s $fh; +Example: kill ourselves when C<$port> is killed abnormally. + + mon $port, $self; + +Example: send us a restart message another C<$port> is killed. + + mon $port, $self => "restart"; + +=cut + +sub mon { + my ($noderef, $port, $cb) = ((split /#/, shift, 2), shift); + + my $node = $NODE{$noderef} || add_node $noderef; + + #TODO: ports must not be references + if (!ref $cb or "AnyEvent::MP::Port" eq ref $cb) { + if (@_) { + # send a kill info message + my (@msg) = ($cb, @_); + $cb = sub { snd @msg, @_ }; } else { - $DEFAULT_SECRET = AnyEvent::MP::Util::nonce 32; + # simply kill other port + my $port = $cb; + $cb = sub { kil $port, @_ if @_ }; } } - $DEFAULT_SECRET + $node->monitor ($port, $cb); + + defined wantarray + and AnyEvent::Util::guard { $node->unmonitor ($port, $cb) } } -=item NODE / $NODE +=item $guard = mon_guard $port, $ref, $ref... + +Monitors the given C<$port> and keeps the passed references. When the port +is killed, the references will be freed. -The C function and the C<$NODE> variable contain the noderef of -the local node. The value is initialised by a call to C or -C, after which all local port identifiers become invalid. +Optionally returns a guard that will stop the monitoring. + +This function is useful when you create e.g. timers or other watchers and +want to free them when the port gets killed: + + $port->rcv (start => sub { + my $timer; $timer = mon_guard $port, AE::timer 1, 1, sub { + undef $timer if 0.9 < rand; + }); + }); =cut -our $UNIQ = sprintf "%x.%x", $$, time; # per-process/node unique cookie -our $ID = "a0"; -our $PUBLIC = 0; -our $NODE; -our $PORT; - -our %NODE; # node id to transport mapping, or "undef", for local node -our %PORT; # local ports -our %LISTENER; # local transports - -sub NODE() { $NODE } - -{ - use POSIX (); - my $nodename = (POSIX::uname)[1]; - $NODE = "$$\@$nodename"; +sub mon_guard { + my ($port, @refs) = @_; + + mon $port, sub { 0 && @refs } } -sub _ANY_() { 1 } -sub _any_() { \&_ANY_ } +=item lnk $port1, $port2 -sub add_node { - my ($noderef) = @_; +Link two ports. This is simply a shorthand for: - return $NODE{$noderef} - if exists $NODE{$noderef}; + mon $port1, $port2; + mon $port2, $port1; - for (split /,/, $noderef) { - return $NODE{$noderef} = $NODE{$_} - if exists $NODE{$_}; - } +It means that if either one is killed abnormally, the other one gets +killed as well. - # for indirect sends, use a different class - my $node = new AnyEvent::MP::Node::Direct $noderef; +=item $local_port = port - $NODE{$_} = $node - for $noderef, split /,/, $noderef; +Create a new local port object that supports message matching. - $node -} +=item $portid = port { my @msg = @_; $finished } -=item snd $portid, type => @data +Creates a "mini port", that is, a very lightweight port without any +pattern matching behind it, and returns its ID. -=item snd $portid, @msg +The block will be called for every message received on the port. When the +callback returns a true value its job is considered "done" and the port +will be destroyed. Otherwise it will stay alive. -Send the given message to the given port ID, which can identify either a -local or a remote port. +The message will be passed as-is, no extra argument (i.e. no port id) will +be passed to the callback. -While the message can be about anything, it is highly recommended to use -a constant string as first element. +If you need the local port id in the callback, this works nicely: -The message data effectively becomes read-only after a call to this -function: modifying any argument is not allowed and can cause many -problems. - -The type of data you can transfer depends on the transport protocol: when -JSON is used, then only strings, numbers and arrays and hashes consisting -of those are allowed (no objects). When Storable is used, then anything -that Storable can serialise and deserialise is allowed, and for the local -node, anything can be passed. + my $port; $port = miniport { + snd $otherport, reply => $port; + }; =cut -sub snd(@) { - my ($noderef, $port) = split /#/, shift, 2; +sub port(;&) { + my $id = "$UNIQ." . $ID++; + my $port = "$NODE#$id"; + + if (@_) { + my $cb = shift; + $PORT{$id} = sub { + local $SELF = $port; + eval { + &$cb + and kil $id; + }; + _self_die if $@; + }; + } else { + my $self = bless { + id => "$NODE#$id", + }, "AnyEvent::MP::Port"; + + $PORT_DATA{$id} = $self; + $PORT{$id} = sub { + local $SELF = $port; + + eval { + for (@{ $self->{rc0}{$_[0]} }) { + $_ && &{$_->[0]} + && undef $_; + } + + for (@{ $self->{rcv}{$_[0]} }) { + $_ && [@_[1 .. @{$_->[1]}]] ~~ $_->[1] + && &{$_->[0]} + && undef $_; + } + + for (@{ $self->{any} }) { + $_ && [@_[0 .. $#{$_->[1]}]] ~~ $_->[1] + && &{$_->[0]} + && undef $_; + } + }; + _self_die if $@; + }; + } - add_node $noderef - unless exists $NODE{$noderef}; + $port +} + +=item reg $portid, $name + +Registers the given port under the name C<$name>. If the name already +exists it is replaced. + +A port can only be registered under one well known name. + +A port automatically becomes unregistered when it is killed. - $NODE{$noderef}->send (["$port", [@_]]); +=cut + +sub reg(@) { + my ($portid, $name) = @_; + + $REG{$name} = $portid; } -=item rcv $portid, type => $callback->(@msg) +=item rcv $portid, tagstring => $callback->(@msg), ... -=item rcv $portid, $smartmatch => $callback->(@msg) +=item rcv $portid, $smartmatch => $callback->(@msg), ... -=item rcv $portid, [$smartmatch...] => $callback->(@msg) +=item rcv $portid, [$smartmatch...] => $callback->(@msg), ... -Register a callback on the port identified by C<$portid>, which I be -a local port. +Register callbacks to be called on matching messages on the given port. The callback has to return a true value when its work is done, after which is will be removed, or a false value in which case it will stay registered. +The global C<$SELF> (exported by this module) contains C<$portid> while +executing the callback. + +Runtime errors wdurign callback execution will result in the port being +Ced. + If the match is an array reference, then it will be matched against the first elements of the message, otherwise only the first element is being matched. @@ -215,159 +378,91 @@ =cut sub rcv($@) { - my ($port, $match, $cb) = @_; - - my $port = $PORT{$port} - or do { - my ($noderef, $lport) = split /#/, $port; - "AnyEvent::MP::Node::Self" eq ref $NODE{$noderef} - or Carp::croak "$port: can only rcv on local ports"; - - $PORT{$lport} - or Carp::croak "$port: port does not exist"; - - $PORT{$port} = $PORT{$lport} # also return - }; - - if (!ref $match) { - push @{ $port->{rc0}{$match} }, [$cb]; - } elsif (("ARRAY" eq ref $match && !ref $match->[0])) { - my ($type, @match) = @$match; - @match - ? push @{ $port->{rcv}{$match->[0]} }, [$cb, \@match] - : push @{ $port->{rc0}{$match->[0]} }, [$cb]; - } else { - push @{ $port->{any} }, [$cb, $match]; - } -} + my ($noderef, $port) = split /#/, shift, 2; -sub _inject { - my ($port, $msg) = @{+shift}; + ($NODE{$noderef} || add_node $noderef) == $NODE{""} + or Carp::croak "$noderef#$port: rcv can only be called on local ports, caught"; - $port = $PORT{$port} - or return; + my $self = $PORT_DATA{$port} + or Carp::croak "$noderef#$port: rcv can only be called on message matching ports, caught"; - @_ = @$msg; + "AnyEvent::MP::Port" eq ref $self + or Carp::croak "$noderef#$port: rcv can only be called on message matching ports, caught"; - for (@{ $port->{rc0}{$msg->[0]} }) { - $_ && &{$_->[0]} - && undef $_; - } - - for (@{ $port->{rcv}{$msg->[0]} }) { - $_ && [@_[1..$#{$_->[1]}]] ~~ $_->[1] - && &{$_->[0]} - && undef $_; - } - - for (@{ $port->{any} }) { - $_ && [@_[0..$#{$_->[1]}]] ~~ $_->[1] - && &{$_->[0]} - && undef $_; + while (@_) { + my ($match, $cb) = splice @_, 0, 2; + + if (!ref $match) { + push @{ $self->{rc0}{$match} }, [$cb]; + } elsif (("ARRAY" eq ref $match && !ref $match->[0])) { + my ($type, @match) = @$match; + @match + ? push @{ $self->{rcv}{$match->[0]} }, [$cb, \@match] + : push @{ $self->{rc0}{$match->[0]} }, [$cb]; + } else { + push @{ $self->{any} }, [$cb, $match]; + } } } -sub normalise_noderef($) { - my ($noderef) = @_; +=item $closure = psub { BLOCK } - my $cv = AE::cv; - my @res; +Remembers C<$SELF> and creates a closure out of the BLOCK. When the +closure is executed, sets up the environment in the same way as in C +callbacks, i.e. runtime errors will cause the port to get Ced. + +This is useful when you register callbacks from C callbacks: + + rcv delayed_reply => sub { + my ($delay, @reply) = @_; + my $timer = AE::timer $delay, 0, psub { + snd @reply, $SELF; + }; + }; - $cv->begin (sub { - my %seen; - my @refs; - for (sort { $a->[0] <=> $b->[0] } @res) { - push @refs, $_->[1] unless $seen{$_->[1]}++ - } - shift->send (join ",", @refs); - }); +=cut - $noderef = $DEFAULT_PORT unless length $noderef; +sub psub(&) { + my $cb = shift; - my $idx; - for my $t (split /,/, $noderef) { - my $pri = ++$idx; - - #TODO: this should be outside normalise_noderef and in become_public - if ($t =~ /^\d*$/) { - my $nodename = (POSIX::uname)[1]; - - $cv->begin; - AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub { - for (@_) { - my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3]; - push @res, [ - $pri += 1e-5, - AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service - ]; - } - $cv->end; - }; + my $port = $SELF + or Carp::croak "psub can only be called from within rcv or psub callbacks, not"; -# my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename; -# -# for (@ipv4) { -# push @res, [ -# $pri, -# AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT, -# ]; -# } - } else { - my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT" - or Carp::croak "$t: unparsable transport descriptor"; + sub { + local $SELF = $port; - $cv->begin; - AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub { - for (@_) { - my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3]; - push @res, [ - $pri += 1e-5, - AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service - ]; - } - $cv->end; - } + if (wantarray) { + my @res = eval { &$cb }; + _self_die if $@; + @res + } else { + my $res = eval { &$cb }; + _self_die if $@; + $res } } +} - $cv->end; +=back - $cv -} +=head1 FUNCTIONS FOR NODES -sub become_public { - return if $PUBLIC; +=over 4 - my $noderef = join ",", ref $_[0] ? @{+shift} : shift; - my @args = @_; +=item become_public endpoint... - $NODE = (normalise_noderef $noderef)->recv; +Tells the node to become a public node, i.e. reachable from other nodes. - for my $t (split /,/, $NODE) { - $NODE{$t} = $NODE{""}; - - my ($host, $port) = AnyEvent::Socket::parse_hostport $t; - - $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port, - @args, - on_error => sub { - die "on_error<@_>\n";#d# - }, - on_connect => sub { - my ($tp) = @_; - - $NODE{$tp->{remote_id}} = $_[0]; - }, - sub { - my ($tp) = @_; - - $NODE{"$tp->{peerhost}:$tp->{peerport}"} = $tp; - }, - ; - } +If no arguments are given, or the first argument is C, then +AnyEvent::MP tries to bind on port C<4040> on all IP addresses that the +local nodename resolves to. + +Otherwise the first argument must be an array-reference with transport +endpoints ("ip:port", "hostname:port") or port numbers (in which case the +local nodename is used as hostname). The endpoints are all resolved and +will become the node reference. - $PUBLIC = 1; -} +=cut =back @@ -382,39 +477,18 @@ =cut -############################################################################# -# self node code +=item lookup => $name, @reply -sub _new_port($) { - my ($name) = @_; - - my ($noderef, $portname) = split /#/, $name; - - $PORT{$name} = - $PORT{$portname} = { - names => [$name, $portname], - }; -} - -$NODE{""} = new AnyEvent::MP::Node::Self noderef => $NODE; -_new_port ""; +Replies with the port ID of the specified well-known port, or C. =item devnull => ... Generic data sink/CPU heat conversion. -=cut - -rcv "", devnull => sub { () }; - =item relay => $port, @msg Simply forwards the message to the given port. -=cut - -rcv "", relay => sub { \&snd; () }; - =item eval => $string[ @reply] Evaluates the given string. If C<@reply> is given, then a message of the @@ -424,15 +498,6 @@ snd $othernode, eval => "exit"; -=cut - -rcv "", eval => sub { - my (undef, $string, @reply) = @_; - my @res = eval $string; - snd @reply, "$@", @res if @reply; - () -}; - =item time => @reply Replies the the current node time to C<@reply>. @@ -443,9 +508,81 @@ snd $NODE, time => $myport, timereply => 1, 2; # => snd $myport, timereply => 1, 2,