1 |
root |
1.1 |
=head1 NAME |
2 |
|
|
|
3 |
|
|
AnyEvent::MP::DataConn - create socket connections between nodes |
4 |
|
|
|
5 |
|
|
=head1 SYNOPSIS |
6 |
|
|
|
7 |
|
|
use AnyEvent::MP::DataConn; |
8 |
|
|
|
9 |
|
|
=head1 DESCRIPTION |
10 |
|
|
|
11 |
|
|
This module can be used to create socket connections between the local and |
12 |
|
|
a remote node in the aemp network. The socket can be used freely for any |
13 |
|
|
purpose, and in most cases, this mechanism is a good way to transport big |
14 |
|
|
chunks of binary data. |
15 |
|
|
|
16 |
|
|
The connections created by this module use the same security mechanisms |
17 |
|
|
as normal AEMP connections (secure authentication, optional use of TLS), |
18 |
|
|
and in fact, use the same listening port as AEMP connections, so when two |
19 |
root |
1.7 |
nodes can reach each other via the normal aemp protocol, they can create |
20 |
|
|
data connections as well, no extra ports or firewall rules are required. |
21 |
root |
1.1 |
|
22 |
root |
1.7 |
The protocol used is, however, not the AEMP transport protocol, so this |
23 |
root |
1.8 |
will only work between nodes implementing the "aemp-dataconn" protocol |
24 |
|
|
extension. |
25 |
root |
1.1 |
|
26 |
|
|
=head1 FUNCTIONS |
27 |
|
|
|
28 |
|
|
=over 4 |
29 |
|
|
|
30 |
|
|
=cut |
31 |
|
|
|
32 |
|
|
package AnyEvent::MP::DataConn; |
33 |
|
|
|
34 |
|
|
use common::sense; |
35 |
|
|
use Carp (); |
36 |
|
|
use POSIX (); |
37 |
|
|
|
38 |
|
|
use AnyEvent (); |
39 |
|
|
use AnyEvent::Util (); |
40 |
|
|
|
41 |
|
|
use AnyEvent::MP; |
42 |
root |
1.2 |
use AnyEvent::MP::Kernel (); |
43 |
|
|
|
44 |
|
|
our $ID = "a"; |
45 |
|
|
our %STATE; |
46 |
|
|
|
47 |
root |
1.3 |
# another node tells us to await a connection |
48 |
|
|
sub _expect { |
49 |
root |
1.5 |
my ($id, $port, $timeout, $initfunc, @initdata) = @_; |
50 |
root |
1.2 |
|
51 |
root |
1.3 |
$STATE{$id} = { |
52 |
|
|
id => $id, |
53 |
|
|
to => (AE::timer $timeout, 0, sub { |
54 |
root |
1.4 |
$STATE{$id}{done}(undef); |
55 |
root |
1.3 |
}), |
56 |
|
|
done => sub { |
57 |
|
|
my ($hdl, $error) = @_; |
58 |
|
|
|
59 |
|
|
%{delete $STATE{$id}} = (); |
60 |
root |
1.2 |
|
61 |
root |
1.3 |
if (defined $hdl) { |
62 |
root |
1.5 |
(AnyEvent::MP::Kernel::load_func $initfunc)->(@initdata, $hdl); |
63 |
root |
1.4 |
} else { |
64 |
|
|
kil $port, AnyEvent::MP::DataConn:: => $error; |
65 |
root |
1.3 |
} |
66 |
root |
1.2 |
}, |
67 |
|
|
}; |
68 |
|
|
} |
69 |
root |
1.1 |
|
70 |
root |
1.3 |
# AEMP::Transport call for dataconn-connections |
71 |
root |
1.1 |
sub _inject { |
72 |
|
|
my ($conn, $error) = @_; |
73 |
root |
1.2 |
|
74 |
root |
1.3 |
my $hdl = defined $error ? undef : delete $conn->{hdl}; |
75 |
|
|
my $id = $conn->{local_greeting}{dataconn_id} || $conn->{remote_greeting}{dataconn_id} |
76 |
|
|
or return; |
77 |
|
|
|
78 |
|
|
$conn->destroy; |
79 |
|
|
|
80 |
root |
1.4 |
($STATE{$id} or return)->{done}($hdl, $error); |
81 |
root |
1.1 |
} |
82 |
|
|
|
83 |
root |
1.3 |
# actively connect to some other node |
84 |
root |
1.2 |
sub _connect { |
85 |
root |
1.3 |
my ($id, $node) = @_; |
86 |
root |
1.2 |
|
87 |
|
|
my $state = $STATE{$id} |
88 |
|
|
or return; |
89 |
|
|
|
90 |
root |
1.3 |
my $addr = $AnyEvent::MP::Global::addr{$node}; |
91 |
root |
1.2 |
|
92 |
|
|
@$addr |
93 |
root |
1.4 |
or return $state->{done}(undef, "$node: no listeners found"); |
94 |
root |
1.2 |
|
95 |
root |
1.3 |
# I love hardcoded constants ! |
96 |
root |
1.2 |
$state->{next} = AE::timer 0, 2, sub { |
97 |
|
|
my $endpoint = shift @$addr |
98 |
|
|
or return delete $state->{next}; |
99 |
|
|
|
100 |
|
|
my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint |
101 |
|
|
or return; |
102 |
|
|
|
103 |
|
|
my $transport; $transport = AnyEvent::MP::Transport::mp_connect |
104 |
|
|
$host, $port, |
105 |
|
|
protocol => "aemp-dataconn", |
106 |
root |
1.3 |
local_greeting => { dataconn_id => $id }, |
107 |
root |
1.9 |
sub { $transport->destroy }, #TODO: destroys handshaked connections too early |
108 |
root |
1.2 |
; |
109 |
|
|
}; |
110 |
|
|
} |
111 |
|
|
|
112 |
root |
1.5 |
=item AnyEvent::MP::DataConn::connect_to $node, $timeout, $initfunc, @initdata, $cb->($handle) |
113 |
root |
1.1 |
|
114 |
|
|
Creates a socket connection between the local node and the node C<$node> |
115 |
root |
1.4 |
(which can also be specified as a port). One of the nodes must have |
116 |
root |
1.9 |
listeners ("binds"). |
117 |
root |
1.1 |
|
118 |
|
|
When the connection could be successfully created, the C<$initfunc> |
119 |
|
|
will be called with the given C<@initdata> on the remote node (similar |
120 |
|
|
to C<snd_to_func> or C<spawn>), and the C<AnyEvent::Handle> object |
121 |
root |
1.2 |
representing the remote connection end as additional argument. |
122 |
root |
1.1 |
|
123 |
root |
1.2 |
Also, the callback given as last argument will be called with the |
124 |
|
|
AnyEvent::Handle object for the local side. |
125 |
root |
1.1 |
|
126 |
root |
1.2 |
The AnyEvent::Handle objects will be in a "quiescent" state - you could rip |
127 |
root |
1.1 |
out the file handle and forget about it, but it is recommended to use it, |
128 |
root |
1.2 |
as the security settings might have called for a TLS connection. If you |
129 |
root |
1.1 |
opt to use it, you at least have to set an C<on_error> callback. |
130 |
|
|
|
131 |
root |
1.2 |
In case of any error (timeout etc.), nothing will be called on |
132 |
|
|
the remote side, and the local port will be C<kil>'ed with an C<< |
133 |
root |
1.1 |
AnyEvent::MP::DataConn => "error message" >> kill reason. |
134 |
|
|
|
135 |
root |
1.4 |
The timeout should be large enough to cover at least four network |
136 |
|
|
round-trips and one message round-trip. |
137 |
|
|
|
138 |
|
|
Example: on node1, establish a connection to node2 and send a line of text, |
139 |
root |
1.10 |
on node2, provide a receiver function. |
140 |
root |
1.4 |
|
141 |
|
|
# node1, code executes in some port context |
142 |
root |
1.5 |
AnyEvent::MP::DataConn::connect_to "node2", 5, "pkg::receiver", 1, sub { |
143 |
root |
1.4 |
my ($hdl) = @_; |
144 |
|
|
warn "connection established, sending line.\n" |
145 |
|
|
$hdl->push_write ("blabla\n") |
146 |
|
|
}; |
147 |
|
|
|
148 |
|
|
# node2 |
149 |
|
|
sub pkg::receiver { |
150 |
|
|
my ($one, $hdl) = @_; |
151 |
|
|
warn "connection established, wait for a line...\n" |
152 |
|
|
|
153 |
|
|
$hdl->push_read (line => sub { |
154 |
|
|
warn "received a line: $_[1]\n"; |
155 |
|
|
undef $hdl; |
156 |
|
|
}); |
157 |
|
|
} |
158 |
|
|
|
159 |
root |
1.1 |
=cut |
160 |
|
|
|
161 |
root |
1.5 |
sub connect_to($$$$@) { |
162 |
|
|
my $cb = pop; |
163 |
|
|
my ($node, $timeout, $initfunc, @initdata) = @_; |
164 |
root |
1.6 |
|
165 |
|
|
my $port = $SELF |
166 |
|
|
or Carp::croak "AnyEvent::MP::DataConn::connect_to must be called in port context"; |
167 |
|
|
|
168 |
root |
1.1 |
$node = node_of $node; |
169 |
|
|
|
170 |
root |
1.2 |
my $id = (++$ID) . "\@$NODE"; |
171 |
root |
1.1 |
|
172 |
root |
1.2 |
# damn, why do my simple state hashes resemble objects so quickly |
173 |
|
|
my $state = $STATE{$id} = { |
174 |
root |
1.3 |
id => (++$ID) . "\@$NODE", |
175 |
|
|
to => (AE::timer $timeout, 0, sub { |
176 |
root |
1.4 |
$STATE{$id}{done}(undef, "$node: unable to establish connection within $timeout seconds"); |
177 |
root |
1.2 |
}), |
178 |
root |
1.3 |
done => sub { |
179 |
|
|
my ($hdl, $error) = @_; |
180 |
|
|
|
181 |
|
|
delete $AnyEvent::MP::Global::ON_SETUP{$id}; |
182 |
|
|
%{delete $STATE{$id}} = (); |
183 |
|
|
|
184 |
|
|
if (defined $hdl) { |
185 |
root |
1.4 |
$cb->($hdl); |
186 |
root |
1.3 |
} else { |
187 |
|
|
kil $port, AnyEvent::MP::DataConn:: => $error; |
188 |
|
|
} |
189 |
root |
1.2 |
}, |
190 |
root |
1.1 |
}; |
191 |
|
|
|
192 |
root |
1.2 |
if (AnyEvent::MP::Kernel::port_is_local $node) { |
193 |
root |
1.6 |
# teh sucks |
194 |
|
|
|
195 |
|
|
require AnyEvent::Util; |
196 |
|
|
my ($fh1, $fh2) = AnyEvent::Util::portable_socketpair () |
197 |
|
|
or return kil $port, AnyEvent::MP::DataConn:: => "cannot create local socketpair: $!"; |
198 |
|
|
|
199 |
|
|
use AnyEvent::Handle; |
200 |
|
|
my $hdl1 = new AnyEvent::Handle fh => $fh1; |
201 |
|
|
my $hdl2 = new AnyEvent::Handle fh => $fh2; |
202 |
|
|
|
203 |
|
|
(AnyEvent::MP::Kernel::load_func $initfunc)->(@initdata, $hdl2); |
204 |
|
|
$cb->($hdl1); |
205 |
root |
1.2 |
|
206 |
root |
1.1 |
} else { |
207 |
root |
1.5 |
AnyEvent::MP::Kernel::snd_to_func $node, |
208 |
|
|
AnyEvent::MP::DataConn::_expect:: => $id, $port, $timeout, $initfunc, @initdata; |
209 |
root |
1.2 |
|
210 |
|
|
$state->{wait} = sub { |
211 |
|
|
if (my $addr = $AnyEvent::MP::Global::addr{$node}) { |
212 |
|
|
delete $AnyEvent::MP::Global::ON_SETUP{$id}; |
213 |
|
|
|
214 |
|
|
# continue connect |
215 |
|
|
if (@$addr) { |
216 |
|
|
# node has listeners, so connect |
217 |
root |
1.3 |
_connect $id, $node; |
218 |
root |
1.2 |
} else { |
219 |
|
|
# no listeners, ask it to connect to us |
220 |
root |
1.4 |
AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_connect:: => $id, $NODE; |
221 |
root |
1.2 |
} |
222 |
|
|
} else { |
223 |
|
|
# wait for the next global setup handshake |
224 |
root |
1.4 |
# due to the round-trip at the beginning, this should never be necessary |
225 |
root |
1.2 |
$AnyEvent::MP::Global::ON_SETUP{$id} = $state->{wait}; |
226 |
|
|
}; |
227 |
root |
1.1 |
}; |
228 |
root |
1.2 |
|
229 |
root |
1.4 |
# we actually have to make sure that the connection arrives after the expect message, and |
230 |
|
|
# the easiest way to do this is to use an rpc call. |
231 |
|
|
AnyEvent::MP::Kernel::snd_on $node, port { $state->{wait}() }; |
232 |
root |
1.1 |
} |
233 |
|
|
} |
234 |
|
|
|
235 |
|
|
=back |
236 |
|
|
|
237 |
|
|
=head1 SEE ALSO |
238 |
|
|
|
239 |
|
|
L<AnyEvent::MP>. |
240 |
|
|
|
241 |
|
|
=head1 AUTHOR |
242 |
|
|
|
243 |
|
|
Marc Lehmann <schmorp@schmorp.de> |
244 |
|
|
http://home.schmorp.de/ |
245 |
|
|
|
246 |
|
|
=cut |
247 |
|
|
|
248 |
|
|
1 |
249 |
|
|
|