ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/DataConn.pm
Revision: 1.8
Committed: Tue Dec 1 11:59:48 2009 UTC (14 years, 6 months ago) by root
Branch: MAIN
CVS Tags: rel-1_28, rel-1_29, rel-1_24, rel-1_26, rel-1_27, rel-1_30
Changes since 1.7: +2 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::DataConn - create socket connections between nodes
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::DataConn;
8
9 =head1 DESCRIPTION
10
11 This module can be used to create socket connections between the local and
12 a remote node in the aemp network. The socket can be used freely for any
13 purpose, and in most cases, this mechanism is a good way to transport big
14 chunks of binary data.
15
16 The connections created by this module use the same security mechanisms
17 as normal AEMP connections (secure authentication, optional use of TLS),
18 and in fact, use the same listening port as AEMP connections, so when two
19 nodes can reach each other via the normal aemp protocol, they can create
20 data connections as well, no extra ports or firewall rules are required.
21
22 The protocol used is, however, not the AEMP transport protocol, so this
23 will only work between nodes implementing the "aemp-dataconn" protocol
24 extension.
25
26 =head1 FUNCTIONS
27
28 =over 4
29
30 =cut
31
32 package AnyEvent::MP::DataConn;
33
34 use common::sense;
35 use Carp ();
36 use POSIX ();
37
38 use AnyEvent ();
39 use AnyEvent::Util ();
40
41 use AnyEvent::MP;
42 use AnyEvent::MP::Kernel ();
43 use AnyEvent::MP::Global ();
44
45 our $ID = "a";
46 our %STATE;
47
48 # another node tells us to await a connection
49 sub _expect {
50 my ($id, $port, $timeout, $initfunc, @initdata) = @_;
51
52 $STATE{$id} = {
53 id => $id,
54 to => (AE::timer $timeout, 0, sub {
55 $STATE{$id}{done}(undef);
56 }),
57 done => sub {
58 my ($hdl, $error) = @_;
59
60 %{delete $STATE{$id}} = ();
61
62 if (defined $hdl) {
63 (AnyEvent::MP::Kernel::load_func $initfunc)->(@initdata, $hdl);
64 } else {
65 kil $port, AnyEvent::MP::DataConn:: => $error;
66 }
67 },
68 };
69 }
70
71 # AEMP::Transport call for dataconn-connections
72 sub _inject {
73 my ($conn, $error) = @_;
74
75 my $hdl = defined $error ? undef : delete $conn->{hdl};
76 my $id = $conn->{local_greeting}{dataconn_id} || $conn->{remote_greeting}{dataconn_id}
77 or return;
78
79 $conn->destroy;
80
81 ($STATE{$id} or return)->{done}($hdl, $error);
82 }
83
84 # actively connect to some other node
85 sub _connect {
86 my ($id, $node) = @_;
87
88 my $state = $STATE{$id}
89 or return;
90
91 my $addr = $AnyEvent::MP::Global::addr{$node};
92
93 @$addr
94 or return $state->{done}(undef, "$node: no listeners found");
95
96 # I love hardcoded constants !
97 $state->{next} = AE::timer 0, 2, sub {
98 my $endpoint = shift @$addr
99 or return delete $state->{next};
100
101 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
102 or return;
103
104 my $transport; $transport = AnyEvent::MP::Transport::mp_connect
105 $host, $port,
106 protocol => "aemp-dataconn",
107 local_greeting => { dataconn_id => $id },
108 sub { $transport->destroy }, #TODO: destroys handshaked conenctions too early
109 ;
110 };
111 }
112
113 =item AnyEvent::MP::DataConn::connect_to $node, $timeout, $initfunc, @initdata, $cb->($handle)
114
115 Creates a socket connection between the local node and the node C<$node>
116 (which can also be specified as a port). One of the nodes must have
117 listening ports ("binds").
118
119 When the connection could be successfully created, the C<$initfunc>
120 will be called with the given C<@initdata> on the remote node (similar
121 to C<snd_to_func> or C<spawn>), and the C<AnyEvent::Handle> object
122 representing the remote connection end as additional argument.
123
124 Also, the callback given as last argument will be called with the
125 AnyEvent::Handle object for the local side.
126
127 The AnyEvent::Handle objects will be in a "quiescent" state - you could rip
128 out the file handle and forget about it, but it is recommended to use it,
129 as the security settings might have called for a TLS connection. If you
130 opt to use it, you at least have to set an C<on_error> callback.
131
132 In case of any error (timeout etc.), nothing will be called on
133 the remote side, and the local port will be C<kil>'ed with an C<<
134 AnyEvent::MP::DataConn => "error message" >> kill reason.
135
136 The timeout should be large enough to cover at least four network
137 round-trips and one message round-trip.
138
139 Example: on node1, establish a connection to node2 and send a line of text,
140 one node2, provide a receiver function.
141
142 # node1, code executes in some port context
143 AnyEvent::MP::DataConn::connect_to "node2", 5, "pkg::receiver", 1, sub {
144 my ($hdl) = @_;
145 warn "connection established, sending line.\n"
146 $hdl->push_write ("blabla\n")
147 };
148
149 # node2
150 sub pkg::receiver {
151 my ($one, $hdl) = @_;
152 warn "connection established, wait for a line...\n"
153
154 $hdl->push_read (line => sub {
155 warn "received a line: $_[1]\n";
156 undef $hdl;
157 });
158 }
159
160 =cut
161
162 sub connect_to($$$$@) {
163 my $cb = pop;
164 my ($node, $timeout, $initfunc, @initdata) = @_;
165
166 my $port = $SELF
167 or Carp::croak "AnyEvent::MP::DataConn::connect_to must be called in port context";
168
169 $node = node_of $node;
170
171 my $id = (++$ID) . "\@$NODE";
172
173 # damn, why do my simple state hashes resemble objects so quickly
174 my $state = $STATE{$id} = {
175 id => (++$ID) . "\@$NODE",
176 to => (AE::timer $timeout, 0, sub {
177 $STATE{$id}{done}(undef, "$node: unable to establish connection within $timeout seconds");
178 }),
179 done => sub {
180 my ($hdl, $error) = @_;
181
182 delete $AnyEvent::MP::Global::ON_SETUP{$id};
183 %{delete $STATE{$id}} = ();
184
185 if (defined $hdl) {
186 $cb->($hdl);
187 } else {
188 kil $port, AnyEvent::MP::DataConn:: => $error;
189 }
190 },
191 };
192
193 if (AnyEvent::MP::Kernel::port_is_local $node) {
194 # teh sucks
195
196 require AnyEvent::Util;
197 my ($fh1, $fh2) = AnyEvent::Util::portable_socketpair ()
198 or return kil $port, AnyEvent::MP::DataConn:: => "cannot create local socketpair: $!";
199
200 use AnyEvent::Handle;
201 my $hdl1 = new AnyEvent::Handle fh => $fh1;
202 my $hdl2 = new AnyEvent::Handle fh => $fh2;
203
204 (AnyEvent::MP::Kernel::load_func $initfunc)->(@initdata, $hdl2);
205 $cb->($hdl1);
206
207 } else {
208 AnyEvent::MP::Kernel::snd_to_func $node,
209 AnyEvent::MP::DataConn::_expect:: => $id, $port, $timeout, $initfunc, @initdata;
210
211 $state->{wait} = sub {
212 if (my $addr = $AnyEvent::MP::Global::addr{$node}) {
213 delete $AnyEvent::MP::Global::ON_SETUP{$id};
214
215 # continue connect
216 if (@$addr) {
217 # node has listeners, so connect
218 _connect $id, $node;
219 } else {
220 # no listeners, ask it to connect to us
221 AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_connect:: => $id, $NODE;
222 }
223 } else {
224 # wait for the next global setup handshake
225 # due to the round-trip at the beginning, this should never be necessary
226 $AnyEvent::MP::Global::ON_SETUP{$id} = $state->{wait};
227 };
228 };
229
230 # we actually have to make sure that the connection arrives after the expect message, and
231 # the easiest way to do this is to use an rpc call.
232 AnyEvent::MP::Kernel::snd_on $node, port { $state->{wait}() };
233 }
234 }
235
236 =back
237
238 =head1 SEE ALSO
239
240 L<AnyEvent::MP>.
241
242 =head1 AUTHOR
243
244 Marc Lehmann <schmorp@schmorp.de>
245 http://home.schmorp.de/
246
247 =cut
248
249 1
250