ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/DataConn.pm
Revision: 1.5
Committed: Sun Nov 8 23:53:08 2009 UTC (14 years, 6 months ago) by root
Branch: MAIN
Changes since 1.4: +9 -8 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::DataConn - create socket connections between nodes
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::DataConn;
8
9 =head1 DESCRIPTION
10
11 This module can be used to create socket connections between the local and
12 a remote node in the aemp network. The socket can be used freely for any
13 purpose, and in most cases, this mechanism is a good way to transport big
14 chunks of binary data.
15
16 The connections created by this module use the same security mechanisms
17 as normal AEMP connections (secure authentication, optional use of TLS),
18 and in fact, use the same listening port as AEMP connections, so when two
19 nodes can reach each other, they can, with high probability, creare a data
20 connection between them.
21
22 The protocol is, however, not the AEMP transport protocol, so this will
23 only work between nodes implementing the "aemp-dataconn" protocol.
24
25 =head1 FUNCTIONS
26
27 =over 4
28
29 =cut
30
31 package AnyEvent::MP::DataConn;
32
33 use common::sense;
34 use Carp ();
35 use POSIX ();
36
37 use AnyEvent ();
38 use AnyEvent::Util ();
39
40 use AnyEvent::MP;
41 use AnyEvent::MP::Kernel ();
42 use AnyEvent::MP::Global ();
43
44 our $ID = "a";
45 our %STATE;
46
47 # another node tells us to await a connection
48 sub _expect {
49 my ($id, $port, $timeout, $initfunc, @initdata) = @_;
50
51 $STATE{$id} = {
52 id => $id,
53 to => (AE::timer $timeout, 0, sub {
54 $STATE{$id}{done}(undef);
55 }),
56 done => sub {
57 my ($hdl, $error) = @_;
58
59 %{delete $STATE{$id}} = ();
60
61 if (defined $hdl) {
62 (AnyEvent::MP::Kernel::load_func $initfunc)->(@initdata, $hdl);
63 } else {
64 kil $port, AnyEvent::MP::DataConn:: => $error;
65 }
66 },
67 };
68 }
69
70 # AEMP::Transport call for dataconn-connections
71 sub _inject {
72 my ($conn, $error) = @_;
73
74 my $hdl = defined $error ? undef : delete $conn->{hdl};
75 my $id = $conn->{local_greeting}{dataconn_id} || $conn->{remote_greeting}{dataconn_id}
76 or return;
77
78 $conn->destroy;
79
80 ($STATE{$id} or return)->{done}($hdl, $error);
81 }
82
83 # actively connect to some other node
84 sub _connect {
85 my ($id, $node) = @_;
86
87 my $state = $STATE{$id}
88 or return;
89
90 my $addr = $AnyEvent::MP::Global::addr{$node};
91
92 @$addr
93 or return $state->{done}(undef, "$node: no listeners found");
94
95 # I love hardcoded constants !
96 $state->{next} = AE::timer 0, 2, sub {
97 my $endpoint = shift @$addr
98 or return delete $state->{next};
99
100 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
101 or return;
102
103 my $transport; $transport = AnyEvent::MP::Transport::mp_connect
104 $host, $port,
105 protocol => "aemp-dataconn",
106 local_greeting => { dataconn_id => $id },
107 sub { $transport->destroy }, #TODO: destroys handshaked conenctions too early
108 ;
109 };
110 }
111
112 =item AnyEvent::MP::DataConn::connect_to $node, $timeout, $initfunc, @initdata, $cb->($handle)
113
114 Creates a socket connection between the local node and the node C<$node>
115 (which can also be specified as a port). One of the nodes must have
116 listening ports ("binds").
117
118 When the connection could be successfully created, the C<$initfunc>
119 will be called with the given C<@initdata> on the remote node (similar
120 to C<snd_to_func> or C<spawn>), and the C<AnyEvent::Handle> object
121 representing the remote connection end as additional argument.
122
123 Also, the callback given as last argument will be called with the
124 AnyEvent::Handle object for the local side.
125
126 The AnyEvent::Handle objects will be in a "quiescent" state - you could rip
127 out the file handle and forget about it, but it is recommended to use it,
128 as the security settings might have called for a TLS connection. If you
129 opt to use it, you at least have to set an C<on_error> callback.
130
131 In case of any error (timeout etc.), nothing will be called on
132 the remote side, and the local port will be C<kil>'ed with an C<<
133 AnyEvent::MP::DataConn => "error message" >> kill reason.
134
135 The timeout should be large enough to cover at least four network
136 round-trips and one message round-trip.
137
138 Example: on node1, establish a connection to node2 and send a line of text,
139 one node2, provide a receiver function.
140
141 # node1, code executes in some port context
142 AnyEvent::MP::DataConn::connect_to "node2", 5, "pkg::receiver", 1, sub {
143 my ($hdl) = @_;
144 warn "connection established, sending line.\n"
145 $hdl->push_write ("blabla\n")
146 };
147
148 # node2
149 sub pkg::receiver {
150 my ($one, $hdl) = @_;
151 warn "connection established, wait for a line...\n"
152
153 $hdl->push_read (line => sub {
154 warn "received a line: $_[1]\n";
155 undef $hdl;
156 });
157 }
158
159 =cut
160
161 sub connect_to($$$$@) {
162 my $cb = pop;
163 my ($node, $timeout, $initfunc, @initdata) = @_;
164 my $port = $SELF;
165 $node = node_of $node;
166
167 my $id = (++$ID) . "\@$NODE";
168
169 # damn, why do my simple state hashes resemble objects so quickly
170 my $state = $STATE{$id} = {
171 id => (++$ID) . "\@$NODE",
172 to => (AE::timer $timeout, 0, sub {
173 $STATE{$id}{done}(undef, "$node: unable to establish connection within $timeout seconds");
174 }),
175 done => sub {
176 my ($hdl, $error) = @_;
177
178 delete $AnyEvent::MP::Global::ON_SETUP{$id};
179 %{delete $STATE{$id}} = ();
180
181 if (defined $hdl) {
182 $cb->($hdl);
183 } else {
184 kil $port, AnyEvent::MP::DataConn:: => $error;
185 }
186 },
187 };
188
189 if (AnyEvent::MP::Kernel::port_is_local $node) {
190 return kil $port, AnyEvent::MP::DataConn:: =>
191 "connect_to does not yet support local/local connections, please bug me about it";
192
193 } else {
194 AnyEvent::MP::Kernel::snd_to_func $node,
195 AnyEvent::MP::DataConn::_expect:: => $id, $port, $timeout, $initfunc, @initdata;
196
197 $state->{wait} = sub {
198 if (my $addr = $AnyEvent::MP::Global::addr{$node}) {
199 delete $AnyEvent::MP::Global::ON_SETUP{$id};
200
201 # continue connect
202 if (@$addr) {
203 # node has listeners, so connect
204 _connect $id, $node;
205 } else {
206 # no listeners, ask it to connect to us
207 AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_connect:: => $id, $NODE;
208 }
209 } else {
210 # wait for the next global setup handshake
211 # due to the round-trip at the beginning, this should never be necessary
212 $AnyEvent::MP::Global::ON_SETUP{$id} = $state->{wait};
213 };
214 };
215
216 # we actually have to make sure that the connection arrives after the expect message, and
217 # the easiest way to do this is to use an rpc call.
218 AnyEvent::MP::Kernel::snd_on $node, port { $state->{wait}() };
219 }
220 }
221
222 =back
223
224 =head1 SEE ALSO
225
226 L<AnyEvent::MP>.
227
228 =head1 AUTHOR
229
230 Marc Lehmann <schmorp@schmorp.de>
231 http://home.schmorp.de/
232
233 =cut
234
235 1
236