1 |
=head1 NAME |
2 |
|
3 |
AnyEvent::MP::DataConn - create socket connections between nodes |
4 |
|
5 |
=head1 SYNOPSIS |
6 |
|
7 |
use AnyEvent::MP::DataConn; |
8 |
|
9 |
=head1 DESCRIPTION |
10 |
|
11 |
This module can be used to create socket connections between the local and |
12 |
a remote node in the aemp network. The socket can be used freely for any |
13 |
purpose, and in most cases, this mechanism is a good way to transport big |
14 |
chunks of binary data. |
15 |
|
16 |
The connections created by this module use the same security mechanisms |
17 |
as normal AEMP connections (secure authentication, optional use of TLS), |
18 |
and in fact, use the same listening port as AEMP connections, so when two |
19 |
nodes can reach each other, they can, with high probability, creare a data |
20 |
connection between them. |
21 |
|
22 |
The protocol is, however, not the AEMP transport protocol, so this will |
23 |
only work between nodes implementing the "aemp-dataconn" protocol. |
24 |
|
25 |
=head1 FUNCTIONS |
26 |
|
27 |
=over 4 |
28 |
|
29 |
=cut |
30 |
|
31 |
package AnyEvent::MP::DataConn; |
32 |
|
33 |
use common::sense; |
34 |
use Carp (); |
35 |
use POSIX (); |
36 |
|
37 |
use AnyEvent (); |
38 |
use AnyEvent::Util (); |
39 |
|
40 |
use AnyEvent::MP; |
41 |
use AnyEvent::MP::Kernel (); |
42 |
use AnyEvent::MP::Global (); |
43 |
|
44 |
our $ID = "a"; |
45 |
our %STATE; |
46 |
|
47 |
sub _accept { |
48 |
my ($id, $timeout, $initspec) = @_; |
49 |
|
50 |
my $cleanup = sub { |
51 |
%{delete $STATE{$id}} = (); |
52 |
}; |
53 |
|
54 |
$STATE{$id} = { |
55 |
id => $id, |
56 |
to => (AE::timer $timeout, 0, $cleanup), |
57 |
success => sub { |
58 |
my ($func, @args) = @$initspec; |
59 |
$cleanup->(); |
60 |
(AnyEvent::MP::Kernel::load_func $func)->(@args, @_); |
61 |
}, |
62 |
fail => sub { |
63 |
$cleanup->(); |
64 |
# nop? |
65 |
}, |
66 |
}; |
67 |
} |
68 |
|
69 |
sub _inject { |
70 |
my ($conn, $error) = @_; |
71 |
|
72 |
my $hdl = delete $conn->{hdl}; |
73 |
die "inject <@_>\n";#d# |
74 |
} |
75 |
|
76 |
sub _connect { |
77 |
my ($id) = @_; |
78 |
|
79 |
my $state = $STATE{$id} |
80 |
or return; |
81 |
|
82 |
my $addr = $AnyEvent::MP::Global::addr{$state->{node}}; |
83 |
|
84 |
@$addr |
85 |
or return $state->{fail}("$state->{node}: no listeners found"); |
86 |
|
87 |
my %transport; |
88 |
|
89 |
# I love hardcoded constants |
90 |
$state->{next} = AE::timer 0, 2, sub { |
91 |
my $endpoint = shift @$addr |
92 |
or return delete $state->{next}; |
93 |
|
94 |
my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint |
95 |
or return; |
96 |
|
97 |
my $transport; $transport = AnyEvent::MP::Transport::mp_connect |
98 |
$host, $port, |
99 |
protocol => "aemp-dataconn", |
100 |
sub { $transport->destroy }, |
101 |
; |
102 |
}; |
103 |
} |
104 |
|
105 |
=item AnyEvent::MP::DataConn::connect_to $node, $timeout, [$initfunc, @initdata], $cb->($handle) |
106 |
|
107 |
Creates a socket connection between the local node and the node C<$node> |
108 |
(which can also be specified as a port). |
109 |
|
110 |
When the connection could be successfully created, the C<$initfunc> |
111 |
will be called with the given C<@initdata> on the remote node (similar |
112 |
to C<snd_to_func> or C<spawn>), and the C<AnyEvent::Handle> object |
113 |
representing the remote connection end as additional argument. |
114 |
|
115 |
Also, the callback given as last argument will be called with the |
116 |
AnyEvent::Handle object for the local side. |
117 |
|
118 |
The AnyEvent::Handle objects will be in a "quiescent" state - you could rip |
119 |
out the file handle and forget about it, but it is recommended to use it, |
120 |
as the security settings might have called for a TLS connection. If you |
121 |
opt to use it, you at least have to set an C<on_error> callback. |
122 |
|
123 |
In case of any error (timeout etc.), nothing will be called on |
124 |
the remote side, and the local port will be C<kil>'ed with an C<< |
125 |
AnyEvent::MP::DataConn => "error message" >> kill reason. |
126 |
|
127 |
=cut |
128 |
|
129 |
sub connect_to($$$;@) { |
130 |
my ($node, $timeout, $initspec, @localmsg) = @_; |
131 |
my $port = $SELF; |
132 |
$node = node_of $node; |
133 |
|
134 |
my $id = (++$ID) . "\@$NODE"; |
135 |
|
136 |
my $cleanup = sub { |
137 |
delete $AnyEvent::MP::Global::ON_SETUP{$id}; |
138 |
%{delete $STATE{$id}} = (); |
139 |
}; |
140 |
|
141 |
# damn, why do my simple state hashes resemble objects so quickly |
142 |
my $state = $STATE{$id} = { |
143 |
id => (++$ID) . "\@$NODE", |
144 |
node => $node, |
145 |
port => $port, |
146 |
to => (AE::timer $timeout, 0, sub { |
147 |
$cleanup->(); |
148 |
kil $port, AnyEvent::MP::DataConn:: => "$node: unable to establish connection within $timeout seconds"; |
149 |
}), |
150 |
success => sub { |
151 |
$cleanup->(); |
152 |
snd @localmsg, @_; |
153 |
}, |
154 |
fail => sub { |
155 |
$cleanup->(); |
156 |
kil $port, AnyEvent::MP::DataConn:: => $_[0]; |
157 |
}, |
158 |
}; |
159 |
|
160 |
if (AnyEvent::MP::Kernel::port_is_local $node) { |
161 |
return kil $port, AnyEvent::MP::DataConn:: => |
162 |
"connect_to does not yet support local/local connections, please bug me about it"; |
163 |
|
164 |
} else { |
165 |
AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_accept:: => $id, $timeout, $initspec; |
166 |
|
167 |
$state->{wait} = sub { |
168 |
if (my $addr = $AnyEvent::MP::Global::addr{$node}) { |
169 |
delete $AnyEvent::MP::Global::ON_SETUP{$id}; |
170 |
|
171 |
warn "<@_ @$addr $addr>\n";#d# |
172 |
# continue connect |
173 |
if (@$addr) { |
174 |
# node has listeners, so connect |
175 |
_connect $id; |
176 |
} else { |
177 |
# no listeners, ask it to connect to us |
178 |
AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_connect:: => $id; |
179 |
} |
180 |
} else { |
181 |
# wait for the next global setup handshake |
182 |
$AnyEvent::MP::Global::ON_SETUP{$id} = $state->{wait}; |
183 |
}; |
184 |
}; |
185 |
|
186 |
$state->{wait}->(); |
187 |
} |
188 |
} |
189 |
|
190 |
=back |
191 |
|
192 |
=head1 SEE ALSO |
193 |
|
194 |
L<AnyEvent::MP>. |
195 |
|
196 |
=head1 AUTHOR |
197 |
|
198 |
Marc Lehmann <schmorp@schmorp.de> |
199 |
http://home.schmorp.de/ |
200 |
|
201 |
=cut |
202 |
|
203 |
1 |
204 |
|