ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Transport.pm
Revision: 1.32
Committed: Thu Aug 13 01:46:10 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.31: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3 root 1.13 AnyEvent::MP::Transport - actual transport protocol handler
4 root 1.1
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Transport;
8    
9     =head1 DESCRIPTION
10    
11 root 1.13 This implements the actual transport protocol for MP (it represents a
12     single link), most of which is considered an implementation detail.
13 root 1.1
14 root 1.7 See the "PROTOCOL" section below if you want to write another client for
15     this protocol.
16 root 1.1
17     =head1 FUNCTIONS/METHODS
18    
19     =over 4
20    
21     =cut
22    
23     package AnyEvent::MP::Transport;
24    
25     use common::sense;
26    
27 root 1.27 use Scalar::Util ();
28     use List::Util ();
29 root 1.1 use MIME::Base64 ();
30     use Storable ();
31 root 1.2 use JSON::XS ();
32 root 1.1
33 root 1.19 use Digest::MD6 ();
34     use Digest::HMAC_MD6 ();
35    
36 root 1.1 use AE ();
37     use AnyEvent::Socket ();
38 root 1.27 use AnyEvent::Handle 4.92 ();
39 root 1.2
40 root 1.30 use AnyEvent::MP::Config ();
41    
42 root 1.2 our $PROTOCOL_VERSION = 0;
43 root 1.1
44     =item $listener = mp_listener $host, $port, <constructor-args>, $cb->($transport)
45    
46     Creates a listener on the given host/port using
47     C<AnyEvent::Socket::tcp_server>.
48    
49     See C<new>, below, for constructor arguments.
50    
51 root 1.10 Defaults for peerhost, peerport and fh are provided.
52 root 1.1
53     =cut
54    
55     sub mp_server($$@) {
56     my $cb = pop;
57     my ($host, $port, @args) = @_;
58    
59     AnyEvent::Socket::tcp_server $host, $port, sub {
60     my ($fh, $host, $port) = @_;
61    
62     $cb->(new AnyEvent::MP::Transport
63     fh => $fh,
64     peerhost => $host,
65     peerport => $port,
66     @args,
67     );
68     }
69     }
70    
71 root 1.2 =item $guard = mp_connect $host, $port, <constructor-args>, $cb->($transport)
72    
73     =cut
74    
75     sub mp_connect {
76 root 1.31 my $release = pop;
77 root 1.2 my ($host, $port, @args) = @_;
78    
79 root 1.31 my $state;
80    
81     $state = AnyEvent::Socket::tcp_connect $host, $port, sub {
82 root 1.2 my ($fh, $nhost, $nport) = @_;
83    
84 root 1.31 return $release->() unless $fh;
85 root 1.2
86 root 1.31 $state = new AnyEvent::MP::Transport
87 root 1.2 fh => $fh,
88     peername => $host,
89     peerhost => $nhost,
90     peerport => $nport,
91 root 1.31 release => $release,
92 root 1.2 @args,
93 root 1.31 ;
94     };
95    
96     \$state
97 root 1.2 }
98    
99 root 1.1 =item new AnyEvent::MP::Transport
100    
101     # immediately starts negotiation
102     my $transport = new AnyEvent::MP::Transport
103 root 1.2 # mandatory
104 root 1.1 fh => $filehandle,
105 root 1.2 local_id => $identifier,
106 root 1.1 on_recv => sub { receive-callback },
107     on_error => sub { error-callback },
108    
109     # optional
110     on_eof => sub { clean-close-callback },
111     on_connect => sub { successful-connect-callback },
112 root 1.2 greeting => { key => value },
113 root 1.1
114     # tls support
115     tls_ctx => AnyEvent::TLS,
116     peername => $peername, # for verification
117     ;
118    
119     =cut
120    
121 root 1.27 sub LATENCY() { 3 } # assumed max. network latency
122    
123 root 1.32 our @FRAMINGS = qw(storable); # the framing types we accept and send, in order of preference
124 root 1.7 our @AUTH_SND = qw(hmac_md6_64_256); # auth types we send
125 root 1.13 our @AUTH_RCV = (@AUTH_SND, qw(cleartext)); # auth types we accept
126 root 1.7
127     #AnyEvent::Handle::register_write_type mp_record => sub {
128     #};
129 root 1.4
130 root 1.1 sub new {
131     my ($class, %arg) = @_;
132    
133     my $self = bless \%arg, $class;
134    
135     $self->{queue} = [];
136    
137     {
138     Scalar::Util::weaken (my $self = $self);
139    
140 root 1.30 my $config = AnyEvent::MP::Config::node_config;
141    
142 root 1.31 my $latency = $config->{network_latency} || LATENCY;
143    
144 root 1.30 $arg{secret} = $config->{secret}
145 root 1.2 unless exists $arg{secret};
146    
147 root 1.31 $arg{timeout} = $config->{monitor_timeout} || $AnyEvent::MP::Kernel::MONITOR_TIMEOUT
148 root 1.24 unless exists $arg{timeout};
149    
150 root 1.31 $arg{timeout} -= $latency;
151    
152     $arg{timeout} = 1 + $latency
153     if $arg{timeout} < 1 + $latency;
154 root 1.27
155 root 1.19 my $secret = $arg{secret};
156    
157 root 1.30 if (exists $config->{cert}) {
158 root 1.19 $arg{tls_ctx} = {
159     sslv2 => 0,
160     sslv3 => 0,
161     tlsv1 => 1,
162     verify => 1,
163 root 1.30 cert => $config->{cert},
164     ca_cert => $config->{cert},
165 root 1.19 verify_require_client_cert => 1,
166     };
167     }
168    
169 root 1.1 $self->{hdl} = new AnyEvent::Handle
170 root 1.2 fh => delete $arg{fh},
171 root 1.4 autocork => 1,
172     no_delay => 1,
173 root 1.1 on_error => sub {
174     $self->error ($_[2]);
175     },
176 root 1.31 rtimeout => $latency,
177 root 1.1 peername => delete $arg{peername},
178     ;
179    
180 root 1.2 my $greeting_kv = $self->{greeting} ||= {};
181 root 1.24
182 root 1.31 $self->{local_node} = $AnyEvent::MP::Kernel::NODE;
183 root 1.24
184     $greeting_kv->{"tls"} = "1.0" if $arg{tls_ctx};
185 root 1.31 $greeting_kv->{provider} = "AE-$AnyEvent::MP::Kernel::VERSION";
186 root 1.7 $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport};
187 root 1.26 $greeting_kv->{timeout} = $arg{timeout};
188 root 1.23
189 root 1.1 # send greeting
190 root 1.12 my $lgreeting1 = "aemp;$PROTOCOL_VERSION"
191 root 1.24 . ";$self->{local_node}"
192 root 1.7 . ";" . (join ",", @AUTH_RCV)
193     . ";" . (join ",", @FRAMINGS)
194     . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv);
195 root 1.12
196 root 1.31 my $lgreeting2 = MIME::Base64::encode_base64 AnyEvent::MP::Kernel::nonce (66), "";
197 root 1.1
198 root 1.7 $self->{hdl}->push_write ("$lgreeting1\012$lgreeting2\012");
199 root 1.1
200     # expect greeting
201 root 1.12 $self->{hdl}->rbuf_max (4 * 1024);
202 root 1.1 $self->{hdl}->push_read (line => sub {
203 root 1.7 my $rgreeting1 = $_[1];
204 root 1.1
205 root 1.26 my ($aemp, $version, $rnode, $auths, $framings, @kv) = split /;/, $rgreeting1;
206 root 1.1
207     if ($aemp ne "aemp") {
208     return $self->error ("unparsable greeting");
209 root 1.12 } elsif ($version != $PROTOCOL_VERSION) {
210     return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version)");
211 root 1.1 }
212    
213 root 1.7 my $s_auth;
214     for my $auth_ (split /,/, $auths) {
215     if (grep $auth_ eq $_, @AUTH_SND) {
216     $s_auth = $auth_;
217     last;
218     }
219     }
220    
221     defined $s_auth
222     or return $self->error ("$auths: no common auth type supported");
223    
224     die unless $s_auth eq "hmac_md6_64_256"; # hardcoded atm.
225    
226     my $s_framing;
227     for my $framing_ (split /,/, $framings) {
228     if (grep $framing_ eq $_, @FRAMINGS) {
229     $s_framing = $framing_;
230     last;
231     }
232     }
233    
234     defined $s_framing
235     or return $self->error ("$framings: no common framing method supported");
236    
237 root 1.2 $self->{remote_node} = $rnode;
238 root 1.1
239 root 1.2 $self->{remote_greeting} = {
240     map /^([^=]+)(?:=(.*))?/ ? ($1 => $2) : (),
241     @kv
242 root 1.1 };
243    
244 root 1.7 # read nonce
245     $self->{hdl}->push_read (line => sub {
246     my $rgreeting2 = $_[1];
247    
248 root 1.19 "$lgreeting1\012$lgreeting2" ne "$rgreeting1\012$rgreeting2" # echo attack?
249     or return $self->error ("authentication error, echo attack?");
250    
251 root 1.30 my $key;
252 root 1.19 my $lauth;
253    
254 root 1.10 if ($self->{tls_ctx} and 1 == int $self->{remote_greeting}{tls}) {
255 root 1.8 $self->{tls} = $lgreeting2 lt $rgreeting2 ? "connect" : "accept";
256     $self->{hdl}->starttls ($self->{tls}, $self->{tls_ctx});
257 root 1.19 $s_auth = "tls";
258     $lauth = "";
259 root 1.30 } elsif (length $secret) {
260     $key = Digest::MD6::md6 $secret;
261 root 1.19 # we currently only support hmac_md6_64_256
262     $lauth = Digest::HMAC_MD6::hmac_md6_hex $key, "$lgreeting1\012$lgreeting2\012$rgreeting1\012$rgreeting2\012", 64, 256;
263 root 1.30 } else {
264     return $self->error ("unable to handshake TLS and no shared secret configured");
265 root 1.8 }
266 root 1.2
267 root 1.7 $self->{hdl}->push_write ("$s_auth;$lauth;$s_framing\012");
268 root 1.2
269 root 1.19 # read the authentication response
270 root 1.7 $self->{hdl}->push_read (line => sub {
271     my ($hdl, $rline) = @_;
272 root 1.2
273 root 1.7 my ($auth_method, $rauth2, $r_framing) = split /;/, $rline;
274 root 1.1
275 root 1.19 my $rauth =
276     $auth_method eq "hmac_md6_64_256" ? Digest::HMAC_MD6::hmac_md6_hex $key, "$rgreeting1\012$rgreeting2\012$lgreeting1\012$lgreeting2\012", 64, 256
277     : $auth_method eq "cleartext" ? unpack "H*", $secret
278     : $auth_method eq "tls" ? ($self->{tls} ? "" : "\012\012") # \012\012 never matches
279     : return $self->error ("$auth_method: fatal, selected unsupported auth method");
280    
281 root 1.7 if ($rauth2 ne $rauth) {
282     return $self->error ("authentication failure/shared secret mismatch");
283     }
284 root 1.1
285 root 1.7 $self->{s_framing} = $s_framing;
286 root 1.2
287 root 1.7 $hdl->rbuf_max (undef);
288     my $queue = delete $self->{queue}; # we are connected
289 root 1.1
290 root 1.27 $self->{hdl}->rtimeout ($self->{remote_greeting}{timeout});
291     $self->{hdl}->wtimeout ($arg{timeout} - LATENCY);
292     $self->{hdl}->on_wtimeout (sub { $self->send (["", "devnull"]) });
293 root 1.24
294 root 1.7 $self->connected;
295 root 1.1
296 root 1.27 # send queued messages
297 root 1.23 $self->send ($_)
298 root 1.7 for @$queue;
299 root 1.1
300 root 1.27 # receive handling
301     my $src_node = $self->{node};
302    
303 root 1.22 my $rmsg; $rmsg = sub {
304 root 1.7 $_[0]->push_read ($r_framing => $rmsg);
305 root 1.1
306 root 1.31 local $AnyEvent::MP::Kernel::SRCNODE = $src_node;
307     AnyEvent::MP::Kernel::_inject (@{ $_[1] });
308 root 1.7 };
309     $hdl->push_read ($r_framing => $rmsg);
310     });
311 root 1.1 });
312     });
313     }
314    
315     $self
316     }
317    
318     sub error {
319     my ($self, $msg) = @_;
320    
321 root 1.31 $self->{node}->transport_error (transport_error => $self->{node}{noderef}, $msg)
322     if $self->{node} && $self->{node}{transport} == $self;
323    
324     (delete $self->{release})->()
325     if exists $self->{release};
326    
327     $AnyEvent::MP::Kernel::WARN->("$self->{peerhost}:$self->{peerport}: $msg");
328 root 1.4 $self->destroy;
329 root 1.1 }
330    
331 root 1.2 sub connected {
332     my ($self) = @_;
333    
334 root 1.31 (delete $self->{release})->()
335     if exists $self->{release};
336 root 1.23
337 root 1.31 # first connect with a master node
338     $AnyEvent::MP::Kernel::SLAVE->($self->{remote_node})
339     if ref $AnyEvent::MP::Kernel::SLAVE;
340 root 1.23
341 root 1.31 my $node = AnyEvent::MP::Kernel::add_node ($self->{remote_node});
342 root 1.4 Scalar::Util::weaken ($self->{node} = $node);
343 root 1.31 $node->transport_connect ($self);
344 root 1.2 }
345    
346 root 1.1 sub send {
347 root 1.2 $_[0]{hdl}->push_write ($_[0]{s_framing} => $_[1]);
348 root 1.1 }
349    
350     sub destroy {
351     my ($self) = @_;
352    
353 root 1.2 $self->{hdl}->destroy
354     if $self->{hdl};
355 root 1.1 }
356    
357     sub DESTROY {
358     my ($self) = @_;
359    
360     $self->destroy;
361     }
362    
363     =back
364    
365 root 1.7 =head1 PROTOCOL
366    
367     The protocol is relatively simple, and consists of three phases which are
368     symmetrical for both sides: greeting (followed by optionally switching to
369     TLS mode), authentication and packet exchange.
370    
371     the protocol is designed to allow both full-text and binary streams.
372    
373     The greeting consists of two text lines that are ended by either an ASCII
374     CR LF pair, or a single ASCII LF (recommended).
375    
376     =head2 GREETING
377    
378 root 1.15 All the lines until after authentication must not exceed 4kb in length,
379     including delimiter. Afterwards there is no limit on the packet size that
380     can be received.
381    
382     =head3 First Greeting Line
383 root 1.12
384 root 1.16 Example:
385    
386     aemp;0;fec.4a7720fc;127.0.0.1:1235,[::1]:1235;hmac_md6_64_256;json,storable;provider=AE-0.0
387    
388     The first line contains strings separated (not ended) by C<;>
389     characters. The first even ixtrings are fixed by the protocol, the
390     remaining strings are C<KEY=VALUE> pairs. None of them may contain C<;>
391     characters themselves.
392    
393 root 1.12 The fixed strings are:
394 root 1.7
395     =over 4
396    
397 root 1.18 =item protocol identification
398 root 1.7
399     The constant C<aemp> to identify the protocol.
400    
401     =item protocol version
402    
403 root 1.12 The protocol version supported by this end, currently C<0>. If the
404     versions don't match then no communication is possible. Minor extensions
405 root 1.18 are supposed to be handled through additional key-value pairs.
406 root 1.7
407     =item the node endpoint descriptors
408    
409     for public nodes, this is a comma-separated list of protocol endpoints,
410 root 1.26 i.e., the noderef. For slave nodes, this is a unique identifier of the
411     form C<slave/nonce>.
412 root 1.7
413     =item the acceptable authentication methods
414    
415     A comma-separated list of authentication methods supported by the
416     node. Note that AnyEvent::MP supports a C<hex_secret> authentication
417     method that accepts a cleartext password (hex-encoded), but will not use
418     this auth method itself.
419    
420     The receiving side should choose the first auth method it supports.
421    
422     =item the acceptable framing formats
423    
424     A comma-separated list of packet encoding/framign formats understood. The
425     receiving side should choose the first framing format it supports for
426     sending packets (which might be different from the format it has to accept).
427    
428 root 1.10 =back
429 root 1.8
430     The remaining arguments are C<KEY=VALUE> pairs. The following key-value
431     pairs are known at this time:
432    
433     =over 4
434    
435     =item provider=<module-version>
436    
437     The software provider for this implementation. For AnyEvent::MP, this is
438     C<AE-0.0> or whatever version it currently is at.
439    
440     =item peeraddr=<host>:<port>
441    
442     The peer address (socket address of the other side) as seen locally, in the same format
443     as noderef endpoints.
444    
445     =item tls=<major>.<minor>
446    
447     Indicates that the other side supports TLS (version should be 1.0) and
448     wishes to do a TLS handshake.
449    
450 root 1.26 =item timeout=<seconds>
451 root 1.24
452 root 1.26 The amount of time after which this node should be detected as dead unless
453     some data has been received. The node is responsible to send traffic
454     reasonably more often than this interval (such as every timeout minus five
455     seconds).
456 root 1.24
457 root 1.8 =back
458    
459 root 1.15 =head3 Second Greeting Line
460    
461 root 1.8 After this greeting line there will be a second line containing a
462     cryptographic nonce, i.e. random data of high quality. To keep the
463     protocol text-only, these are usually 32 base64-encoded octets, but
464     it could be anything that doesn't contain any ASCII CR or ASCII LF
465     characters.
466    
467 root 1.14 I<< The two nonces B<must> be different, and an aemp implementation
468     B<must> check and fail when they are identical >>.
469    
470 root 1.16 Example of a nonce line:
471 root 1.8
472 root 1.12 p/I122ql7kJR8lumW3lXlXCeBnyDAvz8NQo3x5IFowE4
473 root 1.8
474     =head2 TLS handshake
475    
476 root 1.14 I<< If, after the handshake, both sides indicate interest in TLS, then the
477 root 1.20 connection B<must> use TLS, or fail. >>
478 root 1.8
479     Both sides compare their nonces, and the side who sent the lower nonce
480     value ("string" comparison on the raw octet values) becomes the client,
481     and the one with the higher nonce the server.
482    
483     =head2 AUTHENTICATION PHASE
484    
485     After the greeting is received (and the optional TLS handshake),
486     the authentication phase begins, which consists of sending a single
487     C<;>-separated line with three fixed strings and any number of
488     C<KEY=VALUE> pairs.
489    
490     The three fixed strings are:
491    
492     =over 4
493    
494     =item the authentication method chosen
495    
496     This must be one of the methods offered by the other side in the greeting.
497    
498 root 1.13 The currently supported authentication methods are:
499    
500     =over 4
501    
502     =item cleartext
503    
504     This is simply the shared secret, lowercase-hex-encoded. This method is of
505     course very insecure, unless TLS is used, which is why this module will
506     accept, but not generate, cleartext auth replies.
507    
508     =item hmac_md6_64_256
509    
510     This method uses an MD6 HMAC with 64 bit blocksize and 256 bit hash. First, the shared secret
511     is hashed with MD6:
512    
513     key = MD6 (secret)
514    
515     This secret is then used to generate the "local auth reply", by taking
516     the two local greeting lines and the two remote greeting lines (without
517     line endings), appending \012 to all of them, concatenating them and
518     calculating the MD6 HMAC with the key.
519    
520     lauth = HMAC_MD6 key, "lgreeting1\012lgreeting2\012rgreeting1\012rgreeting2\012"
521    
522     This authentication token is then lowercase-hex-encoded and sent to the
523     other side.
524    
525     Then the remote auth reply is generated using the same method, but local
526     and remote greeting lines swapped:
527    
528     rauth = HMAC_MD6 key, "rgreeting1\012rgreeting2\012lgreeting1\012lgreeting2\012"
529    
530     This is the token that is expected from the other side.
531    
532 root 1.19 =item tls
533    
534     This type is only valid iff TLS was enabled and the TLS handshake
535     was successful. It has no authentication data, as the server/client
536     certificate was successfully verified.
537    
538     Implementations supporting TLS I<must> accept this authentication type.
539    
540 root 1.13 =back
541    
542 root 1.8 =item the authentication data
543    
544 root 1.13 The authentication data itself, usually base64 or hex-encoded data, see
545     above.
546 root 1.8
547     =item the framing protocol chosen
548    
549     This must be one of the framing protocols offered by the other side in the
550     greeting. Each side must accept the choice of the other side.
551    
552     =back
553    
554 root 1.16 Example of an authentication reply:
555 root 1.9
556 root 1.13 hmac_md6_64_256;363d5175df38bd9eaddd3f6ca18aa1c0c4aa22f0da245ac638d048398c26b8d3;json
557 root 1.9
558 root 1.8 =head2 DATA PHASE
559    
560     After this, packets get exchanged using the chosen framing protocol. It is
561     quite possible that both sides use a different framing protocol.
562    
563 root 1.16 =head2 FULL EXAMPLE
564    
565 root 1.17 This is an actual protocol dump of a handshake, followed by a single data
566 root 1.16 packet. The greater than/less than lines indicate the direction of the
567     transfer only.
568    
569     > aemp;0;nndKd+gn;10.0.0.1:4040;hmac_md6_64_256,cleartext;json,storable;provider=AE-0.0;peeraddr=127.0.0.1:1235
570     > sRG8bbc4TDbkpvH8FTP4HBs87OhepH6VuApoZqXXskuG
571     < aemp;0;nmpKd+gh;127.0.0.1:1235,[::1]:1235;hmac_md6_64_256,cleartext;json,storable;provider=AE-0.0;peeraddr=127.0.0.1:58760
572     < dCEUcL/LJVSTJcx8byEsOzrwhzJYOq+L3YcopA5T6EAo
573     > hmac_md6_64_256;9513d4b258975accfcb2ab7532b83690e9c119a502c612203332a591c7237788;json
574     < hmac_md6_64_256;0298d6ba2240faabb2b2e881cf86b97d70a113ca74a87dc006f9f1e9d3010f90;json
575 root 1.18 > ["","lookup","pinger","10.0.0.1:4040#nndKd+gn.a","resolved"]
576 root 1.16
577 root 1.1 =head1 SEE ALSO
578    
579 root 1.29 L<AnyEvent::MP>.
580 root 1.1
581     =head1 AUTHOR
582    
583     Marc Lehmann <schmorp@schmorp.de>
584     http://home.schmorp.de/
585    
586     =cut
587    
588     1
589