ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/cvsroot/AnyEvent-MP/MP/Transport.pm
Revision: 1.6
Committed: Sun Aug 2 14:52:41 2009 UTC (14 years, 11 months ago) by root
Branch: MAIN
CVS Tags: rel-0_02
Changes since 1.5: +0 -2 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::MP::Transport - actual transport protocol
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::MP::Transport;
8    
9     =head1 DESCRIPTION
10    
11     This is the superclass for MP transports, most of which is considered an
12     implementation detail.
13    
14     Future versions might document the actual protocol.
15    
16     =head1 FUNCTIONS/METHODS
17    
18     =over 4
19    
20     =cut
21    
22     package AnyEvent::MP::Transport;
23    
24     use common::sense;
25    
26     use Scalar::Util;
27     use MIME::Base64 ();
28     use Storable ();
29 root 1.2 use JSON::XS ();
30 root 1.1
31     use AE ();
32     use AnyEvent::Socket ();
33     use AnyEvent::Handle ();
34 root 1.2
35 root 1.1 use base Exporter::;
36    
37     our $VERSION = '0.0';
38 root 1.2 our $PROTOCOL_VERSION = 0;
39 root 1.1
40     =item $listener = mp_listener $host, $port, <constructor-args>, $cb->($transport)
41    
42     Creates a listener on the given host/port using
43     C<AnyEvent::Socket::tcp_server>.
44    
45     See C<new>, below, for constructor arguments.
46    
47     Defaults for peerhost, peerport, fh and tls are provided.
48    
49     =cut
50    
51     sub mp_server($$@) {
52     my $cb = pop;
53     my ($host, $port, @args) = @_;
54    
55     AnyEvent::Socket::tcp_server $host, $port, sub {
56     my ($fh, $host, $port) = @_;
57    
58     $cb->(new AnyEvent::MP::Transport
59     fh => $fh,
60     peerhost => $host,
61     peerport => $port,
62     tls => "accept",
63     @args,
64     );
65     }
66     }
67    
68 root 1.2 =item $guard = mp_connect $host, $port, <constructor-args>, $cb->($transport)
69    
70     =cut
71    
72     sub mp_connect {
73     my $cb = pop;
74     my ($host, $port, @args) = @_;
75    
76     AnyEvent::Socket::tcp_connect $host, $port, sub {
77     my ($fh, $nhost, $nport) = @_;
78    
79     return $cb->() unless $fh;
80    
81     $cb->(new AnyEvent::MP::Transport
82     fh => $fh,
83     peername => $host,
84     peerhost => $nhost,
85     peerport => $nport,
86     tls => "accept",
87     @args,
88     );
89     }
90     }
91    
92 root 1.1 =item new AnyEvent::MP::Transport
93    
94     # immediately starts negotiation
95     my $transport = new AnyEvent::MP::Transport
96 root 1.2 # mandatory
97 root 1.1 fh => $filehandle,
98 root 1.2 local_id => $identifier,
99 root 1.1 on_recv => sub { receive-callback },
100     on_error => sub { error-callback },
101    
102     # optional
103     secret => "shared secret",
104     on_eof => sub { clean-close-callback },
105     on_connect => sub { successful-connect-callback },
106 root 1.2 greeting => { key => value },
107 root 1.1
108     # tls support
109     tls => "accept|connect",
110     tls_ctx => AnyEvent::TLS,
111     peername => $peername, # for verification
112     ;
113    
114     =cut
115    
116 root 1.4 our @FRAMING_WANT = qw(json storable);#d##TODO#
117    
118 root 1.1 sub new {
119     my ($class, %arg) = @_;
120    
121     my $self = bless \%arg, $class;
122    
123     $self->{queue} = [];
124    
125     {
126     Scalar::Util::weaken (my $self = $self);
127    
128     if (exists $arg{connect}) {
129     $arg{tls} ||= "connect";
130 root 1.2 $arg{tls_ctx} ||= { sslv2 => 0, sslv3 => 0, tlsv1 => 1, verify => 1 };
131 root 1.1 }
132    
133 root 1.5 $arg{secret} = AnyEvent::MP::Base::default_secret ()
134 root 1.2 unless exists $arg{secret};
135    
136 root 1.1 $self->{hdl} = new AnyEvent::Handle
137 root 1.2 fh => delete $arg{fh},
138     rbuf_max => 64 * 1024,
139 root 1.4 autocork => 1,
140     no_delay => 1,
141 root 1.1 on_error => sub {
142     $self->error ($_[2]);
143     },
144     peername => delete $arg{peername},
145     ;
146    
147 root 1.2 my $secret = $arg{secret};
148     my $greeting_kv = $self->{greeting} ||= {};
149     $greeting_kv->{"tls1.0"} ||= $arg{tls}
150     if exists $arg{tls} && $arg{tls_ctx};
151     $greeting_kv->{provider} = "AE-$VERSION";
152 root 1.1
153     # send greeting
154 root 1.2 my $lgreeting = "aemp;$PROTOCOL_VERSION;$PROTOCOL_VERSION" # version, min
155 root 1.5 . ";$AnyEvent::MP::Base::UNIQ"
156     . ";$AnyEvent::MP::Base::NODE"
157     . ";" . (MIME::Base64::encode_base64 AnyEvent::MP::Base::nonce (33), "")
158 root 1.2 . ";hmac_md6_64_256" # hardcoded atm.
159     . ";json" # hardcoded atm.
160     . ";$self->{peerhost};$self->{peerport}"
161     . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv);
162 root 1.1
163     $self->{hdl}->push_write ("$lgreeting\012");
164    
165     # expect greeting
166     $self->{hdl}->push_read (line => sub {
167     my $rgreeting = $_[1];
168    
169 root 1.2 my ($aemp, $version, $version_min, $uniq, $rnode, undef, $auth, $framing, $peerport, $peerhost, @kv) = split /;/, $rgreeting;
170 root 1.1
171     if ($aemp ne "aemp") {
172     return $self->error ("unparsable greeting");
173 root 1.2 } elsif ($version_min > $PROTOCOL_VERSION) {
174     return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version_min .. $version)");
175 root 1.1 } elsif ($auth ne "hmac_md6_64_256") {
176     return $self->error ("unsupported auth method ($auth)");
177 root 1.2 } elsif ($framing ne "json") {
178     return $self->error ("unsupported framing method ($auth)");
179 root 1.1 }
180    
181 root 1.2 $self->{remote_uniq} = $uniq;
182     $self->{remote_node} = $rnode;
183 root 1.1
184 root 1.2 $self->{remote_greeting} = {
185     map /^([^=]+)(?:=(.*))?/ ? ($1 => $2) : (),
186     @kv
187 root 1.1 };
188    
189 root 1.2 if (exists $self->{tls} and $self->{tls_ctx} and exists $self->{remote_greeting}{"tls1.0"}) {
190     if ($self->{tls} ne $self->{remote_greeting}{"tls1.0"}) {
191 root 1.1 return $self->error ("TLS server/client mismatch");
192     }
193     $self->{hdl}->starttls ($self->{tls}, $self->{tls_ctx});
194     }
195    
196     # auth
197     require Digest::MD6;
198     require Digest::HMAC_MD6;
199    
200     my $key = Digest::MD6::md6_hex ($secret);
201 root 1.3 my $lauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$lgreeting\012$rgreeting", 64, 256);
202     my $rauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$rgreeting\012$lgreeting", 64, 256);
203 root 1.1
204 root 1.2 $lauth ne $rauth # echo attack?
205     or return $self->error ("authentication error");
206    
207     $self->{hdl}->push_write ("$auth;$lauth;$framing\012");
208    
209     $self->{hdl}->rbuf_max (64); # enough for 44 reply bytes or so
210 root 1.1 $self->{hdl}->push_read (line => sub {
211 root 1.2 my ($hdl, $rline) = @_;
212    
213     my ($auth_method, $rauth2, $r_framing) = split /;/, $rline;
214 root 1.1
215     if ($rauth2 ne $rauth) {
216     return $self->error ("authentication failure/shared secret mismatch");
217     }
218    
219 root 1.2 $self->{s_framing} = "json";#d#
220    
221     $hdl->rbuf_max (undef);
222 root 1.1 my $queue = delete $self->{queue}; # we are connected
223    
224 root 1.2 $self->connected;
225 root 1.1
226 root 1.2 $hdl->push_write ($self->{s_framing} => $_)
227 root 1.1 for @$queue;
228    
229     my $rmsg; $rmsg = sub {
230 root 1.2 $_[0]->push_read ($r_framing => $rmsg);
231 root 1.1
232 root 1.5 AnyEvent::MP::Base::_inject ($_[1]);
233 root 1.1 };
234 root 1.2 $hdl->push_read ($r_framing => $rmsg);
235 root 1.1 });
236     });
237     }
238    
239     $self
240     }
241    
242     sub error {
243     my ($self, $msg) = @_;
244    
245 root 1.4 if ($self->{node} && $self->{node}{transport} == $self) {
246     $self->{node}->clr_transport;
247     }
248     # $self->{on_error}($self, $msg);
249     $self->destroy;
250 root 1.1 }
251    
252 root 1.2 sub connected {
253     my ($self) = @_;
254    
255 root 1.5 my $node = AnyEvent::MP::Base::add_node ($self->{remote_node});
256 root 1.4 Scalar::Util::weaken ($self->{node} = $node);
257     $node->set_transport ($self);
258 root 1.2 }
259    
260 root 1.1 sub send {
261 root 1.2 $_[0]{hdl}->push_write ($_[0]{s_framing} => $_[1]);
262 root 1.1 }
263    
264     sub destroy {
265     my ($self) = @_;
266    
267 root 1.2 $self->{hdl}->destroy
268     if $self->{hdl};
269 root 1.1 }
270    
271     sub DESTROY {
272     my ($self) = @_;
273    
274     $self->destroy;
275     }
276    
277     =back
278    
279     =head1 SEE ALSO
280    
281     L<AnyEvent>.
282    
283     =head1 AUTHOR
284    
285     Marc Lehmann <schmorp@schmorp.de>
286     http://home.schmorp.de/
287    
288     =cut
289    
290     1
291