ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Transport.pm
Revision: 1.11
Committed: Mon Aug 3 15:40:53 2009 UTC (14 years, 10 months ago) by root
Branch: MAIN
Changes since 1.10: +10 -2 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Transport - actual transport protocol
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Transport;
8    
9     =head1 DESCRIPTION
10    
11     This is the superclass for MP transports, most of which is considered an
12     implementation detail.
13    
14 root 1.7 See the "PROTOCOL" section below if you want to write another client for
15     this protocol.
16 root 1.1
17     =head1 FUNCTIONS/METHODS
18    
19     =over 4
20    
21     =cut
22    
23     package AnyEvent::MP::Transport;
24    
25     use common::sense;
26    
27     use Scalar::Util;
28     use MIME::Base64 ();
29     use Storable ();
30 root 1.2 use JSON::XS ();
31 root 1.1
32     use AE ();
33     use AnyEvent::Socket ();
34     use AnyEvent::Handle ();
35 root 1.2
36 root 1.1 use base Exporter::;
37    
38     our $VERSION = '0.0';
39 root 1.2 our $PROTOCOL_VERSION = 0;
40 root 1.1
41     =item $listener = mp_listener $host, $port, <constructor-args>, $cb->($transport)
42    
43     Creates a listener on the given host/port using
44     C<AnyEvent::Socket::tcp_server>.
45    
46     See C<new>, below, for constructor arguments.
47    
48 root 1.10 Defaults for peerhost, peerport and fh are provided.
49 root 1.1
50     =cut
51    
52     sub mp_server($$@) {
53     my $cb = pop;
54     my ($host, $port, @args) = @_;
55    
56     AnyEvent::Socket::tcp_server $host, $port, sub {
57     my ($fh, $host, $port) = @_;
58    
59     $cb->(new AnyEvent::MP::Transport
60     fh => $fh,
61     peerhost => $host,
62     peerport => $port,
63     @args,
64     );
65     }
66     }
67    
68 root 1.2 =item $guard = mp_connect $host, $port, <constructor-args>, $cb->($transport)
69    
70     =cut
71    
72     sub mp_connect {
73     my $cb = pop;
74     my ($host, $port, @args) = @_;
75    
76     AnyEvent::Socket::tcp_connect $host, $port, sub {
77     my ($fh, $nhost, $nport) = @_;
78    
79     return $cb->() unless $fh;
80    
81     $cb->(new AnyEvent::MP::Transport
82     fh => $fh,
83     peername => $host,
84     peerhost => $nhost,
85     peerport => $nport,
86     @args,
87     );
88     }
89     }
90    
91 root 1.1 =item new AnyEvent::MP::Transport
92    
93     # immediately starts negotiation
94     my $transport = new AnyEvent::MP::Transport
95 root 1.2 # mandatory
96 root 1.1 fh => $filehandle,
97 root 1.2 local_id => $identifier,
98 root 1.1 on_recv => sub { receive-callback },
99     on_error => sub { error-callback },
100    
101     # optional
102     secret => "shared secret",
103     on_eof => sub { clean-close-callback },
104     on_connect => sub { successful-connect-callback },
105 root 1.2 greeting => { key => value },
106 root 1.1
107     # tls support
108     tls_ctx => AnyEvent::TLS,
109     peername => $peername, # for verification
110     ;
111    
112     =cut
113    
114 root 1.7 our @FRAMINGS = qw(json storable); # the framing types we accept and send, in order of preference
115     our @AUTH_SND = qw(hmac_md6_64_256); # auth types we send
116     our @AUTH_RCV = (@AUTH_SND, qw(hex_secret)); # auth types we accept
117    
118     #AnyEvent::Handle::register_write_type mp_record => sub {
119     #};
120 root 1.4
121 root 1.1 sub new {
122     my ($class, %arg) = @_;
123    
124     my $self = bless \%arg, $class;
125    
126     $self->{queue} = [];
127    
128     {
129     Scalar::Util::weaken (my $self = $self);
130    
131 root 1.11 $arg{tls_ctx_disabled} ||= {
132     sslv2 => 0,
133     sslv3 => 0,
134     tlsv1 => 1,
135     verify => 1,
136     cert_file => "secret.pem",
137     ca_file => "secret.pem",
138     verify_require_client_cert => 1,
139     };
140 root 1.1
141 root 1.5 $arg{secret} = AnyEvent::MP::Base::default_secret ()
142 root 1.2 unless exists $arg{secret};
143    
144 root 1.1 $self->{hdl} = new AnyEvent::Handle
145 root 1.2 fh => delete $arg{fh},
146     rbuf_max => 64 * 1024,
147 root 1.4 autocork => 1,
148     no_delay => 1,
149 root 1.1 on_error => sub {
150     $self->error ($_[2]);
151     },
152     peername => delete $arg{peername},
153     ;
154    
155 root 1.2 my $secret = $arg{secret};
156     my $greeting_kv = $self->{greeting} ||= {};
157 root 1.8 $greeting_kv->{"tls"} = "1.0"
158     if $arg{tls_ctx};
159 root 1.2 $greeting_kv->{provider} = "AE-$VERSION";
160 root 1.7 $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport};
161 root 1.1
162     # send greeting
163 root 1.7 my $lgreeting1 = "aemp;$PROTOCOL_VERSION;$PROTOCOL_VERSION" # version, min
164     . ";$AnyEvent::MP::Base::UNIQ"
165     . ";$AnyEvent::MP::Base::NODE"
166     . ";" . (join ",", @AUTH_RCV)
167     . ";" . (join ",", @FRAMINGS)
168     . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv);
169     my $lgreeting2 = MIME::Base64::encode_base64 AnyEvent::MP::Base::nonce (33), "";
170 root 1.1
171 root 1.7 $self->{hdl}->push_write ("$lgreeting1\012$lgreeting2\012");
172 root 1.1
173     # expect greeting
174     $self->{hdl}->push_read (line => sub {
175 root 1.7 my $rgreeting1 = $_[1];
176 root 1.1
177 root 1.7 my ($aemp, $version, $version_min, $uniq, $rnode, $auths, $framings, @kv) = split /;/, $rgreeting1;
178 root 1.1
179     if ($aemp ne "aemp") {
180     return $self->error ("unparsable greeting");
181 root 1.2 } elsif ($version_min > $PROTOCOL_VERSION) {
182     return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version_min .. $version)");
183 root 1.1 }
184    
185 root 1.7 my $s_auth;
186     for my $auth_ (split /,/, $auths) {
187     if (grep $auth_ eq $_, @AUTH_SND) {
188     $s_auth = $auth_;
189     last;
190     }
191     }
192    
193     defined $s_auth
194     or return $self->error ("$auths: no common auth type supported");
195    
196     die unless $s_auth eq "hmac_md6_64_256"; # hardcoded atm.
197    
198     my $s_framing;
199     for my $framing_ (split /,/, $framings) {
200     if (grep $framing_ eq $_, @FRAMINGS) {
201     $s_framing = $framing_;
202     last;
203     }
204     }
205    
206     defined $s_framing
207     or return $self->error ("$framings: no common framing method supported");
208    
209 root 1.2 $self->{remote_uniq} = $uniq;
210     $self->{remote_node} = $rnode;
211 root 1.1
212 root 1.2 $self->{remote_greeting} = {
213     map /^([^=]+)(?:=(.*))?/ ? ($1 => $2) : (),
214     @kv
215 root 1.1 };
216    
217 root 1.7 # read nonce
218     $self->{hdl}->push_read (line => sub {
219     my $rgreeting2 = $_[1];
220    
221 root 1.10 if ($self->{tls_ctx} and 1 == int $self->{remote_greeting}{tls}) {
222 root 1.8 $self->{tls} = $lgreeting2 lt $rgreeting2 ? "connect" : "accept";
223     $self->{hdl}->starttls ($self->{tls}, $self->{tls_ctx});
224     }
225    
226 root 1.7 # auth
227     require Digest::MD6;
228     require Digest::HMAC_MD6;
229 root 1.1
230 root 1.11 my $key = Digest::MD6::md6 ($secret);
231 root 1.7 my $lauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$lgreeting1\012$lgreeting2\012$rgreeting1\012$rgreeting2\012", 64, 256);
232 root 1.1
233 root 1.7 my $rauth =
234     $s_auth eq "hmac_md6_64_256" ? Digest::HMAC_MD6::hmac_md6_base64 ($key, "$rgreeting1\012$rgreeting2\012$lgreeting1\012$lgreeting2\012", 64, 256)
235     : $s_auth eq "hex_secret" ? unpack "H*", $secret
236     : die;
237 root 1.1
238 root 1.7 $lauth ne $rauth # echo attack?
239     or return $self->error ("authentication error");
240 root 1.2
241 root 1.7 $self->{hdl}->push_write ("$s_auth;$lauth;$s_framing\012");
242 root 1.2
243 root 1.7 $self->{hdl}->rbuf_max (64); # enough for 44 reply bytes or so
244     $self->{hdl}->push_read (line => sub {
245     my ($hdl, $rline) = @_;
246 root 1.2
247 root 1.7 my ($auth_method, $rauth2, $r_framing) = split /;/, $rline;
248 root 1.1
249 root 1.7 if ($rauth2 ne $rauth) {
250     return $self->error ("authentication failure/shared secret mismatch");
251     }
252 root 1.1
253 root 1.7 $self->{s_framing} = $s_framing;
254 root 1.2
255 root 1.7 $hdl->rbuf_max (undef);
256     my $queue = delete $self->{queue}; # we are connected
257 root 1.1
258 root 1.7 $self->connected;
259 root 1.1
260 root 1.7 $hdl->push_write ($self->{s_framing} => $_)
261     for @$queue;
262 root 1.1
263 root 1.7 my $rmsg; $rmsg = sub {
264     $_[0]->push_read ($r_framing => $rmsg);
265 root 1.1
266 root 1.7 AnyEvent::MP::Base::_inject ($_[1]);
267     };
268     $hdl->push_read ($r_framing => $rmsg);
269     });
270 root 1.1 });
271     });
272     }
273    
274     $self
275     }
276    
277     sub error {
278     my ($self, $msg) = @_;
279    
280 root 1.4 if ($self->{node} && $self->{node}{transport} == $self) {
281     $self->{node}->clr_transport;
282     }
283 root 1.7 $AnyEvent::MP::Base::WARN->("$self->{peerhost}:$self->{peerport}: $msg");
284 root 1.4 $self->destroy;
285 root 1.1 }
286    
287 root 1.2 sub connected {
288     my ($self) = @_;
289    
290 root 1.5 my $node = AnyEvent::MP::Base::add_node ($self->{remote_node});
291 root 1.4 Scalar::Util::weaken ($self->{node} = $node);
292     $node->set_transport ($self);
293 root 1.2 }
294    
295 root 1.1 sub send {
296 root 1.2 $_[0]{hdl}->push_write ($_[0]{s_framing} => $_[1]);
297 root 1.1 }
298    
299     sub destroy {
300     my ($self) = @_;
301    
302 root 1.2 $self->{hdl}->destroy
303     if $self->{hdl};
304 root 1.1 }
305    
306     sub DESTROY {
307     my ($self) = @_;
308    
309     $self->destroy;
310     }
311    
312     =back
313    
314 root 1.7 =head1 PROTOCOL
315    
316     The protocol is relatively simple, and consists of three phases which are
317     symmetrical for both sides: greeting (followed by optionally switching to
318     TLS mode), authentication and packet exchange.
319    
320     the protocol is designed to allow both full-text and binary streams.
321    
322     The greeting consists of two text lines that are ended by either an ASCII
323     CR LF pair, or a single ASCII LF (recommended).
324    
325     =head2 GREETING
326    
327 root 1.8 The first line contains strings separated (not ended) by C<;>
328 root 1.7 characters. The first seven strings are fixed by the protocol, the
329     remaining strings are C<KEY=VALUE> pairs. None of them may contain C<;>
330     characters themselves.
331    
332     The seven fixed strings are:
333    
334     =over 4
335    
336     =item C<aemp>
337    
338     The constant C<aemp> to identify the protocol.
339    
340     =item protocol version
341    
342     The (maximum) protocol version supported by this end, currently C<0>.
343    
344     =item minimum protocol version
345    
346     The minimum protocol version supported by this end, currently C<0>.
347    
348     =item a token uniquely identifying the current node instance
349    
350     This is a string that must change between restarts. It usually contains
351     things like the current time, the (OS) process id or similar values, but
352     no meaning of the contents are assumed.
353    
354     =item the node endpoint descriptors
355    
356     for public nodes, this is a comma-separated list of protocol endpoints,
357     i.e., the noderef. For slave nodes, this is a unique identifier.
358    
359     =item the acceptable authentication methods
360    
361     A comma-separated list of authentication methods supported by the
362     node. Note that AnyEvent::MP supports a C<hex_secret> authentication
363     method that accepts a cleartext password (hex-encoded), but will not use
364     this auth method itself.
365    
366     The receiving side should choose the first auth method it supports.
367    
368     =item the acceptable framing formats
369    
370     A comma-separated list of packet encoding/framign formats understood. The
371     receiving side should choose the first framing format it supports for
372     sending packets (which might be different from the format it has to accept).
373    
374 root 1.10 =back
375 root 1.8
376     The remaining arguments are C<KEY=VALUE> pairs. The following key-value
377     pairs are known at this time:
378    
379     =over 4
380    
381     =item provider=<module-version>
382    
383     The software provider for this implementation. For AnyEvent::MP, this is
384     C<AE-0.0> or whatever version it currently is at.
385    
386     =item peeraddr=<host>:<port>
387    
388     The peer address (socket address of the other side) as seen locally, in the same format
389     as noderef endpoints.
390    
391     =item tls=<major>.<minor>
392    
393     Indicates that the other side supports TLS (version should be 1.0) and
394     wishes to do a TLS handshake.
395    
396     =back
397    
398     After this greeting line there will be a second line containing a
399     cryptographic nonce, i.e. random data of high quality. To keep the
400     protocol text-only, these are usually 32 base64-encoded octets, but
401     it could be anything that doesn't contain any ASCII CR or ASCII LF
402     characters.
403    
404     Example of the two lines of greeting:
405    
406     aemp;0;0;e7d.4a76f48f;10.0.0.1:4040;hmac_md6_64_256,hex_secret;json,storable;provider=AE-0.0;peeraddr=127.0.0.1:1235
407     XntegV2Guvss0qNn7phCPnoU87xqxV+4Mqm/5y4iQm6a
408    
409     =head2 TLS handshake
410    
411     If, after the handshake, both sides indicate interest in TLS, then the
412     connection I<must> use TLS, or fail.
413    
414     Both sides compare their nonces, and the side who sent the lower nonce
415     value ("string" comparison on the raw octet values) becomes the client,
416     and the one with the higher nonce the server.
417    
418     =head2 AUTHENTICATION PHASE
419    
420     After the greeting is received (and the optional TLS handshake),
421     the authentication phase begins, which consists of sending a single
422     C<;>-separated line with three fixed strings and any number of
423     C<KEY=VALUE> pairs.
424    
425     The three fixed strings are:
426    
427     =over 4
428    
429     =item the authentication method chosen
430    
431     This must be one of the methods offered by the other side in the greeting.
432    
433     =item the authentication data
434    
435     The authentication data itself, usually base64 or hex-encoded data.
436    
437     =item the framing protocol chosen
438    
439     This must be one of the framing protocols offered by the other side in the
440     greeting. Each side must accept the choice of the other side.
441    
442     =back
443    
444 root 1.9 Example (the actual reply matching the previous example):
445    
446     hmac_md6_64_256;wIlLedBY956UCGSISG9mBZRDTG8xUi73/sVse2DSQp0;json
447    
448 root 1.8 =head2 DATA PHASE
449    
450     After this, packets get exchanged using the chosen framing protocol. It is
451     quite possible that both sides use a different framing protocol.
452    
453 root 1.1 =head1 SEE ALSO
454    
455     L<AnyEvent>.
456    
457     =head1 AUTHOR
458    
459     Marc Lehmann <schmorp@schmorp.de>
460     http://home.schmorp.de/
461    
462     =cut
463    
464     1
465