ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Transport.pm
Revision: 1.58
Committed: Wed Nov 4 21:47:14 2009 UTC (14 years, 6 months ago) by root
Branch: MAIN
Changes since 1.57: +41 -29 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Transport - actual transport protocol handler
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Transport;
8
9 =head1 DESCRIPTION
10
11 This module implements (and documents) the actual transport protocol for
12 AEMP.
13
14 See the "PROTOCOL" section below if you want to write another client for
15 this protocol.
16
17 =head1 FUNCTIONS/METHODS
18
19 =over 4
20
21 =cut
22
23 package AnyEvent::MP::Transport;
24
25 use common::sense;
26
27 use Scalar::Util ();
28 use List::Util ();
29 use MIME::Base64 ();
30 use Storable ();
31 use JSON::XS ();
32
33 use Digest::MD6 ();
34 use Digest::HMAC_MD6 ();
35
36 use AE ();
37 use AnyEvent::Socket ();
38 use AnyEvent::Handle 4.92 ();
39
40 use AnyEvent::MP::Config ();
41
42 our $PROTOCOL_VERSION = 1;
43
44 our @HOOK_CONNECT; # called at connect/accept time
45 our @HOOK_GREETING; # called at greeting1 time
46 our @HOOK_CONNECTED; # called at data phase
47 our @HOOK_DESTROY; # called at destroy time
48
49 =item $listener = mp_listener $host, $port, <constructor-args>
50
51 Creates a listener on the given host/port using
52 C<AnyEvent::Socket::tcp_server>.
53
54 See C<new>, below, for constructor arguments.
55
56 Defaults for peerhost, peerport and fh are provided.
57
58 =cut
59
60 sub mp_server($$;%) {
61 my ($host, $port, %arg) = @_;
62
63 AnyEvent::Socket::tcp_server $host, $port, sub {
64 my ($fh, $host, $port) = @_;
65
66 my $tp = new AnyEvent::MP::Transport
67 fh => $fh,
68 peerhost => $host,
69 peerport => $port,
70 %arg,
71 ;
72 $tp->{keepalive} = $tp;
73 }, delete $arg{prepare}
74 }
75
76 =item $guard = mp_connect $host, $port, <constructor-args>, $cb->($transport)
77
78 =cut
79
80 sub mp_connect {
81 my $release = pop;
82 my ($host, $port, @args) = @_;
83
84 new AnyEvent::MP::Transport
85 connect => [$host, $port],
86 peerhost => $host,
87 peerport => $port,
88 release => $release,
89 @args,
90 ;
91 }
92
93 =item new AnyEvent::MP::Transport
94
95 # immediately starts negotiation
96 my $transport = new AnyEvent::MP::Transport
97 # mandatory
98 fh => $filehandle,
99 local_id => $identifier,
100 on_recv => sub { receive-callback },
101 on_error => sub { error-callback },
102
103 # optional
104 on_eof => sub { clean-close-callback },
105 on_connect => sub { successful-connect-callback },
106 greeting => { key => value },
107
108 # tls support
109 tls_ctx => AnyEvent::TLS,
110 peername => $peername, # for verification
111 ;
112
113 =cut
114
115 sub new {
116 my ($class, %arg) = @_;
117
118 my $self = bless \%arg, $class;
119
120 $self->{queue} = [];
121
122 {
123 Scalar::Util::weaken (my $self = $self);
124
125 my $config = $AnyEvent::MP::Kernel::CONFIG;
126
127 my $timeout = $config->{monitor_timeout};
128 my $lframing = $config->{data_format};
129 my $auth_snd = $config->{auth_offer};
130 my $auth_rcv = $config->{auth_accept};
131
132 $self->{secret} = $config->{secret}
133 unless exists $self->{secret};
134
135 my $secret = $self->{secret};
136
137 if (exists $config->{cert}) {
138 $self->{tls_ctx} = {
139 sslv2 => 0,
140 sslv3 => 0,
141 tlsv1 => 1,
142 verify => 1,
143 cert => $config->{cert},
144 ca_cert => $config->{cert},
145 verify_require_client_cert => 1,
146 };
147 }
148
149 $self->{hdl} = new AnyEvent::Handle
150 +($self->{fh} ? (fh => $self->{fh}) : (connect => $self->{connect})),
151 autocork => 1,
152 no_delay => 1,
153 keepalive => 1,
154 on_error => sub {
155 $self->error ($_[2]);
156 },
157 rtimeout => $timeout,
158 ;
159
160 my $greeting_kv = $self->{local_greeting} ||= {};
161
162 $greeting_kv->{tls} = "1.0" if $self->{tls_ctx};
163 $greeting_kv->{provider} = "AE-$AnyEvent::MP::Kernel::VERSION";
164 $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport};
165 $greeting_kv->{timeout} = $self->{timeout};
166
167 my $protocol = $self->{protocol} || "aemp";
168
169 # can modify greeting_kv
170 $_->($self) for $protocol eq "aemp" ? @HOOK_CONNECT : ();
171
172 # send greeting
173 my $lgreeting1 = "$protocol;$PROTOCOL_VERSION"
174 . ";$AnyEvent::MP::Kernel::NODE"
175 . ";" . (join ",", @$auth_rcv)
176 . ";" . (join ",", @$lframing)
177 . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv);
178
179 my $lgreeting2 = MIME::Base64::encode_base64 AnyEvent::MP::Kernel::nonce (66), "";
180
181 $self->{hdl}->push_write ("$lgreeting1\012$lgreeting2\012");
182
183 # expect greeting
184 $self->{hdl}->rbuf_max (4 * 1024);
185 $self->{hdl}->push_read (line => sub {
186 my $rgreeting1 = $_[1];
187
188 my ($aemp, $version, $rnode, $auths, $framings, @kv) = split /;/, $rgreeting1;
189
190 $self->{remote_node} = $rnode;
191
192 $self->{remote_greeting} = {
193 map /^([^=]+)(?:=(.*))?/ ? ($1 => $2) : (),
194 @kv
195 };
196
197 $_->($self) for $protocol eq "aemp" ? @HOOK_GREETING : ();
198
199 if ($aemp ne $protocol) {
200 return $self->error ("unparsable greeting, expected '$protocol', got '$aemp'");
201 } elsif ($version != $PROTOCOL_VERSION) {
202 return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version)");
203 } elsif ($rnode eq $AnyEvent::MP::Kernel::NODE) {
204 return $self->error ("I refuse to talk to myself");
205 } elsif ($AnyEvent::MP::Kernel::NODE{$rnode} && $AnyEvent::MP::Kernel::NODE{$rnode}{transport}) {
206 return $self->error ("$rnode already connected, not connecting again.");
207 }
208
209 # read nonce
210 $self->{hdl}->push_read (line => sub {
211 my $rgreeting2 = $_[1];
212
213 "$lgreeting1\012$lgreeting2" ne "$rgreeting1\012$rgreeting2" # echo attack?
214 or return $self->error ("authentication error, echo attack?");
215
216 my $tls = $self->{tls_ctx} && 1 == int $self->{remote_greeting}{tls};
217
218 my $s_auth;
219 for my $auth_ (split /,/, $auths) {
220 if (grep $auth_ eq $_, @$auth_snd and ($auth_ !~ /^tls_/ or $tls)) {
221 $s_auth = $auth_;
222 last;
223 }
224 }
225
226 defined $s_auth
227 or return $self->error ("$auths: no common auth type supported");
228
229 my $s_framing;
230 for my $framing_ (split /,/, $framings) {
231 if (grep $framing_ eq $_, @$lframing) {
232 $s_framing = $framing_;
233 last;
234 }
235 }
236
237 defined $s_framing
238 or return $self->error ("$framings: no common framing method supported");
239
240 my $key;
241 my $lauth;
242
243 if ($tls) {
244 $self->{tls} = $lgreeting2 lt $rgreeting2 ? "connect" : "accept";
245 $self->{hdl}->starttls ($self->{tls}, $self->{tls_ctx});
246
247 $lauth =
248 $s_auth eq "tls_anon" ? ""
249 : $s_auth eq "tls_md6_64_256" ? Digest::MD6::md6_hex "$lgreeting1\012$lgreeting2\012$rgreeting1\012$rgreeting2\012"
250 : return $self->error ("$s_auth: fatal, selected unsupported snd auth method");
251
252 } elsif (length $secret) {
253 return $self->error ("$s_auth: fatal, selected unsupported snd auth method")
254 unless $s_auth eq "hmac_md6_64_256"; # hardcoded atm.
255
256 $key = Digest::MD6::md6 $secret;
257 # we currently only support hmac_md6_64_256
258 $lauth = Digest::HMAC_MD6::hmac_md6_hex $key, "$lgreeting1\012$lgreeting2\012$rgreeting1\012$rgreeting2\012", 64, 256;
259
260 } else {
261 return $self->error ("unable to handshake TLS and no shared secret configured");
262 }
263
264 $self->{hdl}->push_write ("$s_auth;$lauth;$s_framing\012");
265
266 # read the authentication response
267 $self->{hdl}->push_read (line => sub {
268 my ($hdl, $rline) = @_;
269
270 my ($auth_method, $rauth2, $r_framing) = split /;/, $rline;
271
272 my $rauth =
273 $auth_method eq "hmac_md6_64_256" ? Digest::HMAC_MD6::hmac_md6_hex $key, "$rgreeting1\012$rgreeting2\012$lgreeting1\012$lgreeting2\012", 64, 256
274 : $auth_method eq "cleartext" ? unpack "H*", $secret
275 : $auth_method eq "tls_anon" ? ($tls ? "" : "\012\012") # \012\012 never matches
276 : $auth_method eq "tls_md6_64_256" ? ($tls ? Digest::MD6::md6_hex "$rgreeting1\012$rgreeting2\012$lgreeting1\012$lgreeting2\012" : "\012\012")
277 : return $self->error ("$auth_method: fatal, selected unsupported rcv auth method");
278
279 if ($rauth2 ne $rauth) {
280 return $self->error ("authentication failure/shared secret mismatch");
281 }
282
283 $self->{s_framing} = $s_framing;
284
285 $hdl->rbuf_max (undef);
286
287 # we rely on TCP retransmit timeouts and keepalives
288 $self->{hdl}->rtimeout (undef);
289
290 # except listener-less nodes, they need to continuously probe
291 unless (@$AnyEvent::MP::Kernel::LISTENER) {
292 $self->{hdl}->wtimeout ($timeout);
293 $self->{hdl}->on_wtimeout (sub { $self->send ([]) });
294 }
295
296 $self->{remote_greeting}{untrusted} = 1
297 if $auth_method eq "tls_anon";
298
299 my $queue = delete $self->{queue}; # we are connected
300
301 $self->connected;
302
303 if ($protocol eq "aemp") {
304 # send queued messages
305 $self->send ($_)
306 for @$queue;
307
308 # receive handling
309 my $src_node = $self->{node};
310 my $rmsg; $rmsg = $self->{rmsg} = sub {
311 $_[0]->push_read ($r_framing => $rmsg);
312
313 local $AnyEvent::MP::Kernel::SRCNODE = $src_node;
314 AnyEvent::MP::Kernel::_inject (@{ $_[1] });
315 };
316 $hdl->push_read ($r_framing => $rmsg);
317
318 Scalar::Util::weaken $rmsg;
319 Scalar::Util::weaken $src_node;
320 }
321 });
322 });
323 });
324 }
325
326 $self
327 }
328
329 sub error {
330 my ($self, $msg) = @_;
331
332 delete $self->{keepalive};
333
334 if ($self->{protocol}) {
335 # TODO
336 } else {
337 $AnyEvent::MP::Kernel::WARN->(9, "$self->{peerhost}:$self->{peerport} $msg");#d#
338
339 $self->{node}->transport_error (transport_error => $self->{node}{id}, $msg)
340 if $self->{node} && $self->{node}{transport} == $self;
341 }
342
343 (delete $self->{release})->()
344 if exists $self->{release};
345
346 # $AnyEvent::MP::Kernel::WARN->(7, "$self->{peerhost}:$self->{peerport}: $msg");
347 $self->destroy;
348 }
349
350 sub connected {
351 my ($self) = @_;
352
353 delete $self->{keepalive};
354
355 (delete $self->{release})->()
356 if exists $self->{release};
357
358 if ($self->{protocol}) {
359 # TODO
360 } else {
361 $AnyEvent::MP::Kernel::WARN->(9, "$self->{peerhost}:$self->{peerport} connected as $self->{remote_node}");
362
363 my $node = AnyEvent::MP::Kernel::add_node ($self->{remote_node});
364 Scalar::Util::weaken ($self->{node} = $node);
365 $node->transport_connect ($self);
366
367 $_->($self) for @HOOK_CONNECTED;
368 }
369 }
370
371 sub send {
372 $_[0]{hdl}->push_write ($_[0]{s_framing} => $_[1]);
373 }
374
375 sub destroy {
376 my ($self) = @_;
377
378 (delete $self->{release})->()
379 if exists $self->{release};
380
381 $self->{hdl}->destroy
382 if $self->{hdl};
383
384 $_->($self) for @HOOK_DESTROY;
385 }
386
387 sub DESTROY {
388 my ($self) = @_;
389
390 $self->destroy;
391 }
392
393 =back
394
395 =head1 PROTOCOL
396
397 The AEMP protocol is relatively simple, and consists of three phases which
398 are symmetrical for both sides: greeting (followed by optionally switching
399 to TLS mode), authentication and packet exchange.
400
401 The protocol is designed to allow both full-text and binary streams.
402
403 The greeting consists of two text lines that are ended by either an ASCII
404 CR LF pair, or a single ASCII LF (recommended).
405
406 =head2 GREETING
407
408 All the lines until after authentication must not exceed 4kb in length,
409 including line delimiter. Afterwards there is no limit on the packet size
410 that can be received.
411
412 =head3 First Greeting Line
413
414 Example:
415
416 aemp;0;rain;tls_md6_64_256,hmac_md6_64_256,tls_anon,cleartext;json,storable;timeout=12;peeraddr=10.0.0.1:48082
417
418 The first line contains strings separated (not ended) by C<;>
419 characters. The first five strings are fixed by the protocol, the
420 remaining strings are C<KEY=VALUE> pairs. None of them may contain C<;>
421 characters themselves (when escaping is needed, use C<%3b> to represent
422 C<;> and C<%25> to represent C<%>)-
423
424 The fixed strings are:
425
426 =over 4
427
428 =item protocol identification
429
430 The constant C<aemp> to identify this protocol.
431
432 =item protocol version
433
434 The protocol version supported by this end, currently C<1>. If the
435 versions don't match then no communication is possible. Minor extensions
436 are supposed to be handled through additional key-value pairs.
437
438 =item the node ID
439
440 This is the node ID of the connecting node.
441
442 =item the acceptable authentication methods
443
444 A comma-separated list of authentication methods supported by the
445 node. Note that AnyEvent::MP supports a C<hex_secret> authentication
446 method that accepts a clear-text password (hex-encoded), but will not use
447 this authentication method itself.
448
449 The receiving side should choose the first authentication method it
450 supports.
451
452 =item the acceptable framing formats
453
454 A comma-separated list of packet encoding/framing formats understood. The
455 receiving side should choose the first framing format it supports for
456 sending packets (which might be different from the format it has to accept).
457
458 =back
459
460 The remaining arguments are C<KEY=VALUE> pairs. The following key-value
461 pairs are known at this time:
462
463 =over 4
464
465 =item provider=<module-version>
466
467 The software provider for this implementation. For AnyEvent::MP, this is
468 C<AE-0.0> or whatever version it currently is at.
469
470 =item peeraddr=<host>:<port>
471
472 The peer address (socket address of the other side) as seen locally.
473
474 =item tls=<major>.<minor>
475
476 Indicates that the other side supports TLS (version should be 1.0) and
477 wishes to do a TLS handshake.
478
479 =back
480
481 =head3 Second Greeting Line
482
483 After this greeting line there will be a second line containing a
484 cryptographic nonce, i.e. random data of high quality. To keep the
485 protocol text-only, these are usually 32 base64-encoded octets, but
486 it could be anything that doesn't contain any ASCII CR or ASCII LF
487 characters.
488
489 I<< The two nonces B<must> be different, and an aemp implementation
490 B<must> check and fail when they are identical >>.
491
492 Example of a nonce line (yes, it's random-looking because it is random
493 data):
494
495 2XYhdG7/O6epFa4wuP0ujAEx1rXYWRcOypjUYK7eF6yWAQr7gwIN9m/2+mVvBrTPXz5GJDgfGm9d8QRABAbmAP/s
496
497 =head2 TLS handshake
498
499 I<< If, after the handshake, both sides indicate interest in TLS, then the
500 connection B<must> use TLS, or fail to continue. >>
501
502 Both sides compare their nonces, and the side who sent the lower nonce
503 value ("string" comparison on the raw octet values) becomes the client,
504 and the one with the higher nonce the server.
505
506 =head2 AUTHENTICATION PHASE
507
508 After the greeting is received (and the optional TLS handshake),
509 the authentication phase begins, which consists of sending a single
510 C<;>-separated line with three fixed strings and any number of
511 C<KEY=VALUE> pairs.
512
513 The three fixed strings are:
514
515 =over 4
516
517 =item the authentication method chosen
518
519 This must be one of the methods offered by the other side in the greeting.
520
521 Note that all methods starting with C<tls_> are only valid I<iff> TLS was
522 successfully handshaked (and to be secure the implementation must enforce
523 this).
524
525 The currently supported authentication methods are:
526
527 =over 4
528
529 =item cleartext
530
531 This is simply the shared secret, lowercase-hex-encoded. This method is of
532 course very insecure if TLS is not used (and not completely secure even
533 if TLS is used), which is why this module will accept, but not generate,
534 cleartext auth replies.
535
536 =item hmac_md6_64_256
537
538 This method uses an MD6 HMAC with 64 bit blocksize and 256 bit hash, and
539 requires a shared secret. It is the preferred auth method when a shared
540 secret is available.
541
542 First, the shared secret is hashed with MD6:
543
544 key = MD6 (secret)
545
546 This secret is then used to generate the "local auth reply", by taking
547 the two local greeting lines and the two remote greeting lines (without
548 line endings), appending \012 to all of them, concatenating them and
549 calculating the MD6 HMAC with the key:
550
551 lauth = HMAC_MD6 key, "lgreeting1\012lgreeting2\012rgreeting1\012rgreeting2\012"
552
553 This authentication token is then lowercase-hex-encoded and sent to the
554 other side.
555
556 Then the remote auth reply is generated using the same method, but local
557 and remote greeting lines swapped:
558
559 rauth = HMAC_MD6 key, "rgreeting1\012rgreeting2\012lgreeting1\012lgreeting2\012"
560
561 This is the token that is expected from the other side.
562
563 =item tls_anon
564
565 This type is only valid I<iff> TLS was enabled and the TLS handshake
566 was successful. It has no authentication data, as the server/client
567 certificate was successfully verified.
568
569 This authentication type is somewhat insecure, as it allows a
570 man-in-the-middle attacker to change some of the connection parameters
571 (such as the framing format), although there is no known attack that
572 exploits this in a way that is worse than just denying the service.
573
574 By default, this implementation accepts but never generates this auth
575 reply.
576
577 =item tls_md6_64_256
578
579 This type is only valid I<iff> TLS was enabled and the TLS handshake was
580 successful.
581
582 This authentication type simply calculates:
583
584 lauth = MD6 "rgreeting1\012rgreeting2\012lgreeting1\012lgreeting2\012"
585
586 and lowercase-hex encodes the result and sends it as authentication
587 data. No shared secret is required (authentication is done by TLS). The
588 checksum exists only to make tinkering with the greeting hard.
589
590 =back
591
592 =item the authentication data
593
594 The authentication data itself, usually base64 or hex-encoded data, see
595 above.
596
597 =item the framing protocol chosen
598
599 This must be one of the framing protocols offered by the other side in the
600 greeting. Each side must accept the choice of the other side, and generate
601 packets in the format it chose itself.
602
603 =back
604
605 Example of an authentication reply:
606
607 hmac_md6_64_256;363d5175df38bd9eaddd3f6ca18aa1c0c4aa22f0da245ac638d048398c26b8d3;json
608
609 =head2 DATA PHASE
610
611 After this, packets get exchanged using the chosen framing protocol. It is
612 quite possible that both sides use a different framing protocol.
613
614 =head2 FULL EXAMPLE
615
616 This is an actual protocol dump of a handshake, followed by a single data
617 packet. The greater than/less than lines indicate the direction of the
618 transfer only.
619
620 > aemp;0;anon/57Cs1CggVJjzYaQp13XXg4;tls_md6_64_256,hmac_md6_64_256,tls_anon,cleartext;json,storable;provider=AE-0.8;timeout=12;peeraddr=10.0.0.17:4040
621 > yLgdG1ov/02shVkVQer3wzeuywZK+oraTdEQBmIqWHaegxSGDG4g+HqogLQbvdypFOsoDWJ1Sh4ImV4DMhvUBwTK
622
623 < aemp;0;ruth;tls_md6_64_256,hmac_md6_64_256,tls_anon,cleartext;json,storable;provider=AE-0.8;timeout=12;peeraddr=10.0.0.1:37108
624 < +xMQXP8ElfNmuvEhsmcp+s2wCJOuQAsPxSg3d2Ewhs6gBnJz+ypVdWJ/wAVrXqlIJfLeVS/CBy4gEGkyWHSuVb1L
625
626 > hmac_md6_64_256;5ad913855742ae5a03a5aeb7eafa4c78629de136bed6acd73eea36c9e98df44a;json
627
628 < hmac_md6_64_256;84cd590976f794914c2ca26dac3a207a57a6798b9171289c114de07cf0c20401;json
629 < ["","AnyEvent::MP::_spawn","57Cs1CggVJjzYaQp13XXg4.c","AnyEvent::MP::Global::connect",0,"anon/57Cs1CggVJjzYaQp13XXg4"]
630 ...
631
632 The shared secret in use was C<8ugxrtw6H5tKnfPWfaSr4HGhE8MoJXmzTT1BWq7sLutNcD0IbXprQlZjIbl7MBKoeklG3IEfY9GlJthC0pENzk>.
633
634 =head2 MONITORING
635
636 Monitoring the connection itself is transport-specific. For TCP, all
637 connection monitoring is currently left to TCP retransmit time-outs
638 on a busy link, and TCP keepalive (which should be enabled) for idle
639 connections.
640
641 This is not sufficient for listener-less nodes, however: they need
642 to regularly send data (30 seconds, or the monitoring interval, is
643 recommended), so TCP actively probes.
644
645 Future implementations of AnyEvent::Transport might query the kernel TCP
646 buffer after a write timeout occurs, and if it is non-empty, shut down the
647 connections, but this is an area of future research :)
648
649 =head2 NODE PROTOCOL
650
651 The transport simply transfers messages, but to implement a full node, a
652 special node port must exist that understands a number of requests.
653
654 If you are interested in implementing this, drop us a note so we finish
655 the documentation.
656
657 =head1 SEE ALSO
658
659 L<AnyEvent::MP>.
660
661 =head1 AUTHOR
662
663 Marc Lehmann <schmorp@schmorp.de>
664 http://home.schmorp.de/
665
666 =cut
667
668 1
669