ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Transport.pm
Revision: 1.10
Committed: Mon Aug 3 15:02:42 2009 UTC (14 years, 11 months ago) by root
Branch: MAIN
Changes since 1.9: +4 -10 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Transport - actual transport protocol
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Transport;
8    
9     =head1 DESCRIPTION
10    
11     This is the superclass for MP transports, most of which is considered an
12     implementation detail.
13    
14 root 1.7 See the "PROTOCOL" section below if you want to write another client for
15     this protocol.
16 root 1.1
17     =head1 FUNCTIONS/METHODS
18    
19     =over 4
20    
21     =cut
22    
23     package AnyEvent::MP::Transport;
24    
25     use common::sense;
26    
27     use Scalar::Util;
28     use MIME::Base64 ();
29     use Storable ();
30 root 1.2 use JSON::XS ();
31 root 1.1
32     use AE ();
33     use AnyEvent::Socket ();
34     use AnyEvent::Handle ();
35 root 1.2
36 root 1.1 use base Exporter::;
37    
38     our $VERSION = '0.0';
39 root 1.2 our $PROTOCOL_VERSION = 0;
40 root 1.1
41     =item $listener = mp_listener $host, $port, <constructor-args>, $cb->($transport)
42    
43     Creates a listener on the given host/port using
44     C<AnyEvent::Socket::tcp_server>.
45    
46     See C<new>, below, for constructor arguments.
47    
48 root 1.10 Defaults for peerhost, peerport and fh are provided.
49 root 1.1
50     =cut
51    
52     sub mp_server($$@) {
53     my $cb = pop;
54     my ($host, $port, @args) = @_;
55    
56     AnyEvent::Socket::tcp_server $host, $port, sub {
57     my ($fh, $host, $port) = @_;
58    
59     $cb->(new AnyEvent::MP::Transport
60     fh => $fh,
61     peerhost => $host,
62     peerport => $port,
63     @args,
64     );
65     }
66     }
67    
68 root 1.2 =item $guard = mp_connect $host, $port, <constructor-args>, $cb->($transport)
69    
70     =cut
71    
72     sub mp_connect {
73     my $cb = pop;
74     my ($host, $port, @args) = @_;
75    
76     AnyEvent::Socket::tcp_connect $host, $port, sub {
77     my ($fh, $nhost, $nport) = @_;
78    
79     return $cb->() unless $fh;
80    
81     $cb->(new AnyEvent::MP::Transport
82     fh => $fh,
83     peername => $host,
84     peerhost => $nhost,
85     peerport => $nport,
86     @args,
87     );
88     }
89     }
90    
91 root 1.1 =item new AnyEvent::MP::Transport
92    
93     # immediately starts negotiation
94     my $transport = new AnyEvent::MP::Transport
95 root 1.2 # mandatory
96 root 1.1 fh => $filehandle,
97 root 1.2 local_id => $identifier,
98 root 1.1 on_recv => sub { receive-callback },
99     on_error => sub { error-callback },
100    
101     # optional
102     secret => "shared secret",
103     on_eof => sub { clean-close-callback },
104     on_connect => sub { successful-connect-callback },
105 root 1.2 greeting => { key => value },
106 root 1.1
107     # tls support
108     tls_ctx => AnyEvent::TLS,
109     peername => $peername, # for verification
110     ;
111    
112     =cut
113    
114 root 1.7 our @FRAMINGS = qw(json storable); # the framing types we accept and send, in order of preference
115     our @AUTH_SND = qw(hmac_md6_64_256); # auth types we send
116     our @AUTH_RCV = (@AUTH_SND, qw(hex_secret)); # auth types we accept
117    
118     #AnyEvent::Handle::register_write_type mp_record => sub {
119     #};
120 root 1.4
121 root 1.1 sub new {
122     my ($class, %arg) = @_;
123    
124     my $self = bless \%arg, $class;
125    
126     $self->{queue} = [];
127    
128     {
129     Scalar::Util::weaken (my $self = $self);
130    
131 root 1.10 #$arg{tls_ctx} ||= { sslv2 => 0, sslv3 => 0, tlsv1 => 1, verify => 1 };
132 root 1.1
133 root 1.5 $arg{secret} = AnyEvent::MP::Base::default_secret ()
134 root 1.2 unless exists $arg{secret};
135    
136 root 1.1 $self->{hdl} = new AnyEvent::Handle
137 root 1.2 fh => delete $arg{fh},
138     rbuf_max => 64 * 1024,
139 root 1.4 autocork => 1,
140     no_delay => 1,
141 root 1.1 on_error => sub {
142     $self->error ($_[2]);
143     },
144     peername => delete $arg{peername},
145     ;
146    
147 root 1.2 my $secret = $arg{secret};
148     my $greeting_kv = $self->{greeting} ||= {};
149 root 1.8 $greeting_kv->{"tls"} = "1.0"
150     if $arg{tls_ctx};
151 root 1.2 $greeting_kv->{provider} = "AE-$VERSION";
152 root 1.7 $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport};
153 root 1.1
154     # send greeting
155 root 1.7 my $lgreeting1 = "aemp;$PROTOCOL_VERSION;$PROTOCOL_VERSION" # version, min
156     . ";$AnyEvent::MP::Base::UNIQ"
157     . ";$AnyEvent::MP::Base::NODE"
158     . ";" . (join ",", @AUTH_RCV)
159     . ";" . (join ",", @FRAMINGS)
160     . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv);
161     my $lgreeting2 = MIME::Base64::encode_base64 AnyEvent::MP::Base::nonce (33), "";
162 root 1.1
163 root 1.7 $self->{hdl}->push_write ("$lgreeting1\012$lgreeting2\012");
164 root 1.1
165     # expect greeting
166     $self->{hdl}->push_read (line => sub {
167 root 1.7 my $rgreeting1 = $_[1];
168 root 1.1
169 root 1.7 my ($aemp, $version, $version_min, $uniq, $rnode, $auths, $framings, @kv) = split /;/, $rgreeting1;
170 root 1.1
171     if ($aemp ne "aemp") {
172     return $self->error ("unparsable greeting");
173 root 1.2 } elsif ($version_min > $PROTOCOL_VERSION) {
174     return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version_min .. $version)");
175 root 1.1 }
176    
177 root 1.7 my $s_auth;
178     for my $auth_ (split /,/, $auths) {
179     if (grep $auth_ eq $_, @AUTH_SND) {
180     $s_auth = $auth_;
181     last;
182     }
183     }
184    
185     defined $s_auth
186     or return $self->error ("$auths: no common auth type supported");
187    
188     die unless $s_auth eq "hmac_md6_64_256"; # hardcoded atm.
189    
190     my $s_framing;
191     for my $framing_ (split /,/, $framings) {
192     if (grep $framing_ eq $_, @FRAMINGS) {
193     $s_framing = $framing_;
194     last;
195     }
196     }
197    
198     defined $s_framing
199     or return $self->error ("$framings: no common framing method supported");
200    
201 root 1.2 $self->{remote_uniq} = $uniq;
202     $self->{remote_node} = $rnode;
203 root 1.1
204 root 1.2 $self->{remote_greeting} = {
205     map /^([^=]+)(?:=(.*))?/ ? ($1 => $2) : (),
206     @kv
207 root 1.1 };
208    
209 root 1.7 # read nonce
210     $self->{hdl}->push_read (line => sub {
211     my $rgreeting2 = $_[1];
212    
213 root 1.10 if ($self->{tls_ctx} and 1 == int $self->{remote_greeting}{tls}) {
214 root 1.8 $self->{tls} = $lgreeting2 lt $rgreeting2 ? "connect" : "accept";
215     $self->{hdl}->starttls ($self->{tls}, $self->{tls_ctx});
216     }
217    
218 root 1.7 # auth
219     require Digest::MD6;
220     require Digest::HMAC_MD6;
221 root 1.1
222 root 1.7 my $key = Digest::MD6::md6_hex ($secret);
223     my $lauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$lgreeting1\012$lgreeting2\012$rgreeting1\012$rgreeting2\012", 64, 256);
224 root 1.1
225 root 1.7 my $rauth =
226     $s_auth eq "hmac_md6_64_256" ? Digest::HMAC_MD6::hmac_md6_base64 ($key, "$rgreeting1\012$rgreeting2\012$lgreeting1\012$lgreeting2\012", 64, 256)
227     : $s_auth eq "hex_secret" ? unpack "H*", $secret
228     : die;
229 root 1.1
230 root 1.7 $lauth ne $rauth # echo attack?
231     or return $self->error ("authentication error");
232 root 1.2
233 root 1.7 $self->{hdl}->push_write ("$s_auth;$lauth;$s_framing\012");
234 root 1.2
235 root 1.7 $self->{hdl}->rbuf_max (64); # enough for 44 reply bytes or so
236     $self->{hdl}->push_read (line => sub {
237     my ($hdl, $rline) = @_;
238 root 1.2
239 root 1.7 my ($auth_method, $rauth2, $r_framing) = split /;/, $rline;
240 root 1.1
241 root 1.7 if ($rauth2 ne $rauth) {
242     return $self->error ("authentication failure/shared secret mismatch");
243     }
244 root 1.1
245 root 1.7 $self->{s_framing} = $s_framing;
246 root 1.2
247 root 1.7 $hdl->rbuf_max (undef);
248     my $queue = delete $self->{queue}; # we are connected
249 root 1.1
250 root 1.7 $self->connected;
251 root 1.1
252 root 1.7 $hdl->push_write ($self->{s_framing} => $_)
253     for @$queue;
254 root 1.1
255 root 1.7 my $rmsg; $rmsg = sub {
256     $_[0]->push_read ($r_framing => $rmsg);
257 root 1.1
258 root 1.7 AnyEvent::MP::Base::_inject ($_[1]);
259     };
260     $hdl->push_read ($r_framing => $rmsg);
261     });
262 root 1.1 });
263     });
264     }
265    
266     $self
267     }
268    
269     sub error {
270     my ($self, $msg) = @_;
271    
272 root 1.4 if ($self->{node} && $self->{node}{transport} == $self) {
273     $self->{node}->clr_transport;
274     }
275 root 1.7 $AnyEvent::MP::Base::WARN->("$self->{peerhost}:$self->{peerport}: $msg");
276 root 1.4 $self->destroy;
277 root 1.1 }
278    
279 root 1.2 sub connected {
280     my ($self) = @_;
281    
282 root 1.5 my $node = AnyEvent::MP::Base::add_node ($self->{remote_node});
283 root 1.4 Scalar::Util::weaken ($self->{node} = $node);
284     $node->set_transport ($self);
285 root 1.2 }
286    
287 root 1.1 sub send {
288 root 1.2 $_[0]{hdl}->push_write ($_[0]{s_framing} => $_[1]);
289 root 1.1 }
290    
291     sub destroy {
292     my ($self) = @_;
293    
294 root 1.2 $self->{hdl}->destroy
295     if $self->{hdl};
296 root 1.1 }
297    
298     sub DESTROY {
299     my ($self) = @_;
300    
301     $self->destroy;
302     }
303    
304     =back
305    
306 root 1.7 =head1 PROTOCOL
307    
308     The protocol is relatively simple, and consists of three phases which are
309     symmetrical for both sides: greeting (followed by optionally switching to
310     TLS mode), authentication and packet exchange.
311    
312     the protocol is designed to allow both full-text and binary streams.
313    
314     The greeting consists of two text lines that are ended by either an ASCII
315     CR LF pair, or a single ASCII LF (recommended).
316    
317     =head2 GREETING
318    
319 root 1.8 The first line contains strings separated (not ended) by C<;>
320 root 1.7 characters. The first seven strings are fixed by the protocol, the
321     remaining strings are C<KEY=VALUE> pairs. None of them may contain C<;>
322     characters themselves.
323    
324     The seven fixed strings are:
325    
326     =over 4
327    
328     =item C<aemp>
329    
330     The constant C<aemp> to identify the protocol.
331    
332     =item protocol version
333    
334     The (maximum) protocol version supported by this end, currently C<0>.
335    
336     =item minimum protocol version
337    
338     The minimum protocol version supported by this end, currently C<0>.
339    
340     =item a token uniquely identifying the current node instance
341    
342     This is a string that must change between restarts. It usually contains
343     things like the current time, the (OS) process id or similar values, but
344     no meaning of the contents are assumed.
345    
346     =item the node endpoint descriptors
347    
348     for public nodes, this is a comma-separated list of protocol endpoints,
349     i.e., the noderef. For slave nodes, this is a unique identifier.
350    
351     =item the acceptable authentication methods
352    
353     A comma-separated list of authentication methods supported by the
354     node. Note that AnyEvent::MP supports a C<hex_secret> authentication
355     method that accepts a cleartext password (hex-encoded), but will not use
356     this auth method itself.
357    
358     The receiving side should choose the first auth method it supports.
359    
360     =item the acceptable framing formats
361    
362     A comma-separated list of packet encoding/framign formats understood. The
363     receiving side should choose the first framing format it supports for
364     sending packets (which might be different from the format it has to accept).
365    
366 root 1.10 =back
367 root 1.8
368     The remaining arguments are C<KEY=VALUE> pairs. The following key-value
369     pairs are known at this time:
370    
371     =over 4
372    
373     =item provider=<module-version>
374    
375     The software provider for this implementation. For AnyEvent::MP, this is
376     C<AE-0.0> or whatever version it currently is at.
377    
378     =item peeraddr=<host>:<port>
379    
380     The peer address (socket address of the other side) as seen locally, in the same format
381     as noderef endpoints.
382    
383     =item tls=<major>.<minor>
384    
385     Indicates that the other side supports TLS (version should be 1.0) and
386     wishes to do a TLS handshake.
387    
388     =back
389    
390     After this greeting line there will be a second line containing a
391     cryptographic nonce, i.e. random data of high quality. To keep the
392     protocol text-only, these are usually 32 base64-encoded octets, but
393     it could be anything that doesn't contain any ASCII CR or ASCII LF
394     characters.
395    
396     Example of the two lines of greeting:
397    
398     aemp;0;0;e7d.4a76f48f;10.0.0.1:4040;hmac_md6_64_256,hex_secret;json,storable;provider=AE-0.0;peeraddr=127.0.0.1:1235
399     XntegV2Guvss0qNn7phCPnoU87xqxV+4Mqm/5y4iQm6a
400    
401     =head2 TLS handshake
402    
403     If, after the handshake, both sides indicate interest in TLS, then the
404     connection I<must> use TLS, or fail.
405    
406     Both sides compare their nonces, and the side who sent the lower nonce
407     value ("string" comparison on the raw octet values) becomes the client,
408     and the one with the higher nonce the server.
409    
410     =head2 AUTHENTICATION PHASE
411    
412     After the greeting is received (and the optional TLS handshake),
413     the authentication phase begins, which consists of sending a single
414     C<;>-separated line with three fixed strings and any number of
415     C<KEY=VALUE> pairs.
416    
417     The three fixed strings are:
418    
419     =over 4
420    
421     =item the authentication method chosen
422    
423     This must be one of the methods offered by the other side in the greeting.
424    
425     =item the authentication data
426    
427     The authentication data itself, usually base64 or hex-encoded data.
428    
429     =item the framing protocol chosen
430    
431     This must be one of the framing protocols offered by the other side in the
432     greeting. Each side must accept the choice of the other side.
433    
434     =back
435    
436 root 1.9 Example (the actual reply matching the previous example):
437    
438     hmac_md6_64_256;wIlLedBY956UCGSISG9mBZRDTG8xUi73/sVse2DSQp0;json
439    
440 root 1.8 =head2 DATA PHASE
441    
442     After this, packets get exchanged using the chosen framing protocol. It is
443     quite possible that both sides use a different framing protocol.
444    
445 root 1.1 =head1 SEE ALSO
446    
447     L<AnyEvent>.
448    
449     =head1 AUTHOR
450    
451     Marc Lehmann <schmorp@schmorp.de>
452     http://home.schmorp.de/
453    
454     =cut
455    
456     1
457