ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/DataConn.pm
Revision: 1.3
Committed: Sun Nov 8 01:15:25 2009 UTC (14 years, 6 months ago) by root
Branch: MAIN
Changes since 1.2: +49 -46 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::DataConn - create socket connections between nodes
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::DataConn;
8    
9     =head1 DESCRIPTION
10    
11     This module can be used to create socket connections between the local and
12     a remote node in the aemp network. The socket can be used freely for any
13     purpose, and in most cases, this mechanism is a good way to transport big
14     chunks of binary data.
15    
16     The connections created by this module use the same security mechanisms
17     as normal AEMP connections (secure authentication, optional use of TLS),
18     and in fact, use the same listening port as AEMP connections, so when two
19     nodes can reach each other, they can, with high probability, creare a data
20     connection between them.
21    
22     The protocol is, however, not the AEMP transport protocol, so this will
23     only work between nodes implementing the "aemp-dataconn" protocol.
24    
25     =head1 FUNCTIONS
26    
27     =over 4
28    
29     =cut
30    
31     package AnyEvent::MP::DataConn;
32    
33     use common::sense;
34     use Carp ();
35     use POSIX ();
36    
37     use AnyEvent ();
38     use AnyEvent::Util ();
39    
40     use AnyEvent::MP;
41 root 1.2 use AnyEvent::MP::Kernel ();
42     use AnyEvent::MP::Global ();
43    
44     our $ID = "a";
45     our %STATE;
46    
47 root 1.3 # another node tells us to await a connection
48     sub _expect {
49 root 1.2 my ($id, $timeout, $initspec) = @_;
50    
51 root 1.3 $STATE{$id} = {
52     id => $id,
53     to => (AE::timer $timeout, 0, sub {
54     $STATE{$id}{done}->(undef);
55     }),
56     done => sub {
57     my ($hdl, $error) = @_;
58    
59     %{delete $STATE{$id}} = ();
60 root 1.2
61 root 1.3 if (defined $hdl) {
62     my ($func, @args) = @$initspec;
63     (AnyEvent::MP::Kernel::load_func $func)->(@args, $hdl);
64     }
65 root 1.2 },
66     };
67     }
68 root 1.1
69 root 1.3 # AEMP::Transport call for dataconn-connections
70 root 1.1 sub _inject {
71     my ($conn, $error) = @_;
72 root 1.2
73 root 1.3 my $hdl = defined $error ? undef : delete $conn->{hdl};
74     my $id = $conn->{local_greeting}{dataconn_id} || $conn->{remote_greeting}{dataconn_id}
75     or return;
76    
77     $conn->destroy;
78    
79     ($STATE{$id} or return)->{done}->($hdl, $error);
80 root 1.1 }
81    
82 root 1.3 # actively connect to some other node
83 root 1.2 sub _connect {
84 root 1.3 my ($id, $node) = @_;
85 root 1.2
86     my $state = $STATE{$id}
87     or return;
88    
89 root 1.3 my $addr = $AnyEvent::MP::Global::addr{$node};
90 root 1.2
91     @$addr
92 root 1.3 or return $state->{fail}("$node: no listeners found");
93 root 1.2
94 root 1.3 # I love hardcoded constants !
95 root 1.2 $state->{next} = AE::timer 0, 2, sub {
96     my $endpoint = shift @$addr
97     or return delete $state->{next};
98    
99     my ($host, $port) = AnyEvent::Socket::parse_hostport $endpoint
100     or return;
101    
102     my $transport; $transport = AnyEvent::MP::Transport::mp_connect
103     $host, $port,
104     protocol => "aemp-dataconn",
105 root 1.3 local_greeting => { dataconn_id => $id },
106     sub { $transport->destroy }, #TODO: destroys handshaked conenctions too early
107 root 1.2 ;
108     };
109     }
110    
111     =item AnyEvent::MP::DataConn::connect_to $node, $timeout, [$initfunc, @initdata], $cb->($handle)
112 root 1.1
113     Creates a socket connection between the local node and the node C<$node>
114     (which can also be specified as a port).
115    
116     When the connection could be successfully created, the C<$initfunc>
117     will be called with the given C<@initdata> on the remote node (similar
118     to C<snd_to_func> or C<spawn>), and the C<AnyEvent::Handle> object
119 root 1.2 representing the remote connection end as additional argument.
120 root 1.1
121 root 1.2 Also, the callback given as last argument will be called with the
122     AnyEvent::Handle object for the local side.
123 root 1.1
124 root 1.2 The AnyEvent::Handle objects will be in a "quiescent" state - you could rip
125 root 1.1 out the file handle and forget about it, but it is recommended to use it,
126 root 1.2 as the security settings might have called for a TLS connection. If you
127 root 1.1 opt to use it, you at least have to set an C<on_error> callback.
128    
129 root 1.2 In case of any error (timeout etc.), nothing will be called on
130     the remote side, and the local port will be C<kil>'ed with an C<<
131 root 1.1 AnyEvent::MP::DataConn => "error message" >> kill reason.
132    
133     =cut
134    
135     sub connect_to($$$;@) {
136     my ($node, $timeout, $initspec, @localmsg) = @_;
137     my $port = $SELF;
138     $node = node_of $node;
139    
140 root 1.2 my $id = (++$ID) . "\@$NODE";
141 root 1.1
142 root 1.2 # damn, why do my simple state hashes resemble objects so quickly
143     my $state = $STATE{$id} = {
144 root 1.3 id => (++$ID) . "\@$NODE",
145     to => (AE::timer $timeout, 0, sub {
146     $STATE{$id}{done}->(undef, "$node: unable to establish connection within $timeout seconds");
147 root 1.2 }),
148 root 1.3 done => sub {
149     my ($hdl, $error) = @_;
150    
151     warn "done<@_>\n";#d#
152    
153     delete $AnyEvent::MP::Global::ON_SETUP{$id};
154     %{delete $STATE{$id}} = ();
155    
156     if (defined $hdl) {
157     snd @localmsg, $hdl;
158     } else {
159     kil $port, AnyEvent::MP::DataConn:: => $error;
160     }
161 root 1.2 },
162 root 1.1 };
163    
164 root 1.2 if (AnyEvent::MP::Kernel::port_is_local $node) {
165 root 1.1 return kil $port, AnyEvent::MP::DataConn:: =>
166     "connect_to does not yet support local/local connections, please bug me about it";
167 root 1.2
168 root 1.1 } else {
169 root 1.3 AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_expect:: => $id, $timeout, $initspec;
170 root 1.2
171     $state->{wait} = sub {
172     if (my $addr = $AnyEvent::MP::Global::addr{$node}) {
173     delete $AnyEvent::MP::Global::ON_SETUP{$id};
174    
175     # continue connect
176     if (@$addr) {
177     # node has listeners, so connect
178 root 1.3 _connect $id, $node;
179 root 1.2 } else {
180     # no listeners, ask it to connect to us
181 root 1.3 AnyEvent::MP::Kernel::snd_to_func $node, AnyEvent::MP::DataConn::_connect:: => $id, $node;
182 root 1.2 }
183     } else {
184     # wait for the next global setup handshake
185     $AnyEvent::MP::Global::ON_SETUP{$id} = $state->{wait};
186     };
187 root 1.1 };
188 root 1.2
189     $state->{wait}->();
190 root 1.1 }
191     }
192    
193     =back
194    
195     =head1 SEE ALSO
196    
197     L<AnyEvent::MP>.
198    
199     =head1 AUTHOR
200    
201     Marc Lehmann <schmorp@schmorp.de>
202     http://home.schmorp.de/
203    
204     =cut
205    
206     1
207