ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Transport.pm
Revision: 1.8
Committed: Mon Aug 3 14:58:13 2009 UTC (14 years, 10 months ago) by root
Branch: MAIN
Changes since 1.7: +83 -14 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Transport - actual transport protocol
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Transport;
8    
9     =head1 DESCRIPTION
10    
11     This is the superclass for MP transports, most of which is considered an
12     implementation detail.
13    
14 root 1.7 See the "PROTOCOL" section below if you want to write another client for
15     this protocol.
16 root 1.1
17     =head1 FUNCTIONS/METHODS
18    
19     =over 4
20    
21     =cut
22    
23     package AnyEvent::MP::Transport;
24    
25     use common::sense;
26    
27     use Scalar::Util;
28     use MIME::Base64 ();
29     use Storable ();
30 root 1.2 use JSON::XS ();
31 root 1.1
32     use AE ();
33     use AnyEvent::Socket ();
34     use AnyEvent::Handle ();
35 root 1.2
36 root 1.1 use base Exporter::;
37    
38     our $VERSION = '0.0';
39 root 1.2 our $PROTOCOL_VERSION = 0;
40 root 1.1
41     =item $listener = mp_listener $host, $port, <constructor-args>, $cb->($transport)
42    
43     Creates a listener on the given host/port using
44     C<AnyEvent::Socket::tcp_server>.
45    
46     See C<new>, below, for constructor arguments.
47    
48     Defaults for peerhost, peerport, fh and tls are provided.
49    
50     =cut
51    
52     sub mp_server($$@) {
53     my $cb = pop;
54     my ($host, $port, @args) = @_;
55    
56     AnyEvent::Socket::tcp_server $host, $port, sub {
57     my ($fh, $host, $port) = @_;
58    
59     $cb->(new AnyEvent::MP::Transport
60     fh => $fh,
61     peerhost => $host,
62     peerport => $port,
63     tls => "accept",
64     @args,
65     );
66     }
67     }
68    
69 root 1.2 =item $guard = mp_connect $host, $port, <constructor-args>, $cb->($transport)
70    
71     =cut
72    
73     sub mp_connect {
74     my $cb = pop;
75     my ($host, $port, @args) = @_;
76    
77     AnyEvent::Socket::tcp_connect $host, $port, sub {
78     my ($fh, $nhost, $nport) = @_;
79    
80     return $cb->() unless $fh;
81    
82     $cb->(new AnyEvent::MP::Transport
83     fh => $fh,
84     peername => $host,
85     peerhost => $nhost,
86     peerport => $nport,
87     tls => "accept",
88     @args,
89     );
90     }
91     }
92    
93 root 1.1 =item new AnyEvent::MP::Transport
94    
95     # immediately starts negotiation
96     my $transport = new AnyEvent::MP::Transport
97 root 1.2 # mandatory
98 root 1.1 fh => $filehandle,
99 root 1.2 local_id => $identifier,
100 root 1.1 on_recv => sub { receive-callback },
101     on_error => sub { error-callback },
102    
103     # optional
104     secret => "shared secret",
105     on_eof => sub { clean-close-callback },
106     on_connect => sub { successful-connect-callback },
107 root 1.2 greeting => { key => value },
108 root 1.1
109     # tls support
110     tls => "accept|connect",
111     tls_ctx => AnyEvent::TLS,
112     peername => $peername, # for verification
113     ;
114    
115     =cut
116    
117 root 1.7 our @FRAMINGS = qw(json storable); # the framing types we accept and send, in order of preference
118     our @AUTH_SND = qw(hmac_md6_64_256); # auth types we send
119     our @AUTH_RCV = (@AUTH_SND, qw(hex_secret)); # auth types we accept
120    
121     #AnyEvent::Handle::register_write_type mp_record => sub {
122     #};
123 root 1.4
124 root 1.1 sub new {
125     my ($class, %arg) = @_;
126    
127     my $self = bless \%arg, $class;
128    
129     $self->{queue} = [];
130    
131     {
132     Scalar::Util::weaken (my $self = $self);
133    
134     if (exists $arg{connect}) {
135 root 1.2 $arg{tls_ctx} ||= { sslv2 => 0, sslv3 => 0, tlsv1 => 1, verify => 1 };
136 root 1.1 }
137    
138 root 1.5 $arg{secret} = AnyEvent::MP::Base::default_secret ()
139 root 1.2 unless exists $arg{secret};
140    
141 root 1.1 $self->{hdl} = new AnyEvent::Handle
142 root 1.2 fh => delete $arg{fh},
143     rbuf_max => 64 * 1024,
144 root 1.4 autocork => 1,
145     no_delay => 1,
146 root 1.1 on_error => sub {
147     $self->error ($_[2]);
148     },
149     peername => delete $arg{peername},
150     ;
151    
152 root 1.2 my $secret = $arg{secret};
153     my $greeting_kv = $self->{greeting} ||= {};
154 root 1.8 $greeting_kv->{"tls"} = "1.0"
155     if $arg{tls_ctx};
156 root 1.2 $greeting_kv->{provider} = "AE-$VERSION";
157 root 1.7 $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport};
158 root 1.1
159     # send greeting
160 root 1.7 my $lgreeting1 = "aemp;$PROTOCOL_VERSION;$PROTOCOL_VERSION" # version, min
161     . ";$AnyEvent::MP::Base::UNIQ"
162     . ";$AnyEvent::MP::Base::NODE"
163     . ";" . (join ",", @AUTH_RCV)
164     . ";" . (join ",", @FRAMINGS)
165     . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv);
166     my $lgreeting2 = MIME::Base64::encode_base64 AnyEvent::MP::Base::nonce (33), "";
167 root 1.1
168 root 1.7 $self->{hdl}->push_write ("$lgreeting1\012$lgreeting2\012");
169 root 1.1
170     # expect greeting
171     $self->{hdl}->push_read (line => sub {
172 root 1.7 my $rgreeting1 = $_[1];
173 root 1.1
174 root 1.7 my ($aemp, $version, $version_min, $uniq, $rnode, $auths, $framings, @kv) = split /;/, $rgreeting1;
175 root 1.1
176     if ($aemp ne "aemp") {
177     return $self->error ("unparsable greeting");
178 root 1.2 } elsif ($version_min > $PROTOCOL_VERSION) {
179     return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version_min .. $version)");
180 root 1.1 }
181    
182 root 1.7 my $s_auth;
183     for my $auth_ (split /,/, $auths) {
184     if (grep $auth_ eq $_, @AUTH_SND) {
185     $s_auth = $auth_;
186     last;
187     }
188     }
189    
190     defined $s_auth
191     or return $self->error ("$auths: no common auth type supported");
192    
193     die unless $s_auth eq "hmac_md6_64_256"; # hardcoded atm.
194    
195     my $s_framing;
196     for my $framing_ (split /,/, $framings) {
197     if (grep $framing_ eq $_, @FRAMINGS) {
198     $s_framing = $framing_;
199     last;
200     }
201     }
202    
203     defined $s_framing
204     or return $self->error ("$framings: no common framing method supported");
205    
206 root 1.2 $self->{remote_uniq} = $uniq;
207     $self->{remote_node} = $rnode;
208 root 1.1
209 root 1.2 $self->{remote_greeting} = {
210     map /^([^=]+)(?:=(.*))?/ ? ($1 => $2) : (),
211     @kv
212 root 1.1 };
213    
214 root 1.7 # read nonce
215     $self->{hdl}->push_read (line => sub {
216     my $rgreeting2 = $_[1];
217    
218 root 1.8 if ($self->{tls_ctx} and 1 == int $self->{remote_greeting}{"tls"}) {
219     $self->{tls} = $lgreeting2 lt $rgreeting2 ? "connect" : "accept";
220     $self->{hdl}->starttls ($self->{tls}, $self->{tls_ctx});
221     }
222    
223 root 1.7 # auth
224     require Digest::MD6;
225     require Digest::HMAC_MD6;
226 root 1.1
227 root 1.7 my $key = Digest::MD6::md6_hex ($secret);
228     my $lauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$lgreeting1\012$lgreeting2\012$rgreeting1\012$rgreeting2\012", 64, 256);
229 root 1.1
230 root 1.7 my $rauth =
231     $s_auth eq "hmac_md6_64_256" ? Digest::HMAC_MD6::hmac_md6_base64 ($key, "$rgreeting1\012$rgreeting2\012$lgreeting1\012$lgreeting2\012", 64, 256)
232     : $s_auth eq "hex_secret" ? unpack "H*", $secret
233     : die;
234 root 1.1
235 root 1.7 $lauth ne $rauth # echo attack?
236     or return $self->error ("authentication error");
237 root 1.2
238 root 1.7 $self->{hdl}->push_write ("$s_auth;$lauth;$s_framing\012");
239 root 1.2
240 root 1.7 $self->{hdl}->rbuf_max (64); # enough for 44 reply bytes or so
241     $self->{hdl}->push_read (line => sub {
242     my ($hdl, $rline) = @_;
243 root 1.2
244 root 1.7 my ($auth_method, $rauth2, $r_framing) = split /;/, $rline;
245 root 1.1
246 root 1.7 if ($rauth2 ne $rauth) {
247     return $self->error ("authentication failure/shared secret mismatch");
248     }
249 root 1.1
250 root 1.7 $self->{s_framing} = $s_framing;
251 root 1.2
252 root 1.7 $hdl->rbuf_max (undef);
253     my $queue = delete $self->{queue}; # we are connected
254 root 1.1
255 root 1.7 $self->connected;
256 root 1.1
257 root 1.7 $hdl->push_write ($self->{s_framing} => $_)
258     for @$queue;
259 root 1.1
260 root 1.7 my $rmsg; $rmsg = sub {
261     $_[0]->push_read ($r_framing => $rmsg);
262 root 1.1
263 root 1.7 AnyEvent::MP::Base::_inject ($_[1]);
264     };
265     $hdl->push_read ($r_framing => $rmsg);
266     });
267 root 1.1 });
268     });
269     }
270    
271     $self
272     }
273    
274     sub error {
275     my ($self, $msg) = @_;
276    
277 root 1.4 if ($self->{node} && $self->{node}{transport} == $self) {
278     $self->{node}->clr_transport;
279     }
280 root 1.7 $AnyEvent::MP::Base::WARN->("$self->{peerhost}:$self->{peerport}: $msg");
281 root 1.4 $self->destroy;
282 root 1.1 }
283    
284 root 1.2 sub connected {
285     my ($self) = @_;
286    
287 root 1.5 my $node = AnyEvent::MP::Base::add_node ($self->{remote_node});
288 root 1.4 Scalar::Util::weaken ($self->{node} = $node);
289     $node->set_transport ($self);
290 root 1.2 }
291    
292 root 1.1 sub send {
293 root 1.2 $_[0]{hdl}->push_write ($_[0]{s_framing} => $_[1]);
294 root 1.1 }
295    
296     sub destroy {
297     my ($self) = @_;
298    
299 root 1.2 $self->{hdl}->destroy
300     if $self->{hdl};
301 root 1.1 }
302    
303     sub DESTROY {
304     my ($self) = @_;
305    
306     $self->destroy;
307     }
308    
309     =back
310    
311 root 1.7 =head1 PROTOCOL
312    
313     The protocol is relatively simple, and consists of three phases which are
314     symmetrical for both sides: greeting (followed by optionally switching to
315     TLS mode), authentication and packet exchange.
316    
317     the protocol is designed to allow both full-text and binary streams.
318    
319     The greeting consists of two text lines that are ended by either an ASCII
320     CR LF pair, or a single ASCII LF (recommended).
321    
322     =head2 GREETING
323    
324 root 1.8 The first line contains strings separated (not ended) by C<;>
325 root 1.7 characters. The first seven strings are fixed by the protocol, the
326     remaining strings are C<KEY=VALUE> pairs. None of them may contain C<;>
327     characters themselves.
328    
329     The seven fixed strings are:
330    
331     =over 4
332    
333     =item C<aemp>
334    
335     The constant C<aemp> to identify the protocol.
336    
337     =item protocol version
338    
339     The (maximum) protocol version supported by this end, currently C<0>.
340    
341     =item minimum protocol version
342    
343     The minimum protocol version supported by this end, currently C<0>.
344    
345     =item a token uniquely identifying the current node instance
346    
347     This is a string that must change between restarts. It usually contains
348     things like the current time, the (OS) process id or similar values, but
349     no meaning of the contents are assumed.
350    
351     =item the node endpoint descriptors
352    
353     for public nodes, this is a comma-separated list of protocol endpoints,
354     i.e., the noderef. For slave nodes, this is a unique identifier.
355    
356     =item the acceptable authentication methods
357    
358     A comma-separated list of authentication methods supported by the
359     node. Note that AnyEvent::MP supports a C<hex_secret> authentication
360     method that accepts a cleartext password (hex-encoded), but will not use
361     this auth method itself.
362    
363     The receiving side should choose the first auth method it supports.
364    
365     =item the acceptable framing formats
366    
367     A comma-separated list of packet encoding/framign formats understood. The
368     receiving side should choose the first framing format it supports for
369     sending packets (which might be different from the format it has to accept).
370    
371 root 1.8 =cut
372    
373     The remaining arguments are C<KEY=VALUE> pairs. The following key-value
374     pairs are known at this time:
375    
376     =over 4
377    
378     =item provider=<module-version>
379    
380     The software provider for this implementation. For AnyEvent::MP, this is
381     C<AE-0.0> or whatever version it currently is at.
382    
383     =item peeraddr=<host>:<port>
384    
385     The peer address (socket address of the other side) as seen locally, in the same format
386     as noderef endpoints.
387    
388     =item tls=<major>.<minor>
389    
390     Indicates that the other side supports TLS (version should be 1.0) and
391     wishes to do a TLS handshake.
392    
393     =back
394    
395     After this greeting line there will be a second line containing a
396     cryptographic nonce, i.e. random data of high quality. To keep the
397     protocol text-only, these are usually 32 base64-encoded octets, but
398     it could be anything that doesn't contain any ASCII CR or ASCII LF
399     characters.
400    
401     Example of the two lines of greeting:
402    
403     aemp;0;0;e7d.4a76f48f;10.0.0.1:4040;hmac_md6_64_256,hex_secret;json,storable;provider=AE-0.0;peeraddr=127.0.0.1:1235
404     XntegV2Guvss0qNn7phCPnoU87xqxV+4Mqm/5y4iQm6a
405    
406     =head2 TLS handshake
407    
408     If, after the handshake, both sides indicate interest in TLS, then the
409     connection I<must> use TLS, or fail.
410    
411     Both sides compare their nonces, and the side who sent the lower nonce
412     value ("string" comparison on the raw octet values) becomes the client,
413     and the one with the higher nonce the server.
414    
415     =head2 AUTHENTICATION PHASE
416    
417     After the greeting is received (and the optional TLS handshake),
418     the authentication phase begins, which consists of sending a single
419     C<;>-separated line with three fixed strings and any number of
420     C<KEY=VALUE> pairs.
421    
422     The three fixed strings are:
423    
424     =over 4
425    
426     =item the authentication method chosen
427    
428     This must be one of the methods offered by the other side in the greeting.
429    
430     =item the authentication data
431    
432     The authentication data itself, usually base64 or hex-encoded data.
433    
434     =item the framing protocol chosen
435    
436     This must be one of the framing protocols offered by the other side in the
437     greeting. Each side must accept the choice of the other side.
438    
439     =back
440    
441     =head2 DATA PHASE
442    
443     After this, packets get exchanged using the chosen framing protocol. It is
444     quite possible that both sides use a different framing protocol.
445    
446 root 1.1 =head1 SEE ALSO
447    
448     L<AnyEvent>.
449    
450     =head1 AUTHOR
451    
452     Marc Lehmann <schmorp@schmorp.de>
453     http://home.schmorp.de/
454    
455     =cut
456    
457     1
458