=head1 NAME AnyEvent::MP - multi-processing/message-passing framework =head1 SYNOPSIS use AnyEvent::MP; NODE # returns this node identifier $NODE # contains this node identifier snd $port, type => data...; rcv $port, smartmatch => $cb->($port, @msg); # examples: rcv $port2, ping => sub { snd $_[0], "pong"; 0 }; rcv $port1, pong => sub { warn "pong received\n" }; snd $port2, ping => $port1; # more, smarter, matches (_any_ is exported by this module) rcv $port, [child_died => $pid] => sub { ... rcv $port, [_any_, _any_, 3] => sub { .. $_[2] is 3 =head1 DESCRIPTION This module (-family) implements a simple message passing framework. Despite its simplicity, you can securely message other processes running on the same or other hosts. =head1 CONCEPTS =over 4 =item port A port is something you can send messages to with the C function, and you can register C handlers with. All C handlers will receive messages they match, messages will not be queued. =item port id - C A port id is always the node id, a hash-mark (C<#>) as separator, followed by a port name. A port name can be a well known port (basically an identifier/bareword), or a generated name, consisting of node id, a dot (C<.>), and an identifier. =item node A node is a single process containing at least one port - the node port. You can send messages to node ports to let them create new ports, among other things. Initially, nodes are either private (single-process only) or hidden (connected to a father node only). Only when they epxlicitly "go public" can you send them messages form unrelated other nodes. Public nodes automatically connect to all other public nodes in a network when they connect, creating a full mesh. =item node id - C, C, C A node ID is a string that either uniquely identifies a given node (For private and hidden nodes), or contains a recipe on how to reach a given node (for public nodes). =back =head1 FUNCTIONS =over 4 =cut package AnyEvent::MP; use AnyEvent::MP::Util (); use AnyEvent::MP::Node; use AnyEvent::MP::Transport; use utf8; use common::sense; use Carp (); use AE (); use base "Exporter"; our $VERSION = '0.0'; our @EXPORT = qw(NODE $NODE $PORT snd rcv _any_); our $DEFAULT_SECRET; our $DEFAULT_PORT = "4040"; our $CONNECT_INTERVAL = 5; # new connect every 5s, at least our $CONNECT_TIMEOUT = 30; # includes handshake sub default_secret { unless (defined $DEFAULT_SECRET) { if (open my $fh, "<$ENV{HOME}/.aemp-secret") { sysread $fh, $DEFAULT_SECRET, -s $fh; } else { $DEFAULT_SECRET = AnyEvent::MP::Util::nonce 32; } } $DEFAULT_SECRET } our $UNIQ = sprintf "%x.%x", $$, time; # per-process/node unique cookie our $PUBLIC = 0; our $NODE; our $PORT; our %NODE; # node id to transport mapping, or "undef", for local node our %PORT; # local ports our %LISTENER; # local transports sub NODE() { $NODE } { use POSIX (); my $nodename = (POSIX::uname)[1]; $NODE = "$$\@$nodename"; } sub _ANY_() { 1 } sub _any_() { \&_ANY_ } sub add_node { my ($noderef) = @_; return $NODE{$noderef} if exists $NODE{$noderef}; for (split /,/, $noderef) { return $NODE{$noderef} = $NODE{$_} if exists $NODE{$_}; } # for indirect sends, use a different class my $node = new AnyEvent::MP::Node::Direct $noderef; $NODE{$_} = $node for $noderef, split /,/, $noderef; $node } sub snd($@) { my ($noderef, $port) = split /#/, shift, 2; add_node $noderef unless exists $NODE{$noderef}; $NODE{$noderef}->send ([$port, [@_]]); } sub _inject { my ($port, $msg) = @{+shift}; $port = $PORT{$port} or return; use Data::Dumper; warn Dumper $msg; } sub normalise_noderef($) { my ($noderef) = @_; my $cv = AE::cv; my @res; $cv->begin (sub { my %seen; my @refs; for (sort { $a->[0] <=> $b->[0] } @res) { push @refs, $_->[1] unless $seen{$_->[1]}++ } shift->send (join ",", @refs); }); $noderef = $DEFAULT_PORT unless length $noderef; my $idx; for my $t (split /,/, $noderef) { my $pri = ++$idx; #TODO: this should be outside normalise_noderef and in become_public if ($t =~ /^\d*$/) { my $nodename = (POSIX::uname)[1]; $cv->begin; AnyEvent::Socket::resolve_sockaddr $nodename, $t || "aemp=$DEFAULT_PORT", "tcp", 0, undef, sub { for (@_) { my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3]; push @res, [ $pri += 1e-5, AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service ]; } $cv->end; }; # my (undef, undef, undef, undef, @ipv4) = gethostbyname $nodename; # # for (@ipv4) { # push @res, [ # $pri, # AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $_, $t || $DEFAULT_PORT, # ]; # } } else { my ($host, $port) = AnyEvent::Socket::parse_hostport $t, "aemp=$DEFAULT_PORT" or Carp::croak "$t: unparsable transport descriptor"; $cv->begin; AnyEvent::Socket::resolve_sockaddr $host, $port, "tcp", 0, undef, sub { for (@_) { my ($service, $host) = AnyEvent::Socket::unpack_sockaddr $_->[3]; push @res, [ $pri += 1e-5, AnyEvent::Socket::format_hostport AnyEvent::Socket::format_address $host, $service ]; } $cv->end; } } } $cv->end; $cv } sub become_public { return if $PUBLIC; my $noderef = join ",", ref $_[0] ? @{+shift} : shift; my @args = @_; $NODE = (normalise_noderef $noderef)->recv; my $self = new AnyEvent::MP::Node::Self noderef => $NODE; $NODE{""} = $self; # empty string == local node for my $t (split /,/, $NODE) { $NODE{$t} = $self; my ($host, $port) = AnyEvent::Socket::parse_hostport $t; $LISTENER{$t} = AnyEvent::MP::Transport::mp_server $host, $port, @args, on_error => sub { die "on_error<@_>\n";#d# }, on_connect => sub { my ($tp) = @_; $NODE{$tp->{remote_id}} = $_[0]; }, sub { my ($tp) = @_; $NODE{"$tp->{peerhost}:$tp->{peerport}"} = $tp; }, ; } $PUBLIC = 1; } =back =head1 SEE ALSO L. =head1 AUTHOR Marc Lehmann http://home.schmorp.de/ =cut 1