ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Transport.pm
Revision: 1.2
Committed: Fri Jul 31 20:55:46 2009 UTC (14 years, 10 months ago) by root
Branch: MAIN
Changes since 1.1: +89 -45 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Transport - actual transport protocol
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Transport;
8    
9     =head1 DESCRIPTION
10    
11     This is the superclass for MP transports, most of which is considered an
12     implementation detail.
13    
14     Future versions might document the actual protocol.
15    
16     =head1 FUNCTIONS/METHODS
17    
18     =over 4
19    
20     =cut
21    
22     package AnyEvent::MP::Transport;
23    
24     use common::sense;
25    
26     use Scalar::Util;
27     use MIME::Base64 ();
28     use Storable ();
29 root 1.2 use JSON::XS ();
30 root 1.1
31     use AE ();
32     use AnyEvent::Socket ();
33     use AnyEvent::Handle ();
34 root 1.2
35     use AnyEvent::MP::Util ();
36 root 1.1
37     use base Exporter::;
38    
39     our $VERSION = '0.0';
40 root 1.2 our $PROTOCOL_VERSION = 0;
41 root 1.1
42     =item $listener = mp_listener $host, $port, <constructor-args>, $cb->($transport)
43    
44     Creates a listener on the given host/port using
45     C<AnyEvent::Socket::tcp_server>.
46    
47     See C<new>, below, for constructor arguments.
48    
49     Defaults for peerhost, peerport, fh and tls are provided.
50    
51     =cut
52    
53     sub mp_server($$@) {
54     my $cb = pop;
55     my ($host, $port, @args) = @_;
56    
57     AnyEvent::Socket::tcp_server $host, $port, sub {
58     my ($fh, $host, $port) = @_;
59    
60     $cb->(new AnyEvent::MP::Transport
61     fh => $fh,
62     peerhost => $host,
63     peerport => $port,
64     tls => "accept",
65     @args,
66     );
67     }
68     }
69    
70 root 1.2 =item $guard = mp_connect $host, $port, <constructor-args>, $cb->($transport)
71    
72     =cut
73    
74     sub mp_connect {
75     my $cb = pop;
76     my ($host, $port, @args) = @_;
77    
78     AnyEvent::Socket::tcp_connect $host, $port, sub {
79     my ($fh, $nhost, $nport) = @_;
80    
81     return $cb->() unless $fh;
82    
83     $cb->(new AnyEvent::MP::Transport
84     fh => $fh,
85     peername => $host,
86     peerhost => $nhost,
87     peerport => $nport,
88     tls => "accept",
89     @args,
90     );
91     }
92     }
93    
94 root 1.1 =item new AnyEvent::MP::Transport
95    
96     # immediately starts negotiation
97     my $transport = new AnyEvent::MP::Transport
98 root 1.2 # mandatory
99 root 1.1 fh => $filehandle,
100 root 1.2 local_id => $identifier,
101 root 1.1 on_recv => sub { receive-callback },
102     on_error => sub { error-callback },
103    
104     # optional
105     secret => "shared secret",
106     on_eof => sub { clean-close-callback },
107     on_connect => sub { successful-connect-callback },
108 root 1.2 greeting => { key => value },
109 root 1.1
110     # tls support
111     tls => "accept|connect",
112     tls_ctx => AnyEvent::TLS,
113     peername => $peername, # for verification
114     ;
115    
116     =cut
117    
118     sub new {
119     my ($class, %arg) = @_;
120    
121     my $self = bless \%arg, $class;
122    
123     $self->{queue} = [];
124    
125     {
126     Scalar::Util::weaken (my $self = $self);
127    
128     if (exists $arg{connect}) {
129     $arg{tls} ||= "connect";
130 root 1.2 $arg{tls_ctx} ||= { sslv2 => 0, sslv3 => 0, tlsv1 => 1, verify => 1 };
131 root 1.1 }
132    
133 root 1.2 $arg{secret} = AnyEvent::MP::default_secret ()
134     unless exists $arg{secret};
135    
136 root 1.1 $self->{hdl} = new AnyEvent::Handle
137 root 1.2 fh => delete $arg{fh},
138     rbuf_max => 64 * 1024,
139 root 1.1 on_error => sub {
140     $self->error ($_[2]);
141     },
142     peername => delete $arg{peername},
143     ;
144    
145 root 1.2 my $secret = $arg{secret};
146     my $greeting_kv = $self->{greeting} ||= {};
147     $greeting_kv->{"tls1.0"} ||= $arg{tls}
148     if exists $arg{tls} && $arg{tls_ctx};
149     $greeting_kv->{provider} = "AE-$VERSION";
150 root 1.1
151     # send greeting
152 root 1.2 my $lgreeting = "aemp;$PROTOCOL_VERSION;$PROTOCOL_VERSION" # version, min
153     . ";$AnyEvent::MP::UNIQ"
154     . ";$AnyEvent::MP::NODE"
155     . ";" . (MIME::Base64::encode_base64 AnyEvent::MP::Util::nonce 33, "")
156     . ";hmac_md6_64_256" # hardcoded atm.
157     . ";json" # hardcoded atm.
158     . ";$self->{peerhost};$self->{peerport}"
159     . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv);
160 root 1.1
161     $self->{hdl}->push_write ("$lgreeting\012");
162    
163     # expect greeting
164     $self->{hdl}->push_read (line => sub {
165     my $rgreeting = $_[1];
166    
167 root 1.2 my ($aemp, $version, $version_min, $uniq, $rnode, undef, $auth, $framing, $peerport, $peerhost, @kv) = split /;/, $rgreeting;
168 root 1.1
169     if ($aemp ne "aemp") {
170     return $self->error ("unparsable greeting");
171 root 1.2 } elsif ($version_min > $PROTOCOL_VERSION) {
172     return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version_min .. $version)");
173 root 1.1 } elsif ($auth ne "hmac_md6_64_256") {
174     return $self->error ("unsupported auth method ($auth)");
175 root 1.2 } elsif ($framing ne "json") {
176     return $self->error ("unsupported framing method ($auth)");
177 root 1.1 }
178    
179 root 1.2 $self->{remote_uniq} = $uniq;
180     $self->{remote_node} = $rnode;
181 root 1.1
182 root 1.2 $self->{remote_greeting} = {
183     map /^([^=]+)(?:=(.*))?/ ? ($1 => $2) : (),
184     @kv
185 root 1.1 };
186    
187 root 1.2 if (exists $self->{tls} and $self->{tls_ctx} and exists $self->{remote_greeting}{"tls1.0"}) {
188     if ($self->{tls} ne $self->{remote_greeting}{"tls1.0"}) {
189 root 1.1 return $self->error ("TLS server/client mismatch");
190     }
191     $self->{hdl}->starttls ($self->{tls}, $self->{tls_ctx});
192     }
193    
194     # auth
195     require Digest::MD6;
196     require Digest::HMAC_MD6;
197    
198     my $key = Digest::MD6::md6_hex ($secret);
199     my $lauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$lgreeting$rgreeting", 64, 256);
200     my $rauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$rgreeting$lgreeting", 64, 256);
201    
202 root 1.2 $lauth ne $rauth # echo attack?
203     or return $self->error ("authentication error");
204    
205     $self->{hdl}->push_write ("$auth;$lauth;$framing\012");
206    
207     $self->{hdl}->rbuf_max (64); # enough for 44 reply bytes or so
208 root 1.1 $self->{hdl}->push_read (line => sub {
209 root 1.2 my ($hdl, $rline) = @_;
210    
211     my ($auth_method, $rauth2, $r_framing) = split /;/, $rline;
212 root 1.1
213     if ($rauth2 ne $rauth) {
214     return $self->error ("authentication failure/shared secret mismatch");
215     }
216    
217 root 1.2 $self->{s_framing} = "json";#d#
218    
219     $hdl->rbuf_max (undef);
220 root 1.1 my $queue = delete $self->{queue}; # we are connected
221    
222 root 1.2 $self->connected;
223 root 1.1
224 root 1.2 $hdl->push_write ($self->{s_framing} => $_)
225 root 1.1 for @$queue;
226    
227     my $rmsg; $rmsg = sub {
228 root 1.2 $_[0]->push_read ($r_framing => $rmsg);
229 root 1.1
230 root 1.2 AnyEvent::MP::_inject ($_[1]);
231 root 1.1 };
232 root 1.2 $hdl->push_read ($r_framing => $rmsg);
233 root 1.1 });
234     });
235     }
236    
237     $self
238     }
239    
240     sub error {
241     my ($self, $msg) = @_;
242    
243     $self->{on_error}($self, $msg);
244     $self->{hdl}->destroy;
245     }
246    
247 root 1.2 sub connected {
248     my ($self) = @_;
249    
250     (AnyEvent::MP::add_node ($self->{remote_node}))
251     ->set_transport ($self);
252     }
253    
254 root 1.1 sub send {
255 root 1.2 $_[0]{hdl}->push_write ($_[0]{s_framing} => $_[1]);
256 root 1.1 }
257    
258     sub destroy {
259     my ($self) = @_;
260    
261 root 1.2 $self->{hdl}->destroy
262     if $self->{hdl};
263 root 1.1 }
264    
265     sub DESTROY {
266     my ($self) = @_;
267    
268     $self->destroy;
269     }
270    
271     =back
272    
273     =head1 SEE ALSO
274    
275     L<AnyEvent>.
276    
277     =head1 AUTHOR
278    
279     Marc Lehmann <schmorp@schmorp.de>
280     http://home.schmorp.de/
281    
282     =cut
283    
284     1
285