ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Transport.pm
Revision: 1.7
Committed: Mon Aug 3 14:47:25 2009 UTC (14 years, 10 months ago) by root
Branch: MAIN
Changes since 1.6: +143 -45 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Transport - actual transport protocol
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Transport;
8    
9     =head1 DESCRIPTION
10    
11     This is the superclass for MP transports, most of which is considered an
12     implementation detail.
13    
14 root 1.7 See the "PROTOCOL" section below if you want to write another client for
15     this protocol.
16 root 1.1
17     =head1 FUNCTIONS/METHODS
18    
19     =over 4
20    
21     =cut
22    
23     package AnyEvent::MP::Transport;
24    
25     use common::sense;
26    
27     use Scalar::Util;
28     use MIME::Base64 ();
29     use Storable ();
30 root 1.2 use JSON::XS ();
31 root 1.1
32     use AE ();
33     use AnyEvent::Socket ();
34     use AnyEvent::Handle ();
35 root 1.2
36 root 1.1 use base Exporter::;
37    
38     our $VERSION = '0.0';
39 root 1.2 our $PROTOCOL_VERSION = 0;
40 root 1.1
41     =item $listener = mp_listener $host, $port, <constructor-args>, $cb->($transport)
42    
43     Creates a listener on the given host/port using
44     C<AnyEvent::Socket::tcp_server>.
45    
46     See C<new>, below, for constructor arguments.
47    
48     Defaults for peerhost, peerport, fh and tls are provided.
49    
50     =cut
51    
52     sub mp_server($$@) {
53     my $cb = pop;
54     my ($host, $port, @args) = @_;
55    
56     AnyEvent::Socket::tcp_server $host, $port, sub {
57     my ($fh, $host, $port) = @_;
58    
59     $cb->(new AnyEvent::MP::Transport
60     fh => $fh,
61     peerhost => $host,
62     peerport => $port,
63     tls => "accept",
64     @args,
65     );
66     }
67     }
68    
69 root 1.2 =item $guard = mp_connect $host, $port, <constructor-args>, $cb->($transport)
70    
71     =cut
72    
73     sub mp_connect {
74     my $cb = pop;
75     my ($host, $port, @args) = @_;
76    
77     AnyEvent::Socket::tcp_connect $host, $port, sub {
78     my ($fh, $nhost, $nport) = @_;
79    
80     return $cb->() unless $fh;
81    
82     $cb->(new AnyEvent::MP::Transport
83     fh => $fh,
84     peername => $host,
85     peerhost => $nhost,
86     peerport => $nport,
87     tls => "accept",
88     @args,
89     );
90     }
91     }
92    
93 root 1.1 =item new AnyEvent::MP::Transport
94    
95     # immediately starts negotiation
96     my $transport = new AnyEvent::MP::Transport
97 root 1.2 # mandatory
98 root 1.1 fh => $filehandle,
99 root 1.2 local_id => $identifier,
100 root 1.1 on_recv => sub { receive-callback },
101     on_error => sub { error-callback },
102    
103     # optional
104     secret => "shared secret",
105     on_eof => sub { clean-close-callback },
106     on_connect => sub { successful-connect-callback },
107 root 1.2 greeting => { key => value },
108 root 1.1
109     # tls support
110     tls => "accept|connect",
111     tls_ctx => AnyEvent::TLS,
112     peername => $peername, # for verification
113     ;
114    
115     =cut
116    
117 root 1.7 our @FRAMINGS = qw(json storable); # the framing types we accept and send, in order of preference
118     our @AUTH_SND = qw(hmac_md6_64_256); # auth types we send
119     our @AUTH_RCV = (@AUTH_SND, qw(hex_secret)); # auth types we accept
120    
121     #AnyEvent::Handle::register_write_type mp_record => sub {
122     #};
123 root 1.4
124 root 1.1 sub new {
125     my ($class, %arg) = @_;
126    
127     my $self = bless \%arg, $class;
128    
129     $self->{queue} = [];
130    
131     {
132     Scalar::Util::weaken (my $self = $self);
133    
134     if (exists $arg{connect}) {
135     $arg{tls} ||= "connect";
136 root 1.2 $arg{tls_ctx} ||= { sslv2 => 0, sslv3 => 0, tlsv1 => 1, verify => 1 };
137 root 1.1 }
138    
139 root 1.5 $arg{secret} = AnyEvent::MP::Base::default_secret ()
140 root 1.2 unless exists $arg{secret};
141    
142 root 1.1 $self->{hdl} = new AnyEvent::Handle
143 root 1.2 fh => delete $arg{fh},
144     rbuf_max => 64 * 1024,
145 root 1.4 autocork => 1,
146     no_delay => 1,
147 root 1.1 on_error => sub {
148     $self->error ($_[2]);
149     },
150     peername => delete $arg{peername},
151     ;
152    
153 root 1.2 my $secret = $arg{secret};
154     my $greeting_kv = $self->{greeting} ||= {};
155     $greeting_kv->{"tls1.0"} ||= $arg{tls}
156     if exists $arg{tls} && $arg{tls_ctx};
157     $greeting_kv->{provider} = "AE-$VERSION";
158 root 1.7 $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport};
159 root 1.1
160     # send greeting
161 root 1.7 my $lgreeting1 = "aemp;$PROTOCOL_VERSION;$PROTOCOL_VERSION" # version, min
162     . ";$AnyEvent::MP::Base::UNIQ"
163     . ";$AnyEvent::MP::Base::NODE"
164     . ";" . (join ",", @AUTH_RCV)
165     . ";" . (join ",", @FRAMINGS)
166     . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv);
167     my $lgreeting2 = MIME::Base64::encode_base64 AnyEvent::MP::Base::nonce (33), "";
168 root 1.1
169 root 1.7 $self->{hdl}->push_write ("$lgreeting1\012$lgreeting2\012");
170 root 1.1
171     # expect greeting
172     $self->{hdl}->push_read (line => sub {
173 root 1.7 my $rgreeting1 = $_[1];
174 root 1.1
175 root 1.7 my ($aemp, $version, $version_min, $uniq, $rnode, $auths, $framings, @kv) = split /;/, $rgreeting1;
176 root 1.1
177     if ($aemp ne "aemp") {
178     return $self->error ("unparsable greeting");
179 root 1.2 } elsif ($version_min > $PROTOCOL_VERSION) {
180     return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version_min .. $version)");
181 root 1.1 }
182    
183 root 1.7 my $s_auth;
184     for my $auth_ (split /,/, $auths) {
185     if (grep $auth_ eq $_, @AUTH_SND) {
186     $s_auth = $auth_;
187     last;
188     }
189     }
190    
191     defined $s_auth
192     or return $self->error ("$auths: no common auth type supported");
193    
194     die unless $s_auth eq "hmac_md6_64_256"; # hardcoded atm.
195    
196     my $s_framing;
197     for my $framing_ (split /,/, $framings) {
198     if (grep $framing_ eq $_, @FRAMINGS) {
199     $s_framing = $framing_;
200     last;
201     }
202     }
203    
204     defined $s_framing
205     or return $self->error ("$framings: no common framing method supported");
206    
207 root 1.2 $self->{remote_uniq} = $uniq;
208     $self->{remote_node} = $rnode;
209 root 1.1
210 root 1.2 $self->{remote_greeting} = {
211     map /^([^=]+)(?:=(.*))?/ ? ($1 => $2) : (),
212     @kv
213 root 1.1 };
214    
215 root 1.2 if (exists $self->{tls} and $self->{tls_ctx} and exists $self->{remote_greeting}{"tls1.0"}) {
216     if ($self->{tls} ne $self->{remote_greeting}{"tls1.0"}) {
217 root 1.1 return $self->error ("TLS server/client mismatch");
218     }
219     $self->{hdl}->starttls ($self->{tls}, $self->{tls_ctx});
220     }
221 root 1.7
222     # read nonce
223     $self->{hdl}->push_read (line => sub {
224     my $rgreeting2 = $_[1];
225    
226     # auth
227     require Digest::MD6;
228     require Digest::HMAC_MD6;
229 root 1.1
230 root 1.7 my $key = Digest::MD6::md6_hex ($secret);
231     my $lauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$lgreeting1\012$lgreeting2\012$rgreeting1\012$rgreeting2\012", 64, 256);
232 root 1.1
233 root 1.7 my $rauth =
234     $s_auth eq "hmac_md6_64_256" ? Digest::HMAC_MD6::hmac_md6_base64 ($key, "$rgreeting1\012$rgreeting2\012$lgreeting1\012$lgreeting2\012", 64, 256)
235     : $s_auth eq "hex_secret" ? unpack "H*", $secret
236     : die;
237 root 1.1
238 root 1.7 $lauth ne $rauth # echo attack?
239     or return $self->error ("authentication error");
240 root 1.2
241 root 1.7 $self->{hdl}->push_write ("$s_auth;$lauth;$s_framing\012");
242 root 1.2
243 root 1.7 $self->{hdl}->rbuf_max (64); # enough for 44 reply bytes or so
244     $self->{hdl}->push_read (line => sub {
245     my ($hdl, $rline) = @_;
246 root 1.2
247 root 1.7 my ($auth_method, $rauth2, $r_framing) = split /;/, $rline;
248 root 1.1
249 root 1.7 if ($rauth2 ne $rauth) {
250     return $self->error ("authentication failure/shared secret mismatch");
251     }
252 root 1.1
253 root 1.7 $self->{s_framing} = $s_framing;
254 root 1.2
255 root 1.7 $hdl->rbuf_max (undef);
256     my $queue = delete $self->{queue}; # we are connected
257 root 1.1
258 root 1.7 $self->connected;
259 root 1.1
260 root 1.7 $hdl->push_write ($self->{s_framing} => $_)
261     for @$queue;
262 root 1.1
263 root 1.7 my $rmsg; $rmsg = sub {
264     $_[0]->push_read ($r_framing => $rmsg);
265 root 1.1
266 root 1.7 AnyEvent::MP::Base::_inject ($_[1]);
267     };
268     $hdl->push_read ($r_framing => $rmsg);
269     });
270 root 1.1 });
271     });
272     }
273    
274     $self
275     }
276    
277     sub error {
278     my ($self, $msg) = @_;
279    
280 root 1.4 if ($self->{node} && $self->{node}{transport} == $self) {
281     $self->{node}->clr_transport;
282     }
283 root 1.7 $AnyEvent::MP::Base::WARN->("$self->{peerhost}:$self->{peerport}: $msg");
284 root 1.4 $self->destroy;
285 root 1.1 }
286    
287 root 1.2 sub connected {
288     my ($self) = @_;
289    
290 root 1.5 my $node = AnyEvent::MP::Base::add_node ($self->{remote_node});
291 root 1.4 Scalar::Util::weaken ($self->{node} = $node);
292     $node->set_transport ($self);
293 root 1.2 }
294    
295 root 1.1 sub send {
296 root 1.2 $_[0]{hdl}->push_write ($_[0]{s_framing} => $_[1]);
297 root 1.1 }
298    
299     sub destroy {
300     my ($self) = @_;
301    
302 root 1.2 $self->{hdl}->destroy
303     if $self->{hdl};
304 root 1.1 }
305    
306     sub DESTROY {
307     my ($self) = @_;
308    
309     $self->destroy;
310     }
311    
312     =back
313    
314 root 1.7 =head1 PROTOCOL
315    
316     The protocol is relatively simple, and consists of three phases which are
317     symmetrical for both sides: greeting (followed by optionally switching to
318     TLS mode), authentication and packet exchange.
319    
320     the protocol is designed to allow both full-text and binary streams.
321    
322     The greeting consists of two text lines that are ended by either an ASCII
323     CR LF pair, or a single ASCII LF (recommended).
324    
325     =head2 GREETING
326    
327     The first line contains strings seperated (not ended) by C<;>
328     characters. The first seven strings are fixed by the protocol, the
329     remaining strings are C<KEY=VALUE> pairs. None of them may contain C<;>
330     characters themselves.
331    
332     The seven fixed strings are:
333    
334     =over 4
335    
336     =item C<aemp>
337    
338     The constant C<aemp> to identify the protocol.
339    
340     =item protocol version
341    
342     The (maximum) protocol version supported by this end, currently C<0>.
343    
344     =item minimum protocol version
345    
346     The minimum protocol version supported by this end, currently C<0>.
347    
348     =item a token uniquely identifying the current node instance
349    
350     This is a string that must change between restarts. It usually contains
351     things like the current time, the (OS) process id or similar values, but
352     no meaning of the contents are assumed.
353    
354     =item the node endpoint descriptors
355    
356     for public nodes, this is a comma-separated list of protocol endpoints,
357     i.e., the noderef. For slave nodes, this is a unique identifier.
358    
359     =item the acceptable authentication methods
360    
361     A comma-separated list of authentication methods supported by the
362     node. Note that AnyEvent::MP supports a C<hex_secret> authentication
363     method that accepts a cleartext password (hex-encoded), but will not use
364     this auth method itself.
365    
366     The receiving side should choose the first auth method it supports.
367    
368     =item the acceptable framing formats
369    
370     A comma-separated list of packet encoding/framign formats understood. The
371     receiving side should choose the first framing format it supports for
372     sending packets (which might be different from the format it has to accept).
373    
374     . ";$self->{peerhost};$self->{peerport}"
375     . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv);
376     my $lgreeting2 = MIME::Base64::encode_base64 AnyEvent::MP::Base::nonce (33), "";
377 root 1.1 =head1 SEE ALSO
378    
379     L<AnyEvent>.
380    
381     =head1 AUTHOR
382    
383     Marc Lehmann <schmorp@schmorp.de>
384     http://home.schmorp.de/
385    
386     =cut
387    
388     1
389