ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Transport.pm
Revision: 1.2
Committed: Fri Jul 31 20:55:46 2009 UTC (14 years, 10 months ago) by root
Branch: MAIN
Changes since 1.1: +89 -45 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Transport - actual transport protocol
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Transport;
8
9 =head1 DESCRIPTION
10
11 This is the superclass for MP transports, most of which is considered an
12 implementation detail.
13
14 Future versions might document the actual protocol.
15
16 =head1 FUNCTIONS/METHODS
17
18 =over 4
19
20 =cut
21
22 package AnyEvent::MP::Transport;
23
24 use common::sense;
25
26 use Scalar::Util;
27 use MIME::Base64 ();
28 use Storable ();
29 use JSON::XS ();
30
31 use AE ();
32 use AnyEvent::Socket ();
33 use AnyEvent::Handle ();
34
35 use AnyEvent::MP::Util ();
36
37 use base Exporter::;
38
39 our $VERSION = '0.0';
40 our $PROTOCOL_VERSION = 0;
41
42 =item $listener = mp_listener $host, $port, <constructor-args>, $cb->($transport)
43
44 Creates a listener on the given host/port using
45 C<AnyEvent::Socket::tcp_server>.
46
47 See C<new>, below, for constructor arguments.
48
49 Defaults for peerhost, peerport, fh and tls are provided.
50
51 =cut
52
53 sub mp_server($$@) {
54 my $cb = pop;
55 my ($host, $port, @args) = @_;
56
57 AnyEvent::Socket::tcp_server $host, $port, sub {
58 my ($fh, $host, $port) = @_;
59
60 $cb->(new AnyEvent::MP::Transport
61 fh => $fh,
62 peerhost => $host,
63 peerport => $port,
64 tls => "accept",
65 @args,
66 );
67 }
68 }
69
70 =item $guard = mp_connect $host, $port, <constructor-args>, $cb->($transport)
71
72 =cut
73
74 sub mp_connect {
75 my $cb = pop;
76 my ($host, $port, @args) = @_;
77
78 AnyEvent::Socket::tcp_connect $host, $port, sub {
79 my ($fh, $nhost, $nport) = @_;
80
81 return $cb->() unless $fh;
82
83 $cb->(new AnyEvent::MP::Transport
84 fh => $fh,
85 peername => $host,
86 peerhost => $nhost,
87 peerport => $nport,
88 tls => "accept",
89 @args,
90 );
91 }
92 }
93
94 =item new AnyEvent::MP::Transport
95
96 # immediately starts negotiation
97 my $transport = new AnyEvent::MP::Transport
98 # mandatory
99 fh => $filehandle,
100 local_id => $identifier,
101 on_recv => sub { receive-callback },
102 on_error => sub { error-callback },
103
104 # optional
105 secret => "shared secret",
106 on_eof => sub { clean-close-callback },
107 on_connect => sub { successful-connect-callback },
108 greeting => { key => value },
109
110 # tls support
111 tls => "accept|connect",
112 tls_ctx => AnyEvent::TLS,
113 peername => $peername, # for verification
114 ;
115
116 =cut
117
118 sub new {
119 my ($class, %arg) = @_;
120
121 my $self = bless \%arg, $class;
122
123 $self->{queue} = [];
124
125 {
126 Scalar::Util::weaken (my $self = $self);
127
128 if (exists $arg{connect}) {
129 $arg{tls} ||= "connect";
130 $arg{tls_ctx} ||= { sslv2 => 0, sslv3 => 0, tlsv1 => 1, verify => 1 };
131 }
132
133 $arg{secret} = AnyEvent::MP::default_secret ()
134 unless exists $arg{secret};
135
136 $self->{hdl} = new AnyEvent::Handle
137 fh => delete $arg{fh},
138 rbuf_max => 64 * 1024,
139 on_error => sub {
140 $self->error ($_[2]);
141 },
142 peername => delete $arg{peername},
143 ;
144
145 my $secret = $arg{secret};
146 my $greeting_kv = $self->{greeting} ||= {};
147 $greeting_kv->{"tls1.0"} ||= $arg{tls}
148 if exists $arg{tls} && $arg{tls_ctx};
149 $greeting_kv->{provider} = "AE-$VERSION";
150
151 # send greeting
152 my $lgreeting = "aemp;$PROTOCOL_VERSION;$PROTOCOL_VERSION" # version, min
153 . ";$AnyEvent::MP::UNIQ"
154 . ";$AnyEvent::MP::NODE"
155 . ";" . (MIME::Base64::encode_base64 AnyEvent::MP::Util::nonce 33, "")
156 . ";hmac_md6_64_256" # hardcoded atm.
157 . ";json" # hardcoded atm.
158 . ";$self->{peerhost};$self->{peerport}"
159 . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv);
160
161 $self->{hdl}->push_write ("$lgreeting\012");
162
163 # expect greeting
164 $self->{hdl}->push_read (line => sub {
165 my $rgreeting = $_[1];
166
167 my ($aemp, $version, $version_min, $uniq, $rnode, undef, $auth, $framing, $peerport, $peerhost, @kv) = split /;/, $rgreeting;
168
169 if ($aemp ne "aemp") {
170 return $self->error ("unparsable greeting");
171 } elsif ($version_min > $PROTOCOL_VERSION) {
172 return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version_min .. $version)");
173 } elsif ($auth ne "hmac_md6_64_256") {
174 return $self->error ("unsupported auth method ($auth)");
175 } elsif ($framing ne "json") {
176 return $self->error ("unsupported framing method ($auth)");
177 }
178
179 $self->{remote_uniq} = $uniq;
180 $self->{remote_node} = $rnode;
181
182 $self->{remote_greeting} = {
183 map /^([^=]+)(?:=(.*))?/ ? ($1 => $2) : (),
184 @kv
185 };
186
187 if (exists $self->{tls} and $self->{tls_ctx} and exists $self->{remote_greeting}{"tls1.0"}) {
188 if ($self->{tls} ne $self->{remote_greeting}{"tls1.0"}) {
189 return $self->error ("TLS server/client mismatch");
190 }
191 $self->{hdl}->starttls ($self->{tls}, $self->{tls_ctx});
192 }
193
194 # auth
195 require Digest::MD6;
196 require Digest::HMAC_MD6;
197
198 my $key = Digest::MD6::md6_hex ($secret);
199 my $lauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$lgreeting$rgreeting", 64, 256);
200 my $rauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$rgreeting$lgreeting", 64, 256);
201
202 $lauth ne $rauth # echo attack?
203 or return $self->error ("authentication error");
204
205 $self->{hdl}->push_write ("$auth;$lauth;$framing\012");
206
207 $self->{hdl}->rbuf_max (64); # enough for 44 reply bytes or so
208 $self->{hdl}->push_read (line => sub {
209 my ($hdl, $rline) = @_;
210
211 my ($auth_method, $rauth2, $r_framing) = split /;/, $rline;
212
213 if ($rauth2 ne $rauth) {
214 return $self->error ("authentication failure/shared secret mismatch");
215 }
216
217 $self->{s_framing} = "json";#d#
218
219 $hdl->rbuf_max (undef);
220 my $queue = delete $self->{queue}; # we are connected
221
222 $self->connected;
223
224 $hdl->push_write ($self->{s_framing} => $_)
225 for @$queue;
226
227 my $rmsg; $rmsg = sub {
228 $_[0]->push_read ($r_framing => $rmsg);
229
230 AnyEvent::MP::_inject ($_[1]);
231 };
232 $hdl->push_read ($r_framing => $rmsg);
233 });
234 });
235 }
236
237 $self
238 }
239
240 sub error {
241 my ($self, $msg) = @_;
242
243 $self->{on_error}($self, $msg);
244 $self->{hdl}->destroy;
245 }
246
247 sub connected {
248 my ($self) = @_;
249
250 (AnyEvent::MP::add_node ($self->{remote_node}))
251 ->set_transport ($self);
252 }
253
254 sub send {
255 $_[0]{hdl}->push_write ($_[0]{s_framing} => $_[1]);
256 }
257
258 sub destroy {
259 my ($self) = @_;
260
261 $self->{hdl}->destroy
262 if $self->{hdl};
263 }
264
265 sub DESTROY {
266 my ($self) = @_;
267
268 $self->destroy;
269 }
270
271 =back
272
273 =head1 SEE ALSO
274
275 L<AnyEvent>.
276
277 =head1 AUTHOR
278
279 Marc Lehmann <schmorp@schmorp.de>
280 http://home.schmorp.de/
281
282 =cut
283
284 1
285