1 |
=head1 NAME |
2 |
|
3 |
AnyEvent::MP::Transport - actual transport protocol |
4 |
|
5 |
=head1 SYNOPSIS |
6 |
|
7 |
use AnyEvent::MP::Transport; |
8 |
|
9 |
=head1 DESCRIPTION |
10 |
|
11 |
This is the superclass for MP transports, most of which is considered an |
12 |
implementation detail. |
13 |
|
14 |
Future versions might document the actual protocol. |
15 |
|
16 |
=head1 FUNCTIONS/METHODS |
17 |
|
18 |
=over 4 |
19 |
|
20 |
=cut |
21 |
|
22 |
package AnyEvent::MP::Transport; |
23 |
|
24 |
use common::sense; |
25 |
|
26 |
use Scalar::Util; |
27 |
use MIME::Base64 (); |
28 |
use Storable (); |
29 |
use JSON::XS (); |
30 |
|
31 |
use AE (); |
32 |
use AnyEvent::Socket (); |
33 |
use AnyEvent::Handle (); |
34 |
|
35 |
use base Exporter::; |
36 |
|
37 |
our $VERSION = '0.0'; |
38 |
our $PROTOCOL_VERSION = 0; |
39 |
|
40 |
=item $listener = mp_listener $host, $port, <constructor-args>, $cb->($transport) |
41 |
|
42 |
Creates a listener on the given host/port using |
43 |
C<AnyEvent::Socket::tcp_server>. |
44 |
|
45 |
See C<new>, below, for constructor arguments. |
46 |
|
47 |
Defaults for peerhost, peerport, fh and tls are provided. |
48 |
|
49 |
=cut |
50 |
|
51 |
sub mp_server($$@) { |
52 |
my $cb = pop; |
53 |
my ($host, $port, @args) = @_; |
54 |
|
55 |
AnyEvent::Socket::tcp_server $host, $port, sub { |
56 |
my ($fh, $host, $port) = @_; |
57 |
|
58 |
$cb->(new AnyEvent::MP::Transport |
59 |
fh => $fh, |
60 |
peerhost => $host, |
61 |
peerport => $port, |
62 |
tls => "accept", |
63 |
@args, |
64 |
); |
65 |
} |
66 |
} |
67 |
|
68 |
=item $guard = mp_connect $host, $port, <constructor-args>, $cb->($transport) |
69 |
|
70 |
=cut |
71 |
|
72 |
sub mp_connect { |
73 |
my $cb = pop; |
74 |
my ($host, $port, @args) = @_; |
75 |
|
76 |
AnyEvent::Socket::tcp_connect $host, $port, sub { |
77 |
my ($fh, $nhost, $nport) = @_; |
78 |
|
79 |
return $cb->() unless $fh; |
80 |
|
81 |
$cb->(new AnyEvent::MP::Transport |
82 |
fh => $fh, |
83 |
peername => $host, |
84 |
peerhost => $nhost, |
85 |
peerport => $nport, |
86 |
tls => "accept", |
87 |
@args, |
88 |
); |
89 |
} |
90 |
} |
91 |
|
92 |
=item new AnyEvent::MP::Transport |
93 |
|
94 |
# immediately starts negotiation |
95 |
my $transport = new AnyEvent::MP::Transport |
96 |
# mandatory |
97 |
fh => $filehandle, |
98 |
local_id => $identifier, |
99 |
on_recv => sub { receive-callback }, |
100 |
on_error => sub { error-callback }, |
101 |
|
102 |
# optional |
103 |
secret => "shared secret", |
104 |
on_eof => sub { clean-close-callback }, |
105 |
on_connect => sub { successful-connect-callback }, |
106 |
greeting => { key => value }, |
107 |
|
108 |
# tls support |
109 |
tls => "accept|connect", |
110 |
tls_ctx => AnyEvent::TLS, |
111 |
peername => $peername, # for verification |
112 |
; |
113 |
|
114 |
=cut |
115 |
|
116 |
our @FRAMING_WANT = qw(json storable);#d##TODO# |
117 |
|
118 |
sub new { |
119 |
my ($class, %arg) = @_; |
120 |
|
121 |
my $self = bless \%arg, $class; |
122 |
|
123 |
$self->{queue} = []; |
124 |
|
125 |
{ |
126 |
Scalar::Util::weaken (my $self = $self); |
127 |
|
128 |
if (exists $arg{connect}) { |
129 |
$arg{tls} ||= "connect"; |
130 |
$arg{tls_ctx} ||= { sslv2 => 0, sslv3 => 0, tlsv1 => 1, verify => 1 }; |
131 |
} |
132 |
|
133 |
$arg{secret} = AnyEvent::MP::Base::default_secret () |
134 |
unless exists $arg{secret}; |
135 |
|
136 |
$self->{hdl} = new AnyEvent::Handle |
137 |
fh => delete $arg{fh}, |
138 |
rbuf_max => 64 * 1024, |
139 |
autocork => 1, |
140 |
no_delay => 1, |
141 |
on_error => sub { |
142 |
$self->error ($_[2]); |
143 |
}, |
144 |
peername => delete $arg{peername}, |
145 |
; |
146 |
|
147 |
my $secret = $arg{secret}; |
148 |
my $greeting_kv = $self->{greeting} ||= {}; |
149 |
$greeting_kv->{"tls1.0"} ||= $arg{tls} |
150 |
if exists $arg{tls} && $arg{tls_ctx}; |
151 |
$greeting_kv->{provider} = "AE-$VERSION"; |
152 |
|
153 |
# send greeting |
154 |
my $lgreeting = "aemp;$PROTOCOL_VERSION;$PROTOCOL_VERSION" # version, min |
155 |
. ";$AnyEvent::MP::Base::UNIQ" |
156 |
. ";$AnyEvent::MP::Base::NODE" |
157 |
. ";" . (MIME::Base64::encode_base64 AnyEvent::MP::Base::nonce (33), "") |
158 |
. ";hmac_md6_64_256" # hardcoded atm. |
159 |
. ";json" # hardcoded atm. |
160 |
. ";$self->{peerhost};$self->{peerport}" |
161 |
. (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv); |
162 |
|
163 |
$self->{hdl}->push_write ("$lgreeting\012"); |
164 |
|
165 |
# expect greeting |
166 |
$self->{hdl}->push_read (line => sub { |
167 |
my $rgreeting = $_[1]; |
168 |
|
169 |
my ($aemp, $version, $version_min, $uniq, $rnode, undef, $auth, $framing, $peerport, $peerhost, @kv) = split /;/, $rgreeting; |
170 |
|
171 |
if ($aemp ne "aemp") { |
172 |
return $self->error ("unparsable greeting"); |
173 |
} elsif ($version_min > $PROTOCOL_VERSION) { |
174 |
return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version_min .. $version)"); |
175 |
} elsif ($auth ne "hmac_md6_64_256") { |
176 |
return $self->error ("unsupported auth method ($auth)"); |
177 |
} elsif ($framing ne "json") { |
178 |
return $self->error ("unsupported framing method ($auth)"); |
179 |
} |
180 |
|
181 |
$self->{remote_uniq} = $uniq; |
182 |
$self->{remote_node} = $rnode; |
183 |
|
184 |
$self->{remote_greeting} = { |
185 |
map /^([^=]+)(?:=(.*))?/ ? ($1 => $2) : (), |
186 |
@kv |
187 |
}; |
188 |
|
189 |
if (exists $self->{tls} and $self->{tls_ctx} and exists $self->{remote_greeting}{"tls1.0"}) { |
190 |
if ($self->{tls} ne $self->{remote_greeting}{"tls1.0"}) { |
191 |
return $self->error ("TLS server/client mismatch"); |
192 |
} |
193 |
$self->{hdl}->starttls ($self->{tls}, $self->{tls_ctx}); |
194 |
} |
195 |
|
196 |
# auth |
197 |
require Digest::MD6; |
198 |
require Digest::HMAC_MD6; |
199 |
|
200 |
my $key = Digest::MD6::md6_hex ($secret); |
201 |
my $lauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$lgreeting\012$rgreeting", 64, 256); |
202 |
my $rauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$rgreeting\012$lgreeting", 64, 256); |
203 |
|
204 |
$lauth ne $rauth # echo attack? |
205 |
or return $self->error ("authentication error"); |
206 |
|
207 |
$self->{hdl}->push_write ("$auth;$lauth;$framing\012"); |
208 |
|
209 |
$self->{hdl}->rbuf_max (64); # enough for 44 reply bytes or so |
210 |
$self->{hdl}->push_read (line => sub { |
211 |
my ($hdl, $rline) = @_; |
212 |
|
213 |
my ($auth_method, $rauth2, $r_framing) = split /;/, $rline; |
214 |
|
215 |
if ($rauth2 ne $rauth) { |
216 |
return $self->error ("authentication failure/shared secret mismatch"); |
217 |
} |
218 |
|
219 |
$self->{s_framing} = "json";#d# |
220 |
|
221 |
$hdl->rbuf_max (undef); |
222 |
my $queue = delete $self->{queue}; # we are connected |
223 |
|
224 |
$self->connected; |
225 |
|
226 |
$hdl->push_write ($self->{s_framing} => $_) |
227 |
for @$queue; |
228 |
|
229 |
my $rmsg; $rmsg = sub { |
230 |
$_[0]->push_read ($r_framing => $rmsg); |
231 |
|
232 |
AnyEvent::MP::Base::_inject ($_[1]); |
233 |
}; |
234 |
$hdl->push_read ($r_framing => $rmsg); |
235 |
}); |
236 |
}); |
237 |
} |
238 |
|
239 |
$self |
240 |
} |
241 |
|
242 |
sub error { |
243 |
my ($self, $msg) = @_; |
244 |
|
245 |
if ($self->{node} && $self->{node}{transport} == $self) { |
246 |
$self->{node}->clr_transport; |
247 |
} |
248 |
# $self->{on_error}($self, $msg); |
249 |
$self->destroy; |
250 |
} |
251 |
|
252 |
sub connected { |
253 |
my ($self) = @_; |
254 |
|
255 |
my $node = AnyEvent::MP::Base::add_node ($self->{remote_node}); |
256 |
Scalar::Util::weaken ($self->{node} = $node); |
257 |
$node->set_transport ($self); |
258 |
} |
259 |
|
260 |
sub send { |
261 |
$_[0]{hdl}->push_write ($_[0]{s_framing} => $_[1]); |
262 |
} |
263 |
|
264 |
sub destroy { |
265 |
my ($self) = @_; |
266 |
|
267 |
$self->{hdl}->destroy |
268 |
if $self->{hdl}; |
269 |
} |
270 |
|
271 |
sub DESTROY { |
272 |
my ($self) = @_; |
273 |
|
274 |
$self->destroy; |
275 |
} |
276 |
|
277 |
=back |
278 |
|
279 |
=head1 SEE ALSO |
280 |
|
281 |
L<AnyEvent>. |
282 |
|
283 |
=head1 AUTHOR |
284 |
|
285 |
Marc Lehmann <schmorp@schmorp.de> |
286 |
http://home.schmorp.de/ |
287 |
|
288 |
=cut |
289 |
|
290 |
1 |
291 |
|