1 |
=head1 NAME |
2 |
|
3 |
AnyEvent::MP::Transport - actual transport protocol |
4 |
|
5 |
=head1 SYNOPSIS |
6 |
|
7 |
use AnyEvent::MP::Transport; |
8 |
|
9 |
=head1 DESCRIPTION |
10 |
|
11 |
This is the superclass for MP transports, most of which is considered an |
12 |
implementation detail. |
13 |
|
14 |
Future versions might document the actual protocol. |
15 |
|
16 |
=head1 FUNCTIONS/METHODS |
17 |
|
18 |
=over 4 |
19 |
|
20 |
=cut |
21 |
|
22 |
package AnyEvent::MP::Transport; |
23 |
|
24 |
use common::sense; |
25 |
|
26 |
use Scalar::Util; |
27 |
use MIME::Base64 (); |
28 |
use Storable (); |
29 |
use JSON::XS (); |
30 |
|
31 |
use AE (); |
32 |
use AnyEvent::Socket (); |
33 |
use AnyEvent::Handle (); |
34 |
|
35 |
use AnyEvent::MP::Util (); |
36 |
|
37 |
use base Exporter::; |
38 |
|
39 |
our $VERSION = '0.0'; |
40 |
our $PROTOCOL_VERSION = 0; |
41 |
|
42 |
=item $listener = mp_listener $host, $port, <constructor-args>, $cb->($transport) |
43 |
|
44 |
Creates a listener on the given host/port using |
45 |
C<AnyEvent::Socket::tcp_server>. |
46 |
|
47 |
See C<new>, below, for constructor arguments. |
48 |
|
49 |
Defaults for peerhost, peerport, fh and tls are provided. |
50 |
|
51 |
=cut |
52 |
|
53 |
sub mp_server($$@) { |
54 |
my $cb = pop; |
55 |
my ($host, $port, @args) = @_; |
56 |
|
57 |
AnyEvent::Socket::tcp_server $host, $port, sub { |
58 |
my ($fh, $host, $port) = @_; |
59 |
|
60 |
$cb->(new AnyEvent::MP::Transport |
61 |
fh => $fh, |
62 |
peerhost => $host, |
63 |
peerport => $port, |
64 |
tls => "accept", |
65 |
@args, |
66 |
); |
67 |
} |
68 |
} |
69 |
|
70 |
=item $guard = mp_connect $host, $port, <constructor-args>, $cb->($transport) |
71 |
|
72 |
=cut |
73 |
|
74 |
sub mp_connect { |
75 |
my $cb = pop; |
76 |
my ($host, $port, @args) = @_; |
77 |
|
78 |
AnyEvent::Socket::tcp_connect $host, $port, sub { |
79 |
my ($fh, $nhost, $nport) = @_; |
80 |
|
81 |
return $cb->() unless $fh; |
82 |
|
83 |
$cb->(new AnyEvent::MP::Transport |
84 |
fh => $fh, |
85 |
peername => $host, |
86 |
peerhost => $nhost, |
87 |
peerport => $nport, |
88 |
tls => "accept", |
89 |
@args, |
90 |
); |
91 |
} |
92 |
} |
93 |
|
94 |
=item new AnyEvent::MP::Transport |
95 |
|
96 |
# immediately starts negotiation |
97 |
my $transport = new AnyEvent::MP::Transport |
98 |
# mandatory |
99 |
fh => $filehandle, |
100 |
local_id => $identifier, |
101 |
on_recv => sub { receive-callback }, |
102 |
on_error => sub { error-callback }, |
103 |
|
104 |
# optional |
105 |
secret => "shared secret", |
106 |
on_eof => sub { clean-close-callback }, |
107 |
on_connect => sub { successful-connect-callback }, |
108 |
greeting => { key => value }, |
109 |
|
110 |
# tls support |
111 |
tls => "accept|connect", |
112 |
tls_ctx => AnyEvent::TLS, |
113 |
peername => $peername, # for verification |
114 |
; |
115 |
|
116 |
=cut |
117 |
|
118 |
our @FRAMING_WANT = qw(json storable);#d##TODO# |
119 |
|
120 |
sub new { |
121 |
my ($class, %arg) = @_; |
122 |
|
123 |
my $self = bless \%arg, $class; |
124 |
|
125 |
$self->{queue} = []; |
126 |
|
127 |
{ |
128 |
Scalar::Util::weaken (my $self = $self); |
129 |
|
130 |
if (exists $arg{connect}) { |
131 |
$arg{tls} ||= "connect"; |
132 |
$arg{tls_ctx} ||= { sslv2 => 0, sslv3 => 0, tlsv1 => 1, verify => 1 }; |
133 |
} |
134 |
|
135 |
$arg{secret} = AnyEvent::MP::Base::default_secret () |
136 |
unless exists $arg{secret}; |
137 |
|
138 |
$self->{hdl} = new AnyEvent::Handle |
139 |
fh => delete $arg{fh}, |
140 |
rbuf_max => 64 * 1024, |
141 |
autocork => 1, |
142 |
no_delay => 1, |
143 |
on_error => sub { |
144 |
$self->error ($_[2]); |
145 |
}, |
146 |
peername => delete $arg{peername}, |
147 |
; |
148 |
|
149 |
my $secret = $arg{secret}; |
150 |
my $greeting_kv = $self->{greeting} ||= {}; |
151 |
$greeting_kv->{"tls1.0"} ||= $arg{tls} |
152 |
if exists $arg{tls} && $arg{tls_ctx}; |
153 |
$greeting_kv->{provider} = "AE-$VERSION"; |
154 |
|
155 |
# send greeting |
156 |
my $lgreeting = "aemp;$PROTOCOL_VERSION;$PROTOCOL_VERSION" # version, min |
157 |
. ";$AnyEvent::MP::Base::UNIQ" |
158 |
. ";$AnyEvent::MP::Base::NODE" |
159 |
. ";" . (MIME::Base64::encode_base64 AnyEvent::MP::Base::nonce (33), "") |
160 |
. ";hmac_md6_64_256" # hardcoded atm. |
161 |
. ";json" # hardcoded atm. |
162 |
. ";$self->{peerhost};$self->{peerport}" |
163 |
. (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv); |
164 |
|
165 |
$self->{hdl}->push_write ("$lgreeting\012"); |
166 |
|
167 |
# expect greeting |
168 |
$self->{hdl}->push_read (line => sub { |
169 |
my $rgreeting = $_[1]; |
170 |
|
171 |
my ($aemp, $version, $version_min, $uniq, $rnode, undef, $auth, $framing, $peerport, $peerhost, @kv) = split /;/, $rgreeting; |
172 |
|
173 |
if ($aemp ne "aemp") { |
174 |
return $self->error ("unparsable greeting"); |
175 |
} elsif ($version_min > $PROTOCOL_VERSION) { |
176 |
return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version_min .. $version)"); |
177 |
} elsif ($auth ne "hmac_md6_64_256") { |
178 |
return $self->error ("unsupported auth method ($auth)"); |
179 |
} elsif ($framing ne "json") { |
180 |
return $self->error ("unsupported framing method ($auth)"); |
181 |
} |
182 |
|
183 |
$self->{remote_uniq} = $uniq; |
184 |
$self->{remote_node} = $rnode; |
185 |
|
186 |
$self->{remote_greeting} = { |
187 |
map /^([^=]+)(?:=(.*))?/ ? ($1 => $2) : (), |
188 |
@kv |
189 |
}; |
190 |
|
191 |
if (exists $self->{tls} and $self->{tls_ctx} and exists $self->{remote_greeting}{"tls1.0"}) { |
192 |
if ($self->{tls} ne $self->{remote_greeting}{"tls1.0"}) { |
193 |
return $self->error ("TLS server/client mismatch"); |
194 |
} |
195 |
$self->{hdl}->starttls ($self->{tls}, $self->{tls_ctx}); |
196 |
} |
197 |
|
198 |
# auth |
199 |
require Digest::MD6; |
200 |
require Digest::HMAC_MD6; |
201 |
|
202 |
my $key = Digest::MD6::md6_hex ($secret); |
203 |
my $lauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$lgreeting\012$rgreeting", 64, 256); |
204 |
my $rauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$rgreeting\012$lgreeting", 64, 256); |
205 |
|
206 |
$lauth ne $rauth # echo attack? |
207 |
or return $self->error ("authentication error"); |
208 |
|
209 |
$self->{hdl}->push_write ("$auth;$lauth;$framing\012"); |
210 |
|
211 |
$self->{hdl}->rbuf_max (64); # enough for 44 reply bytes or so |
212 |
$self->{hdl}->push_read (line => sub { |
213 |
my ($hdl, $rline) = @_; |
214 |
|
215 |
my ($auth_method, $rauth2, $r_framing) = split /;/, $rline; |
216 |
|
217 |
if ($rauth2 ne $rauth) { |
218 |
return $self->error ("authentication failure/shared secret mismatch"); |
219 |
} |
220 |
|
221 |
$self->{s_framing} = "json";#d# |
222 |
|
223 |
$hdl->rbuf_max (undef); |
224 |
my $queue = delete $self->{queue}; # we are connected |
225 |
|
226 |
$self->connected; |
227 |
|
228 |
$hdl->push_write ($self->{s_framing} => $_) |
229 |
for @$queue; |
230 |
|
231 |
my $rmsg; $rmsg = sub { |
232 |
$_[0]->push_read ($r_framing => $rmsg); |
233 |
|
234 |
AnyEvent::MP::Base::_inject ($_[1]); |
235 |
}; |
236 |
$hdl->push_read ($r_framing => $rmsg); |
237 |
}); |
238 |
}); |
239 |
} |
240 |
|
241 |
$self |
242 |
} |
243 |
|
244 |
sub error { |
245 |
my ($self, $msg) = @_; |
246 |
|
247 |
if ($self->{node} && $self->{node}{transport} == $self) { |
248 |
$self->{node}->clr_transport; |
249 |
} |
250 |
# $self->{on_error}($self, $msg); |
251 |
$self->destroy; |
252 |
} |
253 |
|
254 |
sub connected { |
255 |
my ($self) = @_; |
256 |
|
257 |
my $node = AnyEvent::MP::Base::add_node ($self->{remote_node}); |
258 |
Scalar::Util::weaken ($self->{node} = $node); |
259 |
$node->set_transport ($self); |
260 |
} |
261 |
|
262 |
sub send { |
263 |
$_[0]{hdl}->push_write ($_[0]{s_framing} => $_[1]); |
264 |
} |
265 |
|
266 |
sub destroy { |
267 |
my ($self) = @_; |
268 |
|
269 |
$self->{hdl}->destroy |
270 |
if $self->{hdl}; |
271 |
} |
272 |
|
273 |
sub DESTROY { |
274 |
my ($self) = @_; |
275 |
|
276 |
$self->destroy; |
277 |
} |
278 |
|
279 |
=back |
280 |
|
281 |
=head1 SEE ALSO |
282 |
|
283 |
L<AnyEvent>. |
284 |
|
285 |
=head1 AUTHOR |
286 |
|
287 |
Marc Lehmann <schmorp@schmorp.de> |
288 |
http://home.schmorp.de/ |
289 |
|
290 |
=cut |
291 |
|
292 |
1 |
293 |
|