ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/DataConn.pm
Revision: 1.4
Committed: Sun Nov 8 23:49:40 2009 UTC (14 years, 6 months ago) by root
Branch: MAIN
Changes since 1.3: +41 -13 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::DataConn - create socket connections between nodes
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::DataConn;
8
9 =head1 DESCRIPTION
10
11 This module can be used to create socket connections between the local and
12 a remote node in the aemp network. The socket can be used freely for any
13 purpose, and in most cases, this mechanism is a good way to transport big
14 chunks of binary data.
15
16 The connections created by this module use the same security mechanisms
17 as normal AEMP connections (secure authentication, optional use of TLS),
18 and in fact, use the same listening port as AEMP connections, so when two
19 nodes can reach each other, they can, with high probability, creare a data
20 connection between them.
21
22 The protocol is, however, not the AEMP transport protocol, so this will
23 only work between nodes implementing the "aemp-dataconn" protocol.
24
25 =head1 FUNCTIONS
26
27 =over 4
28
29 =cut
30
31 package AnyEvent::MP::DataConn;
32
33 use common::sense;
34 use Carp ();
35 use POSIX ();
36
37 use AnyEvent ();
38 use AnyEvent::Util ();
39
40 use AnyEvent::MP;
41 use AnyEvent::MP::Kernel ();
42 use AnyEvent::MP::Global ();
43
44 our $ID = "a";
45 our %STATE;
46
47 # another node tells us to await a connection
48 sub _expect {
49 my ($id, $port, $timeout, $initspec) = @_;
50
51 $STATE{$id} = {
52 id => $id,
53 to => (AE::timer $timeout, 0, sub {
54 $STATE{$id}{done}(undef);
55 }),
56 done => sub {
57 my ($hdl, $error) = @_;
58
59 %{delete $STATE{$id}} = ();
60
61 if (defined $hdl) {
62 my ($func, @args) = @$initspec;
63 (AnyEvent::MP::Kernel::load_func $func)->(@args, $hdl);
64 } else {
65 kil $port, AnyEvent::MP::DataConn:: => $error;
66 }
67 },
68 };
69 }
70
71 # AEMP::Transport call for dataconn-connections
72 sub _inject {
73 my ($conn, $error) = @_;
74
75 my $hdl = defined $error ? undef : delete $conn->{hdl};
76 my $id = $conn->{local_greeting}{dataconn_id} || $conn->{remote_greeting}{dataconn_id}
77 or return;
78
79 $conn->destroy;
80
81 ($STATE{$id} or return)->{done}($hdl, $error);
82 }
83
84 # actively connect to some other node
85 sub _connect {
86 my ($id, $node) = @_;
87
88 my $state = $STATE{$id}
89 or return;
90
91 my $addr = $AnyEvent::MP::Global::addr{$node};
92
93 @$addr
94 or return $state->{done}(undef, "$node: no listeners found");
95
96 # I love hardcoded constants !
97 $state->{next} = AE::timer 0, 2, sub {
98 my $endpoint = shift @$addr
99 or return delete $state->{next};
100
101 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
102 or return;
103
104 my $transport; $transport = AnyEvent::MP::Transport::mp_connect
105 $host, $port,
106 protocol => "aemp-dataconn",
107 local_greeting => { dataconn_id => $id },
108 sub { $transport->destroy }, #TODO: destroys handshaked conenctions too early
109 ;
110 };
111 }
112
113 =item AnyEvent::MP::DataConn::connect_to $node, $timeout, [$initfunc, @initdata], $cb->($handle)
114
115 Creates a socket connection between the local node and the node C<$node>
116 (which can also be specified as a port). One of the nodes must have
117 listening ports ("binds").
118
119 When the connection could be successfully created, the C<$initfunc>
120 will be called with the given C<@initdata> on the remote node (similar
121 to C<snd_to_func> or C<spawn>), and the C<AnyEvent::Handle> object
122 representing the remote connection end as additional argument.
123
124 Also, the callback given as last argument will be called with the
125 AnyEvent::Handle object for the local side.
126
127 The AnyEvent::Handle objects will be in a "quiescent" state - you could rip
128 out the file handle and forget about it, but it is recommended to use it,
129 as the security settings might have called for a TLS connection. If you
130 opt to use it, you at least have to set an C<on_error> callback.
131
132 In case of any error (timeout etc.), nothing will be called on
133 the remote side, and the local port will be C<kil>'ed with an C<<
134 AnyEvent::MP::DataConn => "error message" >> kill reason.
135
136 The timeout should be large enough to cover at least four network
137 round-trips and one message round-trip.
138
139 Example: on node1, establish a connection to node2 and send a line of text,
140 one node2, provide a receiver function.
141
142 # node1, code executes in some port context
143 AnyEvent::MP::DataConn::connect_to "node2", 5, ["pkg::receiver", 1], sub {
144 my ($hdl) = @_;
145 warn "connection established, sending line.\n"
146 $hdl->push_write ("blabla\n")
147 };
148
149 # node2
150 sub pkg::receiver {
151 my ($one, $hdl) = @_;
152 warn "connection established, wait for a line...\n"
153
154 $hdl->push_read (line => sub {
155 warn "received a line: $_[1]\n";
156 undef $hdl;
157 });
158 }
159
160 =cut
161
162 sub connect_to($$$;@) {
163 my ($node, $timeout, $initspec, $cb) = @_;
164 my $port = $SELF;
165 $node = node_of $node;
166
167 my $id = (++$ID) . "\@$NODE";
168
169 # damn, why do my simple state hashes resemble objects so quickly
170 my $state = $STATE{$id} = {
171 id => (++$ID) . "\@$NODE",
172 to => (AE::timer $timeout, 0, sub {
173 $STATE{$id}{done}(undef, "$node: unable to establish connection within $timeout seconds");
174 }),
175 done => sub {
176 my ($hdl, $error) = @_;
177
178 delete $AnyEvent::MP::Global::ON_SETUP{$id};
179 %{delete $STATE{$id}} = ();
180
181 if (defined $hdl) {
182 $cb->($hdl);
183 } else {
184 kil $port, AnyEvent::MP::DataConn:: => $error;
185 }
186 },
187 };
188
189 if (AnyEvent::MP::Kernel::port_is_local $node) {
190 return kil $port, AnyEvent::MP::DataConn:: =>
191 "connect_to does not yet support local/local connections, please bug me about it";
192
193 } else {
194 AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_expect:: => $id, $port, $timeout, $initspec;
195
196 $state->{wait} = sub {
197 if (my $addr = $AnyEvent::MP::Global::addr{$node}) {
198 delete $AnyEvent::MP::Global::ON_SETUP{$id};
199
200 # continue connect
201 if (@$addr) {
202 # node has listeners, so connect
203 _connect $id, $node;
204 } else {
205 # no listeners, ask it to connect to us
206 AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_connect:: => $id, $NODE;
207 }
208 } else {
209 # wait for the next global setup handshake
210 # due to the round-trip at the beginning, this should never be necessary
211 $AnyEvent::MP::Global::ON_SETUP{$id} = $state->{wait};
212 };
213 };
214
215 # we actually have to make sure that the connection arrives after the expect message, and
216 # the easiest way to do this is to use an rpc call.
217 AnyEvent::MP::Kernel::snd_on $node, port { $state->{wait}() };
218 }
219 }
220
221 =back
222
223 =head1 SEE ALSO
224
225 L<AnyEvent::MP>.
226
227 =head1 AUTHOR
228
229 Marc Lehmann <schmorp@schmorp.de>
230 http://home.schmorp.de/
231
232 =cut
233
234 1
235