ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/DataConn.pm
Revision: 1.2
Committed: Sat Nov 7 02:36:31 2009 UTC (14 years, 7 months ago) by root
Branch: MAIN
Changes since 1.1: +111 -24 lines
Log Message:
bleh

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::DataConn - create socket connections between nodes
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::DataConn;
8    
9     =head1 DESCRIPTION
10    
11     This module can be used to create socket connections between the local and
12     a remote node in the aemp network. The socket can be used freely for any
13     purpose, and in most cases, this mechanism is a good way to transport big
14     chunks of binary data.
15    
16     The connections created by this module use the same security mechanisms
17     as normal AEMP connections (secure authentication, optional use of TLS),
18     and in fact, use the same listening port as AEMP connections, so when two
19     nodes can reach each other, they can, with high probability, creare a data
20     connection between them.
21    
22     The protocol is, however, not the AEMP transport protocol, so this will
23     only work between nodes implementing the "aemp-dataconn" protocol.
24    
25     =head1 FUNCTIONS
26    
27     =over 4
28    
29     =cut
30    
31     package AnyEvent::MP::DataConn;
32    
33     use common::sense;
34     use Carp ();
35     use POSIX ();
36    
37     use AnyEvent ();
38     use AnyEvent::Util ();
39    
40     use AnyEvent::MP;
41 root 1.2 use AnyEvent::MP::Kernel ();
42     use AnyEvent::MP::Global ();
43    
44     our $ID = "a";
45     our %STATE;
46    
47     sub _accept {
48     my ($id, $timeout, $initspec) = @_;
49    
50     my $cleanup = sub {
51     %{delete $STATE{$id}} = ();
52     };
53    
54     $STATE{$id} = {
55     id => $id,
56     to => (AE::timer $timeout, 0, $cleanup),
57     success => sub {
58     my ($func, @args) = @$initspec;
59     $cleanup->();
60     (AnyEvent::MP::Kernel::load_func $func)->(@args, @_);
61     },
62     fail => sub {
63     $cleanup->();
64     # nop?
65     },
66     };
67     }
68 root 1.1
69     sub _inject {
70     my ($conn, $error) = @_;
71 root 1.2
72     my $hdl = delete $conn->{hdl};
73     die "inject <@_>\n";#d#
74 root 1.1 }
75    
76 root 1.2 sub _connect {
77     my ($id) = @_;
78    
79     my $state = $STATE{$id}
80     or return;
81    
82     my $addr = $AnyEvent::MP::Global::addr{$state->{node}};
83    
84     @$addr
85     or return $state->{fail}("$state->{node}: no listeners found");
86    
87     my %transport;
88    
89     # I love hardcoded constants
90     $state->{next} = AE::timer 0, 2, sub {
91     my $endpoint = shift @$addr
92     or return delete $state->{next};
93    
94     my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
95     or return;
96    
97     my $transport; $transport = AnyEvent::MP::Transport::mp_connect
98     $host, $port,
99     protocol => "aemp-dataconn",
100     sub { $transport->destroy },
101     ;
102     };
103     }
104    
105     =item AnyEvent::MP::DataConn::connect_to $node, $timeout, [$initfunc, @initdata], $cb->($handle)
106 root 1.1
107     Creates a socket connection between the local node and the node C<$node>
108     (which can also be specified as a port).
109    
110     When the connection could be successfully created, the C<$initfunc>
111     will be called with the given C<@initdata> on the remote node (similar
112     to C<snd_to_func> or C<spawn>), and the C<AnyEvent::Handle> object
113 root 1.2 representing the remote connection end as additional argument.
114 root 1.1
115 root 1.2 Also, the callback given as last argument will be called with the
116     AnyEvent::Handle object for the local side.
117 root 1.1
118 root 1.2 The AnyEvent::Handle objects will be in a "quiescent" state - you could rip
119 root 1.1 out the file handle and forget about it, but it is recommended to use it,
120 root 1.2 as the security settings might have called for a TLS connection. If you
121 root 1.1 opt to use it, you at least have to set an C<on_error> callback.
122    
123 root 1.2 In case of any error (timeout etc.), nothing will be called on
124     the remote side, and the local port will be C<kil>'ed with an C<<
125 root 1.1 AnyEvent::MP::DataConn => "error message" >> kill reason.
126    
127     =cut
128    
129     sub connect_to($$$;@) {
130     my ($node, $timeout, $initspec, @localmsg) = @_;
131     my $port = $SELF;
132     $node = node_of $node;
133    
134 root 1.2 my $id = (++$ID) . "\@$NODE";
135 root 1.1
136 root 1.2 my $cleanup = sub {
137     delete $AnyEvent::MP::Global::ON_SETUP{$id};
138     %{delete $STATE{$id}} = ();
139 root 1.1 };
140    
141 root 1.2 # damn, why do my simple state hashes resemble objects so quickly
142     my $state = $STATE{$id} = {
143     id => (++$ID) . "\@$NODE",
144     node => $node,
145     port => $port,
146     to => (AE::timer $timeout, 0, sub {
147     $cleanup->();
148     kil $port, AnyEvent::MP::DataConn:: => "$node: unable to establish connection within $timeout seconds";
149     }),
150     success => sub {
151     $cleanup->();
152     snd @localmsg, @_;
153     },
154     fail => sub {
155     $cleanup->();
156     kil $port, AnyEvent::MP::DataConn:: => $_[0];
157     },
158 root 1.1 };
159    
160 root 1.2 if (AnyEvent::MP::Kernel::port_is_local $node) {
161 root 1.1 return kil $port, AnyEvent::MP::DataConn:: =>
162     "connect_to does not yet support local/local connections, please bug me about it";
163 root 1.2
164 root 1.1 } else {
165 root 1.2 AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_accept:: => $id, $timeout, $initspec;
166    
167     $state->{wait} = sub {
168     if (my $addr = $AnyEvent::MP::Global::addr{$node}) {
169     delete $AnyEvent::MP::Global::ON_SETUP{$id};
170    
171     warn "<@_ @$addr $addr>\n";#d#
172     # continue connect
173     if (@$addr) {
174     # node has listeners, so connect
175     _connect $id;
176     } else {
177     # no listeners, ask it to connect to us
178     AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_connect:: => $id;
179     }
180     } else {
181     # wait for the next global setup handshake
182     $AnyEvent::MP::Global::ON_SETUP{$id} = $state->{wait};
183     };
184 root 1.1 };
185 root 1.2
186     $state->{wait}->();
187 root 1.1 }
188     }
189    
190     =back
191    
192     =head1 SEE ALSO
193    
194     L<AnyEvent::MP>.
195    
196     =head1 AUTHOR
197    
198     Marc Lehmann <schmorp@schmorp.de>
199     http://home.schmorp.de/
200    
201     =cut
202    
203     1
204