ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/DataConn.pm
Revision: 1.2
Committed: Sat Nov 7 02:36:31 2009 UTC (14 years, 7 months ago) by root
Branch: MAIN
Changes since 1.1: +111 -24 lines
Log Message:
bleh

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::DataConn - create socket connections between nodes
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::DataConn;
8
9 =head1 DESCRIPTION
10
11 This module can be used to create socket connections between the local and
12 a remote node in the aemp network. The socket can be used freely for any
13 purpose, and in most cases, this mechanism is a good way to transport big
14 chunks of binary data.
15
16 The connections created by this module use the same security mechanisms
17 as normal AEMP connections (secure authentication, optional use of TLS),
18 and in fact, use the same listening port as AEMP connections, so when two
19 nodes can reach each other, they can, with high probability, creare a data
20 connection between them.
21
22 The protocol is, however, not the AEMP transport protocol, so this will
23 only work between nodes implementing the "aemp-dataconn" protocol.
24
25 =head1 FUNCTIONS
26
27 =over 4
28
29 =cut
30
31 package AnyEvent::MP::DataConn;
32
33 use common::sense;
34 use Carp ();
35 use POSIX ();
36
37 use AnyEvent ();
38 use AnyEvent::Util ();
39
40 use AnyEvent::MP;
41 use AnyEvent::MP::Kernel ();
42 use AnyEvent::MP::Global ();
43
44 our $ID = "a";
45 our %STATE;
46
47 sub _accept {
48 my ($id, $timeout, $initspec) = @_;
49
50 my $cleanup = sub {
51 %{delete $STATE{$id}} = ();
52 };
53
54 $STATE{$id} = {
55 id => $id,
56 to => (AE::timer $timeout, 0, $cleanup),
57 success => sub {
58 my ($func, @args) = @$initspec;
59 $cleanup->();
60 (AnyEvent::MP::Kernel::load_func $func)->(@args, @_);
61 },
62 fail => sub {
63 $cleanup->();
64 # nop?
65 },
66 };
67 }
68
69 sub _inject {
70 my ($conn, $error) = @_;
71
72 my $hdl = delete $conn->{hdl};
73 die "inject <@_>\n";#d#
74 }
75
76 sub _connect {
77 my ($id) = @_;
78
79 my $state = $STATE{$id}
80 or return;
81
82 my $addr = $AnyEvent::MP::Global::addr{$state->{node}};
83
84 @$addr
85 or return $state->{fail}("$state->{node}: no listeners found");
86
87 my %transport;
88
89 # I love hardcoded constants
90 $state->{next} = AE::timer 0, 2, sub {
91 my $endpoint = shift @$addr
92 or return delete $state->{next};
93
94 my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
95 or return;
96
97 my $transport; $transport = AnyEvent::MP::Transport::mp_connect
98 $host, $port,
99 protocol => "aemp-dataconn",
100 sub { $transport->destroy },
101 ;
102 };
103 }
104
105 =item AnyEvent::MP::DataConn::connect_to $node, $timeout, [$initfunc, @initdata], $cb->($handle)
106
107 Creates a socket connection between the local node and the node C<$node>
108 (which can also be specified as a port).
109
110 When the connection could be successfully created, the C<$initfunc>
111 will be called with the given C<@initdata> on the remote node (similar
112 to C<snd_to_func> or C<spawn>), and the C<AnyEvent::Handle> object
113 representing the remote connection end as additional argument.
114
115 Also, the callback given as last argument will be called with the
116 AnyEvent::Handle object for the local side.
117
118 The AnyEvent::Handle objects will be in a "quiescent" state - you could rip
119 out the file handle and forget about it, but it is recommended to use it,
120 as the security settings might have called for a TLS connection. If you
121 opt to use it, you at least have to set an C<on_error> callback.
122
123 In case of any error (timeout etc.), nothing will be called on
124 the remote side, and the local port will be C<kil>'ed with an C<<
125 AnyEvent::MP::DataConn => "error message" >> kill reason.
126
127 =cut
128
129 sub connect_to($$$;@) {
130 my ($node, $timeout, $initspec, @localmsg) = @_;
131 my $port = $SELF;
132 $node = node_of $node;
133
134 my $id = (++$ID) . "\@$NODE";
135
136 my $cleanup = sub {
137 delete $AnyEvent::MP::Global::ON_SETUP{$id};
138 %{delete $STATE{$id}} = ();
139 };
140
141 # damn, why do my simple state hashes resemble objects so quickly
142 my $state = $STATE{$id} = {
143 id => (++$ID) . "\@$NODE",
144 node => $node,
145 port => $port,
146 to => (AE::timer $timeout, 0, sub {
147 $cleanup->();
148 kil $port, AnyEvent::MP::DataConn:: => "$node: unable to establish connection within $timeout seconds";
149 }),
150 success => sub {
151 $cleanup->();
152 snd @localmsg, @_;
153 },
154 fail => sub {
155 $cleanup->();
156 kil $port, AnyEvent::MP::DataConn:: => $_[0];
157 },
158 };
159
160 if (AnyEvent::MP::Kernel::port_is_local $node) {
161 return kil $port, AnyEvent::MP::DataConn:: =>
162 "connect_to does not yet support local/local connections, please bug me about it";
163
164 } else {
165 AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_accept:: => $id, $timeout, $initspec;
166
167 $state->{wait} = sub {
168 if (my $addr = $AnyEvent::MP::Global::addr{$node}) {
169 delete $AnyEvent::MP::Global::ON_SETUP{$id};
170
171 warn "<@_ @$addr $addr>\n";#d#
172 # continue connect
173 if (@$addr) {
174 # node has listeners, so connect
175 _connect $id;
176 } else {
177 # no listeners, ask it to connect to us
178 AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_connect:: => $id;
179 }
180 } else {
181 # wait for the next global setup handshake
182 $AnyEvent::MP::Global::ON_SETUP{$id} = $state->{wait};
183 };
184 };
185
186 $state->{wait}->();
187 }
188 }
189
190 =back
191
192 =head1 SEE ALSO
193
194 L<AnyEvent::MP>.
195
196 =head1 AUTHOR
197
198 Marc Lehmann <schmorp@schmorp.de>
199 http://home.schmorp.de/
200
201 =cut
202
203 1
204