1 |
root |
1.1 |
=head1 NAME |
2 |
|
|
|
3 |
|
|
AnyEvent::MP::Transport - actual transport protocol |
4 |
|
|
|
5 |
|
|
=head1 SYNOPSIS |
6 |
|
|
|
7 |
|
|
use AnyEvent::MP::Transport; |
8 |
|
|
|
9 |
|
|
=head1 DESCRIPTION |
10 |
|
|
|
11 |
|
|
This is the superclass for MP transports, most of which is considered an |
12 |
|
|
implementation detail. |
13 |
|
|
|
14 |
|
|
Future versions might document the actual protocol. |
15 |
|
|
|
16 |
|
|
=head1 FUNCTIONS/METHODS |
17 |
|
|
|
18 |
|
|
=over 4 |
19 |
|
|
|
20 |
|
|
=cut |
21 |
|
|
|
22 |
|
|
package AnyEvent::MP::Transport; |
23 |
|
|
|
24 |
|
|
use common::sense; |
25 |
|
|
|
26 |
|
|
use Scalar::Util; |
27 |
|
|
use MIME::Base64 (); |
28 |
|
|
use Storable (); |
29 |
root |
1.2 |
use JSON::XS (); |
30 |
root |
1.1 |
|
31 |
|
|
use AE (); |
32 |
|
|
use AnyEvent::Socket (); |
33 |
|
|
use AnyEvent::Handle (); |
34 |
root |
1.2 |
|
35 |
|
|
use AnyEvent::MP::Util (); |
36 |
root |
1.1 |
|
37 |
|
|
use base Exporter::; |
38 |
|
|
|
39 |
|
|
our $VERSION = '0.0'; |
40 |
root |
1.2 |
our $PROTOCOL_VERSION = 0; |
41 |
root |
1.1 |
|
42 |
|
|
=item $listener = mp_listener $host, $port, <constructor-args>, $cb->($transport) |
43 |
|
|
|
44 |
|
|
Creates a listener on the given host/port using |
45 |
|
|
C<AnyEvent::Socket::tcp_server>. |
46 |
|
|
|
47 |
|
|
See C<new>, below, for constructor arguments. |
48 |
|
|
|
49 |
|
|
Defaults for peerhost, peerport, fh and tls are provided. |
50 |
|
|
|
51 |
|
|
=cut |
52 |
|
|
|
53 |
|
|
sub mp_server($$@) { |
54 |
|
|
my $cb = pop; |
55 |
|
|
my ($host, $port, @args) = @_; |
56 |
|
|
|
57 |
|
|
AnyEvent::Socket::tcp_server $host, $port, sub { |
58 |
|
|
my ($fh, $host, $port) = @_; |
59 |
|
|
|
60 |
|
|
$cb->(new AnyEvent::MP::Transport |
61 |
|
|
fh => $fh, |
62 |
|
|
peerhost => $host, |
63 |
|
|
peerport => $port, |
64 |
|
|
tls => "accept", |
65 |
|
|
@args, |
66 |
|
|
); |
67 |
|
|
} |
68 |
|
|
} |
69 |
|
|
|
70 |
root |
1.2 |
=item $guard = mp_connect $host, $port, <constructor-args>, $cb->($transport) |
71 |
|
|
|
72 |
|
|
=cut |
73 |
|
|
|
74 |
|
|
sub mp_connect { |
75 |
|
|
my $cb = pop; |
76 |
|
|
my ($host, $port, @args) = @_; |
77 |
|
|
|
78 |
|
|
AnyEvent::Socket::tcp_connect $host, $port, sub { |
79 |
|
|
my ($fh, $nhost, $nport) = @_; |
80 |
|
|
|
81 |
|
|
return $cb->() unless $fh; |
82 |
|
|
|
83 |
|
|
$cb->(new AnyEvent::MP::Transport |
84 |
|
|
fh => $fh, |
85 |
|
|
peername => $host, |
86 |
|
|
peerhost => $nhost, |
87 |
|
|
peerport => $nport, |
88 |
|
|
tls => "accept", |
89 |
|
|
@args, |
90 |
|
|
); |
91 |
|
|
} |
92 |
|
|
} |
93 |
|
|
|
94 |
root |
1.1 |
=item new AnyEvent::MP::Transport |
95 |
|
|
|
96 |
|
|
# immediately starts negotiation |
97 |
|
|
my $transport = new AnyEvent::MP::Transport |
98 |
root |
1.2 |
# mandatory |
99 |
root |
1.1 |
fh => $filehandle, |
100 |
root |
1.2 |
local_id => $identifier, |
101 |
root |
1.1 |
on_recv => sub { receive-callback }, |
102 |
|
|
on_error => sub { error-callback }, |
103 |
|
|
|
104 |
|
|
# optional |
105 |
|
|
secret => "shared secret", |
106 |
|
|
on_eof => sub { clean-close-callback }, |
107 |
|
|
on_connect => sub { successful-connect-callback }, |
108 |
root |
1.2 |
greeting => { key => value }, |
109 |
root |
1.1 |
|
110 |
|
|
# tls support |
111 |
|
|
tls => "accept|connect", |
112 |
|
|
tls_ctx => AnyEvent::TLS, |
113 |
|
|
peername => $peername, # for verification |
114 |
|
|
; |
115 |
|
|
|
116 |
|
|
=cut |
117 |
|
|
|
118 |
root |
1.4 |
our @FRAMING_WANT = qw(json storable);#d##TODO# |
119 |
|
|
|
120 |
root |
1.1 |
sub new { |
121 |
|
|
my ($class, %arg) = @_; |
122 |
|
|
|
123 |
|
|
my $self = bless \%arg, $class; |
124 |
|
|
|
125 |
|
|
$self->{queue} = []; |
126 |
|
|
|
127 |
|
|
{ |
128 |
|
|
Scalar::Util::weaken (my $self = $self); |
129 |
|
|
|
130 |
|
|
if (exists $arg{connect}) { |
131 |
|
|
$arg{tls} ||= "connect"; |
132 |
root |
1.2 |
$arg{tls_ctx} ||= { sslv2 => 0, sslv3 => 0, tlsv1 => 1, verify => 1 }; |
133 |
root |
1.1 |
} |
134 |
|
|
|
135 |
root |
1.2 |
$arg{secret} = AnyEvent::MP::default_secret () |
136 |
|
|
unless exists $arg{secret}; |
137 |
|
|
|
138 |
root |
1.1 |
$self->{hdl} = new AnyEvent::Handle |
139 |
root |
1.2 |
fh => delete $arg{fh}, |
140 |
|
|
rbuf_max => 64 * 1024, |
141 |
root |
1.4 |
autocork => 1, |
142 |
|
|
no_delay => 1, |
143 |
root |
1.1 |
on_error => sub { |
144 |
|
|
$self->error ($_[2]); |
145 |
|
|
}, |
146 |
|
|
peername => delete $arg{peername}, |
147 |
|
|
; |
148 |
|
|
|
149 |
root |
1.2 |
my $secret = $arg{secret}; |
150 |
|
|
my $greeting_kv = $self->{greeting} ||= {}; |
151 |
|
|
$greeting_kv->{"tls1.0"} ||= $arg{tls} |
152 |
|
|
if exists $arg{tls} && $arg{tls_ctx}; |
153 |
|
|
$greeting_kv->{provider} = "AE-$VERSION"; |
154 |
root |
1.1 |
|
155 |
|
|
# send greeting |
156 |
root |
1.2 |
my $lgreeting = "aemp;$PROTOCOL_VERSION;$PROTOCOL_VERSION" # version, min |
157 |
|
|
. ";$AnyEvent::MP::UNIQ" |
158 |
|
|
. ";$AnyEvent::MP::NODE" |
159 |
|
|
. ";" . (MIME::Base64::encode_base64 AnyEvent::MP::Util::nonce 33, "") |
160 |
|
|
. ";hmac_md6_64_256" # hardcoded atm. |
161 |
|
|
. ";json" # hardcoded atm. |
162 |
|
|
. ";$self->{peerhost};$self->{peerport}" |
163 |
|
|
. (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv); |
164 |
root |
1.1 |
|
165 |
|
|
$self->{hdl}->push_write ("$lgreeting\012"); |
166 |
|
|
|
167 |
|
|
# expect greeting |
168 |
|
|
$self->{hdl}->push_read (line => sub { |
169 |
|
|
my $rgreeting = $_[1]; |
170 |
|
|
|
171 |
root |
1.2 |
my ($aemp, $version, $version_min, $uniq, $rnode, undef, $auth, $framing, $peerport, $peerhost, @kv) = split /;/, $rgreeting; |
172 |
root |
1.1 |
|
173 |
|
|
if ($aemp ne "aemp") { |
174 |
|
|
return $self->error ("unparsable greeting"); |
175 |
root |
1.2 |
} elsif ($version_min > $PROTOCOL_VERSION) { |
176 |
|
|
return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version_min .. $version)"); |
177 |
root |
1.1 |
} elsif ($auth ne "hmac_md6_64_256") { |
178 |
|
|
return $self->error ("unsupported auth method ($auth)"); |
179 |
root |
1.2 |
} elsif ($framing ne "json") { |
180 |
|
|
return $self->error ("unsupported framing method ($auth)"); |
181 |
root |
1.1 |
} |
182 |
|
|
|
183 |
root |
1.2 |
$self->{remote_uniq} = $uniq; |
184 |
|
|
$self->{remote_node} = $rnode; |
185 |
root |
1.1 |
|
186 |
root |
1.2 |
$self->{remote_greeting} = { |
187 |
|
|
map /^([^=]+)(?:=(.*))?/ ? ($1 => $2) : (), |
188 |
|
|
@kv |
189 |
root |
1.1 |
}; |
190 |
|
|
|
191 |
root |
1.2 |
if (exists $self->{tls} and $self->{tls_ctx} and exists $self->{remote_greeting}{"tls1.0"}) { |
192 |
|
|
if ($self->{tls} ne $self->{remote_greeting}{"tls1.0"}) { |
193 |
root |
1.1 |
return $self->error ("TLS server/client mismatch"); |
194 |
|
|
} |
195 |
|
|
$self->{hdl}->starttls ($self->{tls}, $self->{tls_ctx}); |
196 |
|
|
} |
197 |
|
|
|
198 |
|
|
# auth |
199 |
|
|
require Digest::MD6; |
200 |
|
|
require Digest::HMAC_MD6; |
201 |
|
|
|
202 |
|
|
my $key = Digest::MD6::md6_hex ($secret); |
203 |
root |
1.3 |
my $lauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$lgreeting\012$rgreeting", 64, 256); |
204 |
|
|
my $rauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$rgreeting\012$lgreeting", 64, 256); |
205 |
root |
1.1 |
|
206 |
root |
1.2 |
$lauth ne $rauth # echo attack? |
207 |
|
|
or return $self->error ("authentication error"); |
208 |
|
|
|
209 |
|
|
$self->{hdl}->push_write ("$auth;$lauth;$framing\012"); |
210 |
|
|
|
211 |
|
|
$self->{hdl}->rbuf_max (64); # enough for 44 reply bytes or so |
212 |
root |
1.1 |
$self->{hdl}->push_read (line => sub { |
213 |
root |
1.2 |
my ($hdl, $rline) = @_; |
214 |
|
|
|
215 |
|
|
my ($auth_method, $rauth2, $r_framing) = split /;/, $rline; |
216 |
root |
1.1 |
|
217 |
|
|
if ($rauth2 ne $rauth) { |
218 |
|
|
return $self->error ("authentication failure/shared secret mismatch"); |
219 |
|
|
} |
220 |
|
|
|
221 |
root |
1.2 |
$self->{s_framing} = "json";#d# |
222 |
|
|
|
223 |
|
|
$hdl->rbuf_max (undef); |
224 |
root |
1.1 |
my $queue = delete $self->{queue}; # we are connected |
225 |
|
|
|
226 |
root |
1.2 |
$self->connected; |
227 |
root |
1.1 |
|
228 |
root |
1.2 |
$hdl->push_write ($self->{s_framing} => $_) |
229 |
root |
1.1 |
for @$queue; |
230 |
|
|
|
231 |
|
|
my $rmsg; $rmsg = sub { |
232 |
root |
1.2 |
$_[0]->push_read ($r_framing => $rmsg); |
233 |
root |
1.1 |
|
234 |
root |
1.2 |
AnyEvent::MP::_inject ($_[1]); |
235 |
root |
1.1 |
}; |
236 |
root |
1.2 |
$hdl->push_read ($r_framing => $rmsg); |
237 |
root |
1.1 |
}); |
238 |
|
|
}); |
239 |
|
|
} |
240 |
|
|
|
241 |
|
|
$self |
242 |
|
|
} |
243 |
|
|
|
244 |
|
|
sub error { |
245 |
|
|
my ($self, $msg) = @_; |
246 |
|
|
|
247 |
root |
1.4 |
if ($self->{node} && $self->{node}{transport} == $self) { |
248 |
|
|
$self->{node}->clr_transport; |
249 |
|
|
} |
250 |
|
|
# $self->{on_error}($self, $msg); |
251 |
|
|
$self->destroy; |
252 |
root |
1.1 |
} |
253 |
|
|
|
254 |
root |
1.2 |
sub connected { |
255 |
|
|
my ($self) = @_; |
256 |
|
|
|
257 |
root |
1.4 |
my $node = AnyEvent::MP::add_node ($self->{remote_node}); |
258 |
|
|
Scalar::Util::weaken ($self->{node} = $node); |
259 |
|
|
$node->set_transport ($self); |
260 |
root |
1.2 |
} |
261 |
|
|
|
262 |
root |
1.1 |
sub send { |
263 |
root |
1.2 |
$_[0]{hdl}->push_write ($_[0]{s_framing} => $_[1]); |
264 |
root |
1.1 |
} |
265 |
|
|
|
266 |
|
|
sub destroy { |
267 |
|
|
my ($self) = @_; |
268 |
|
|
|
269 |
root |
1.2 |
$self->{hdl}->destroy |
270 |
|
|
if $self->{hdl}; |
271 |
root |
1.1 |
} |
272 |
|
|
|
273 |
|
|
sub DESTROY { |
274 |
|
|
my ($self) = @_; |
275 |
|
|
|
276 |
|
|
$self->destroy; |
277 |
|
|
} |
278 |
|
|
|
279 |
|
|
=back |
280 |
|
|
|
281 |
|
|
=head1 SEE ALSO |
282 |
|
|
|
283 |
|
|
L<AnyEvent>. |
284 |
|
|
|
285 |
|
|
=head1 AUTHOR |
286 |
|
|
|
287 |
|
|
Marc Lehmann <schmorp@schmorp.de> |
288 |
|
|
http://home.schmorp.de/ |
289 |
|
|
|
290 |
|
|
=cut |
291 |
|
|
|
292 |
|
|
1 |
293 |
|
|
|