ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Transport.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Transport.pm (file contents):
Revision 1.1 by root, Thu Jul 30 08:38:50 2009 UTC vs.
Revision 1.8 by root, Mon Aug 3 14:58:13 2009 UTC

9=head1 DESCRIPTION 9=head1 DESCRIPTION
10 10
11This is the superclass for MP transports, most of which is considered an 11This is the superclass for MP transports, most of which is considered an
12implementation detail. 12implementation detail.
13 13
14Future versions might document the actual protocol. 14See the "PROTOCOL" section below if you want to write another client for
15this protocol.
15 16
16=head1 FUNCTIONS/METHODS 17=head1 FUNCTIONS/METHODS
17 18
18=over 4 19=over 4
19 20
24use common::sense; 25use common::sense;
25 26
26use Scalar::Util; 27use Scalar::Util;
27use MIME::Base64 (); 28use MIME::Base64 ();
28use Storable (); 29use Storable ();
30use JSON::XS ();
29 31
30use AE (); 32use AE ();
31use AnyEvent::Socket (); 33use AnyEvent::Socket ();
32use AnyEvent::Handle (); 34use AnyEvent::Handle ();
33use AnyEvent::MP ();
34 35
35use base Exporter::; 36use base Exporter::;
36 37
37our $VERSION = '0.0'; 38our $VERSION = '0.0';
38our $PROTOCOL_VERSION_MAJOR = 0; 39our $PROTOCOL_VERSION = 0;
39our $PROTOCOL_VERSION_MINOR = 0;
40 40
41=item $listener = mp_listener $host, $port, <constructor-args>, $cb->($transport) 41=item $listener = mp_listener $host, $port, <constructor-args>, $cb->($transport)
42 42
43Creates a listener on the given host/port using 43Creates a listener on the given host/port using
44C<AnyEvent::Socket::tcp_server>. 44C<AnyEvent::Socket::tcp_server>.
64 @args, 64 @args,
65 ); 65 );
66 } 66 }
67} 67}
68 68
69=item $guard = mp_connect $host, $port, <constructor-args>, $cb->($transport)
70
71=cut
72
73sub mp_connect {
74 my $cb = pop;
75 my ($host, $port, @args) = @_;
76
77 AnyEvent::Socket::tcp_connect $host, $port, sub {
78 my ($fh, $nhost, $nport) = @_;
79
80 return $cb->() unless $fh;
81
82 $cb->(new AnyEvent::MP::Transport
83 fh => $fh,
84 peername => $host,
85 peerhost => $nhost,
86 peerport => $nport,
87 tls => "accept",
88 @args,
89 );
90 }
91}
92
69=item new AnyEvent::MP::Transport 93=item new AnyEvent::MP::Transport
70 94
71 # immediately starts negotiation 95 # immediately starts negotiation
72 my $transport = new AnyEvent::MP::Transport 96 my $transport = new AnyEvent::MP::Transport
73 # fh OR connect is mandatory 97 # mandatory
74 fh => $filehandle, 98 fh => $filehandle,
75 connect => [$host, $port], 99 local_id => $identifier,
76
77 # mandatory
78 on_recv => sub { receive-callback }, 100 on_recv => sub { receive-callback },
79 on_error => sub { error-callback }, 101 on_error => sub { error-callback },
80 102
81 # optional 103 # optional
82 local_id => $identifier,
83 secret => "shared secret", 104 secret => "shared secret",
84 on_eof => sub { clean-close-callback }, 105 on_eof => sub { clean-close-callback },
85 on_connect => sub { successful-connect-callback }, 106 on_connect => sub { successful-connect-callback },
107 greeting => { key => value },
86 108
87 # tls support 109 # tls support
88 tls => "accept|connect", 110 tls => "accept|connect",
89 tls_ctx => AnyEvent::TLS, 111 tls_ctx => AnyEvent::TLS,
90 peername => $peername, # for verification 112 peername => $peername, # for verification
91 ; 113 ;
92 114
93=cut 115=cut
94 116
117our @FRAMINGS = qw(json storable); # the framing types we accept and send, in order of preference
118our @AUTH_SND = qw(hmac_md6_64_256); # auth types we send
119our @AUTH_RCV = (@AUTH_SND, qw(hex_secret)); # auth types we accept
120
121#AnyEvent::Handle::register_write_type mp_record => sub {
122#};
123
95sub new { 124sub new {
96 my ($class, %arg) = @_; 125 my ($class, %arg) = @_;
97 126
98 my $self = bless \%arg, $class; 127 my $self = bless \%arg, $class;
99 128
101 130
102 { 131 {
103 Scalar::Util::weaken (my $self = $self); 132 Scalar::Util::weaken (my $self = $self);
104 133
105 if (exists $arg{connect}) { 134 if (exists $arg{connect}) {
106 $arg{tls} ||= "connect";
107 $arg{tls_ctx} ||= { sslv2 => 0, sslv3 => 0, tlsv1 => 1, verify => 1, verify_peername => "https" }; 135 $arg{tls_ctx} ||= { sslv2 => 0, sslv3 => 0, tlsv1 => 1, verify => 1 };
108 } 136 }
109 137
138 $arg{secret} = AnyEvent::MP::Base::default_secret ()
139 unless exists $arg{secret};
140
110 $self->{hdl} = new AnyEvent::Handle 141 $self->{hdl} = new AnyEvent::Handle
111 (exists $arg{fh} ? (fh => delete $arg{fh}) : (connect => delete $arg{connect})), 142 fh => delete $arg{fh},
143 rbuf_max => 64 * 1024,
144 autocork => 1,
145 no_delay => 1,
112 on_error => sub { 146 on_error => sub {
113 $self->error ($_[2]); 147 $self->error ($_[2]);
114 }, 148 },
115 peername => delete $arg{peername}, 149 peername => delete $arg{peername},
116 ; 150 ;
117 151
118 my $secret = delete $arg{secret} ? delete $arg{secret} : AnyEvent::MP::default_secret; 152 my $secret = $arg{secret};
153 my $greeting_kv = $self->{greeting} ||= {};
154 $greeting_kv->{"tls"} = "1.0"
155 if $arg{tls_ctx};
156 $greeting_kv->{provider} = "AE-$VERSION";
157 $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport};
119 158
120 # send greeting 159 # send greeting
121 my $lgreeting = "aemp;$PROTOCOL_VERSION_MAJOR;$PROTOCOL_VERSION_MINOR;AnyEvent::MP;$VERSION;" 160 my $lgreeting1 = "aemp;$PROTOCOL_VERSION;$PROTOCOL_VERSION" # version, min
122 . (MIME::Base64::encode_base64 AnyEvent::MP::nonce 33, "") . ";" 161 . ";$AnyEvent::MP::Base::UNIQ"
123 . "hmac_md6_64_256;" # hardcoded atm. 162 . ";$AnyEvent::MP::Base::NODE"
124 . "storable;" # hardcoded atm. 163 . ";" . (join ",", @AUTH_RCV)
125 . "$self->{local_id};" 164 . ";" . (join ",", @FRAMINGS)
126 . (exists $arg{tls} && $arg{tls_ctx} ? "tls1.0=$arg{tls};" : ""); 165 . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv);
166 my $lgreeting2 = MIME::Base64::encode_base64 AnyEvent::MP::Base::nonce (33), "";
127 167
128 $self->{hdl}->push_write ("$lgreeting\012"); 168 $self->{hdl}->push_write ("$lgreeting1\012$lgreeting2\012");
129 169
130 # expect greeting 170 # expect greeting
131 $self->{hdl}->push_read (line => sub { 171 $self->{hdl}->push_read (line => sub {
132 my $rgreeting = $_[1]; 172 my $rgreeting1 = $_[1];
133 173
134 my ($aemp, $major, $minor, $provider, $provider_version, $nonce2, $auth, $framing, $rid, @kv) = split /;/, $rgreeting; 174 my ($aemp, $version, $version_min, $uniq, $rnode, $auths, $framings, @kv) = split /;/, $rgreeting1;
135 175
136 if ($aemp ne "aemp") { 176 if ($aemp ne "aemp") {
137 return $self->error ("unparsable greeting"); 177 return $self->error ("unparsable greeting");
138 } elsif ($major != $PROTOCOL_VERSION_MAJOR) { 178 } elsif ($version_min > $PROTOCOL_VERSION) {
139 return $self->error ("major version mismatch ($PROTOCOL_VERSION_MAJOR vs. $major)"); 179 return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version_min .. $version)");
140 } elsif ($auth ne "hmac_md6_64_256") {
141 return $self->error ("unsupported auth method ($auth)");
142 } elsif ($framing ne "storable") {
143 return $self->error ("unsupported auth method ($auth)");
144 } 180 }
145 181
182 my $s_auth;
183 for my $auth_ (split /,/, $auths) {
184 if (grep $auth_ eq $_, @AUTH_SND) {
185 $s_auth = $auth_;
186 last;
187 }
188 }
189
190 defined $s_auth
191 or return $self->error ("$auths: no common auth type supported");
192
193 die unless $s_auth eq "hmac_md6_64_256"; # hardcoded atm.
194
195 my $s_framing;
196 for my $framing_ (split /,/, $framings) {
197 if (grep $framing_ eq $_, @FRAMINGS) {
198 $s_framing = $framing_;
199 last;
200 }
201 }
202
203 defined $s_framing
204 or return $self->error ("$framings: no common framing method supported");
205
206 $self->{remote_uniq} = $uniq;
146 $self->{remote_id} = $rid; 207 $self->{remote_node} = $rnode;
147 208
148 $self->{greeting} = { 209 $self->{remote_greeting} = {
149 provider => $provider, 210 map /^([^=]+)(?:=(.*))?/ ? ($1 => $2) : (),
150 provider_version => $provider_version, 211 @kv
151 }; 212 };
152 213
153 /^([^=]+)(?:=(.*))?/ and $self->{greeting}{$1} = $2 214 # read nonce
154 for @kv; 215 $self->{hdl}->push_read (line => sub {
216 my $rgreeting2 = $_[1];
155 217
156 if (exists $self->{tls} and $self->{tls_ctx} and exists $self->{greeting}{"tls1.0"}) {
157 if ($self->{tls} ne $self->{greeting}{"tls1.0"}) { 218 if ($self->{tls_ctx} and 1 == int $self->{remote_greeting}{"tls"}) {
158 return $self->error ("TLS server/client mismatch"); 219 $self->{tls} = $lgreeting2 lt $rgreeting2 ? "connect" : "accept";
220 $self->{hdl}->starttls ($self->{tls}, $self->{tls_ctx});
159 } 221 }
160 $self->{hdl}->starttls ($self->{tls}, $self->{tls_ctx});
161 } 222
162
163 # auth 223 # auth
164 require Digest::MD6; 224 require Digest::MD6;
165 require Digest::HMAC_MD6; 225 require Digest::HMAC_MD6;
166 226
167 my $key = Digest::MD6::md6_hex ($secret); 227 my $key = Digest::MD6::md6_hex ($secret);
168 my $lauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$lgreeting$rgreeting", 64, 256); 228 my $lauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$lgreeting1\012$lgreeting2\012$rgreeting1\012$rgreeting2\012", 64, 256);
169 my $rauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$rgreeting$lgreeting", 64, 256); 229
230 my $rauth =
231 $s_auth eq "hmac_md6_64_256" ? Digest::HMAC_MD6::hmac_md6_base64 ($key, "$rgreeting1\012$rgreeting2\012$lgreeting1\012$lgreeting2\012", 64, 256)
232 : $s_auth eq "hex_secret" ? unpack "H*", $secret
233 : die;
234
235 $lauth ne $rauth # echo attack?
236 or return $self->error ("authentication error");
237
170 $self->{hdl}->push_write ("$lauth\012"); 238 $self->{hdl}->push_write ("$s_auth;$lauth;$s_framing\012");
171 239
240 $self->{hdl}->rbuf_max (64); # enough for 44 reply bytes or so
172 $self->{hdl}->push_read (line => sub { 241 $self->{hdl}->push_read (line => sub {
173 my ($hdl, $rauth2) = @_; 242 my ($hdl, $rline) = @_;
174 243
244 my ($auth_method, $rauth2, $r_framing) = split /;/, $rline;
245
175 if ($rauth2 ne $rauth) { 246 if ($rauth2 ne $rauth) {
176 return $self->error ("authentication failure/shared secret mismatch"); 247 return $self->error ("authentication failure/shared secret mismatch");
177 } 248 }
178 249
250 $self->{s_framing} = $s_framing;
251
252 $hdl->rbuf_max (undef);
179 my $queue = delete $self->{queue}; # we are connected 253 my $queue = delete $self->{queue}; # we are connected
180 254
181 $self->{on_connect}($self)
182 if $self->{on_connect}; 255 $self->connected;
183 256
184 $hdl->push_write (storable => $_) 257 $hdl->push_write ($self->{s_framing} => $_)
185 for @$queue; 258 for @$queue;
186 259
187 my $rmsg; $rmsg = sub { 260 my $rmsg; $rmsg = sub {
188 $_[0]->push_read (storable => $rmsg); 261 $_[0]->push_read ($r_framing => $rmsg);
189 262
190 $self->{on_recv}($self, $_[1]); 263 AnyEvent::MP::Base::_inject ($_[1]);
264 };
265 $hdl->push_read ($r_framing => $rmsg);
191 }; 266 });
192 $hdl->push_read (storable => $rmsg);
193 }); 267 });
194 }); 268 });
195 } 269 }
196 270
197 $self 271 $self
198} 272}
199 273
200sub error { 274sub error {
201 my ($self, $msg) = @_; 275 my ($self, $msg) = @_;
202 276
203 $self->{on_error}($self, $msg); 277 if ($self->{node} && $self->{node}{transport} == $self) {
278 $self->{node}->clr_transport;
279 }
280 $AnyEvent::MP::Base::WARN->("$self->{peerhost}:$self->{peerport}: $msg");
204 $self->{hdl}->destroy; 281 $self->destroy;
282}
283
284sub connected {
285 my ($self) = @_;
286
287 my $node = AnyEvent::MP::Base::add_node ($self->{remote_node});
288 Scalar::Util::weaken ($self->{node} = $node);
289 $node->set_transport ($self);
205} 290}
206 291
207sub send { 292sub send {
208 my ($self, $msg) = @_; 293 $_[0]{hdl}->push_write ($_[0]{s_framing} => $_[1]);
209
210 exists $self->{queue}
211 ? push @{ $self->{queue} }, $msg
212 : $self->{hdl}->push_write (storable => $msg);
213} 294}
214 295
215sub destroy { 296sub destroy {
216 my ($self) = @_; 297 my ($self) = @_;
217 298
218 $self->{hdl}->destroy; 299 $self->{hdl}->destroy
300 if $self->{hdl};
219} 301}
220 302
221sub DESTROY { 303sub DESTROY {
222 my ($self) = @_; 304 my ($self) = @_;
223 305
224 $self->destroy; 306 $self->destroy;
225} 307}
226 308
227=back 309=back
228 310
311=head1 PROTOCOL
312
313The protocol is relatively simple, and consists of three phases which are
314symmetrical for both sides: greeting (followed by optionally switching to
315TLS mode), authentication and packet exchange.
316
317the protocol is designed to allow both full-text and binary streams.
318
319The greeting consists of two text lines that are ended by either an ASCII
320CR LF pair, or a single ASCII LF (recommended).
321
322=head2 GREETING
323
324The first line contains strings separated (not ended) by C<;>
325characters. The first seven strings are fixed by the protocol, the
326remaining strings are C<KEY=VALUE> pairs. None of them may contain C<;>
327characters themselves.
328
329The seven fixed strings are:
330
331=over 4
332
333=item C<aemp>
334
335The constant C<aemp> to identify the protocol.
336
337=item protocol version
338
339The (maximum) protocol version supported by this end, currently C<0>.
340
341=item minimum protocol version
342
343The minimum protocol version supported by this end, currently C<0>.
344
345=item a token uniquely identifying the current node instance
346
347This is a string that must change between restarts. It usually contains
348things like the current time, the (OS) process id or similar values, but
349no meaning of the contents are assumed.
350
351=item the node endpoint descriptors
352
353for public nodes, this is a comma-separated list of protocol endpoints,
354i.e., the noderef. For slave nodes, this is a unique identifier.
355
356=item the acceptable authentication methods
357
358A comma-separated list of authentication methods supported by the
359node. Note that AnyEvent::MP supports a C<hex_secret> authentication
360method that accepts a cleartext password (hex-encoded), but will not use
361this auth method itself.
362
363The receiving side should choose the first auth method it supports.
364
365=item the acceptable framing formats
366
367A comma-separated list of packet encoding/framign formats understood. The
368receiving side should choose the first framing format it supports for
369sending packets (which might be different from the format it has to accept).
370
371=cut
372
373The remaining arguments are C<KEY=VALUE> pairs. The following key-value
374pairs are known at this time:
375
376=over 4
377
378=item provider=<module-version>
379
380The software provider for this implementation. For AnyEvent::MP, this is
381C<AE-0.0> or whatever version it currently is at.
382
383=item peeraddr=<host>:<port>
384
385The peer address (socket address of the other side) as seen locally, in the same format
386as noderef endpoints.
387
388=item tls=<major>.<minor>
389
390Indicates that the other side supports TLS (version should be 1.0) and
391wishes to do a TLS handshake.
392
393=back
394
395After this greeting line there will be a second line containing a
396cryptographic nonce, i.e. random data of high quality. To keep the
397protocol text-only, these are usually 32 base64-encoded octets, but
398it could be anything that doesn't contain any ASCII CR or ASCII LF
399characters.
400
401Example of the two lines of greeting:
402
403 aemp;0;0;e7d.4a76f48f;10.0.0.1:4040;hmac_md6_64_256,hex_secret;json,storable;provider=AE-0.0;peeraddr=127.0.0.1:1235
404 XntegV2Guvss0qNn7phCPnoU87xqxV+4Mqm/5y4iQm6a
405
406=head2 TLS handshake
407
408If, after the handshake, both sides indicate interest in TLS, then the
409connection I<must> use TLS, or fail.
410
411Both sides compare their nonces, and the side who sent the lower nonce
412value ("string" comparison on the raw octet values) becomes the client,
413and the one with the higher nonce the server.
414
415=head2 AUTHENTICATION PHASE
416
417After the greeting is received (and the optional TLS handshake),
418the authentication phase begins, which consists of sending a single
419C<;>-separated line with three fixed strings and any number of
420C<KEY=VALUE> pairs.
421
422The three fixed strings are:
423
424=over 4
425
426=item the authentication method chosen
427
428This must be one of the methods offered by the other side in the greeting.
429
430=item the authentication data
431
432The authentication data itself, usually base64 or hex-encoded data.
433
434=item the framing protocol chosen
435
436This must be one of the framing protocols offered by the other side in the
437greeting. Each side must accept the choice of the other side.
438
439=back
440
441=head2 DATA PHASE
442
443After this, packets get exchanged using the chosen framing protocol. It is
444quite possible that both sides use a different framing protocol.
445
229=head1 SEE ALSO 446=head1 SEE ALSO
230 447
231L<AnyEvent>. 448L<AnyEvent>.
232 449
233=head1 AUTHOR 450=head1 AUTHOR

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines