--- AnyEvent-MP/MP/Transport.pm 2009/07/30 08:38:50 1.1 +++ AnyEvent-MP/MP/Transport.pm 2009/08/02 14:44:37 1.5 @@ -26,17 +26,18 @@ use Scalar::Util; use MIME::Base64 (); use Storable (); +use JSON::XS (); use AE (); use AnyEvent::Socket (); use AnyEvent::Handle (); -use AnyEvent::MP (); + +use AnyEvent::MP::Util (); use base Exporter::; our $VERSION = '0.0'; -our $PROTOCOL_VERSION_MAJOR = 0; -our $PROTOCOL_VERSION_MINOR = 0; +our $PROTOCOL_VERSION = 0; =item $listener = mp_listener $host, $port, , $cb->($transport) @@ -66,23 +67,45 @@ } } +=item $guard = mp_connect $host, $port, , $cb->($transport) + +=cut + +sub mp_connect { + my $cb = pop; + my ($host, $port, @args) = @_; + + AnyEvent::Socket::tcp_connect $host, $port, sub { + my ($fh, $nhost, $nport) = @_; + + return $cb->() unless $fh; + + $cb->(new AnyEvent::MP::Transport + fh => $fh, + peername => $host, + peerhost => $nhost, + peerport => $nport, + tls => "accept", + @args, + ); + } +} + =item new AnyEvent::MP::Transport # immediately starts negotiation my $transport = new AnyEvent::MP::Transport - # fh OR connect is mandatory - fh => $filehandle, - connect => [$host, $port], - # mandatory + fh => $filehandle, + local_id => $identifier, on_recv => sub { receive-callback }, on_error => sub { error-callback }, # optional - local_id => $identifier, secret => "shared secret", on_eof => sub { clean-close-callback }, on_connect => sub { successful-connect-callback }, + greeting => { key => value }, # tls support tls => "accept|connect", @@ -92,6 +115,8 @@ =cut +our @FRAMING_WANT = qw(json storable);#d##TODO# + sub new { my ($class, %arg) = @_; @@ -104,26 +129,38 @@ if (exists $arg{connect}) { $arg{tls} ||= "connect"; - $arg{tls_ctx} ||= { sslv2 => 0, sslv3 => 0, tlsv1 => 1, verify => 1, verify_peername => "https" }; + $arg{tls_ctx} ||= { sslv2 => 0, sslv3 => 0, tlsv1 => 1, verify => 1 }; } + $arg{secret} = AnyEvent::MP::Base::default_secret () + unless exists $arg{secret}; + $self->{hdl} = new AnyEvent::Handle - (exists $arg{fh} ? (fh => delete $arg{fh}) : (connect => delete $arg{connect})), + fh => delete $arg{fh}, + rbuf_max => 64 * 1024, + autocork => 1, + no_delay => 1, on_error => sub { $self->error ($_[2]); }, peername => delete $arg{peername}, ; - my $secret = delete $arg{secret} ? delete $arg{secret} : AnyEvent::MP::default_secret; + my $secret = $arg{secret}; + my $greeting_kv = $self->{greeting} ||= {}; + $greeting_kv->{"tls1.0"} ||= $arg{tls} + if exists $arg{tls} && $arg{tls_ctx}; + $greeting_kv->{provider} = "AE-$VERSION"; # send greeting - my $lgreeting = "aemp;$PROTOCOL_VERSION_MAJOR;$PROTOCOL_VERSION_MINOR;AnyEvent::MP;$VERSION;" - . (MIME::Base64::encode_base64 AnyEvent::MP::nonce 33, "") . ";" - . "hmac_md6_64_256;" # hardcoded atm. - . "storable;" # hardcoded atm. - . "$self->{local_id};" - . (exists $arg{tls} && $arg{tls_ctx} ? "tls1.0=$arg{tls};" : ""); + my $lgreeting = "aemp;$PROTOCOL_VERSION;$PROTOCOL_VERSION" # version, min + . ";$AnyEvent::MP::Base::UNIQ" + . ";$AnyEvent::MP::Base::NODE" + . ";" . (MIME::Base64::encode_base64 AnyEvent::MP::Base::nonce (33), "") + . ";hmac_md6_64_256" # hardcoded atm. + . ";json" # hardcoded atm. + . ";$self->{peerhost};$self->{peerport}" + . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv); $self->{hdl}->push_write ("$lgreeting\012"); @@ -131,30 +168,28 @@ $self->{hdl}->push_read (line => sub { my $rgreeting = $_[1]; - my ($aemp, $major, $minor, $provider, $provider_version, $nonce2, $auth, $framing, $rid, @kv) = split /;/, $rgreeting; + my ($aemp, $version, $version_min, $uniq, $rnode, undef, $auth, $framing, $peerport, $peerhost, @kv) = split /;/, $rgreeting; if ($aemp ne "aemp") { return $self->error ("unparsable greeting"); - } elsif ($major != $PROTOCOL_VERSION_MAJOR) { - return $self->error ("major version mismatch ($PROTOCOL_VERSION_MAJOR vs. $major)"); + } elsif ($version_min > $PROTOCOL_VERSION) { + return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version_min .. $version)"); } elsif ($auth ne "hmac_md6_64_256") { return $self->error ("unsupported auth method ($auth)"); - } elsif ($framing ne "storable") { - return $self->error ("unsupported auth method ($auth)"); + } elsif ($framing ne "json") { + return $self->error ("unsupported framing method ($auth)"); } - $self->{remote_id} = $rid; + $self->{remote_uniq} = $uniq; + $self->{remote_node} = $rnode; - $self->{greeting} = { - provider => $provider, - provider_version => $provider_version, + $self->{remote_greeting} = { + map /^([^=]+)(?:=(.*))?/ ? ($1 => $2) : (), + @kv }; - /^([^=]+)(?:=(.*))?/ and $self->{greeting}{$1} = $2 - for @kv; - - if (exists $self->{tls} and $self->{tls_ctx} and exists $self->{greeting}{"tls1.0"}) { - if ($self->{tls} ne $self->{greeting}{"tls1.0"}) { + if (exists $self->{tls} and $self->{tls_ctx} and exists $self->{remote_greeting}{"tls1.0"}) { + if ($self->{tls} ne $self->{remote_greeting}{"tls1.0"}) { return $self->error ("TLS server/client mismatch"); } $self->{hdl}->starttls ($self->{tls}, $self->{tls_ctx}); @@ -165,31 +200,40 @@ require Digest::HMAC_MD6; my $key = Digest::MD6::md6_hex ($secret); - my $lauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$lgreeting$rgreeting", 64, 256); - my $rauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$rgreeting$lgreeting", 64, 256); - $self->{hdl}->push_write ("$lauth\012"); + my $lauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$lgreeting\012$rgreeting", 64, 256); + my $rauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$rgreeting\012$lgreeting", 64, 256); + + $lauth ne $rauth # echo attack? + or return $self->error ("authentication error"); + + $self->{hdl}->push_write ("$auth;$lauth;$framing\012"); + $self->{hdl}->rbuf_max (64); # enough for 44 reply bytes or so $self->{hdl}->push_read (line => sub { - my ($hdl, $rauth2) = @_; + my ($hdl, $rline) = @_; + + my ($auth_method, $rauth2, $r_framing) = split /;/, $rline; if ($rauth2 ne $rauth) { return $self->error ("authentication failure/shared secret mismatch"); } + $self->{s_framing} = "json";#d# + + $hdl->rbuf_max (undef); my $queue = delete $self->{queue}; # we are connected - $self->{on_connect}($self) - if $self->{on_connect}; + $self->connected; - $hdl->push_write (storable => $_) + $hdl->push_write ($self->{s_framing} => $_) for @$queue; my $rmsg; $rmsg = sub { - $_[0]->push_read (storable => $rmsg); + $_[0]->push_read ($r_framing => $rmsg); - $self->{on_recv}($self, $_[1]); + AnyEvent::MP::Base::_inject ($_[1]); }; - $hdl->push_read (storable => $rmsg); + $hdl->push_read ($r_framing => $rmsg); }); }); } @@ -200,22 +244,30 @@ sub error { my ($self, $msg) = @_; - $self->{on_error}($self, $msg); - $self->{hdl}->destroy; + if ($self->{node} && $self->{node}{transport} == $self) { + $self->{node}->clr_transport; + } +# $self->{on_error}($self, $msg); + $self->destroy; } -sub send { - my ($self, $msg) = @_; +sub connected { + my ($self) = @_; + + my $node = AnyEvent::MP::Base::add_node ($self->{remote_node}); + Scalar::Util::weaken ($self->{node} = $node); + $node->set_transport ($self); +} - exists $self->{queue} - ? push @{ $self->{queue} }, $msg - : $self->{hdl}->push_write (storable => $msg); +sub send { + $_[0]{hdl}->push_write ($_[0]{s_framing} => $_[1]); } sub destroy { my ($self) = @_; - $self->{hdl}->destroy; + $self->{hdl}->destroy + if $self->{hdl}; } sub DESTROY {