ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Transport.pm
Revision: 1.59
Committed: Thu Nov 5 22:44:56 2009 UTC (14 years, 7 months ago) by root
Branch: MAIN
CVS Tags: rel-1_23
Changes since 1.58: +18 -19 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Transport - actual transport protocol handler
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Transport;
8
9 =head1 DESCRIPTION
10
11 This module implements (and documents) the actual transport protocol for
12 AEMP.
13
14 See the "PROTOCOL" section below if you want to write another client for
15 this protocol.
16
17 =head1 FUNCTIONS/METHODS
18
19 =over 4
20
21 =cut
22
23 package AnyEvent::MP::Transport;
24
25 use common::sense;
26
27 use Scalar::Util ();
28 use List::Util ();
29 use MIME::Base64 ();
30 use Storable ();
31 use JSON::XS ();
32
33 use Digest::MD6 ();
34 use Digest::HMAC_MD6 ();
35
36 use AE ();
37 use AnyEvent::Socket ();
38 use AnyEvent::Handle 4.92 ();
39
40 use AnyEvent::MP::Config ();
41
42 our $PROTOCOL_VERSION = 1;
43
44 our @HOOK_CONNECT; # called at connect/accept time
45 our @HOOK_GREETING; # called at greeting1 time
46 our @HOOK_CONNECTED; # called at data phase
47 our @HOOK_DESTROY; # called at destroy time
48 our %HOOK_PROTOCOL = (
49 "aemp-dataconn" => sub {
50 require AnyEvent::MP::DataConn;
51 &AnyEvent::MP::DataConn::_inject;
52 },
53 );
54
55 =item $listener = mp_listener $host, $port, <constructor-args>
56
57 Creates a listener on the given host/port using
58 C<AnyEvent::Socket::tcp_server>.
59
60 See C<new>, below, for constructor arguments.
61
62 Defaults for peerhost, peerport and fh are provided.
63
64 =cut
65
66 sub mp_server($$;%) {
67 my ($host, $port, %arg) = @_;
68
69 AnyEvent::Socket::tcp_server $host, $port, sub {
70 my ($fh, $host, $port) = @_;
71
72 my $tp = new AnyEvent::MP::Transport
73 fh => $fh,
74 peerhost => $host,
75 peerport => $port,
76 %arg,
77 ;
78 $tp->{keepalive} = $tp;
79 }, delete $arg{prepare}
80 }
81
82 =item $guard = mp_connect $host, $port, <constructor-args>, $cb->($transport)
83
84 =cut
85
86 sub mp_connect {
87 my $release = pop;
88 my ($host, $port, @args) = @_;
89
90 new AnyEvent::MP::Transport
91 connect => [$host, $port],
92 peerhost => $host,
93 peerport => $port,
94 release => $release,
95 @args,
96 ;
97 }
98
99 =item new AnyEvent::MP::Transport
100
101 # immediately starts negotiation
102 my $transport = new AnyEvent::MP::Transport
103 # mandatory
104 fh => $filehandle,
105 local_id => $identifier,
106 on_recv => sub { receive-callback },
107 on_error => sub { error-callback },
108
109 # optional
110 on_eof => sub { clean-close-callback },
111 on_connect => sub { successful-connect-callback },
112 greeting => { key => value },
113
114 # tls support
115 tls_ctx => AnyEvent::TLS,
116 peername => $peername, # for verification
117 ;
118
119 =cut
120
121 sub new {
122 my ($class, %arg) = @_;
123
124 my $self = bless \%arg, $class;
125
126 {
127 Scalar::Util::weaken (my $self = $self);
128
129 my $config = $AnyEvent::MP::Kernel::CONFIG;
130
131 my $timeout = $config->{monitor_timeout};
132 my $lframing = $config->{data_format};
133 my $auth_snd = $config->{auth_offer};
134 my $auth_rcv = $config->{auth_accept};
135
136 $self->{secret} = $config->{secret}
137 unless exists $self->{secret};
138
139 my $secret = $self->{secret};
140
141 if (exists $config->{cert}) {
142 $self->{tls_ctx} = {
143 sslv2 => 0,
144 sslv3 => 0,
145 tlsv1 => 1,
146 verify => 1,
147 cert => $config->{cert},
148 ca_cert => $config->{cert},
149 verify_require_client_cert => 1,
150 };
151 }
152
153 $self->{hdl} = new AnyEvent::Handle
154 +($self->{fh} ? (fh => $self->{fh}) : (connect => $self->{connect})),
155 autocork => 1,
156 no_delay => 1,
157 keepalive => 1,
158 on_error => sub {
159 $self->error ($_[2]);
160 },
161 rtimeout => $timeout,
162 ;
163
164 my $greeting_kv = $self->{local_greeting} ||= {};
165
166 $greeting_kv->{tls} = "1.0" if $self->{tls_ctx};
167 $greeting_kv->{provider} = "AE-$AnyEvent::MP::Kernel::VERSION";
168 $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport};
169 $greeting_kv->{timeout} = $self->{timeout};
170
171 my $protocol = $self->{protocol} || "aemp";
172
173 # can modify greeting_kv
174 $_->($self) for $protocol eq "aemp" ? @HOOK_CONNECT : ();
175
176 # send greeting
177 my $lgreeting1 = "$protocol;$PROTOCOL_VERSION"
178 . ";$AnyEvent::MP::Kernel::NODE"
179 . ";" . (join ",", @$auth_rcv)
180 . ";" . (join ",", @$lframing)
181 . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv);
182
183 my $lgreeting2 = MIME::Base64::encode_base64 AnyEvent::MP::Kernel::nonce (66), "";
184
185 $self->{hdl}->push_write ("$lgreeting1\012$lgreeting2\012");
186
187 # expect greeting
188 $self->{hdl}->rbuf_max (4 * 1024);
189 $self->{hdl}->push_read (line => sub {
190 my $rgreeting1 = $_[1];
191
192 my ($aemp, $version, $rnode, $auths, $framings, @kv) = split /;/, $rgreeting1;
193
194 $self->{remote_node} = $rnode;
195
196 $self->{remote_greeting} = {
197 map /^([^=]+)(?:=(.*))?/ ? ($1 => $2) : (),
198 @kv
199 };
200
201 $_->($self) for $protocol eq "aemp" ? @HOOK_GREETING : ();
202
203 if ($aemp ne $protocol) {
204 return $self->error ("unparsable greeting, expected '$protocol', got '$aemp'");
205 } elsif ($version != $PROTOCOL_VERSION) {
206 return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version)");
207 } elsif ($rnode eq $AnyEvent::MP::Kernel::NODE) {
208 return $self->error ("I refuse to talk to myself");
209 } elsif ($AnyEvent::MP::Kernel::NODE{$rnode} && $AnyEvent::MP::Kernel::NODE{$rnode}{transport}) {
210 return $self->error ("$rnode already connected, not connecting again.");
211 }
212
213 # read nonce
214 $self->{hdl}->push_read (line => sub {
215 my $rgreeting2 = $_[1];
216
217 "$lgreeting1\012$lgreeting2" ne "$rgreeting1\012$rgreeting2" # echo attack?
218 or return $self->error ("authentication error, echo attack?");
219
220 my $tls = $self->{tls_ctx} && 1 == int $self->{remote_greeting}{tls};
221
222 my $s_auth;
223 for my $auth_ (split /,/, $auths) {
224 if (grep $auth_ eq $_, @$auth_snd and ($auth_ !~ /^tls_/ or $tls)) {
225 $s_auth = $auth_;
226 last;
227 }
228 }
229
230 defined $s_auth
231 or return $self->error ("$auths: no common auth type supported");
232
233 my $s_framing;
234 for my $framing_ (split /,/, $framings) {
235 if (grep $framing_ eq $_, @$lframing) {
236 $s_framing = $framing_;
237 last;
238 }
239 }
240
241 defined $s_framing
242 or return $self->error ("$framings: no common framing method supported");
243
244 my $key;
245 my $lauth;
246
247 if ($tls) {
248 $self->{tls} = $lgreeting2 lt $rgreeting2 ? "connect" : "accept";
249 $self->{hdl}->starttls ($self->{tls}, $self->{tls_ctx});
250
251 $lauth =
252 $s_auth eq "tls_anon" ? ""
253 : $s_auth eq "tls_md6_64_256" ? Digest::MD6::md6_hex "$lgreeting1\012$lgreeting2\012$rgreeting1\012$rgreeting2\012"
254 : return $self->error ("$s_auth: fatal, selected unsupported snd auth method");
255
256 } elsif (length $secret) {
257 return $self->error ("$s_auth: fatal, selected unsupported snd auth method")
258 unless $s_auth eq "hmac_md6_64_256"; # hardcoded atm.
259
260 $key = Digest::MD6::md6 $secret;
261 # we currently only support hmac_md6_64_256
262 $lauth = Digest::HMAC_MD6::hmac_md6_hex $key, "$lgreeting1\012$lgreeting2\012$rgreeting1\012$rgreeting2\012", 64, 256;
263
264 } else {
265 return $self->error ("unable to handshake TLS and no shared secret configured");
266 }
267
268 $self->{hdl}->push_write ("$s_auth;$lauth;$s_framing\012");
269
270 # read the authentication response
271 $self->{hdl}->push_read (line => sub {
272 my ($hdl, $rline) = @_;
273
274 my ($auth_method, $rauth2, $r_framing) = split /;/, $rline;
275
276 my $rauth =
277 $auth_method eq "hmac_md6_64_256" ? Digest::HMAC_MD6::hmac_md6_hex $key, "$rgreeting1\012$rgreeting2\012$lgreeting1\012$lgreeting2\012", 64, 256
278 : $auth_method eq "cleartext" ? unpack "H*", $secret
279 : $auth_method eq "tls_anon" ? ($tls ? "" : "\012\012") # \012\012 never matches
280 : $auth_method eq "tls_md6_64_256" ? ($tls ? Digest::MD6::md6_hex "$rgreeting1\012$rgreeting2\012$lgreeting1\012$lgreeting2\012" : "\012\012")
281 : return $self->error ("$auth_method: fatal, selected unsupported rcv auth method");
282
283 if ($rauth2 ne $rauth) {
284 return $self->error ("authentication failure/shared secret mismatch");
285 }
286
287 $self->{s_framing} = $s_framing;
288
289 $hdl->rbuf_max (undef);
290
291 # we rely on TCP retransmit timeouts and keepalives
292 $self->{hdl}->rtimeout (undef);
293
294 $self->{remote_greeting}{untrusted} = 1
295 if $auth_method eq "tls_anon";
296
297 $self->connected;
298
299 if ($protocol eq "aemp") {
300 # listener-less node need to continuously probe
301 unless (@$AnyEvent::MP::Kernel::LISTENER) {
302 $self->{hdl}->wtimeout ($timeout);
303 $self->{hdl}->on_wtimeout (sub { $self->send ([]) });
304 }
305
306 # receive handling
307 my $src_node = $self->{node};
308 my $rmsg; $rmsg = $self->{rmsg} = sub {
309 $_[0]->push_read ($r_framing => $rmsg);
310
311 local $AnyEvent::MP::Kernel::SRCNODE = $src_node;
312 AnyEvent::MP::Kernel::_inject (@{ $_[1] });
313 };
314 $hdl->push_read ($r_framing => $rmsg);
315
316 Scalar::Util::weaken $rmsg;
317 Scalar::Util::weaken $src_node;
318 }
319 });
320 });
321 });
322 }
323
324 $self
325 }
326
327 sub error {
328 my ($self, $msg) = @_;
329
330 delete $self->{keepalive};
331
332 if ($self->{protocol}) {
333 $HOOK_PROTOCOL{$self->{protocol}}->($self, $msg);
334 } else {
335 $AnyEvent::MP::Kernel::WARN->(9, "$self->{peerhost}:$self->{peerport} $msg");#d#
336
337 $self->{node}->transport_error (transport_error => $self->{node}{id}, $msg)
338 if $self->{node} && $self->{node}{transport} == $self;
339 }
340
341 (delete $self->{release})->()
342 if exists $self->{release};
343
344 # $AnyEvent::MP::Kernel::WARN->(7, "$self->{peerhost}:$self->{peerport}: $msg");
345 $self->destroy;
346 }
347
348 sub connected {
349 my ($self) = @_;
350
351 delete $self->{keepalive};
352
353 (delete $self->{release})->()
354 if exists $self->{release};
355
356 if ($self->{protocol}) {
357 $self->{hdl}->on_error (undef);
358 $HOOK_PROTOCOL{$self->{protocol}}->($self, undef);
359 } else {
360 $AnyEvent::MP::Kernel::WARN->(9, "$self->{peerhost}:$self->{peerport} connected as $self->{remote_node}");
361
362 my $node = AnyEvent::MP::Kernel::add_node ($self->{remote_node});
363 Scalar::Util::weaken ($self->{node} = $node);
364 $node->transport_connect ($self);
365
366 $_->($self) for @HOOK_CONNECTED;
367 }
368 }
369
370 sub send {
371 $_[0]{hdl}->push_write ($_[0]{s_framing} => $_[1]);
372 }
373
374 sub destroy {
375 my ($self) = @_;
376
377 (delete $self->{release})->()
378 if exists $self->{release};
379
380 $self->{hdl}->destroy
381 if $self->{hdl};
382
383 $_->($self) for $self->{protocol} ? () : @HOOK_DESTROY;
384 }
385
386 sub DESTROY {
387 my ($self) = @_;
388
389 $self->destroy;
390 }
391
392 =back
393
394 =head1 PROTOCOL
395
396 The AEMP protocol is comparatively simple, and consists of three phases
397 which are symmetrical for both sides: greeting (followed by optionally
398 switching to TLS mode), authentication and packet exchange.
399
400 The protocol is designed to allow both full-text and binary streams.
401
402 The greeting consists of two text lines that are ended by either an ASCII
403 CR LF pair, or a single ASCII LF (recommended).
404
405 =head2 GREETING
406
407 All the lines until after authentication must not exceed 4kb in length,
408 including line delimiter. Afterwards there is no limit on the packet size
409 that can be received.
410
411 =head3 First Greeting Line
412
413 Example:
414
415 aemp;0;rain;tls_md6_64_256,hmac_md6_64_256,tls_anon,cleartext;json,storable;timeout=12;peeraddr=10.0.0.1:48082
416
417 The first line contains strings separated (not ended) by C<;>
418 characters. The first five strings are fixed by the protocol, the
419 remaining strings are C<KEY=VALUE> pairs. None of them may contain C<;>
420 characters themselves (when escaping is needed, use C<%3b> to represent
421 C<;> and C<%25> to represent C<%>)-
422
423 The fixed strings are:
424
425 =over 4
426
427 =item protocol identification
428
429 The constant C<aemp> to identify this protocol.
430
431 =item protocol version
432
433 The protocol version supported by this end, currently C<1>. If the
434 versions don't match then no communication is possible. Minor extensions
435 are supposed to be handled through additional key-value pairs.
436
437 =item the node ID
438
439 This is the node ID of the connecting node.
440
441 =item the acceptable authentication methods
442
443 A comma-separated list of authentication methods supported by the
444 node. Note that AnyEvent::MP supports a C<hex_secret> authentication
445 method that accepts a clear-text password (hex-encoded), but will not use
446 this authentication method itself.
447
448 The receiving side should choose the first authentication method it
449 supports.
450
451 =item the acceptable framing formats
452
453 A comma-separated list of packet encoding/framing formats understood. The
454 receiving side should choose the first framing format it supports for
455 sending packets (which might be different from the format it has to accept).
456
457 =back
458
459 The remaining arguments are C<KEY=VALUE> pairs. The following key-value
460 pairs are known at this time:
461
462 =over 4
463
464 =item provider=<module-version>
465
466 The software provider for this implementation. For AnyEvent::MP, this is
467 C<AE-0.0> or whatever version it currently is at.
468
469 =item peeraddr=<host>:<port>
470
471 The peer address (socket address of the other side) as seen locally.
472
473 =item tls=<major>.<minor>
474
475 Indicates that the other side supports TLS (version should be 1.0) and
476 wishes to do a TLS handshake.
477
478 =back
479
480 =head3 Second Greeting Line
481
482 After this greeting line there will be a second line containing a
483 cryptographic nonce, i.e. random data of high quality. To keep the
484 protocol text-only, these are usually 32 base64-encoded octets, but
485 it could be anything that doesn't contain any ASCII CR or ASCII LF
486 characters.
487
488 I<< The two nonces B<must> be different, and an aemp implementation
489 B<must> check and fail when they are identical >>.
490
491 Example of a nonce line (yes, it's random-looking because it is random
492 data):
493
494 2XYhdG7/O6epFa4wuP0ujAEx1rXYWRcOypjUYK7eF6yWAQr7gwIN9m/2+mVvBrTPXz5GJDgfGm9d8QRABAbmAP/s
495
496 =head2 TLS handshake
497
498 I<< If, after the handshake, both sides indicate interest in TLS, then the
499 connection B<must> use TLS, or fail to continue. >>
500
501 Both sides compare their nonces, and the side who sent the lower nonce
502 value ("string" comparison on the raw octet values) becomes the client,
503 and the one with the higher nonce the server.
504
505 =head2 AUTHENTICATION PHASE
506
507 After the greeting is received (and the optional TLS handshake),
508 the authentication phase begins, which consists of sending a single
509 C<;>-separated line with three fixed strings and any number of
510 C<KEY=VALUE> pairs.
511
512 The three fixed strings are:
513
514 =over 4
515
516 =item the authentication method chosen
517
518 This must be one of the methods offered by the other side in the greeting.
519
520 Note that all methods starting with C<tls_> are only valid I<iff> TLS was
521 successfully handshaked (and to be secure the implementation must enforce
522 this).
523
524 The currently supported authentication methods are:
525
526 =over 4
527
528 =item cleartext
529
530 This is simply the shared secret, lowercase-hex-encoded. This method is of
531 course very insecure if TLS is not used (and not completely secure even
532 if TLS is used), which is why this module will accept, but not generate,
533 cleartext auth replies.
534
535 =item hmac_md6_64_256
536
537 This method uses an MD6 HMAC with 64 bit blocksize and 256 bit hash, and
538 requires a shared secret. It is the preferred auth method when a shared
539 secret is available.
540
541 First, the shared secret is hashed with MD6:
542
543 key = MD6 (secret)
544
545 This secret is then used to generate the "local auth reply", by taking
546 the two local greeting lines and the two remote greeting lines (without
547 line endings), appending \012 to all of them, concatenating them and
548 calculating the MD6 HMAC with the key:
549
550 lauth = HMAC_MD6 key, "lgreeting1\012lgreeting2\012rgreeting1\012rgreeting2\012"
551
552 This authentication token is then lowercase-hex-encoded and sent to the
553 other side.
554
555 Then the remote auth reply is generated using the same method, but local
556 and remote greeting lines swapped:
557
558 rauth = HMAC_MD6 key, "rgreeting1\012rgreeting2\012lgreeting1\012lgreeting2\012"
559
560 This is the token that is expected from the other side.
561
562 =item tls_anon
563
564 This type is only valid I<iff> TLS was enabled and the TLS handshake
565 was successful. It has no authentication data, as the server/client
566 certificate was successfully verified.
567
568 This authentication type is somewhat insecure, as it allows a
569 man-in-the-middle attacker to change some of the connection parameters
570 (such as the framing format), although there is no known attack that
571 exploits this in a way that is worse than just denying the service.
572
573 By default, this implementation accepts but never generates this auth
574 reply.
575
576 =item tls_md6_64_256
577
578 This type is only valid I<iff> TLS was enabled and the TLS handshake was
579 successful.
580
581 This authentication type simply calculates:
582
583 lauth = MD6 "rgreeting1\012rgreeting2\012lgreeting1\012lgreeting2\012"
584
585 and lowercase-hex encodes the result and sends it as authentication
586 data. No shared secret is required (authentication is done by TLS). The
587 checksum exists only to make tinkering with the greeting hard.
588
589 =back
590
591 =item the authentication data
592
593 The authentication data itself, usually base64 or hex-encoded data, see
594 above.
595
596 =item the framing protocol chosen
597
598 This must be one of the framing protocols offered by the other side in the
599 greeting. Each side must accept the choice of the other side, and generate
600 packets in the format it chose itself.
601
602 =back
603
604 Example of an authentication reply:
605
606 hmac_md6_64_256;363d5175df38bd9eaddd3f6ca18aa1c0c4aa22f0da245ac638d048398c26b8d3;json
607
608 =head2 DATA PHASE
609
610 After this, packets get exchanged using the chosen framing protocol. It is
611 quite possible that both sides use a different framing protocol.
612
613 =head2 FULL EXAMPLE
614
615 This is an actual protocol dump of a handshake, followed by a single data
616 packet. The greater than/less than lines indicate the direction of the
617 transfer only.
618
619 > aemp;0;anon/57Cs1CggVJjzYaQp13XXg4;tls_md6_64_256,hmac_md6_64_256,tls_anon,cleartext;json,storable;provider=AE-0.8;timeout=12;peeraddr=10.0.0.17:4040
620 > yLgdG1ov/02shVkVQer3wzeuywZK+oraTdEQBmIqWHaegxSGDG4g+HqogLQbvdypFOsoDWJ1Sh4ImV4DMhvUBwTK
621
622 < aemp;0;ruth;tls_md6_64_256,hmac_md6_64_256,tls_anon,cleartext;json,storable;provider=AE-0.8;timeout=12;peeraddr=10.0.0.1:37108
623 < +xMQXP8ElfNmuvEhsmcp+s2wCJOuQAsPxSg3d2Ewhs6gBnJz+ypVdWJ/wAVrXqlIJfLeVS/CBy4gEGkyWHSuVb1L
624
625 > hmac_md6_64_256;5ad913855742ae5a03a5aeb7eafa4c78629de136bed6acd73eea36c9e98df44a;json
626
627 < hmac_md6_64_256;84cd590976f794914c2ca26dac3a207a57a6798b9171289c114de07cf0c20401;json
628 < ["","AnyEvent::MP::_spawn","57Cs1CggVJjzYaQp13XXg4.c","AnyEvent::MP::Global::connect",0,"anon/57Cs1CggVJjzYaQp13XXg4"]
629 ...
630
631 The shared secret in use was C<8ugxrtw6H5tKnfPWfaSr4HGhE8MoJXmzTT1BWq7sLutNcD0IbXprQlZjIbl7MBKoeklG3IEfY9GlJthC0pENzk>.
632
633 =head2 MONITORING
634
635 Monitoring the connection itself is transport-specific. For TCP, all
636 connection monitoring is currently left to TCP retransmit time-outs
637 on a busy link, and TCP keepalive (which should be enabled) for idle
638 connections.
639
640 This is not sufficient for listener-less nodes, however: they need
641 to regularly send data (30 seconds, or the monitoring interval, is
642 recommended), so TCP actively probes.
643
644 Future implementations of AnyEvent::Transport might query the kernel TCP
645 buffer after a write timeout occurs, and if it is non-empty, shut down the
646 connections, but this is an area of future research :)
647
648 =head2 NODE PROTOCOL
649
650 The transport simply transfers messages, but to implement a full node, a
651 special node port must exist that understands a number of requests.
652
653 If you are interested in implementing this, drop us a note so we finish
654 the documentation.
655
656 =head1 SEE ALSO
657
658 L<AnyEvent::MP>.
659
660 =head1 AUTHOR
661
662 Marc Lehmann <schmorp@schmorp.de>
663 http://home.schmorp.de/
664
665 =cut
666
667 1
668