ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Transport.pm
Revision: 1.6
Committed: Sun Aug 2 14:52:41 2009 UTC (14 years, 10 months ago) by root
Branch: MAIN
CVS Tags: rel-0_02
Changes since 1.5: +0 -2 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::MP::Transport - actual transport protocol
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::MP::Transport;
8
9 =head1 DESCRIPTION
10
11 This is the superclass for MP transports, most of which is considered an
12 implementation detail.
13
14 Future versions might document the actual protocol.
15
16 =head1 FUNCTIONS/METHODS
17
18 =over 4
19
20 =cut
21
22 package AnyEvent::MP::Transport;
23
24 use common::sense;
25
26 use Scalar::Util;
27 use MIME::Base64 ();
28 use Storable ();
29 use JSON::XS ();
30
31 use AE ();
32 use AnyEvent::Socket ();
33 use AnyEvent::Handle ();
34
35 use base Exporter::;
36
37 our $VERSION = '0.0';
38 our $PROTOCOL_VERSION = 0;
39
40 =item $listener = mp_listener $host, $port, <constructor-args>, $cb->($transport)
41
42 Creates a listener on the given host/port using
43 C<AnyEvent::Socket::tcp_server>.
44
45 See C<new>, below, for constructor arguments.
46
47 Defaults for peerhost, peerport, fh and tls are provided.
48
49 =cut
50
51 sub mp_server($$@) {
52 my $cb = pop;
53 my ($host, $port, @args) = @_;
54
55 AnyEvent::Socket::tcp_server $host, $port, sub {
56 my ($fh, $host, $port) = @_;
57
58 $cb->(new AnyEvent::MP::Transport
59 fh => $fh,
60 peerhost => $host,
61 peerport => $port,
62 tls => "accept",
63 @args,
64 );
65 }
66 }
67
68 =item $guard = mp_connect $host, $port, <constructor-args>, $cb->($transport)
69
70 =cut
71
72 sub mp_connect {
73 my $cb = pop;
74 my ($host, $port, @args) = @_;
75
76 AnyEvent::Socket::tcp_connect $host, $port, sub {
77 my ($fh, $nhost, $nport) = @_;
78
79 return $cb->() unless $fh;
80
81 $cb->(new AnyEvent::MP::Transport
82 fh => $fh,
83 peername => $host,
84 peerhost => $nhost,
85 peerport => $nport,
86 tls => "accept",
87 @args,
88 );
89 }
90 }
91
92 =item new AnyEvent::MP::Transport
93
94 # immediately starts negotiation
95 my $transport = new AnyEvent::MP::Transport
96 # mandatory
97 fh => $filehandle,
98 local_id => $identifier,
99 on_recv => sub { receive-callback },
100 on_error => sub { error-callback },
101
102 # optional
103 secret => "shared secret",
104 on_eof => sub { clean-close-callback },
105 on_connect => sub { successful-connect-callback },
106 greeting => { key => value },
107
108 # tls support
109 tls => "accept|connect",
110 tls_ctx => AnyEvent::TLS,
111 peername => $peername, # for verification
112 ;
113
114 =cut
115
116 our @FRAMING_WANT = qw(json storable);#d##TODO#
117
118 sub new {
119 my ($class, %arg) = @_;
120
121 my $self = bless \%arg, $class;
122
123 $self->{queue} = [];
124
125 {
126 Scalar::Util::weaken (my $self = $self);
127
128 if (exists $arg{connect}) {
129 $arg{tls} ||= "connect";
130 $arg{tls_ctx} ||= { sslv2 => 0, sslv3 => 0, tlsv1 => 1, verify => 1 };
131 }
132
133 $arg{secret} = AnyEvent::MP::Base::default_secret ()
134 unless exists $arg{secret};
135
136 $self->{hdl} = new AnyEvent::Handle
137 fh => delete $arg{fh},
138 rbuf_max => 64 * 1024,
139 autocork => 1,
140 no_delay => 1,
141 on_error => sub {
142 $self->error ($_[2]);
143 },
144 peername => delete $arg{peername},
145 ;
146
147 my $secret = $arg{secret};
148 my $greeting_kv = $self->{greeting} ||= {};
149 $greeting_kv->{"tls1.0"} ||= $arg{tls}
150 if exists $arg{tls} && $arg{tls_ctx};
151 $greeting_kv->{provider} = "AE-$VERSION";
152
153 # send greeting
154 my $lgreeting = "aemp;$PROTOCOL_VERSION;$PROTOCOL_VERSION" # version, min
155 . ";$AnyEvent::MP::Base::UNIQ"
156 . ";$AnyEvent::MP::Base::NODE"
157 . ";" . (MIME::Base64::encode_base64 AnyEvent::MP::Base::nonce (33), "")
158 . ";hmac_md6_64_256" # hardcoded atm.
159 . ";json" # hardcoded atm.
160 . ";$self->{peerhost};$self->{peerport}"
161 . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv);
162
163 $self->{hdl}->push_write ("$lgreeting\012");
164
165 # expect greeting
166 $self->{hdl}->push_read (line => sub {
167 my $rgreeting = $_[1];
168
169 my ($aemp, $version, $version_min, $uniq, $rnode, undef, $auth, $framing, $peerport, $peerhost, @kv) = split /;/, $rgreeting;
170
171 if ($aemp ne "aemp") {
172 return $self->error ("unparsable greeting");
173 } elsif ($version_min > $PROTOCOL_VERSION) {
174 return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version_min .. $version)");
175 } elsif ($auth ne "hmac_md6_64_256") {
176 return $self->error ("unsupported auth method ($auth)");
177 } elsif ($framing ne "json") {
178 return $self->error ("unsupported framing method ($auth)");
179 }
180
181 $self->{remote_uniq} = $uniq;
182 $self->{remote_node} = $rnode;
183
184 $self->{remote_greeting} = {
185 map /^([^=]+)(?:=(.*))?/ ? ($1 => $2) : (),
186 @kv
187 };
188
189 if (exists $self->{tls} and $self->{tls_ctx} and exists $self->{remote_greeting}{"tls1.0"}) {
190 if ($self->{tls} ne $self->{remote_greeting}{"tls1.0"}) {
191 return $self->error ("TLS server/client mismatch");
192 }
193 $self->{hdl}->starttls ($self->{tls}, $self->{tls_ctx});
194 }
195
196 # auth
197 require Digest::MD6;
198 require Digest::HMAC_MD6;
199
200 my $key = Digest::MD6::md6_hex ($secret);
201 my $lauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$lgreeting\012$rgreeting", 64, 256);
202 my $rauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$rgreeting\012$lgreeting", 64, 256);
203
204 $lauth ne $rauth # echo attack?
205 or return $self->error ("authentication error");
206
207 $self->{hdl}->push_write ("$auth;$lauth;$framing\012");
208
209 $self->{hdl}->rbuf_max (64); # enough for 44 reply bytes or so
210 $self->{hdl}->push_read (line => sub {
211 my ($hdl, $rline) = @_;
212
213 my ($auth_method, $rauth2, $r_framing) = split /;/, $rline;
214
215 if ($rauth2 ne $rauth) {
216 return $self->error ("authentication failure/shared secret mismatch");
217 }
218
219 $self->{s_framing} = "json";#d#
220
221 $hdl->rbuf_max (undef);
222 my $queue = delete $self->{queue}; # we are connected
223
224 $self->connected;
225
226 $hdl->push_write ($self->{s_framing} => $_)
227 for @$queue;
228
229 my $rmsg; $rmsg = sub {
230 $_[0]->push_read ($r_framing => $rmsg);
231
232 AnyEvent::MP::Base::_inject ($_[1]);
233 };
234 $hdl->push_read ($r_framing => $rmsg);
235 });
236 });
237 }
238
239 $self
240 }
241
242 sub error {
243 my ($self, $msg) = @_;
244
245 if ($self->{node} && $self->{node}{transport} == $self) {
246 $self->{node}->clr_transport;
247 }
248 # $self->{on_error}($self, $msg);
249 $self->destroy;
250 }
251
252 sub connected {
253 my ($self) = @_;
254
255 my $node = AnyEvent::MP::Base::add_node ($self->{remote_node});
256 Scalar::Util::weaken ($self->{node} = $node);
257 $node->set_transport ($self);
258 }
259
260 sub send {
261 $_[0]{hdl}->push_write ($_[0]{s_framing} => $_[1]);
262 }
263
264 sub destroy {
265 my ($self) = @_;
266
267 $self->{hdl}->destroy
268 if $self->{hdl};
269 }
270
271 sub DESTROY {
272 my ($self) = @_;
273
274 $self->destroy;
275 }
276
277 =back
278
279 =head1 SEE ALSO
280
281 L<AnyEvent>.
282
283 =head1 AUTHOR
284
285 Marc Lehmann <schmorp@schmorp.de>
286 http://home.schmorp.de/
287
288 =cut
289
290 1
291