ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/DataConn.pm
Revision: 1.3
Committed: Sun Nov 8 01:15:25 2009 UTC (14 years, 6 months ago) by root
Branch: MAIN
Changes since 1.2: +49 -46 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::DataConn - create socket connections between nodes
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::DataConn;
8
9 =head1 DESCRIPTION
10
11 This module can be used to create socket connections between the local and
12 a remote node in the aemp network. The socket can be used freely for any
13 purpose, and in most cases, this mechanism is a good way to transport big
14 chunks of binary data.
15
16 The connections created by this module use the same security mechanisms
17 as normal AEMP connections (secure authentication, optional use of TLS),
18 and in fact, use the same listening port as AEMP connections, so when two
19 nodes can reach each other, they can, with high probability, creare a data
20 connection between them.
21
22 The protocol is, however, not the AEMP transport protocol, so this will
23 only work between nodes implementing the "aemp-dataconn" protocol.
24
25 =head1 FUNCTIONS
26
27 =over 4
28
29 =cut
30
31 package AnyEvent::MP::DataConn;
32
33 use common::sense;
34 use Carp ();
35 use POSIX ();
36
37 use AnyEvent ();
38 use AnyEvent::Util ();
39
40 use AnyEvent::MP;
41 use AnyEvent::MP::Kernel ();
42 use AnyEvent::MP::Global ();
43
44 our $ID = "a";
45 our %STATE;
46
47 # another node tells us to await a connection
48 sub _expect {
49 my ($id, $timeout, $initspec) = @_;
50
51 $STATE{$id} = {
52 id => $id,
53 to => (AE::timer $timeout, 0, sub {
54 $STATE{$id}{done}->(undef);
55 }),
56 done => sub {
57 my ($hdl, $error) = @_;
58
59 %{delete $STATE{$id}} = ();
60
61 if (defined $hdl) {
62 my ($func, @args) = @$initspec;
63 (AnyEvent::MP::Kernel::load_func $func)->(@args, $hdl);
64 }
65 },
66 };
67 }
68
69 # AEMP::Transport call for dataconn-connections
70 sub _inject {
71 my ($conn, $error) = @_;
72
73 my $hdl = defined $error ? undef : delete $conn->{hdl};
74 my $id = $conn->{local_greeting}{dataconn_id} || $conn->{remote_greeting}{dataconn_id}
75 or return;
76
77 $conn->destroy;
78
79 ($STATE{$id} or return)->{done}->($hdl, $error);
80 }
81
82 # actively connect to some other node
83 sub _connect {
84 my ($id, $node) = @_;
85
86 my $state = $STATE{$id}
87 or return;
88
89 my $addr = $AnyEvent::MP::Global::addr{$node};
90
91 @$addr
92 or return $state->{fail}("$node: no listeners found");
93
94 # I love hardcoded constants !
95 $state->{next} = AE::timer 0, 2, sub {
96 my $endpoint = shift @$addr
97 or return delete $state->{next};
98
99 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
100 or return;
101
102 my $transport; $transport = AnyEvent::MP::Transport::mp_connect
103 $host, $port,
104 protocol => "aemp-dataconn",
105 local_greeting => { dataconn_id => $id },
106 sub { $transport->destroy }, #TODO: destroys handshaked conenctions too early
107 ;
108 };
109 }
110
111 =item AnyEvent::MP::DataConn::connect_to $node, $timeout, [$initfunc, @initdata], $cb->($handle)
112
113 Creates a socket connection between the local node and the node C<$node>
114 (which can also be specified as a port).
115
116 When the connection could be successfully created, the C<$initfunc>
117 will be called with the given C<@initdata> on the remote node (similar
118 to C<snd_to_func> or C<spawn>), and the C<AnyEvent::Handle> object
119 representing the remote connection end as additional argument.
120
121 Also, the callback given as last argument will be called with the
122 AnyEvent::Handle object for the local side.
123
124 The AnyEvent::Handle objects will be in a "quiescent" state - you could rip
125 out the file handle and forget about it, but it is recommended to use it,
126 as the security settings might have called for a TLS connection. If you
127 opt to use it, you at least have to set an C<on_error> callback.
128
129 In case of any error (timeout etc.), nothing will be called on
130 the remote side, and the local port will be C<kil>'ed with an C<<
131 AnyEvent::MP::DataConn => "error message" >> kill reason.
132
133 =cut
134
135 sub connect_to($$$;@) {
136 my ($node, $timeout, $initspec, @localmsg) = @_;
137 my $port = $SELF;
138 $node = node_of $node;
139
140 my $id = (++$ID) . "\@$NODE";
141
142 # damn, why do my simple state hashes resemble objects so quickly
143 my $state = $STATE{$id} = {
144 id => (++$ID) . "\@$NODE",
145 to => (AE::timer $timeout, 0, sub {
146 $STATE{$id}{done}->(undef, "$node: unable to establish connection within $timeout seconds");
147 }),
148 done => sub {
149 my ($hdl, $error) = @_;
150
151 warn "done<@_>\n";#d#
152
153 delete $AnyEvent::MP::Global::ON_SETUP{$id};
154 %{delete $STATE{$id}} = ();
155
156 if (defined $hdl) {
157 snd @localmsg, $hdl;
158 } else {
159 kil $port, AnyEvent::MP::DataConn:: => $error;
160 }
161 },
162 };
163
164 if (AnyEvent::MP::Kernel::port_is_local $node) {
165 return kil $port, AnyEvent::MP::DataConn:: =>
166 "connect_to does not yet support local/local connections, please bug me about it";
167
168 } else {
169 AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_expect:: => $id, $timeout, $initspec;
170
171 $state->{wait} = sub {
172 if (my $addr = $AnyEvent::MP::Global::addr{$node}) {
173 delete $AnyEvent::MP::Global::ON_SETUP{$id};
174
175 # continue connect
176 if (@$addr) {
177 # node has listeners, so connect
178 _connect $id, $node;
179 } else {
180 # no listeners, ask it to connect to us
181 AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_connect:: => $id, $node;
182 }
183 } else {
184 # wait for the next global setup handshake
185 $AnyEvent::MP::Global::ON_SETUP{$id} = $state->{wait};
186 };
187 };
188
189 $state->{wait}->();
190 }
191 }
192
193 =back
194
195 =head1 SEE ALSO
196
197 L<AnyEvent::MP>.
198
199 =head1 AUTHOR
200
201 Marc Lehmann <schmorp@schmorp.de>
202 http://home.schmorp.de/
203
204 =cut
205
206 1
207