ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/DataConn.pm
Revision: 1.9
Committed: Fri Mar 23 00:38:14 2012 UTC (12 years, 2 months ago) by root
Branch: MAIN
Changes since 1.8: +2 -3 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::DataConn - create socket connections between nodes
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::DataConn;
8    
9     =head1 DESCRIPTION
10    
11     This module can be used to create socket connections between the local and
12     a remote node in the aemp network. The socket can be used freely for any
13     purpose, and in most cases, this mechanism is a good way to transport big
14     chunks of binary data.
15    
16     The connections created by this module use the same security mechanisms
17     as normal AEMP connections (secure authentication, optional use of TLS),
18     and in fact, use the same listening port as AEMP connections, so when two
19 root 1.7 nodes can reach each other via the normal aemp protocol, they can create
20     data connections as well, no extra ports or firewall rules are required.
21 root 1.1
22 root 1.7 The protocol used is, however, not the AEMP transport protocol, so this
23 root 1.8 will only work between nodes implementing the "aemp-dataconn" protocol
24     extension.
25 root 1.1
26     =head1 FUNCTIONS
27    
28     =over 4
29    
30     =cut
31    
32     package AnyEvent::MP::DataConn;
33    
34     use common::sense;
35     use Carp ();
36     use POSIX ();
37    
38     use AnyEvent ();
39     use AnyEvent::Util ();
40    
41     use AnyEvent::MP;
42 root 1.2 use AnyEvent::MP::Kernel ();
43    
44     our $ID = "a";
45     our %STATE;
46    
47 root 1.3 # another node tells us to await a connection
48     sub _expect {
49 root 1.5 my ($id, $port, $timeout, $initfunc, @initdata) = @_;
50 root 1.2
51 root 1.3 $STATE{$id} = {
52     id => $id,
53     to => (AE::timer $timeout, 0, sub {
54 root 1.4 $STATE{$id}{done}(undef);
55 root 1.3 }),
56     done => sub {
57     my ($hdl, $error) = @_;
58    
59     %{delete $STATE{$id}} = ();
60 root 1.2
61 root 1.3 if (defined $hdl) {
62 root 1.5 (AnyEvent::MP::Kernel::load_func $initfunc)->(@initdata, $hdl);
63 root 1.4 } else {
64     kil $port, AnyEvent::MP::DataConn:: => $error;
65 root 1.3 }
66 root 1.2 },
67     };
68     }
69 root 1.1
70 root 1.3 # AEMP::Transport call for dataconn-connections
71 root 1.1 sub _inject {
72     my ($conn, $error) = @_;
73 root 1.2
74 root 1.3 my $hdl = defined $error ? undef : delete $conn->{hdl};
75     my $id = $conn->{local_greeting}{dataconn_id} || $conn->{remote_greeting}{dataconn_id}
76     or return;
77    
78     $conn->destroy;
79    
80 root 1.4 ($STATE{$id} or return)->{done}($hdl, $error);
81 root 1.1 }
82    
83 root 1.3 # actively connect to some other node
84 root 1.2 sub _connect {
85 root 1.3 my ($id, $node) = @_;
86 root 1.2
87     my $state = $STATE{$id}
88     or return;
89    
90 root 1.3 my $addr = $AnyEvent::MP::Global::addr{$node};
91 root 1.2
92     @$addr
93 root 1.4 or return $state->{done}(undef, "$node: no listeners found");
94 root 1.2
95 root 1.3 # I love hardcoded constants !
96 root 1.2 $state->{next} = AE::timer 0, 2, sub {
97     my $endpoint = shift @$addr
98     or return delete $state->{next};
99    
100     my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
101     or return;
102    
103     my $transport; $transport = AnyEvent::MP::Transport::mp_connect
104     $host, $port,
105     protocol => "aemp-dataconn",
106 root 1.3 local_greeting => { dataconn_id => $id },
107 root 1.9 sub { $transport->destroy }, #TODO: destroys handshaked connections too early
108 root 1.2 ;
109     };
110     }
111    
112 root 1.5 =item AnyEvent::MP::DataConn::connect_to $node, $timeout, $initfunc, @initdata, $cb->($handle)
113 root 1.1
114     Creates a socket connection between the local node and the node C<$node>
115 root 1.4 (which can also be specified as a port). One of the nodes must have
116 root 1.9 listeners ("binds").
117 root 1.1
118     When the connection could be successfully created, the C<$initfunc>
119     will be called with the given C<@initdata> on the remote node (similar
120     to C<snd_to_func> or C<spawn>), and the C<AnyEvent::Handle> object
121 root 1.2 representing the remote connection end as additional argument.
122 root 1.1
123 root 1.2 Also, the callback given as last argument will be called with the
124     AnyEvent::Handle object for the local side.
125 root 1.1
126 root 1.2 The AnyEvent::Handle objects will be in a "quiescent" state - you could rip
127 root 1.1 out the file handle and forget about it, but it is recommended to use it,
128 root 1.2 as the security settings might have called for a TLS connection. If you
129 root 1.1 opt to use it, you at least have to set an C<on_error> callback.
130    
131 root 1.2 In case of any error (timeout etc.), nothing will be called on
132     the remote side, and the local port will be C<kil>'ed with an C<<
133 root 1.1 AnyEvent::MP::DataConn => "error message" >> kill reason.
134    
135 root 1.4 The timeout should be large enough to cover at least four network
136     round-trips and one message round-trip.
137    
138     Example: on node1, establish a connection to node2 and send a line of text,
139     one node2, provide a receiver function.
140    
141     # node1, code executes in some port context
142 root 1.5 AnyEvent::MP::DataConn::connect_to "node2", 5, "pkg::receiver", 1, sub {
143 root 1.4 my ($hdl) = @_;
144     warn "connection established, sending line.\n"
145     $hdl->push_write ("blabla\n")
146     };
147    
148     # node2
149     sub pkg::receiver {
150     my ($one, $hdl) = @_;
151     warn "connection established, wait for a line...\n"
152    
153     $hdl->push_read (line => sub {
154     warn "received a line: $_[1]\n";
155     undef $hdl;
156     });
157     }
158    
159 root 1.1 =cut
160    
161 root 1.5 sub connect_to($$$$@) {
162     my $cb = pop;
163     my ($node, $timeout, $initfunc, @initdata) = @_;
164 root 1.6
165     my $port = $SELF
166     or Carp::croak "AnyEvent::MP::DataConn::connect_to must be called in port context";
167    
168 root 1.1 $node = node_of $node;
169    
170 root 1.2 my $id = (++$ID) . "\@$NODE";
171 root 1.1
172 root 1.2 # damn, why do my simple state hashes resemble objects so quickly
173     my $state = $STATE{$id} = {
174 root 1.3 id => (++$ID) . "\@$NODE",
175     to => (AE::timer $timeout, 0, sub {
176 root 1.4 $STATE{$id}{done}(undef, "$node: unable to establish connection within $timeout seconds");
177 root 1.2 }),
178 root 1.3 done => sub {
179     my ($hdl, $error) = @_;
180    
181     delete $AnyEvent::MP::Global::ON_SETUP{$id};
182     %{delete $STATE{$id}} = ();
183    
184     if (defined $hdl) {
185 root 1.4 $cb->($hdl);
186 root 1.3 } else {
187     kil $port, AnyEvent::MP::DataConn:: => $error;
188     }
189 root 1.2 },
190 root 1.1 };
191    
192 root 1.2 if (AnyEvent::MP::Kernel::port_is_local $node) {
193 root 1.6 # teh sucks
194    
195     require AnyEvent::Util;
196     my ($fh1, $fh2) = AnyEvent::Util::portable_socketpair ()
197     or return kil $port, AnyEvent::MP::DataConn:: => "cannot create local socketpair: $!";
198    
199     use AnyEvent::Handle;
200     my $hdl1 = new AnyEvent::Handle fh => $fh1;
201     my $hdl2 = new AnyEvent::Handle fh => $fh2;
202    
203     (AnyEvent::MP::Kernel::load_func $initfunc)->(@initdata, $hdl2);
204     $cb->($hdl1);
205 root 1.2
206 root 1.1 } else {
207 root 1.5 AnyEvent::MP::Kernel::snd_to_func $node,
208     AnyEvent::MP::DataConn::_expect:: => $id, $port, $timeout, $initfunc, @initdata;
209 root 1.2
210     $state->{wait} = sub {
211     if (my $addr = $AnyEvent::MP::Global::addr{$node}) {
212     delete $AnyEvent::MP::Global::ON_SETUP{$id};
213    
214     # continue connect
215     if (@$addr) {
216     # node has listeners, so connect
217 root 1.3 _connect $id, $node;
218 root 1.2 } else {
219     # no listeners, ask it to connect to us
220 root 1.4 AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_connect:: => $id, $NODE;
221 root 1.2 }
222     } else {
223     # wait for the next global setup handshake
224 root 1.4 # due to the round-trip at the beginning, this should never be necessary
225 root 1.2 $AnyEvent::MP::Global::ON_SETUP{$id} = $state->{wait};
226     };
227 root 1.1 };
228 root 1.2
229 root 1.4 # we actually have to make sure that the connection arrives after the expect message, and
230     # the easiest way to do this is to use an rpc call.
231     AnyEvent::MP::Kernel::snd_on $node, port { $state->{wait}() };
232 root 1.1 }
233     }
234    
235     =back
236    
237     =head1 SEE ALSO
238    
239     L<AnyEvent::MP>.
240    
241     =head1 AUTHOR
242    
243     Marc Lehmann <schmorp@schmorp.de>
244     http://home.schmorp.de/
245    
246     =cut
247    
248     1
249