ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Transport.pm
Revision: 1.38
Committed: Thu Aug 27 07:12:48 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.37: +2 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Transport - actual transport protocol handler
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Transport;
8
9 =head1 DESCRIPTION
10
11 This implements the actual transport protocol for MP (it represents a
12 single link), most of which is considered an implementation detail.
13
14 See the "PROTOCOL" section below if you want to write another client for
15 this protocol.
16
17 =head1 FUNCTIONS/METHODS
18
19 =over 4
20
21 =cut
22
23 package AnyEvent::MP::Transport;
24
25 use common::sense;
26
27 use Scalar::Util ();
28 use List::Util ();
29 use MIME::Base64 ();
30 use Storable ();
31 use JSON::XS ();
32
33 use Digest::MD6 ();
34 use Digest::HMAC_MD6 ();
35
36 use AE ();
37 use AnyEvent::Socket ();
38 use AnyEvent::Handle 4.92 ();
39
40 use AnyEvent::MP::Config ();
41
42 our $PROTOCOL_VERSION = 0;
43
44 =item $listener = mp_listener $host, $port, <constructor-args>, $cb->($transport)
45
46 Creates a listener on the given host/port using
47 C<AnyEvent::Socket::tcp_server>.
48
49 See C<new>, below, for constructor arguments.
50
51 Defaults for peerhost, peerport and fh are provided.
52
53 =cut
54
55 sub mp_server($$@) {
56 my $cb = pop;
57 my ($host, $port, @args) = @_;
58
59 AnyEvent::Socket::tcp_server $host, $port, sub {
60 my ($fh, $host, $port) = @_;
61
62 $cb->(new AnyEvent::MP::Transport
63 fh => $fh,
64 peerhost => $host,
65 peerport => $port,
66 @args,
67 );
68 }
69 }
70
71 =item $guard = mp_connect $host, $port, <constructor-args>, $cb->($transport)
72
73 =cut
74
75 sub mp_connect {
76 my $release = pop;
77 my ($host, $port, @args) = @_;
78
79 my $state;
80
81 $state = AnyEvent::Socket::tcp_connect $host, $port, sub {
82 my ($fh, $nhost, $nport) = @_;
83
84 return $release->() unless $fh;
85
86 $state = new AnyEvent::MP::Transport
87 fh => $fh,
88 peername => $host,
89 peerhost => $nhost,
90 peerport => $nport,
91 release => $release,
92 @args,
93 ;
94 };
95
96 \$state
97 }
98
99 =item new AnyEvent::MP::Transport
100
101 # immediately starts negotiation
102 my $transport = new AnyEvent::MP::Transport
103 # mandatory
104 fh => $filehandle,
105 local_id => $identifier,
106 on_recv => sub { receive-callback },
107 on_error => sub { error-callback },
108
109 # optional
110 on_eof => sub { clean-close-callback },
111 on_connect => sub { successful-connect-callback },
112 greeting => { key => value },
113
114 # tls support
115 tls_ctx => AnyEvent::TLS,
116 peername => $peername, # for verification
117 ;
118
119 =cut
120
121 sub LATENCY() { 3 } # assumed max. network latency
122
123 our @FRAMINGS = qw(json storable); # the framing types we accept and send, in order of preference
124 our @AUTH_SND = qw(hmac_md6_64_256); # auth types we send
125 our @AUTH_RCV = (@AUTH_SND, qw(cleartext)); # auth types we accept
126
127 #AnyEvent::Handle::register_write_type mp_record => sub {
128 #};
129
130 sub new {
131 my ($class, %arg) = @_;
132
133 my $self = bless \%arg, $class;
134
135 $self->{queue} = [];
136
137 {
138 Scalar::Util::weaken (my $self = $self);
139
140 my $config = AnyEvent::MP::Config::config;
141
142 my $latency = $config->{network_latency} || LATENCY;
143
144 $arg{secret} = $config->{secret}
145 unless exists $arg{secret};
146
147 $arg{timeout} = $config->{monitor_timeout} || $AnyEvent::MP::Kernel::MONITOR_TIMEOUT
148 unless exists $arg{timeout};
149
150 $arg{timeout} -= $latency;
151
152 $arg{timeout} = 1 + $latency
153 if $arg{timeout} < 1 + $latency;
154
155 my $secret = $arg{secret};
156
157 if (exists $config->{cert}) {
158 $arg{tls_ctx} = {
159 sslv2 => 0,
160 sslv3 => 0,
161 tlsv1 => 1,
162 verify => 1,
163 cert => $config->{cert},
164 ca_cert => $config->{cert},
165 verify_require_client_cert => 1,
166 };
167 }
168
169 $self->{hdl} = new AnyEvent::Handle
170 fh => delete $arg{fh},
171 autocork => 1,
172 no_delay => 1,
173 on_error => sub {
174 $self->error ($_[2]);
175 },
176 rtimeout => $latency,
177 peername => delete $arg{peername},
178 ;
179
180 my $greeting_kv = $self->{greeting} ||= {};
181
182 $self->{local_node} ||= $AnyEvent::MP::Kernel::NODE;
183
184 $greeting_kv->{"tls"} = "1.0" if $arg{tls_ctx};
185 $greeting_kv->{provider} = "AE-$AnyEvent::MP::Kernel::VERSION";
186 $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport};
187 $greeting_kv->{timeout} = $arg{timeout};
188
189 # send greeting
190 my $lgreeting1 = "aemp;$PROTOCOL_VERSION"
191 . ";$self->{local_node}"
192 . ";" . (join ",", @AUTH_RCV)
193 . ";" . (join ",", @FRAMINGS)
194 . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv);
195
196 my $lgreeting2 = MIME::Base64::encode_base64 AnyEvent::MP::Kernel::nonce (66), "";
197
198 $self->{hdl}->push_write ("$lgreeting1\012$lgreeting2\012");
199
200 # expect greeting
201 $self->{hdl}->rbuf_max (4 * 1024);
202 $self->{hdl}->push_read (line => sub {
203 my $rgreeting1 = $_[1];
204
205 my ($aemp, $version, $rnode, $auths, $framings, @kv) = split /;/, $rgreeting1;
206
207 if ($aemp ne "aemp") {
208 return $self->error ("unparsable greeting");
209 } elsif ($version != $PROTOCOL_VERSION) {
210 return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version)");
211 }
212
213 my $s_auth;
214 for my $auth_ (split /,/, $auths) {
215 if (grep $auth_ eq $_, @AUTH_SND) {
216 $s_auth = $auth_;
217 last;
218 }
219 }
220
221 defined $s_auth
222 or return $self->error ("$auths: no common auth type supported");
223
224 die unless $s_auth eq "hmac_md6_64_256"; # hardcoded atm.
225
226 my $s_framing;
227 for my $framing_ (split /,/, $framings) {
228 if (grep $framing_ eq $_, @FRAMINGS) {
229 $s_framing = $framing_;
230 last;
231 }
232 }
233
234 defined $s_framing
235 or return $self->error ("$framings: no common framing method supported");
236
237 $self->{remote_node} = $rnode;
238
239 $self->{remote_greeting} = {
240 map /^([^=]+)(?:=(.*))?/ ? ($1 => $2) : (),
241 @kv
242 };
243
244 # read nonce
245 $self->{hdl}->push_read (line => sub {
246 my $rgreeting2 = $_[1];
247
248 "$lgreeting1\012$lgreeting2" ne "$rgreeting1\012$rgreeting2" # echo attack?
249 or return $self->error ("authentication error, echo attack?");
250
251 my $key;
252 my $lauth;
253
254 if ($self->{tls_ctx} and 1 == int $self->{remote_greeting}{tls}) {
255 $self->{tls} = $lgreeting2 lt $rgreeting2 ? "connect" : "accept";
256 $self->{hdl}->starttls ($self->{tls}, $self->{tls_ctx});
257 $s_auth = "tls";
258 $lauth = "";
259 } elsif (length $secret) {
260 $key = Digest::MD6::md6 $secret;
261 # we currently only support hmac_md6_64_256
262 $lauth = Digest::HMAC_MD6::hmac_md6_hex $key, "$lgreeting1\012$lgreeting2\012$rgreeting1\012$rgreeting2\012", 64, 256;
263 } else {
264 return $self->error ("unable to handshake TLS and no shared secret configured");
265 }
266
267 $self->{hdl}->push_write ("$s_auth;$lauth;$s_framing\012");
268
269 # read the authentication response
270 $self->{hdl}->push_read (line => sub {
271 my ($hdl, $rline) = @_;
272
273 my ($auth_method, $rauth2, $r_framing) = split /;/, $rline;
274
275 my $rauth =
276 $auth_method eq "hmac_md6_64_256" ? Digest::HMAC_MD6::hmac_md6_hex $key, "$rgreeting1\012$rgreeting2\012$lgreeting1\012$lgreeting2\012", 64, 256
277 : $auth_method eq "cleartext" ? unpack "H*", $secret
278 : $auth_method eq "tls" ? ($self->{tls} ? "" : "\012\012") # \012\012 never matches
279 : return $self->error ("$auth_method: fatal, selected unsupported auth method");
280
281 if ($rauth2 ne $rauth) {
282 return $self->error ("authentication failure/shared secret mismatch");
283 }
284
285 $self->{s_framing} = $s_framing;
286
287 $hdl->rbuf_max (undef);
288 my $queue = delete $self->{queue}; # we are connected
289
290 $self->{hdl}->rtimeout ($self->{remote_greeting}{timeout});
291 $self->{hdl}->wtimeout ($arg{timeout} - LATENCY);
292 $self->{hdl}->on_wtimeout (sub { $self->send ([]) });
293
294 $self->connected;
295
296 # send queued messages
297 $self->send ($_)
298 for @$queue;
299
300 # receive handling
301 my $src_node = $self->{node};
302
303 my $rmsg; $rmsg = sub {
304 $_[0]->push_read ($r_framing => $rmsg);
305
306 local $AnyEvent::MP::Kernel::SRCNODE = $src_node;
307 AnyEvent::MP::Kernel::_inject (@{ $_[1] });
308 };
309 $hdl->push_read ($r_framing => $rmsg);
310 });
311 });
312 });
313 }
314
315 $self
316 }
317
318 sub error {
319 my ($self, $msg) = @_;
320
321 $self->{node}->transport_error (transport_error => $self->{node}{noderef}, $msg)
322 if $self->{node} && $self->{node}{transport} == $self;
323
324 (delete $self->{release})->()
325 if exists $self->{release};
326
327 # $AnyEvent::MP::Kernel::WARN->(7, "$self->{peerhost}:$self->{peerport}: $msg");
328 $self->destroy;
329 }
330
331 sub connected {
332 my ($self) = @_;
333
334 (delete $self->{release})->()
335 if exists $self->{release};
336
337 my $node = AnyEvent::MP::Kernel::add_node ($self->{remote_node});
338
339 Scalar::Util::weaken ($self->{node} = $node);
340 $node->transport_connect ($self);
341 }
342
343 sub send {
344 $_[0]{hdl}->push_write ($_[0]{s_framing} => $_[1]);
345 }
346
347 sub destroy {
348 my ($self) = @_;
349
350 $self->{hdl}->destroy
351 if $self->{hdl};
352 }
353
354 sub DESTROY {
355 my ($self) = @_;
356
357 $self->destroy;
358 }
359
360 =back
361
362 =head1 PROTOCOL
363
364 The protocol is relatively simple, and consists of three phases which are
365 symmetrical for both sides: greeting (followed by optionally switching to
366 TLS mode), authentication and packet exchange.
367
368 the protocol is designed to allow both full-text and binary streams.
369
370 The greeting consists of two text lines that are ended by either an ASCII
371 CR LF pair, or a single ASCII LF (recommended).
372
373 =head2 GREETING
374
375 All the lines until after authentication must not exceed 4kb in length,
376 including delimiter. Afterwards there is no limit on the packet size that
377 can be received.
378
379 =head3 First Greeting Line
380
381 Example:
382
383 aemp;0;fec.4a7720fc;127.0.0.1:1235,[::1]:1235;hmac_md6_64_256;json,storable;provider=AE-0.0
384
385 The first line contains strings separated (not ended) by C<;>
386 characters. The first even ixtrings are fixed by the protocol, the
387 remaining strings are C<KEY=VALUE> pairs. None of them may contain C<;>
388 characters themselves.
389
390 The fixed strings are:
391
392 =over 4
393
394 =item protocol identification
395
396 The constant C<aemp> to identify the protocol.
397
398 =item protocol version
399
400 The protocol version supported by this end, currently C<0>. If the
401 versions don't match then no communication is possible. Minor extensions
402 are supposed to be handled through additional key-value pairs.
403
404 =item the node endpoint descriptors
405
406 for public nodes, this is a comma-separated list of protocol endpoints,
407 i.e., the noderef. For slave nodes, this is a unique identifier of the
408 form C<slave/nonce>.
409
410 =item the acceptable authentication methods
411
412 A comma-separated list of authentication methods supported by the
413 node. Note that AnyEvent::MP supports a C<hex_secret> authentication
414 method that accepts a cleartext password (hex-encoded), but will not use
415 this auth method itself.
416
417 The receiving side should choose the first auth method it supports.
418
419 =item the acceptable framing formats
420
421 A comma-separated list of packet encoding/framign formats understood. The
422 receiving side should choose the first framing format it supports for
423 sending packets (which might be different from the format it has to accept).
424
425 =back
426
427 The remaining arguments are C<KEY=VALUE> pairs. The following key-value
428 pairs are known at this time:
429
430 =over 4
431
432 =item provider=<module-version>
433
434 The software provider for this implementation. For AnyEvent::MP, this is
435 C<AE-0.0> or whatever version it currently is at.
436
437 =item peeraddr=<host>:<port>
438
439 The peer address (socket address of the other side) as seen locally, in the same format
440 as noderef endpoints.
441
442 =item tls=<major>.<minor>
443
444 Indicates that the other side supports TLS (version should be 1.0) and
445 wishes to do a TLS handshake.
446
447 =item timeout=<seconds>
448
449 The amount of time after which this node should be detected as dead unless
450 some data has been received. The node is responsible to send traffic
451 reasonably more often than this interval (such as every timeout minus five
452 seconds).
453
454 =back
455
456 =head3 Second Greeting Line
457
458 After this greeting line there will be a second line containing a
459 cryptographic nonce, i.e. random data of high quality. To keep the
460 protocol text-only, these are usually 32 base64-encoded octets, but
461 it could be anything that doesn't contain any ASCII CR or ASCII LF
462 characters.
463
464 I<< The two nonces B<must> be different, and an aemp implementation
465 B<must> check and fail when they are identical >>.
466
467 Example of a nonce line:
468
469 p/I122ql7kJR8lumW3lXlXCeBnyDAvz8NQo3x5IFowE4
470
471 =head2 TLS handshake
472
473 I<< If, after the handshake, both sides indicate interest in TLS, then the
474 connection B<must> use TLS, or fail. >>
475
476 Both sides compare their nonces, and the side who sent the lower nonce
477 value ("string" comparison on the raw octet values) becomes the client,
478 and the one with the higher nonce the server.
479
480 =head2 AUTHENTICATION PHASE
481
482 After the greeting is received (and the optional TLS handshake),
483 the authentication phase begins, which consists of sending a single
484 C<;>-separated line with three fixed strings and any number of
485 C<KEY=VALUE> pairs.
486
487 The three fixed strings are:
488
489 =over 4
490
491 =item the authentication method chosen
492
493 This must be one of the methods offered by the other side in the greeting.
494
495 The currently supported authentication methods are:
496
497 =over 4
498
499 =item cleartext
500
501 This is simply the shared secret, lowercase-hex-encoded. This method is of
502 course very insecure, unless TLS is used, which is why this module will
503 accept, but not generate, cleartext auth replies.
504
505 =item hmac_md6_64_256
506
507 This method uses an MD6 HMAC with 64 bit blocksize and 256 bit hash. First, the shared secret
508 is hashed with MD6:
509
510 key = MD6 (secret)
511
512 This secret is then used to generate the "local auth reply", by taking
513 the two local greeting lines and the two remote greeting lines (without
514 line endings), appending \012 to all of them, concatenating them and
515 calculating the MD6 HMAC with the key.
516
517 lauth = HMAC_MD6 key, "lgreeting1\012lgreeting2\012rgreeting1\012rgreeting2\012"
518
519 This authentication token is then lowercase-hex-encoded and sent to the
520 other side.
521
522 Then the remote auth reply is generated using the same method, but local
523 and remote greeting lines swapped:
524
525 rauth = HMAC_MD6 key, "rgreeting1\012rgreeting2\012lgreeting1\012lgreeting2\012"
526
527 This is the token that is expected from the other side.
528
529 =item tls
530
531 This type is only valid iff TLS was enabled and the TLS handshake
532 was successful. It has no authentication data, as the server/client
533 certificate was successfully verified.
534
535 Implementations supporting TLS I<must> accept this authentication type.
536
537 =back
538
539 =item the authentication data
540
541 The authentication data itself, usually base64 or hex-encoded data, see
542 above.
543
544 =item the framing protocol chosen
545
546 This must be one of the framing protocols offered by the other side in the
547 greeting. Each side must accept the choice of the other side.
548
549 =back
550
551 Example of an authentication reply:
552
553 hmac_md6_64_256;363d5175df38bd9eaddd3f6ca18aa1c0c4aa22f0da245ac638d048398c26b8d3;json
554
555 =head2 DATA PHASE
556
557 After this, packets get exchanged using the chosen framing protocol. It is
558 quite possible that both sides use a different framing protocol.
559
560 =head2 FULL EXAMPLE
561
562 This is an actual protocol dump of a handshake, followed by a single data
563 packet. The greater than/less than lines indicate the direction of the
564 transfer only.
565
566 > aemp;0;nndKd+gn;10.0.0.1:4040;hmac_md6_64_256,cleartext;json,storable;provider=AE-0.0;peeraddr=127.0.0.1:1235
567 > sRG8bbc4TDbkpvH8FTP4HBs87OhepH6VuApoZqXXskuG
568 < aemp;0;nmpKd+gh;127.0.0.1:1235,[::1]:1235;hmac_md6_64_256,cleartext;json,storable;provider=AE-0.0;peeraddr=127.0.0.1:58760
569 < dCEUcL/LJVSTJcx8byEsOzrwhzJYOq+L3YcopA5T6EAo
570 > hmac_md6_64_256;9513d4b258975accfcb2ab7532b83690e9c119a502c612203332a591c7237788;json
571 < hmac_md6_64_256;0298d6ba2240faabb2b2e881cf86b97d70a113ca74a87dc006f9f1e9d3010f90;json
572 > ["","lookup","pinger","10.0.0.1:4040#nndKd+gn.a","resolved"]
573
574 =head1 SEE ALSO
575
576 L<AnyEvent::MP>.
577
578 =head1 AUTHOR
579
580 Marc Lehmann <schmorp@schmorp.de>
581 http://home.schmorp.de/
582
583 =cut
584
585 1
586