ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Transport.pm
Revision: 1.5
Committed: Sun Aug 2 14:44:37 2009 UTC (14 years, 10 months ago) by root
Branch: MAIN
Changes since 1.4: +6 -6 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Transport - actual transport protocol
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Transport;
8
9 =head1 DESCRIPTION
10
11 This is the superclass for MP transports, most of which is considered an
12 implementation detail.
13
14 Future versions might document the actual protocol.
15
16 =head1 FUNCTIONS/METHODS
17
18 =over 4
19
20 =cut
21
22 package AnyEvent::MP::Transport;
23
24 use common::sense;
25
26 use Scalar::Util;
27 use MIME::Base64 ();
28 use Storable ();
29 use JSON::XS ();
30
31 use AE ();
32 use AnyEvent::Socket ();
33 use AnyEvent::Handle ();
34
35 use AnyEvent::MP::Util ();
36
37 use base Exporter::;
38
39 our $VERSION = '0.0';
40 our $PROTOCOL_VERSION = 0;
41
42 =item $listener = mp_listener $host, $port, <constructor-args>, $cb->($transport)
43
44 Creates a listener on the given host/port using
45 C<AnyEvent::Socket::tcp_server>.
46
47 See C<new>, below, for constructor arguments.
48
49 Defaults for peerhost, peerport, fh and tls are provided.
50
51 =cut
52
53 sub mp_server($$@) {
54 my $cb = pop;
55 my ($host, $port, @args) = @_;
56
57 AnyEvent::Socket::tcp_server $host, $port, sub {
58 my ($fh, $host, $port) = @_;
59
60 $cb->(new AnyEvent::MP::Transport
61 fh => $fh,
62 peerhost => $host,
63 peerport => $port,
64 tls => "accept",
65 @args,
66 );
67 }
68 }
69
70 =item $guard = mp_connect $host, $port, <constructor-args>, $cb->($transport)
71
72 =cut
73
74 sub mp_connect {
75 my $cb = pop;
76 my ($host, $port, @args) = @_;
77
78 AnyEvent::Socket::tcp_connect $host, $port, sub {
79 my ($fh, $nhost, $nport) = @_;
80
81 return $cb->() unless $fh;
82
83 $cb->(new AnyEvent::MP::Transport
84 fh => $fh,
85 peername => $host,
86 peerhost => $nhost,
87 peerport => $nport,
88 tls => "accept",
89 @args,
90 );
91 }
92 }
93
94 =item new AnyEvent::MP::Transport
95
96 # immediately starts negotiation
97 my $transport = new AnyEvent::MP::Transport
98 # mandatory
99 fh => $filehandle,
100 local_id => $identifier,
101 on_recv => sub { receive-callback },
102 on_error => sub { error-callback },
103
104 # optional
105 secret => "shared secret",
106 on_eof => sub { clean-close-callback },
107 on_connect => sub { successful-connect-callback },
108 greeting => { key => value },
109
110 # tls support
111 tls => "accept|connect",
112 tls_ctx => AnyEvent::TLS,
113 peername => $peername, # for verification
114 ;
115
116 =cut
117
118 our @FRAMING_WANT = qw(json storable);#d##TODO#
119
120 sub new {
121 my ($class, %arg) = @_;
122
123 my $self = bless \%arg, $class;
124
125 $self->{queue} = [];
126
127 {
128 Scalar::Util::weaken (my $self = $self);
129
130 if (exists $arg{connect}) {
131 $arg{tls} ||= "connect";
132 $arg{tls_ctx} ||= { sslv2 => 0, sslv3 => 0, tlsv1 => 1, verify => 1 };
133 }
134
135 $arg{secret} = AnyEvent::MP::Base::default_secret ()
136 unless exists $arg{secret};
137
138 $self->{hdl} = new AnyEvent::Handle
139 fh => delete $arg{fh},
140 rbuf_max => 64 * 1024,
141 autocork => 1,
142 no_delay => 1,
143 on_error => sub {
144 $self->error ($_[2]);
145 },
146 peername => delete $arg{peername},
147 ;
148
149 my $secret = $arg{secret};
150 my $greeting_kv = $self->{greeting} ||= {};
151 $greeting_kv->{"tls1.0"} ||= $arg{tls}
152 if exists $arg{tls} && $arg{tls_ctx};
153 $greeting_kv->{provider} = "AE-$VERSION";
154
155 # send greeting
156 my $lgreeting = "aemp;$PROTOCOL_VERSION;$PROTOCOL_VERSION" # version, min
157 . ";$AnyEvent::MP::Base::UNIQ"
158 . ";$AnyEvent::MP::Base::NODE"
159 . ";" . (MIME::Base64::encode_base64 AnyEvent::MP::Base::nonce (33), "")
160 . ";hmac_md6_64_256" # hardcoded atm.
161 . ";json" # hardcoded atm.
162 . ";$self->{peerhost};$self->{peerport}"
163 . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv);
164
165 $self->{hdl}->push_write ("$lgreeting\012");
166
167 # expect greeting
168 $self->{hdl}->push_read (line => sub {
169 my $rgreeting = $_[1];
170
171 my ($aemp, $version, $version_min, $uniq, $rnode, undef, $auth, $framing, $peerport, $peerhost, @kv) = split /;/, $rgreeting;
172
173 if ($aemp ne "aemp") {
174 return $self->error ("unparsable greeting");
175 } elsif ($version_min > $PROTOCOL_VERSION) {
176 return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version_min .. $version)");
177 } elsif ($auth ne "hmac_md6_64_256") {
178 return $self->error ("unsupported auth method ($auth)");
179 } elsif ($framing ne "json") {
180 return $self->error ("unsupported framing method ($auth)");
181 }
182
183 $self->{remote_uniq} = $uniq;
184 $self->{remote_node} = $rnode;
185
186 $self->{remote_greeting} = {
187 map /^([^=]+)(?:=(.*))?/ ? ($1 => $2) : (),
188 @kv
189 };
190
191 if (exists $self->{tls} and $self->{tls_ctx} and exists $self->{remote_greeting}{"tls1.0"}) {
192 if ($self->{tls} ne $self->{remote_greeting}{"tls1.0"}) {
193 return $self->error ("TLS server/client mismatch");
194 }
195 $self->{hdl}->starttls ($self->{tls}, $self->{tls_ctx});
196 }
197
198 # auth
199 require Digest::MD6;
200 require Digest::HMAC_MD6;
201
202 my $key = Digest::MD6::md6_hex ($secret);
203 my $lauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$lgreeting\012$rgreeting", 64, 256);
204 my $rauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$rgreeting\012$lgreeting", 64, 256);
205
206 $lauth ne $rauth # echo attack?
207 or return $self->error ("authentication error");
208
209 $self->{hdl}->push_write ("$auth;$lauth;$framing\012");
210
211 $self->{hdl}->rbuf_max (64); # enough for 44 reply bytes or so
212 $self->{hdl}->push_read (line => sub {
213 my ($hdl, $rline) = @_;
214
215 my ($auth_method, $rauth2, $r_framing) = split /;/, $rline;
216
217 if ($rauth2 ne $rauth) {
218 return $self->error ("authentication failure/shared secret mismatch");
219 }
220
221 $self->{s_framing} = "json";#d#
222
223 $hdl->rbuf_max (undef);
224 my $queue = delete $self->{queue}; # we are connected
225
226 $self->connected;
227
228 $hdl->push_write ($self->{s_framing} => $_)
229 for @$queue;
230
231 my $rmsg; $rmsg = sub {
232 $_[0]->push_read ($r_framing => $rmsg);
233
234 AnyEvent::MP::Base::_inject ($_[1]);
235 };
236 $hdl->push_read ($r_framing => $rmsg);
237 });
238 });
239 }
240
241 $self
242 }
243
244 sub error {
245 my ($self, $msg) = @_;
246
247 if ($self->{node} && $self->{node}{transport} == $self) {
248 $self->{node}->clr_transport;
249 }
250 # $self->{on_error}($self, $msg);
251 $self->destroy;
252 }
253
254 sub connected {
255 my ($self) = @_;
256
257 my $node = AnyEvent::MP::Base::add_node ($self->{remote_node});
258 Scalar::Util::weaken ($self->{node} = $node);
259 $node->set_transport ($self);
260 }
261
262 sub send {
263 $_[0]{hdl}->push_write ($_[0]{s_framing} => $_[1]);
264 }
265
266 sub destroy {
267 my ($self) = @_;
268
269 $self->{hdl}->destroy
270 if $self->{hdl};
271 }
272
273 sub DESTROY {
274 my ($self) = @_;
275
276 $self->destroy;
277 }
278
279 =back
280
281 =head1 SEE ALSO
282
283 L<AnyEvent>.
284
285 =head1 AUTHOR
286
287 Marc Lehmann <schmorp@schmorp.de>
288 http://home.schmorp.de/
289
290 =cut
291
292 1
293