ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Transport.pm
Revision: 1.36
Committed: Sat Aug 15 04:34:34 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.35: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Transport - actual transport protocol handler
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Transport;
8
9 =head1 DESCRIPTION
10
11 This implements the actual transport protocol for MP (it represents a
12 single link), most of which is considered an implementation detail.
13
14 See the "PROTOCOL" section below if you want to write another client for
15 this protocol.
16
17 =head1 FUNCTIONS/METHODS
18
19 =over 4
20
21 =cut
22
23 package AnyEvent::MP::Transport;
24
25 use common::sense;
26
27 use Scalar::Util ();
28 use List::Util ();
29 use MIME::Base64 ();
30 use Storable ();
31 use JSON::XS ();
32
33 use Digest::MD6 ();
34 use Digest::HMAC_MD6 ();
35
36 use AE ();
37 use AnyEvent::Socket ();
38 use AnyEvent::Handle 4.92 ();
39
40 use AnyEvent::MP::Config ();
41
42 our $PROTOCOL_VERSION = 0;
43
44 =item $listener = mp_listener $host, $port, <constructor-args>, $cb->($transport)
45
46 Creates a listener on the given host/port using
47 C<AnyEvent::Socket::tcp_server>.
48
49 See C<new>, below, for constructor arguments.
50
51 Defaults for peerhost, peerport and fh are provided.
52
53 =cut
54
55 sub mp_server($$@) {
56 my $cb = pop;
57 my ($host, $port, @args) = @_;
58
59 AnyEvent::Socket::tcp_server $host, $port, sub {
60 my ($fh, $host, $port) = @_;
61
62 $cb->(new AnyEvent::MP::Transport
63 fh => $fh,
64 peerhost => $host,
65 peerport => $port,
66 @args,
67 );
68 }
69 }
70
71 =item $guard = mp_connect $host, $port, <constructor-args>, $cb->($transport)
72
73 =cut
74
75 sub mp_connect {
76 my $release = pop;
77 my ($host, $port, @args) = @_;
78
79 my $state;
80
81 $state = AnyEvent::Socket::tcp_connect $host, $port, sub {
82 my ($fh, $nhost, $nport) = @_;
83
84 return $release->() unless $fh;
85
86 $state = new AnyEvent::MP::Transport
87 fh => $fh,
88 peername => $host,
89 peerhost => $nhost,
90 peerport => $nport,
91 release => $release,
92 @args,
93 ;
94 };
95
96 \$state
97 }
98
99 =item new AnyEvent::MP::Transport
100
101 # immediately starts negotiation
102 my $transport = new AnyEvent::MP::Transport
103 # mandatory
104 fh => $filehandle,
105 local_id => $identifier,
106 on_recv => sub { receive-callback },
107 on_error => sub { error-callback },
108
109 # optional
110 on_eof => sub { clean-close-callback },
111 on_connect => sub { successful-connect-callback },
112 greeting => { key => value },
113
114 # tls support
115 tls_ctx => AnyEvent::TLS,
116 peername => $peername, # for verification
117 ;
118
119 =cut
120
121 sub LATENCY() { 3 } # assumed max. network latency
122
123 our @FRAMINGS = qw(json storable); # the framing types we accept and send, in order of preference
124 our @AUTH_SND = qw(hmac_md6_64_256); # auth types we send
125 our @AUTH_RCV = (@AUTH_SND, qw(cleartext)); # auth types we accept
126
127 #AnyEvent::Handle::register_write_type mp_record => sub {
128 #};
129
130 sub new {
131 my ($class, %arg) = @_;
132
133 my $self = bless \%arg, $class;
134
135 $self->{queue} = [];
136
137 {
138 Scalar::Util::weaken (my $self = $self);
139
140 my $config = AnyEvent::MP::Config::config;
141
142 my $latency = $config->{network_latency} || LATENCY;
143
144 $arg{secret} = $config->{secret}
145 unless exists $arg{secret};
146
147 $arg{timeout} = $config->{monitor_timeout} || $AnyEvent::MP::Kernel::MONITOR_TIMEOUT
148 unless exists $arg{timeout};
149
150 $arg{timeout} -= $latency;
151
152 $arg{timeout} = 1 + $latency
153 if $arg{timeout} < 1 + $latency;
154
155 my $secret = $arg{secret};
156
157 if (exists $config->{cert}) {
158 $arg{tls_ctx} = {
159 sslv2 => 0,
160 sslv3 => 0,
161 tlsv1 => 1,
162 verify => 1,
163 cert => $config->{cert},
164 ca_cert => $config->{cert},
165 verify_require_client_cert => 1,
166 };
167 }
168
169 $self->{hdl} = new AnyEvent::Handle
170 fh => delete $arg{fh},
171 autocork => 1,
172 no_delay => 1,
173 on_error => sub {
174 $self->error ($_[2]);
175 },
176 rtimeout => $latency,
177 peername => delete $arg{peername},
178 ;
179
180 my $greeting_kv = $self->{greeting} ||= {};
181
182 $self->{local_node} = $AnyEvent::MP::Kernel::NODE;
183
184 $greeting_kv->{"tls"} = "1.0" if $arg{tls_ctx};
185 $greeting_kv->{provider} = "AE-$AnyEvent::MP::Kernel::VERSION";
186 $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport};
187 $greeting_kv->{timeout} = $arg{timeout};
188
189 # send greeting
190 my $lgreeting1 = "aemp;$PROTOCOL_VERSION"
191 . ";$self->{local_node}"
192 . ";" . (join ",", @AUTH_RCV)
193 . ";" . (join ",", @FRAMINGS)
194 . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv);
195
196 my $lgreeting2 = MIME::Base64::encode_base64 AnyEvent::MP::Kernel::nonce (66), "";
197
198 $self->{hdl}->push_write ("$lgreeting1\012$lgreeting2\012");
199
200 # expect greeting
201 $self->{hdl}->rbuf_max (4 * 1024);
202 $self->{hdl}->push_read (line => sub {
203 my $rgreeting1 = $_[1];
204
205 my ($aemp, $version, $rnode, $auths, $framings, @kv) = split /;/, $rgreeting1;
206
207 if ($aemp ne "aemp") {
208 return $self->error ("unparsable greeting");
209 } elsif ($version != $PROTOCOL_VERSION) {
210 return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version)");
211 }
212
213 my $s_auth;
214 for my $auth_ (split /,/, $auths) {
215 if (grep $auth_ eq $_, @AUTH_SND) {
216 $s_auth = $auth_;
217 last;
218 }
219 }
220
221 defined $s_auth
222 or return $self->error ("$auths: no common auth type supported");
223
224 die unless $s_auth eq "hmac_md6_64_256"; # hardcoded atm.
225
226 my $s_framing;
227 for my $framing_ (split /,/, $framings) {
228 if (grep $framing_ eq $_, @FRAMINGS) {
229 $s_framing = $framing_;
230 last;
231 }
232 }
233
234 defined $s_framing
235 or return $self->error ("$framings: no common framing method supported");
236
237 $self->{remote_node} = $rnode;
238
239 $self->{remote_greeting} = {
240 map /^([^=]+)(?:=(.*))?/ ? ($1 => $2) : (),
241 @kv
242 };
243
244 # read nonce
245 $self->{hdl}->push_read (line => sub {
246 my $rgreeting2 = $_[1];
247
248 "$lgreeting1\012$lgreeting2" ne "$rgreeting1\012$rgreeting2" # echo attack?
249 or return $self->error ("authentication error, echo attack?");
250
251 my $key;
252 my $lauth;
253
254 if ($self->{tls_ctx} and 1 == int $self->{remote_greeting}{tls}) {
255 $self->{tls} = $lgreeting2 lt $rgreeting2 ? "connect" : "accept";
256 $self->{hdl}->starttls ($self->{tls}, $self->{tls_ctx});
257 $s_auth = "tls";
258 $lauth = "";
259 } elsif (length $secret) {
260 $key = Digest::MD6::md6 $secret;
261 # we currently only support hmac_md6_64_256
262 $lauth = Digest::HMAC_MD6::hmac_md6_hex $key, "$lgreeting1\012$lgreeting2\012$rgreeting1\012$rgreeting2\012", 64, 256;
263 } else {
264 return $self->error ("unable to handshake TLS and no shared secret configured");
265 }
266
267 $self->{hdl}->push_write ("$s_auth;$lauth;$s_framing\012");
268
269 # read the authentication response
270 $self->{hdl}->push_read (line => sub {
271 my ($hdl, $rline) = @_;
272
273 my ($auth_method, $rauth2, $r_framing) = split /;/, $rline;
274
275 my $rauth =
276 $auth_method eq "hmac_md6_64_256" ? Digest::HMAC_MD6::hmac_md6_hex $key, "$rgreeting1\012$rgreeting2\012$lgreeting1\012$lgreeting2\012", 64, 256
277 : $auth_method eq "cleartext" ? unpack "H*", $secret
278 : $auth_method eq "tls" ? ($self->{tls} ? "" : "\012\012") # \012\012 never matches
279 : return $self->error ("$auth_method: fatal, selected unsupported auth method");
280
281 if ($rauth2 ne $rauth) {
282 return $self->error ("authentication failure/shared secret mismatch");
283 }
284
285 $self->{s_framing} = $s_framing;
286
287 $hdl->rbuf_max (undef);
288 my $queue = delete $self->{queue}; # we are connected
289
290 $self->{hdl}->rtimeout ($self->{remote_greeting}{timeout});
291 $self->{hdl}->wtimeout ($arg{timeout} - LATENCY);
292 $self->{hdl}->on_wtimeout (sub { $self->send ([]) });
293
294 $self->connected;
295
296 # send queued messages
297 $self->send ($_)
298 for @$queue;
299
300 # receive handling
301 my $src_node = $self->{node};
302
303 my $rmsg; $rmsg = sub {
304 $_[0]->push_read ($r_framing => $rmsg);
305
306 local $AnyEvent::MP::Kernel::SRCNODE = $src_node;
307 AnyEvent::MP::Kernel::_inject (@{ $_[1] });
308 };
309 $hdl->push_read ($r_framing => $rmsg);
310 });
311 });
312 });
313 }
314
315 $self
316 }
317
318 sub error {
319 my ($self, $msg) = @_;
320
321 $self->{node}->transport_error (transport_error => $self->{node}{noderef}, $msg)
322 if $self->{node} && $self->{node}{transport} == $self;
323
324 (delete $self->{release})->()
325 if exists $self->{release};
326
327 $AnyEvent::MP::Kernel::WARN->("$self->{peerhost}:$self->{peerport}: $msg");
328 $self->destroy;
329 }
330
331 sub connected {
332 my ($self) = @_;
333
334 (delete $self->{release})->()
335 if exists $self->{release};
336
337 my $node = AnyEvent::MP::Kernel::add_node ($self->{remote_node});
338 Scalar::Util::weaken ($self->{node} = $node);
339 $node->transport_connect ($self);
340 }
341
342 sub send {
343 $_[0]{hdl}->push_write ($_[0]{s_framing} => $_[1]);
344 }
345
346 sub destroy {
347 my ($self) = @_;
348
349 $self->{hdl}->destroy
350 if $self->{hdl};
351 }
352
353 sub DESTROY {
354 my ($self) = @_;
355
356 $self->destroy;
357 }
358
359 =back
360
361 =head1 PROTOCOL
362
363 The protocol is relatively simple, and consists of three phases which are
364 symmetrical for both sides: greeting (followed by optionally switching to
365 TLS mode), authentication and packet exchange.
366
367 the protocol is designed to allow both full-text and binary streams.
368
369 The greeting consists of two text lines that are ended by either an ASCII
370 CR LF pair, or a single ASCII LF (recommended).
371
372 =head2 GREETING
373
374 All the lines until after authentication must not exceed 4kb in length,
375 including delimiter. Afterwards there is no limit on the packet size that
376 can be received.
377
378 =head3 First Greeting Line
379
380 Example:
381
382 aemp;0;fec.4a7720fc;127.0.0.1:1235,[::1]:1235;hmac_md6_64_256;json,storable;provider=AE-0.0
383
384 The first line contains strings separated (not ended) by C<;>
385 characters. The first even ixtrings are fixed by the protocol, the
386 remaining strings are C<KEY=VALUE> pairs. None of them may contain C<;>
387 characters themselves.
388
389 The fixed strings are:
390
391 =over 4
392
393 =item protocol identification
394
395 The constant C<aemp> to identify the protocol.
396
397 =item protocol version
398
399 The protocol version supported by this end, currently C<0>. If the
400 versions don't match then no communication is possible. Minor extensions
401 are supposed to be handled through additional key-value pairs.
402
403 =item the node endpoint descriptors
404
405 for public nodes, this is a comma-separated list of protocol endpoints,
406 i.e., the noderef. For slave nodes, this is a unique identifier of the
407 form C<slave/nonce>.
408
409 =item the acceptable authentication methods
410
411 A comma-separated list of authentication methods supported by the
412 node. Note that AnyEvent::MP supports a C<hex_secret> authentication
413 method that accepts a cleartext password (hex-encoded), but will not use
414 this auth method itself.
415
416 The receiving side should choose the first auth method it supports.
417
418 =item the acceptable framing formats
419
420 A comma-separated list of packet encoding/framign formats understood. The
421 receiving side should choose the first framing format it supports for
422 sending packets (which might be different from the format it has to accept).
423
424 =back
425
426 The remaining arguments are C<KEY=VALUE> pairs. The following key-value
427 pairs are known at this time:
428
429 =over 4
430
431 =item provider=<module-version>
432
433 The software provider for this implementation. For AnyEvent::MP, this is
434 C<AE-0.0> or whatever version it currently is at.
435
436 =item peeraddr=<host>:<port>
437
438 The peer address (socket address of the other side) as seen locally, in the same format
439 as noderef endpoints.
440
441 =item tls=<major>.<minor>
442
443 Indicates that the other side supports TLS (version should be 1.0) and
444 wishes to do a TLS handshake.
445
446 =item timeout=<seconds>
447
448 The amount of time after which this node should be detected as dead unless
449 some data has been received. The node is responsible to send traffic
450 reasonably more often than this interval (such as every timeout minus five
451 seconds).
452
453 =back
454
455 =head3 Second Greeting Line
456
457 After this greeting line there will be a second line containing a
458 cryptographic nonce, i.e. random data of high quality. To keep the
459 protocol text-only, these are usually 32 base64-encoded octets, but
460 it could be anything that doesn't contain any ASCII CR or ASCII LF
461 characters.
462
463 I<< The two nonces B<must> be different, and an aemp implementation
464 B<must> check and fail when they are identical >>.
465
466 Example of a nonce line:
467
468 p/I122ql7kJR8lumW3lXlXCeBnyDAvz8NQo3x5IFowE4
469
470 =head2 TLS handshake
471
472 I<< If, after the handshake, both sides indicate interest in TLS, then the
473 connection B<must> use TLS, or fail. >>
474
475 Both sides compare their nonces, and the side who sent the lower nonce
476 value ("string" comparison on the raw octet values) becomes the client,
477 and the one with the higher nonce the server.
478
479 =head2 AUTHENTICATION PHASE
480
481 After the greeting is received (and the optional TLS handshake),
482 the authentication phase begins, which consists of sending a single
483 C<;>-separated line with three fixed strings and any number of
484 C<KEY=VALUE> pairs.
485
486 The three fixed strings are:
487
488 =over 4
489
490 =item the authentication method chosen
491
492 This must be one of the methods offered by the other side in the greeting.
493
494 The currently supported authentication methods are:
495
496 =over 4
497
498 =item cleartext
499
500 This is simply the shared secret, lowercase-hex-encoded. This method is of
501 course very insecure, unless TLS is used, which is why this module will
502 accept, but not generate, cleartext auth replies.
503
504 =item hmac_md6_64_256
505
506 This method uses an MD6 HMAC with 64 bit blocksize and 256 bit hash. First, the shared secret
507 is hashed with MD6:
508
509 key = MD6 (secret)
510
511 This secret is then used to generate the "local auth reply", by taking
512 the two local greeting lines and the two remote greeting lines (without
513 line endings), appending \012 to all of them, concatenating them and
514 calculating the MD6 HMAC with the key.
515
516 lauth = HMAC_MD6 key, "lgreeting1\012lgreeting2\012rgreeting1\012rgreeting2\012"
517
518 This authentication token is then lowercase-hex-encoded and sent to the
519 other side.
520
521 Then the remote auth reply is generated using the same method, but local
522 and remote greeting lines swapped:
523
524 rauth = HMAC_MD6 key, "rgreeting1\012rgreeting2\012lgreeting1\012lgreeting2\012"
525
526 This is the token that is expected from the other side.
527
528 =item tls
529
530 This type is only valid iff TLS was enabled and the TLS handshake
531 was successful. It has no authentication data, as the server/client
532 certificate was successfully verified.
533
534 Implementations supporting TLS I<must> accept this authentication type.
535
536 =back
537
538 =item the authentication data
539
540 The authentication data itself, usually base64 or hex-encoded data, see
541 above.
542
543 =item the framing protocol chosen
544
545 This must be one of the framing protocols offered by the other side in the
546 greeting. Each side must accept the choice of the other side.
547
548 =back
549
550 Example of an authentication reply:
551
552 hmac_md6_64_256;363d5175df38bd9eaddd3f6ca18aa1c0c4aa22f0da245ac638d048398c26b8d3;json
553
554 =head2 DATA PHASE
555
556 After this, packets get exchanged using the chosen framing protocol. It is
557 quite possible that both sides use a different framing protocol.
558
559 =head2 FULL EXAMPLE
560
561 This is an actual protocol dump of a handshake, followed by a single data
562 packet. The greater than/less than lines indicate the direction of the
563 transfer only.
564
565 > aemp;0;nndKd+gn;10.0.0.1:4040;hmac_md6_64_256,cleartext;json,storable;provider=AE-0.0;peeraddr=127.0.0.1:1235
566 > sRG8bbc4TDbkpvH8FTP4HBs87OhepH6VuApoZqXXskuG
567 < aemp;0;nmpKd+gh;127.0.0.1:1235,[::1]:1235;hmac_md6_64_256,cleartext;json,storable;provider=AE-0.0;peeraddr=127.0.0.1:58760
568 < dCEUcL/LJVSTJcx8byEsOzrwhzJYOq+L3YcopA5T6EAo
569 > hmac_md6_64_256;9513d4b258975accfcb2ab7532b83690e9c119a502c612203332a591c7237788;json
570 < hmac_md6_64_256;0298d6ba2240faabb2b2e881cf86b97d70a113ca74a87dc006f9f1e9d3010f90;json
571 > ["","lookup","pinger","10.0.0.1:4040#nndKd+gn.a","resolved"]
572
573 =head1 SEE ALSO
574
575 L<AnyEvent::MP>.
576
577 =head1 AUTHOR
578
579 Marc Lehmann <schmorp@schmorp.de>
580 http://home.schmorp.de/
581
582 =cut
583
584 1
585