ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/DataConn.pm
Revision: 1.10
Committed: Tue Aug 29 22:39:40 2017 UTC (6 years, 8 months ago) by root
Branch: MAIN
CVS Tags: rel-2_02, rel-2_01, rel-2_0, HEAD
Changes since 1.9: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::DataConn - create socket connections between nodes
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::DataConn;
8
9 =head1 DESCRIPTION
10
11 This module can be used to create socket connections between the local and
12 a remote node in the aemp network. The socket can be used freely for any
13 purpose, and in most cases, this mechanism is a good way to transport big
14 chunks of binary data.
15
16 The connections created by this module use the same security mechanisms
17 as normal AEMP connections (secure authentication, optional use of TLS),
18 and in fact, use the same listening port as AEMP connections, so when two
19 nodes can reach each other via the normal aemp protocol, they can create
20 data connections as well, no extra ports or firewall rules are required.
21
22 The protocol used is, however, not the AEMP transport protocol, so this
23 will only work between nodes implementing the "aemp-dataconn" protocol
24 extension.
25
26 =head1 FUNCTIONS
27
28 =over 4
29
30 =cut
31
32 package AnyEvent::MP::DataConn;
33
34 use common::sense;
35 use Carp ();
36 use POSIX ();
37
38 use AnyEvent ();
39 use AnyEvent::Util ();
40
41 use AnyEvent::MP;
42 use AnyEvent::MP::Kernel ();
43
44 our $ID = "a";
45 our %STATE;
46
47 # another node tells us to await a connection
48 sub _expect {
49 my ($id, $port, $timeout, $initfunc, @initdata) = @_;
50
51 $STATE{$id} = {
52 id => $id,
53 to => (AE::timer $timeout, 0, sub {
54 $STATE{$id}{done}(undef);
55 }),
56 done => sub {
57 my ($hdl, $error) = @_;
58
59 %{delete $STATE{$id}} = ();
60
61 if (defined $hdl) {
62 (AnyEvent::MP::Kernel::load_func $initfunc)->(@initdata, $hdl);
63 } else {
64 kil $port, AnyEvent::MP::DataConn:: => $error;
65 }
66 },
67 };
68 }
69
70 # AEMP::Transport call for dataconn-connections
71 sub _inject {
72 my ($conn, $error) = @_;
73
74 my $hdl = defined $error ? undef : delete $conn->{hdl};
75 my $id = $conn->{local_greeting}{dataconn_id} || $conn->{remote_greeting}{dataconn_id}
76 or return;
77
78 $conn->destroy;
79
80 ($STATE{$id} or return)->{done}($hdl, $error);
81 }
82
83 # actively connect to some other node
84 sub _connect {
85 my ($id, $node) = @_;
86
87 my $state = $STATE{$id}
88 or return;
89
90 my $addr = $AnyEvent::MP::Global::addr{$node};
91
92 @$addr
93 or return $state->{done}(undef, "$node: no listeners found");
94
95 # I love hardcoded constants !
96 $state->{next} = AE::timer 0, 2, sub {
97 my $endpoint = shift @$addr
98 or return delete $state->{next};
99
100 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
101 or return;
102
103 my $transport; $transport = AnyEvent::MP::Transport::mp_connect
104 $host, $port,
105 protocol => "aemp-dataconn",
106 local_greeting => { dataconn_id => $id },
107 sub { $transport->destroy }, #TODO: destroys handshaked connections too early
108 ;
109 };
110 }
111
112 =item AnyEvent::MP::DataConn::connect_to $node, $timeout, $initfunc, @initdata, $cb->($handle)
113
114 Creates a socket connection between the local node and the node C<$node>
115 (which can also be specified as a port). One of the nodes must have
116 listeners ("binds").
117
118 When the connection could be successfully created, the C<$initfunc>
119 will be called with the given C<@initdata> on the remote node (similar
120 to C<snd_to_func> or C<spawn>), and the C<AnyEvent::Handle> object
121 representing the remote connection end as additional argument.
122
123 Also, the callback given as last argument will be called with the
124 AnyEvent::Handle object for the local side.
125
126 The AnyEvent::Handle objects will be in a "quiescent" state - you could rip
127 out the file handle and forget about it, but it is recommended to use it,
128 as the security settings might have called for a TLS connection. If you
129 opt to use it, you at least have to set an C<on_error> callback.
130
131 In case of any error (timeout etc.), nothing will be called on
132 the remote side, and the local port will be C<kil>'ed with an C<<
133 AnyEvent::MP::DataConn => "error message" >> kill reason.
134
135 The timeout should be large enough to cover at least four network
136 round-trips and one message round-trip.
137
138 Example: on node1, establish a connection to node2 and send a line of text,
139 on node2, provide a receiver function.
140
141 # node1, code executes in some port context
142 AnyEvent::MP::DataConn::connect_to "node2", 5, "pkg::receiver", 1, sub {
143 my ($hdl) = @_;
144 warn "connection established, sending line.\n"
145 $hdl->push_write ("blabla\n")
146 };
147
148 # node2
149 sub pkg::receiver {
150 my ($one, $hdl) = @_;
151 warn "connection established, wait for a line...\n"
152
153 $hdl->push_read (line => sub {
154 warn "received a line: $_[1]\n";
155 undef $hdl;
156 });
157 }
158
159 =cut
160
161 sub connect_to($$$$@) {
162 my $cb = pop;
163 my ($node, $timeout, $initfunc, @initdata) = @_;
164
165 my $port = $SELF
166 or Carp::croak "AnyEvent::MP::DataConn::connect_to must be called in port context";
167
168 $node = node_of $node;
169
170 my $id = (++$ID) . "\@$NODE";
171
172 # damn, why do my simple state hashes resemble objects so quickly
173 my $state = $STATE{$id} = {
174 id => (++$ID) . "\@$NODE",
175 to => (AE::timer $timeout, 0, sub {
176 $STATE{$id}{done}(undef, "$node: unable to establish connection within $timeout seconds");
177 }),
178 done => sub {
179 my ($hdl, $error) = @_;
180
181 delete $AnyEvent::MP::Global::ON_SETUP{$id};
182 %{delete $STATE{$id}} = ();
183
184 if (defined $hdl) {
185 $cb->($hdl);
186 } else {
187 kil $port, AnyEvent::MP::DataConn:: => $error;
188 }
189 },
190 };
191
192 if (AnyEvent::MP::Kernel::port_is_local $node) {
193 # teh sucks
194
195 require AnyEvent::Util;
196 my ($fh1, $fh2) = AnyEvent::Util::portable_socketpair ()
197 or return kil $port, AnyEvent::MP::DataConn:: => "cannot create local socketpair: $!";
198
199 use AnyEvent::Handle;
200 my $hdl1 = new AnyEvent::Handle fh => $fh1;
201 my $hdl2 = new AnyEvent::Handle fh => $fh2;
202
203 (AnyEvent::MP::Kernel::load_func $initfunc)->(@initdata, $hdl2);
204 $cb->($hdl1);
205
206 } else {
207 AnyEvent::MP::Kernel::snd_to_func $node,
208 AnyEvent::MP::DataConn::_expect:: => $id, $port, $timeout, $initfunc, @initdata;
209
210 $state->{wait} = sub {
211 if (my $addr = $AnyEvent::MP::Global::addr{$node}) {
212 delete $AnyEvent::MP::Global::ON_SETUP{$id};
213
214 # continue connect
215 if (@$addr) {
216 # node has listeners, so connect
217 _connect $id, $node;
218 } else {
219 # no listeners, ask it to connect to us
220 AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_connect:: => $id, $NODE;
221 }
222 } else {
223 # wait for the next global setup handshake
224 # due to the round-trip at the beginning, this should never be necessary
225 $AnyEvent::MP::Global::ON_SETUP{$id} = $state->{wait};
226 };
227 };
228
229 # we actually have to make sure that the connection arrives after the expect message, and
230 # the easiest way to do this is to use an rpc call.
231 AnyEvent::MP::Kernel::snd_on $node, port { $state->{wait}() };
232 }
233 }
234
235 =back
236
237 =head1 SEE ALSO
238
239 L<AnyEvent::MP>.
240
241 =head1 AUTHOR
242
243 Marc Lehmann <schmorp@schmorp.de>
244 http://home.schmorp.de/
245
246 =cut
247
248 1
249