ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Transport.pm
Revision: 1.7
Committed: Mon Aug 3 14:47:25 2009 UTC (14 years, 9 months ago) by root
Branch: MAIN
Changes since 1.6: +143 -45 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Transport - actual transport protocol
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Transport;
8
9 =head1 DESCRIPTION
10
11 This is the superclass for MP transports, most of which is considered an
12 implementation detail.
13
14 See the "PROTOCOL" section below if you want to write another client for
15 this protocol.
16
17 =head1 FUNCTIONS/METHODS
18
19 =over 4
20
21 =cut
22
23 package AnyEvent::MP::Transport;
24
25 use common::sense;
26
27 use Scalar::Util;
28 use MIME::Base64 ();
29 use Storable ();
30 use JSON::XS ();
31
32 use AE ();
33 use AnyEvent::Socket ();
34 use AnyEvent::Handle ();
35
36 use base Exporter::;
37
38 our $VERSION = '0.0';
39 our $PROTOCOL_VERSION = 0;
40
41 =item $listener = mp_listener $host, $port, <constructor-args>, $cb->($transport)
42
43 Creates a listener on the given host/port using
44 C<AnyEvent::Socket::tcp_server>.
45
46 See C<new>, below, for constructor arguments.
47
48 Defaults for peerhost, peerport, fh and tls are provided.
49
50 =cut
51
52 sub mp_server($$@) {
53 my $cb = pop;
54 my ($host, $port, @args) = @_;
55
56 AnyEvent::Socket::tcp_server $host, $port, sub {
57 my ($fh, $host, $port) = @_;
58
59 $cb->(new AnyEvent::MP::Transport
60 fh => $fh,
61 peerhost => $host,
62 peerport => $port,
63 tls => "accept",
64 @args,
65 );
66 }
67 }
68
69 =item $guard = mp_connect $host, $port, <constructor-args>, $cb->($transport)
70
71 =cut
72
73 sub mp_connect {
74 my $cb = pop;
75 my ($host, $port, @args) = @_;
76
77 AnyEvent::Socket::tcp_connect $host, $port, sub {
78 my ($fh, $nhost, $nport) = @_;
79
80 return $cb->() unless $fh;
81
82 $cb->(new AnyEvent::MP::Transport
83 fh => $fh,
84 peername => $host,
85 peerhost => $nhost,
86 peerport => $nport,
87 tls => "accept",
88 @args,
89 );
90 }
91 }
92
93 =item new AnyEvent::MP::Transport
94
95 # immediately starts negotiation
96 my $transport = new AnyEvent::MP::Transport
97 # mandatory
98 fh => $filehandle,
99 local_id => $identifier,
100 on_recv => sub { receive-callback },
101 on_error => sub { error-callback },
102
103 # optional
104 secret => "shared secret",
105 on_eof => sub { clean-close-callback },
106 on_connect => sub { successful-connect-callback },
107 greeting => { key => value },
108
109 # tls support
110 tls => "accept|connect",
111 tls_ctx => AnyEvent::TLS,
112 peername => $peername, # for verification
113 ;
114
115 =cut
116
117 our @FRAMINGS = qw(json storable); # the framing types we accept and send, in order of preference
118 our @AUTH_SND = qw(hmac_md6_64_256); # auth types we send
119 our @AUTH_RCV = (@AUTH_SND, qw(hex_secret)); # auth types we accept
120
121 #AnyEvent::Handle::register_write_type mp_record => sub {
122 #};
123
124 sub new {
125 my ($class, %arg) = @_;
126
127 my $self = bless \%arg, $class;
128
129 $self->{queue} = [];
130
131 {
132 Scalar::Util::weaken (my $self = $self);
133
134 if (exists $arg{connect}) {
135 $arg{tls} ||= "connect";
136 $arg{tls_ctx} ||= { sslv2 => 0, sslv3 => 0, tlsv1 => 1, verify => 1 };
137 }
138
139 $arg{secret} = AnyEvent::MP::Base::default_secret ()
140 unless exists $arg{secret};
141
142 $self->{hdl} = new AnyEvent::Handle
143 fh => delete $arg{fh},
144 rbuf_max => 64 * 1024,
145 autocork => 1,
146 no_delay => 1,
147 on_error => sub {
148 $self->error ($_[2]);
149 },
150 peername => delete $arg{peername},
151 ;
152
153 my $secret = $arg{secret};
154 my $greeting_kv = $self->{greeting} ||= {};
155 $greeting_kv->{"tls1.0"} ||= $arg{tls}
156 if exists $arg{tls} && $arg{tls_ctx};
157 $greeting_kv->{provider} = "AE-$VERSION";
158 $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport};
159
160 # send greeting
161 my $lgreeting1 = "aemp;$PROTOCOL_VERSION;$PROTOCOL_VERSION" # version, min
162 . ";$AnyEvent::MP::Base::UNIQ"
163 . ";$AnyEvent::MP::Base::NODE"
164 . ";" . (join ",", @AUTH_RCV)
165 . ";" . (join ",", @FRAMINGS)
166 . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv);
167 my $lgreeting2 = MIME::Base64::encode_base64 AnyEvent::MP::Base::nonce (33), "";
168
169 $self->{hdl}->push_write ("$lgreeting1\012$lgreeting2\012");
170
171 # expect greeting
172 $self->{hdl}->push_read (line => sub {
173 my $rgreeting1 = $_[1];
174
175 my ($aemp, $version, $version_min, $uniq, $rnode, $auths, $framings, @kv) = split /;/, $rgreeting1;
176
177 if ($aemp ne "aemp") {
178 return $self->error ("unparsable greeting");
179 } elsif ($version_min > $PROTOCOL_VERSION) {
180 return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version_min .. $version)");
181 }
182
183 my $s_auth;
184 for my $auth_ (split /,/, $auths) {
185 if (grep $auth_ eq $_, @AUTH_SND) {
186 $s_auth = $auth_;
187 last;
188 }
189 }
190
191 defined $s_auth
192 or return $self->error ("$auths: no common auth type supported");
193
194 die unless $s_auth eq "hmac_md6_64_256"; # hardcoded atm.
195
196 my $s_framing;
197 for my $framing_ (split /,/, $framings) {
198 if (grep $framing_ eq $_, @FRAMINGS) {
199 $s_framing = $framing_;
200 last;
201 }
202 }
203
204 defined $s_framing
205 or return $self->error ("$framings: no common framing method supported");
206
207 $self->{remote_uniq} = $uniq;
208 $self->{remote_node} = $rnode;
209
210 $self->{remote_greeting} = {
211 map /^([^=]+)(?:=(.*))?/ ? ($1 => $2) : (),
212 @kv
213 };
214
215 if (exists $self->{tls} and $self->{tls_ctx} and exists $self->{remote_greeting}{"tls1.0"}) {
216 if ($self->{tls} ne $self->{remote_greeting}{"tls1.0"}) {
217 return $self->error ("TLS server/client mismatch");
218 }
219 $self->{hdl}->starttls ($self->{tls}, $self->{tls_ctx});
220 }
221
222 # read nonce
223 $self->{hdl}->push_read (line => sub {
224 my $rgreeting2 = $_[1];
225
226 # auth
227 require Digest::MD6;
228 require Digest::HMAC_MD6;
229
230 my $key = Digest::MD6::md6_hex ($secret);
231 my $lauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$lgreeting1\012$lgreeting2\012$rgreeting1\012$rgreeting2\012", 64, 256);
232
233 my $rauth =
234 $s_auth eq "hmac_md6_64_256" ? Digest::HMAC_MD6::hmac_md6_base64 ($key, "$rgreeting1\012$rgreeting2\012$lgreeting1\012$lgreeting2\012", 64, 256)
235 : $s_auth eq "hex_secret" ? unpack "H*", $secret
236 : die;
237
238 $lauth ne $rauth # echo attack?
239 or return $self->error ("authentication error");
240
241 $self->{hdl}->push_write ("$s_auth;$lauth;$s_framing\012");
242
243 $self->{hdl}->rbuf_max (64); # enough for 44 reply bytes or so
244 $self->{hdl}->push_read (line => sub {
245 my ($hdl, $rline) = @_;
246
247 my ($auth_method, $rauth2, $r_framing) = split /;/, $rline;
248
249 if ($rauth2 ne $rauth) {
250 return $self->error ("authentication failure/shared secret mismatch");
251 }
252
253 $self->{s_framing} = $s_framing;
254
255 $hdl->rbuf_max (undef);
256 my $queue = delete $self->{queue}; # we are connected
257
258 $self->connected;
259
260 $hdl->push_write ($self->{s_framing} => $_)
261 for @$queue;
262
263 my $rmsg; $rmsg = sub {
264 $_[0]->push_read ($r_framing => $rmsg);
265
266 AnyEvent::MP::Base::_inject ($_[1]);
267 };
268 $hdl->push_read ($r_framing => $rmsg);
269 });
270 });
271 });
272 }
273
274 $self
275 }
276
277 sub error {
278 my ($self, $msg) = @_;
279
280 if ($self->{node} && $self->{node}{transport} == $self) {
281 $self->{node}->clr_transport;
282 }
283 $AnyEvent::MP::Base::WARN->("$self->{peerhost}:$self->{peerport}: $msg");
284 $self->destroy;
285 }
286
287 sub connected {
288 my ($self) = @_;
289
290 my $node = AnyEvent::MP::Base::add_node ($self->{remote_node});
291 Scalar::Util::weaken ($self->{node} = $node);
292 $node->set_transport ($self);
293 }
294
295 sub send {
296 $_[0]{hdl}->push_write ($_[0]{s_framing} => $_[1]);
297 }
298
299 sub destroy {
300 my ($self) = @_;
301
302 $self->{hdl}->destroy
303 if $self->{hdl};
304 }
305
306 sub DESTROY {
307 my ($self) = @_;
308
309 $self->destroy;
310 }
311
312 =back
313
314 =head1 PROTOCOL
315
316 The protocol is relatively simple, and consists of three phases which are
317 symmetrical for both sides: greeting (followed by optionally switching to
318 TLS mode), authentication and packet exchange.
319
320 the protocol is designed to allow both full-text and binary streams.
321
322 The greeting consists of two text lines that are ended by either an ASCII
323 CR LF pair, or a single ASCII LF (recommended).
324
325 =head2 GREETING
326
327 The first line contains strings seperated (not ended) by C<;>
328 characters. The first seven strings are fixed by the protocol, the
329 remaining strings are C<KEY=VALUE> pairs. None of them may contain C<;>
330 characters themselves.
331
332 The seven fixed strings are:
333
334 =over 4
335
336 =item C<aemp>
337
338 The constant C<aemp> to identify the protocol.
339
340 =item protocol version
341
342 The (maximum) protocol version supported by this end, currently C<0>.
343
344 =item minimum protocol version
345
346 The minimum protocol version supported by this end, currently C<0>.
347
348 =item a token uniquely identifying the current node instance
349
350 This is a string that must change between restarts. It usually contains
351 things like the current time, the (OS) process id or similar values, but
352 no meaning of the contents are assumed.
353
354 =item the node endpoint descriptors
355
356 for public nodes, this is a comma-separated list of protocol endpoints,
357 i.e., the noderef. For slave nodes, this is a unique identifier.
358
359 =item the acceptable authentication methods
360
361 A comma-separated list of authentication methods supported by the
362 node. Note that AnyEvent::MP supports a C<hex_secret> authentication
363 method that accepts a cleartext password (hex-encoded), but will not use
364 this auth method itself.
365
366 The receiving side should choose the first auth method it supports.
367
368 =item the acceptable framing formats
369
370 A comma-separated list of packet encoding/framign formats understood. The
371 receiving side should choose the first framing format it supports for
372 sending packets (which might be different from the format it has to accept).
373
374 . ";$self->{peerhost};$self->{peerport}"
375 . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv);
376 my $lgreeting2 = MIME::Base64::encode_base64 AnyEvent::MP::Base::nonce (33), "";
377 =head1 SEE ALSO
378
379 L<AnyEvent>.
380
381 =head1 AUTHOR
382
383 Marc Lehmann <schmorp@schmorp.de>
384 http://home.schmorp.de/
385
386 =cut
387
388 1
389