ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Transport.pm
Revision: 1.4
Committed: Sat Aug 1 15:04:31 2009 UTC (14 years, 10 months ago) by root
Branch: MAIN
Changes since 1.3: +12 -4 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Transport - actual transport protocol
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Transport;
8    
9     =head1 DESCRIPTION
10    
11     This is the superclass for MP transports, most of which is considered an
12     implementation detail.
13    
14     Future versions might document the actual protocol.
15    
16     =head1 FUNCTIONS/METHODS
17    
18     =over 4
19    
20     =cut
21    
22     package AnyEvent::MP::Transport;
23    
24     use common::sense;
25    
26     use Scalar::Util;
27     use MIME::Base64 ();
28     use Storable ();
29 root 1.2 use JSON::XS ();
30 root 1.1
31     use AE ();
32     use AnyEvent::Socket ();
33     use AnyEvent::Handle ();
34 root 1.2
35     use AnyEvent::MP::Util ();
36 root 1.1
37     use base Exporter::;
38    
39     our $VERSION = '0.0';
40 root 1.2 our $PROTOCOL_VERSION = 0;
41 root 1.1
42     =item $listener = mp_listener $host, $port, <constructor-args>, $cb->($transport)
43    
44     Creates a listener on the given host/port using
45     C<AnyEvent::Socket::tcp_server>.
46    
47     See C<new>, below, for constructor arguments.
48    
49     Defaults for peerhost, peerport, fh and tls are provided.
50    
51     =cut
52    
53     sub mp_server($$@) {
54     my $cb = pop;
55     my ($host, $port, @args) = @_;
56    
57     AnyEvent::Socket::tcp_server $host, $port, sub {
58     my ($fh, $host, $port) = @_;
59    
60     $cb->(new AnyEvent::MP::Transport
61     fh => $fh,
62     peerhost => $host,
63     peerport => $port,
64     tls => "accept",
65     @args,
66     );
67     }
68     }
69    
70 root 1.2 =item $guard = mp_connect $host, $port, <constructor-args>, $cb->($transport)
71    
72     =cut
73    
74     sub mp_connect {
75     my $cb = pop;
76     my ($host, $port, @args) = @_;
77    
78     AnyEvent::Socket::tcp_connect $host, $port, sub {
79     my ($fh, $nhost, $nport) = @_;
80    
81     return $cb->() unless $fh;
82    
83     $cb->(new AnyEvent::MP::Transport
84     fh => $fh,
85     peername => $host,
86     peerhost => $nhost,
87     peerport => $nport,
88     tls => "accept",
89     @args,
90     );
91     }
92     }
93    
94 root 1.1 =item new AnyEvent::MP::Transport
95    
96     # immediately starts negotiation
97     my $transport = new AnyEvent::MP::Transport
98 root 1.2 # mandatory
99 root 1.1 fh => $filehandle,
100 root 1.2 local_id => $identifier,
101 root 1.1 on_recv => sub { receive-callback },
102     on_error => sub { error-callback },
103    
104     # optional
105     secret => "shared secret",
106     on_eof => sub { clean-close-callback },
107     on_connect => sub { successful-connect-callback },
108 root 1.2 greeting => { key => value },
109 root 1.1
110     # tls support
111     tls => "accept|connect",
112     tls_ctx => AnyEvent::TLS,
113     peername => $peername, # for verification
114     ;
115    
116     =cut
117    
118 root 1.4 our @FRAMING_WANT = qw(json storable);#d##TODO#
119    
120 root 1.1 sub new {
121     my ($class, %arg) = @_;
122    
123     my $self = bless \%arg, $class;
124    
125     $self->{queue} = [];
126    
127     {
128     Scalar::Util::weaken (my $self = $self);
129    
130     if (exists $arg{connect}) {
131     $arg{tls} ||= "connect";
132 root 1.2 $arg{tls_ctx} ||= { sslv2 => 0, sslv3 => 0, tlsv1 => 1, verify => 1 };
133 root 1.1 }
134    
135 root 1.2 $arg{secret} = AnyEvent::MP::default_secret ()
136     unless exists $arg{secret};
137    
138 root 1.1 $self->{hdl} = new AnyEvent::Handle
139 root 1.2 fh => delete $arg{fh},
140     rbuf_max => 64 * 1024,
141 root 1.4 autocork => 1,
142     no_delay => 1,
143 root 1.1 on_error => sub {
144     $self->error ($_[2]);
145     },
146     peername => delete $arg{peername},
147     ;
148    
149 root 1.2 my $secret = $arg{secret};
150     my $greeting_kv = $self->{greeting} ||= {};
151     $greeting_kv->{"tls1.0"} ||= $arg{tls}
152     if exists $arg{tls} && $arg{tls_ctx};
153     $greeting_kv->{provider} = "AE-$VERSION";
154 root 1.1
155     # send greeting
156 root 1.2 my $lgreeting = "aemp;$PROTOCOL_VERSION;$PROTOCOL_VERSION" # version, min
157     . ";$AnyEvent::MP::UNIQ"
158     . ";$AnyEvent::MP::NODE"
159     . ";" . (MIME::Base64::encode_base64 AnyEvent::MP::Util::nonce 33, "")
160     . ";hmac_md6_64_256" # hardcoded atm.
161     . ";json" # hardcoded atm.
162     . ";$self->{peerhost};$self->{peerport}"
163     . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv);
164 root 1.1
165     $self->{hdl}->push_write ("$lgreeting\012");
166    
167     # expect greeting
168     $self->{hdl}->push_read (line => sub {
169     my $rgreeting = $_[1];
170    
171 root 1.2 my ($aemp, $version, $version_min, $uniq, $rnode, undef, $auth, $framing, $peerport, $peerhost, @kv) = split /;/, $rgreeting;
172 root 1.1
173     if ($aemp ne "aemp") {
174     return $self->error ("unparsable greeting");
175 root 1.2 } elsif ($version_min > $PROTOCOL_VERSION) {
176     return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version_min .. $version)");
177 root 1.1 } elsif ($auth ne "hmac_md6_64_256") {
178     return $self->error ("unsupported auth method ($auth)");
179 root 1.2 } elsif ($framing ne "json") {
180     return $self->error ("unsupported framing method ($auth)");
181 root 1.1 }
182    
183 root 1.2 $self->{remote_uniq} = $uniq;
184     $self->{remote_node} = $rnode;
185 root 1.1
186 root 1.2 $self->{remote_greeting} = {
187     map /^([^=]+)(?:=(.*))?/ ? ($1 => $2) : (),
188     @kv
189 root 1.1 };
190    
191 root 1.2 if (exists $self->{tls} and $self->{tls_ctx} and exists $self->{remote_greeting}{"tls1.0"}) {
192     if ($self->{tls} ne $self->{remote_greeting}{"tls1.0"}) {
193 root 1.1 return $self->error ("TLS server/client mismatch");
194     }
195     $self->{hdl}->starttls ($self->{tls}, $self->{tls_ctx});
196     }
197    
198     # auth
199     require Digest::MD6;
200     require Digest::HMAC_MD6;
201    
202     my $key = Digest::MD6::md6_hex ($secret);
203 root 1.3 my $lauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$lgreeting\012$rgreeting", 64, 256);
204     my $rauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$rgreeting\012$lgreeting", 64, 256);
205 root 1.1
206 root 1.2 $lauth ne $rauth # echo attack?
207     or return $self->error ("authentication error");
208    
209     $self->{hdl}->push_write ("$auth;$lauth;$framing\012");
210    
211     $self->{hdl}->rbuf_max (64); # enough for 44 reply bytes or so
212 root 1.1 $self->{hdl}->push_read (line => sub {
213 root 1.2 my ($hdl, $rline) = @_;
214    
215     my ($auth_method, $rauth2, $r_framing) = split /;/, $rline;
216 root 1.1
217     if ($rauth2 ne $rauth) {
218     return $self->error ("authentication failure/shared secret mismatch");
219     }
220    
221 root 1.2 $self->{s_framing} = "json";#d#
222    
223     $hdl->rbuf_max (undef);
224 root 1.1 my $queue = delete $self->{queue}; # we are connected
225    
226 root 1.2 $self->connected;
227 root 1.1
228 root 1.2 $hdl->push_write ($self->{s_framing} => $_)
229 root 1.1 for @$queue;
230    
231     my $rmsg; $rmsg = sub {
232 root 1.2 $_[0]->push_read ($r_framing => $rmsg);
233 root 1.1
234 root 1.2 AnyEvent::MP::_inject ($_[1]);
235 root 1.1 };
236 root 1.2 $hdl->push_read ($r_framing => $rmsg);
237 root 1.1 });
238     });
239     }
240    
241     $self
242     }
243    
244     sub error {
245     my ($self, $msg) = @_;
246    
247 root 1.4 if ($self->{node} && $self->{node}{transport} == $self) {
248     $self->{node}->clr_transport;
249     }
250     # $self->{on_error}($self, $msg);
251     $self->destroy;
252 root 1.1 }
253    
254 root 1.2 sub connected {
255     my ($self) = @_;
256    
257 root 1.4 my $node = AnyEvent::MP::add_node ($self->{remote_node});
258     Scalar::Util::weaken ($self->{node} = $node);
259     $node->set_transport ($self);
260 root 1.2 }
261    
262 root 1.1 sub send {
263 root 1.2 $_[0]{hdl}->push_write ($_[0]{s_framing} => $_[1]);
264 root 1.1 }
265    
266     sub destroy {
267     my ($self) = @_;
268    
269 root 1.2 $self->{hdl}->destroy
270     if $self->{hdl};
271 root 1.1 }
272    
273     sub DESTROY {
274     my ($self) = @_;
275    
276     $self->destroy;
277     }
278    
279     =back
280    
281     =head1 SEE ALSO
282    
283     L<AnyEvent>.
284    
285     =head1 AUTHOR
286    
287     Marc Lehmann <schmorp@schmorp.de>
288     http://home.schmorp.de/
289    
290     =cut
291    
292     1
293