ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/DataConn.pm
Revision: 1.8
Committed: Tue Dec 1 11:59:48 2009 UTC (14 years, 6 months ago) by root
Branch: MAIN
CVS Tags: rel-1_28, rel-1_29, rel-1_24, rel-1_26, rel-1_27, rel-1_30
Changes since 1.7: +2 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::DataConn - create socket connections between nodes
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::DataConn;
8    
9     =head1 DESCRIPTION
10    
11     This module can be used to create socket connections between the local and
12     a remote node in the aemp network. The socket can be used freely for any
13     purpose, and in most cases, this mechanism is a good way to transport big
14     chunks of binary data.
15    
16     The connections created by this module use the same security mechanisms
17     as normal AEMP connections (secure authentication, optional use of TLS),
18     and in fact, use the same listening port as AEMP connections, so when two
19 root 1.7 nodes can reach each other via the normal aemp protocol, they can create
20     data connections as well, no extra ports or firewall rules are required.
21 root 1.1
22 root 1.7 The protocol used is, however, not the AEMP transport protocol, so this
23 root 1.8 will only work between nodes implementing the "aemp-dataconn" protocol
24     extension.
25 root 1.1
26     =head1 FUNCTIONS
27    
28     =over 4
29    
30     =cut
31    
32     package AnyEvent::MP::DataConn;
33    
34     use common::sense;
35     use Carp ();
36     use POSIX ();
37    
38     use AnyEvent ();
39     use AnyEvent::Util ();
40    
41     use AnyEvent::MP;
42 root 1.2 use AnyEvent::MP::Kernel ();
43     use AnyEvent::MP::Global ();
44    
45     our $ID = "a";
46     our %STATE;
47    
48 root 1.3 # another node tells us to await a connection
49     sub _expect {
50 root 1.5 my ($id, $port, $timeout, $initfunc, @initdata) = @_;
51 root 1.2
52 root 1.3 $STATE{$id} = {
53     id => $id,
54     to => (AE::timer $timeout, 0, sub {
55 root 1.4 $STATE{$id}{done}(undef);
56 root 1.3 }),
57     done => sub {
58     my ($hdl, $error) = @_;
59    
60     %{delete $STATE{$id}} = ();
61 root 1.2
62 root 1.3 if (defined $hdl) {
63 root 1.5 (AnyEvent::MP::Kernel::load_func $initfunc)->(@initdata, $hdl);
64 root 1.4 } else {
65     kil $port, AnyEvent::MP::DataConn:: => $error;
66 root 1.3 }
67 root 1.2 },
68     };
69     }
70 root 1.1
71 root 1.3 # AEMP::Transport call for dataconn-connections
72 root 1.1 sub _inject {
73     my ($conn, $error) = @_;
74 root 1.2
75 root 1.3 my $hdl = defined $error ? undef : delete $conn->{hdl};
76     my $id = $conn->{local_greeting}{dataconn_id} || $conn->{remote_greeting}{dataconn_id}
77     or return;
78    
79     $conn->destroy;
80    
81 root 1.4 ($STATE{$id} or return)->{done}($hdl, $error);
82 root 1.1 }
83    
84 root 1.3 # actively connect to some other node
85 root 1.2 sub _connect {
86 root 1.3 my ($id, $node) = @_;
87 root 1.2
88     my $state = $STATE{$id}
89     or return;
90    
91 root 1.3 my $addr = $AnyEvent::MP::Global::addr{$node};
92 root 1.2
93     @$addr
94 root 1.4 or return $state->{done}(undef, "$node: no listeners found");
95 root 1.2
96 root 1.3 # I love hardcoded constants !
97 root 1.2 $state->{next} = AE::timer 0, 2, sub {
98     my $endpoint = shift @$addr
99     or return delete $state->{next};
100    
101     my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
102     or return;
103    
104     my $transport; $transport = AnyEvent::MP::Transport::mp_connect
105     $host, $port,
106     protocol => "aemp-dataconn",
107 root 1.3 local_greeting => { dataconn_id => $id },
108     sub { $transport->destroy }, #TODO: destroys handshaked conenctions too early
109 root 1.2 ;
110     };
111     }
112    
113 root 1.5 =item AnyEvent::MP::DataConn::connect_to $node, $timeout, $initfunc, @initdata, $cb->($handle)
114 root 1.1
115     Creates a socket connection between the local node and the node C<$node>
116 root 1.4 (which can also be specified as a port). One of the nodes must have
117     listening ports ("binds").
118 root 1.1
119     When the connection could be successfully created, the C<$initfunc>
120     will be called with the given C<@initdata> on the remote node (similar
121     to C<snd_to_func> or C<spawn>), and the C<AnyEvent::Handle> object
122 root 1.2 representing the remote connection end as additional argument.
123 root 1.1
124 root 1.2 Also, the callback given as last argument will be called with the
125     AnyEvent::Handle object for the local side.
126 root 1.1
127 root 1.2 The AnyEvent::Handle objects will be in a "quiescent" state - you could rip
128 root 1.1 out the file handle and forget about it, but it is recommended to use it,
129 root 1.2 as the security settings might have called for a TLS connection. If you
130 root 1.1 opt to use it, you at least have to set an C<on_error> callback.
131    
132 root 1.2 In case of any error (timeout etc.), nothing will be called on
133     the remote side, and the local port will be C<kil>'ed with an C<<
134 root 1.1 AnyEvent::MP::DataConn => "error message" >> kill reason.
135    
136 root 1.4 The timeout should be large enough to cover at least four network
137     round-trips and one message round-trip.
138    
139     Example: on node1, establish a connection to node2 and send a line of text,
140     one node2, provide a receiver function.
141    
142     # node1, code executes in some port context
143 root 1.5 AnyEvent::MP::DataConn::connect_to "node2", 5, "pkg::receiver", 1, sub {
144 root 1.4 my ($hdl) = @_;
145     warn "connection established, sending line.\n"
146     $hdl->push_write ("blabla\n")
147     };
148    
149     # node2
150     sub pkg::receiver {
151     my ($one, $hdl) = @_;
152     warn "connection established, wait for a line...\n"
153    
154     $hdl->push_read (line => sub {
155     warn "received a line: $_[1]\n";
156     undef $hdl;
157     });
158     }
159    
160 root 1.1 =cut
161    
162 root 1.5 sub connect_to($$$$@) {
163     my $cb = pop;
164     my ($node, $timeout, $initfunc, @initdata) = @_;
165 root 1.6
166     my $port = $SELF
167     or Carp::croak "AnyEvent::MP::DataConn::connect_to must be called in port context";
168    
169 root 1.1 $node = node_of $node;
170    
171 root 1.2 my $id = (++$ID) . "\@$NODE";
172 root 1.1
173 root 1.2 # damn, why do my simple state hashes resemble objects so quickly
174     my $state = $STATE{$id} = {
175 root 1.3 id => (++$ID) . "\@$NODE",
176     to => (AE::timer $timeout, 0, sub {
177 root 1.4 $STATE{$id}{done}(undef, "$node: unable to establish connection within $timeout seconds");
178 root 1.2 }),
179 root 1.3 done => sub {
180     my ($hdl, $error) = @_;
181    
182     delete $AnyEvent::MP::Global::ON_SETUP{$id};
183     %{delete $STATE{$id}} = ();
184    
185     if (defined $hdl) {
186 root 1.4 $cb->($hdl);
187 root 1.3 } else {
188     kil $port, AnyEvent::MP::DataConn:: => $error;
189     }
190 root 1.2 },
191 root 1.1 };
192    
193 root 1.2 if (AnyEvent::MP::Kernel::port_is_local $node) {
194 root 1.6 # teh sucks
195    
196     require AnyEvent::Util;
197     my ($fh1, $fh2) = AnyEvent::Util::portable_socketpair ()
198     or return kil $port, AnyEvent::MP::DataConn:: => "cannot create local socketpair: $!";
199    
200     use AnyEvent::Handle;
201     my $hdl1 = new AnyEvent::Handle fh => $fh1;
202     my $hdl2 = new AnyEvent::Handle fh => $fh2;
203    
204     (AnyEvent::MP::Kernel::load_func $initfunc)->(@initdata, $hdl2);
205     $cb->($hdl1);
206 root 1.2
207 root 1.1 } else {
208 root 1.5 AnyEvent::MP::Kernel::snd_to_func $node,
209     AnyEvent::MP::DataConn::_expect:: => $id, $port, $timeout, $initfunc, @initdata;
210 root 1.2
211     $state->{wait} = sub {
212     if (my $addr = $AnyEvent::MP::Global::addr{$node}) {
213     delete $AnyEvent::MP::Global::ON_SETUP{$id};
214    
215     # continue connect
216     if (@$addr) {
217     # node has listeners, so connect
218 root 1.3 _connect $id, $node;
219 root 1.2 } else {
220     # no listeners, ask it to connect to us
221 root 1.4 AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_connect:: => $id, $NODE;
222 root 1.2 }
223     } else {
224     # wait for the next global setup handshake
225 root 1.4 # due to the round-trip at the beginning, this should never be necessary
226 root 1.2 $AnyEvent::MP::Global::ON_SETUP{$id} = $state->{wait};
227     };
228 root 1.1 };
229 root 1.2
230 root 1.4 # we actually have to make sure that the connection arrives after the expect message, and
231     # the easiest way to do this is to use an rpc call.
232     AnyEvent::MP::Kernel::snd_on $node, port { $state->{wait}() };
233 root 1.1 }
234     }
235    
236     =back
237    
238     =head1 SEE ALSO
239    
240     L<AnyEvent::MP>.
241    
242     =head1 AUTHOR
243    
244     Marc Lehmann <schmorp@schmorp.de>
245     http://home.schmorp.de/
246    
247     =cut
248    
249     1
250