ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Transport.pm
Revision: 1.32
Committed: Thu Aug 13 01:46:10 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.31: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Transport - actual transport protocol handler
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Transport;
8
9 =head1 DESCRIPTION
10
11 This implements the actual transport protocol for MP (it represents a
12 single link), most of which is considered an implementation detail.
13
14 See the "PROTOCOL" section below if you want to write another client for
15 this protocol.
16
17 =head1 FUNCTIONS/METHODS
18
19 =over 4
20
21 =cut
22
23 package AnyEvent::MP::Transport;
24
25 use common::sense;
26
27 use Scalar::Util ();
28 use List::Util ();
29 use MIME::Base64 ();
30 use Storable ();
31 use JSON::XS ();
32
33 use Digest::MD6 ();
34 use Digest::HMAC_MD6 ();
35
36 use AE ();
37 use AnyEvent::Socket ();
38 use AnyEvent::Handle 4.92 ();
39
40 use AnyEvent::MP::Config ();
41
42 our $PROTOCOL_VERSION = 0;
43
44 =item $listener = mp_listener $host, $port, <constructor-args>, $cb->($transport)
45
46 Creates a listener on the given host/port using
47 C<AnyEvent::Socket::tcp_server>.
48
49 See C<new>, below, for constructor arguments.
50
51 Defaults for peerhost, peerport and fh are provided.
52
53 =cut
54
55 sub mp_server($$@) {
56 my $cb = pop;
57 my ($host, $port, @args) = @_;
58
59 AnyEvent::Socket::tcp_server $host, $port, sub {
60 my ($fh, $host, $port) = @_;
61
62 $cb->(new AnyEvent::MP::Transport
63 fh => $fh,
64 peerhost => $host,
65 peerport => $port,
66 @args,
67 );
68 }
69 }
70
71 =item $guard = mp_connect $host, $port, <constructor-args>, $cb->($transport)
72
73 =cut
74
75 sub mp_connect {
76 my $release = pop;
77 my ($host, $port, @args) = @_;
78
79 my $state;
80
81 $state = AnyEvent::Socket::tcp_connect $host, $port, sub {
82 my ($fh, $nhost, $nport) = @_;
83
84 return $release->() unless $fh;
85
86 $state = new AnyEvent::MP::Transport
87 fh => $fh,
88 peername => $host,
89 peerhost => $nhost,
90 peerport => $nport,
91 release => $release,
92 @args,
93 ;
94 };
95
96 \$state
97 }
98
99 =item new AnyEvent::MP::Transport
100
101 # immediately starts negotiation
102 my $transport = new AnyEvent::MP::Transport
103 # mandatory
104 fh => $filehandle,
105 local_id => $identifier,
106 on_recv => sub { receive-callback },
107 on_error => sub { error-callback },
108
109 # optional
110 on_eof => sub { clean-close-callback },
111 on_connect => sub { successful-connect-callback },
112 greeting => { key => value },
113
114 # tls support
115 tls_ctx => AnyEvent::TLS,
116 peername => $peername, # for verification
117 ;
118
119 =cut
120
121 sub LATENCY() { 3 } # assumed max. network latency
122
123 our @FRAMINGS = qw(storable); # the framing types we accept and send, in order of preference
124 our @AUTH_SND = qw(hmac_md6_64_256); # auth types we send
125 our @AUTH_RCV = (@AUTH_SND, qw(cleartext)); # auth types we accept
126
127 #AnyEvent::Handle::register_write_type mp_record => sub {
128 #};
129
130 sub new {
131 my ($class, %arg) = @_;
132
133 my $self = bless \%arg, $class;
134
135 $self->{queue} = [];
136
137 {
138 Scalar::Util::weaken (my $self = $self);
139
140 my $config = AnyEvent::MP::Config::node_config;
141
142 my $latency = $config->{network_latency} || LATENCY;
143
144 $arg{secret} = $config->{secret}
145 unless exists $arg{secret};
146
147 $arg{timeout} = $config->{monitor_timeout} || $AnyEvent::MP::Kernel::MONITOR_TIMEOUT
148 unless exists $arg{timeout};
149
150 $arg{timeout} -= $latency;
151
152 $arg{timeout} = 1 + $latency
153 if $arg{timeout} < 1 + $latency;
154
155 my $secret = $arg{secret};
156
157 if (exists $config->{cert}) {
158 $arg{tls_ctx} = {
159 sslv2 => 0,
160 sslv3 => 0,
161 tlsv1 => 1,
162 verify => 1,
163 cert => $config->{cert},
164 ca_cert => $config->{cert},
165 verify_require_client_cert => 1,
166 };
167 }
168
169 $self->{hdl} = new AnyEvent::Handle
170 fh => delete $arg{fh},
171 autocork => 1,
172 no_delay => 1,
173 on_error => sub {
174 $self->error ($_[2]);
175 },
176 rtimeout => $latency,
177 peername => delete $arg{peername},
178 ;
179
180 my $greeting_kv = $self->{greeting} ||= {};
181
182 $self->{local_node} = $AnyEvent::MP::Kernel::NODE;
183
184 $greeting_kv->{"tls"} = "1.0" if $arg{tls_ctx};
185 $greeting_kv->{provider} = "AE-$AnyEvent::MP::Kernel::VERSION";
186 $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport};
187 $greeting_kv->{timeout} = $arg{timeout};
188
189 # send greeting
190 my $lgreeting1 = "aemp;$PROTOCOL_VERSION"
191 . ";$self->{local_node}"
192 . ";" . (join ",", @AUTH_RCV)
193 . ";" . (join ",", @FRAMINGS)
194 . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv);
195
196 my $lgreeting2 = MIME::Base64::encode_base64 AnyEvent::MP::Kernel::nonce (66), "";
197
198 $self->{hdl}->push_write ("$lgreeting1\012$lgreeting2\012");
199
200 # expect greeting
201 $self->{hdl}->rbuf_max (4 * 1024);
202 $self->{hdl}->push_read (line => sub {
203 my $rgreeting1 = $_[1];
204
205 my ($aemp, $version, $rnode, $auths, $framings, @kv) = split /;/, $rgreeting1;
206
207 if ($aemp ne "aemp") {
208 return $self->error ("unparsable greeting");
209 } elsif ($version != $PROTOCOL_VERSION) {
210 return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version)");
211 }
212
213 my $s_auth;
214 for my $auth_ (split /,/, $auths) {
215 if (grep $auth_ eq $_, @AUTH_SND) {
216 $s_auth = $auth_;
217 last;
218 }
219 }
220
221 defined $s_auth
222 or return $self->error ("$auths: no common auth type supported");
223
224 die unless $s_auth eq "hmac_md6_64_256"; # hardcoded atm.
225
226 my $s_framing;
227 for my $framing_ (split /,/, $framings) {
228 if (grep $framing_ eq $_, @FRAMINGS) {
229 $s_framing = $framing_;
230 last;
231 }
232 }
233
234 defined $s_framing
235 or return $self->error ("$framings: no common framing method supported");
236
237 $self->{remote_node} = $rnode;
238
239 $self->{remote_greeting} = {
240 map /^([^=]+)(?:=(.*))?/ ? ($1 => $2) : (),
241 @kv
242 };
243
244 # read nonce
245 $self->{hdl}->push_read (line => sub {
246 my $rgreeting2 = $_[1];
247
248 "$lgreeting1\012$lgreeting2" ne "$rgreeting1\012$rgreeting2" # echo attack?
249 or return $self->error ("authentication error, echo attack?");
250
251 my $key;
252 my $lauth;
253
254 if ($self->{tls_ctx} and 1 == int $self->{remote_greeting}{tls}) {
255 $self->{tls} = $lgreeting2 lt $rgreeting2 ? "connect" : "accept";
256 $self->{hdl}->starttls ($self->{tls}, $self->{tls_ctx});
257 $s_auth = "tls";
258 $lauth = "";
259 } elsif (length $secret) {
260 $key = Digest::MD6::md6 $secret;
261 # we currently only support hmac_md6_64_256
262 $lauth = Digest::HMAC_MD6::hmac_md6_hex $key, "$lgreeting1\012$lgreeting2\012$rgreeting1\012$rgreeting2\012", 64, 256;
263 } else {
264 return $self->error ("unable to handshake TLS and no shared secret configured");
265 }
266
267 $self->{hdl}->push_write ("$s_auth;$lauth;$s_framing\012");
268
269 # read the authentication response
270 $self->{hdl}->push_read (line => sub {
271 my ($hdl, $rline) = @_;
272
273 my ($auth_method, $rauth2, $r_framing) = split /;/, $rline;
274
275 my $rauth =
276 $auth_method eq "hmac_md6_64_256" ? Digest::HMAC_MD6::hmac_md6_hex $key, "$rgreeting1\012$rgreeting2\012$lgreeting1\012$lgreeting2\012", 64, 256
277 : $auth_method eq "cleartext" ? unpack "H*", $secret
278 : $auth_method eq "tls" ? ($self->{tls} ? "" : "\012\012") # \012\012 never matches
279 : return $self->error ("$auth_method: fatal, selected unsupported auth method");
280
281 if ($rauth2 ne $rauth) {
282 return $self->error ("authentication failure/shared secret mismatch");
283 }
284
285 $self->{s_framing} = $s_framing;
286
287 $hdl->rbuf_max (undef);
288 my $queue = delete $self->{queue}; # we are connected
289
290 $self->{hdl}->rtimeout ($self->{remote_greeting}{timeout});
291 $self->{hdl}->wtimeout ($arg{timeout} - LATENCY);
292 $self->{hdl}->on_wtimeout (sub { $self->send (["", "devnull"]) });
293
294 $self->connected;
295
296 # send queued messages
297 $self->send ($_)
298 for @$queue;
299
300 # receive handling
301 my $src_node = $self->{node};
302
303 my $rmsg; $rmsg = sub {
304 $_[0]->push_read ($r_framing => $rmsg);
305
306 local $AnyEvent::MP::Kernel::SRCNODE = $src_node;
307 AnyEvent::MP::Kernel::_inject (@{ $_[1] });
308 };
309 $hdl->push_read ($r_framing => $rmsg);
310 });
311 });
312 });
313 }
314
315 $self
316 }
317
318 sub error {
319 my ($self, $msg) = @_;
320
321 $self->{node}->transport_error (transport_error => $self->{node}{noderef}, $msg)
322 if $self->{node} && $self->{node}{transport} == $self;
323
324 (delete $self->{release})->()
325 if exists $self->{release};
326
327 $AnyEvent::MP::Kernel::WARN->("$self->{peerhost}:$self->{peerport}: $msg");
328 $self->destroy;
329 }
330
331 sub connected {
332 my ($self) = @_;
333
334 (delete $self->{release})->()
335 if exists $self->{release};
336
337 # first connect with a master node
338 $AnyEvent::MP::Kernel::SLAVE->($self->{remote_node})
339 if ref $AnyEvent::MP::Kernel::SLAVE;
340
341 my $node = AnyEvent::MP::Kernel::add_node ($self->{remote_node});
342 Scalar::Util::weaken ($self->{node} = $node);
343 $node->transport_connect ($self);
344 }
345
346 sub send {
347 $_[0]{hdl}->push_write ($_[0]{s_framing} => $_[1]);
348 }
349
350 sub destroy {
351 my ($self) = @_;
352
353 $self->{hdl}->destroy
354 if $self->{hdl};
355 }
356
357 sub DESTROY {
358 my ($self) = @_;
359
360 $self->destroy;
361 }
362
363 =back
364
365 =head1 PROTOCOL
366
367 The protocol is relatively simple, and consists of three phases which are
368 symmetrical for both sides: greeting (followed by optionally switching to
369 TLS mode), authentication and packet exchange.
370
371 the protocol is designed to allow both full-text and binary streams.
372
373 The greeting consists of two text lines that are ended by either an ASCII
374 CR LF pair, or a single ASCII LF (recommended).
375
376 =head2 GREETING
377
378 All the lines until after authentication must not exceed 4kb in length,
379 including delimiter. Afterwards there is no limit on the packet size that
380 can be received.
381
382 =head3 First Greeting Line
383
384 Example:
385
386 aemp;0;fec.4a7720fc;127.0.0.1:1235,[::1]:1235;hmac_md6_64_256;json,storable;provider=AE-0.0
387
388 The first line contains strings separated (not ended) by C<;>
389 characters. The first even ixtrings are fixed by the protocol, the
390 remaining strings are C<KEY=VALUE> pairs. None of them may contain C<;>
391 characters themselves.
392
393 The fixed strings are:
394
395 =over 4
396
397 =item protocol identification
398
399 The constant C<aemp> to identify the protocol.
400
401 =item protocol version
402
403 The protocol version supported by this end, currently C<0>. If the
404 versions don't match then no communication is possible. Minor extensions
405 are supposed to be handled through additional key-value pairs.
406
407 =item the node endpoint descriptors
408
409 for public nodes, this is a comma-separated list of protocol endpoints,
410 i.e., the noderef. For slave nodes, this is a unique identifier of the
411 form C<slave/nonce>.
412
413 =item the acceptable authentication methods
414
415 A comma-separated list of authentication methods supported by the
416 node. Note that AnyEvent::MP supports a C<hex_secret> authentication
417 method that accepts a cleartext password (hex-encoded), but will not use
418 this auth method itself.
419
420 The receiving side should choose the first auth method it supports.
421
422 =item the acceptable framing formats
423
424 A comma-separated list of packet encoding/framign formats understood. The
425 receiving side should choose the first framing format it supports for
426 sending packets (which might be different from the format it has to accept).
427
428 =back
429
430 The remaining arguments are C<KEY=VALUE> pairs. The following key-value
431 pairs are known at this time:
432
433 =over 4
434
435 =item provider=<module-version>
436
437 The software provider for this implementation. For AnyEvent::MP, this is
438 C<AE-0.0> or whatever version it currently is at.
439
440 =item peeraddr=<host>:<port>
441
442 The peer address (socket address of the other side) as seen locally, in the same format
443 as noderef endpoints.
444
445 =item tls=<major>.<minor>
446
447 Indicates that the other side supports TLS (version should be 1.0) and
448 wishes to do a TLS handshake.
449
450 =item timeout=<seconds>
451
452 The amount of time after which this node should be detected as dead unless
453 some data has been received. The node is responsible to send traffic
454 reasonably more often than this interval (such as every timeout minus five
455 seconds).
456
457 =back
458
459 =head3 Second Greeting Line
460
461 After this greeting line there will be a second line containing a
462 cryptographic nonce, i.e. random data of high quality. To keep the
463 protocol text-only, these are usually 32 base64-encoded octets, but
464 it could be anything that doesn't contain any ASCII CR or ASCII LF
465 characters.
466
467 I<< The two nonces B<must> be different, and an aemp implementation
468 B<must> check and fail when they are identical >>.
469
470 Example of a nonce line:
471
472 p/I122ql7kJR8lumW3lXlXCeBnyDAvz8NQo3x5IFowE4
473
474 =head2 TLS handshake
475
476 I<< If, after the handshake, both sides indicate interest in TLS, then the
477 connection B<must> use TLS, or fail. >>
478
479 Both sides compare their nonces, and the side who sent the lower nonce
480 value ("string" comparison on the raw octet values) becomes the client,
481 and the one with the higher nonce the server.
482
483 =head2 AUTHENTICATION PHASE
484
485 After the greeting is received (and the optional TLS handshake),
486 the authentication phase begins, which consists of sending a single
487 C<;>-separated line with three fixed strings and any number of
488 C<KEY=VALUE> pairs.
489
490 The three fixed strings are:
491
492 =over 4
493
494 =item the authentication method chosen
495
496 This must be one of the methods offered by the other side in the greeting.
497
498 The currently supported authentication methods are:
499
500 =over 4
501
502 =item cleartext
503
504 This is simply the shared secret, lowercase-hex-encoded. This method is of
505 course very insecure, unless TLS is used, which is why this module will
506 accept, but not generate, cleartext auth replies.
507
508 =item hmac_md6_64_256
509
510 This method uses an MD6 HMAC with 64 bit blocksize and 256 bit hash. First, the shared secret
511 is hashed with MD6:
512
513 key = MD6 (secret)
514
515 This secret is then used to generate the "local auth reply", by taking
516 the two local greeting lines and the two remote greeting lines (without
517 line endings), appending \012 to all of them, concatenating them and
518 calculating the MD6 HMAC with the key.
519
520 lauth = HMAC_MD6 key, "lgreeting1\012lgreeting2\012rgreeting1\012rgreeting2\012"
521
522 This authentication token is then lowercase-hex-encoded and sent to the
523 other side.
524
525 Then the remote auth reply is generated using the same method, but local
526 and remote greeting lines swapped:
527
528 rauth = HMAC_MD6 key, "rgreeting1\012rgreeting2\012lgreeting1\012lgreeting2\012"
529
530 This is the token that is expected from the other side.
531
532 =item tls
533
534 This type is only valid iff TLS was enabled and the TLS handshake
535 was successful. It has no authentication data, as the server/client
536 certificate was successfully verified.
537
538 Implementations supporting TLS I<must> accept this authentication type.
539
540 =back
541
542 =item the authentication data
543
544 The authentication data itself, usually base64 or hex-encoded data, see
545 above.
546
547 =item the framing protocol chosen
548
549 This must be one of the framing protocols offered by the other side in the
550 greeting. Each side must accept the choice of the other side.
551
552 =back
553
554 Example of an authentication reply:
555
556 hmac_md6_64_256;363d5175df38bd9eaddd3f6ca18aa1c0c4aa22f0da245ac638d048398c26b8d3;json
557
558 =head2 DATA PHASE
559
560 After this, packets get exchanged using the chosen framing protocol. It is
561 quite possible that both sides use a different framing protocol.
562
563 =head2 FULL EXAMPLE
564
565 This is an actual protocol dump of a handshake, followed by a single data
566 packet. The greater than/less than lines indicate the direction of the
567 transfer only.
568
569 > aemp;0;nndKd+gn;10.0.0.1:4040;hmac_md6_64_256,cleartext;json,storable;provider=AE-0.0;peeraddr=127.0.0.1:1235
570 > sRG8bbc4TDbkpvH8FTP4HBs87OhepH6VuApoZqXXskuG
571 < aemp;0;nmpKd+gh;127.0.0.1:1235,[::1]:1235;hmac_md6_64_256,cleartext;json,storable;provider=AE-0.0;peeraddr=127.0.0.1:58760
572 < dCEUcL/LJVSTJcx8byEsOzrwhzJYOq+L3YcopA5T6EAo
573 > hmac_md6_64_256;9513d4b258975accfcb2ab7532b83690e9c119a502c612203332a591c7237788;json
574 < hmac_md6_64_256;0298d6ba2240faabb2b2e881cf86b97d70a113ca74a87dc006f9f1e9d3010f90;json
575 > ["","lookup","pinger","10.0.0.1:4040#nndKd+gn.a","resolved"]
576
577 =head1 SEE ALSO
578
579 L<AnyEvent::MP>.
580
581 =head1 AUTHOR
582
583 Marc Lehmann <schmorp@schmorp.de>
584 http://home.schmorp.de/
585
586 =cut
587
588 1
589