=head1 NAME AnyEvent::MP::DataConn - create socket connections between nodes =head1 SYNOPSIS use AnyEvent::MP::DataConn; =head1 DESCRIPTION This module can be used to create socket connections between the local and a remote node in the aemp network. The socket can be used freely for any purpose, and in most cases, this mechanism is a good way to transport big chunks of binary data. The connections created by this module use the same security mechanisms as normal AEMP connections (secure authentication, optional use of TLS), and in fact, use the same listening port as AEMP connections, so when two nodes can reach each other, they can, with high probability, creare a data connection between them. The protocol is, however, not the AEMP transport protocol, so this will only work between nodes implementing the "aemp-dataconn" protocol. =head1 FUNCTIONS =over 4 =cut package AnyEvent::MP::DataConn; use common::sense; use Carp (); use POSIX (); use AnyEvent (); use AnyEvent::Util (); use AnyEvent::MP; use AnyEvent::MP::Kernel (); use AnyEvent::MP::Global (); our $ID = "a"; our %STATE; sub _accept { my ($id, $timeout, $initspec) = @_; my $cleanup = sub { %{delete $STATE{$id}} = (); }; $STATE{$id} = { id => $id, to => (AE::timer $timeout, 0, $cleanup), success => sub { my ($func, @args) = @$initspec; $cleanup->(); (AnyEvent::MP::Kernel::load_func $func)->(@args, @_); }, fail => sub { $cleanup->(); # nop? }, }; } sub _inject { my ($conn, $error) = @_; my $hdl = delete $conn->{hdl}; die "inject <@_>\n";#d# } sub _connect { my ($id) = @_; my $state = $STATE{$id} or return; my $addr = $AnyEvent::MP::Global::addr{$state->{node}}; @$addr or return $state->{fail}("$state->{node}: no listeners found"); my %transport; # I love hardcoded constants $state->{next} = AE::timer 0, 2, sub { my $endpoint = shift @$addr or return delete $state->{next}; my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint or return; my $transport; $transport = AnyEvent::MP::Transport::mp_connect $host, $port, protocol => "aemp-dataconn", sub { $transport->destroy }, ; }; } =item AnyEvent::MP::DataConn::connect_to $node, $timeout, [$initfunc, @initdata], $cb->($handle) Creates a socket connection between the local node and the node C<$node> (which can also be specified as a port). When the connection could be successfully created, the C<$initfunc> will be called with the given C<@initdata> on the remote node (similar to C or C), and the C object representing the remote connection end as additional argument. Also, the callback given as last argument will be called with the AnyEvent::Handle object for the local side. The AnyEvent::Handle objects will be in a "quiescent" state - you could rip out the file handle and forget about it, but it is recommended to use it, as the security settings might have called for a TLS connection. If you opt to use it, you at least have to set an C callback. In case of any error (timeout etc.), nothing will be called on the remote side, and the local port will be C'ed with an C<< AnyEvent::MP::DataConn => "error message" >> kill reason. =cut sub connect_to($$$;@) { my ($node, $timeout, $initspec, @localmsg) = @_; my $port = $SELF; $node = node_of $node; my $id = (++$ID) . "\@$NODE"; my $cleanup = sub { delete $AnyEvent::MP::Global::ON_SETUP{$id}; %{delete $STATE{$id}} = (); }; # damn, why do my simple state hashes resemble objects so quickly my $state = $STATE{$id} = { id => (++$ID) . "\@$NODE", node => $node, port => $port, to => (AE::timer $timeout, 0, sub { $cleanup->(); kil $port, AnyEvent::MP::DataConn:: => "$node: unable to establish connection within $timeout seconds"; }), success => sub { $cleanup->(); snd @localmsg, @_; }, fail => sub { $cleanup->(); kil $port, AnyEvent::MP::DataConn:: => $_[0]; }, }; if (AnyEvent::MP::Kernel::port_is_local $node) { return kil $port, AnyEvent::MP::DataConn:: => "connect_to does not yet support local/local connections, please bug me about it"; } else { AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_accept:: => $id, $timeout, $initspec; $state->{wait} = sub { if (my $addr = $AnyEvent::MP::Global::addr{$node}) { delete $AnyEvent::MP::Global::ON_SETUP{$id}; warn "<@_ @$addr $addr>\n";#d# # continue connect if (@$addr) { # node has listeners, so connect _connect $id; } else { # no listeners, ask it to connect to us AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_connect:: => $id; } } else { # wait for the next global setup handshake $AnyEvent::MP::Global::ON_SETUP{$id} = $state->{wait}; }; }; $state->{wait}->(); } } =back =head1 SEE ALSO L. =head1 AUTHOR Marc Lehmann http://home.schmorp.de/ =cut 1