=head1 NAME AnyEvent::MP::Transport - actual transport protocol =head1 SYNOPSIS use AnyEvent::MP::Transport; =head1 DESCRIPTION This is the superclass for MP transports, most of which is considered an implementation detail. Future versions might document the actual protocol. =head1 FUNCTIONS/METHODS =over 4 =cut package AnyEvent::MP::Transport; use common::sense; use Scalar::Util; use MIME::Base64 (); use Storable (); use AE (); use AnyEvent::Socket (); use AnyEvent::Handle (); use AnyEvent::MP (); use base Exporter::; our $VERSION = '0.0'; our $PROTOCOL_VERSION_MAJOR = 0; our $PROTOCOL_VERSION_MINOR = 0; =item $listener = mp_listener $host, $port, , $cb->($transport) Creates a listener on the given host/port using C. See C, below, for constructor arguments. Defaults for peerhost, peerport, fh and tls are provided. =cut sub mp_server($$@) { my $cb = pop; my ($host, $port, @args) = @_; AnyEvent::Socket::tcp_server $host, $port, sub { my ($fh, $host, $port) = @_; $cb->(new AnyEvent::MP::Transport fh => $fh, peerhost => $host, peerport => $port, tls => "accept", @args, ); } } =item new AnyEvent::MP::Transport # immediately starts negotiation my $transport = new AnyEvent::MP::Transport # fh OR connect is mandatory fh => $filehandle, connect => [$host, $port], # mandatory on_recv => sub { receive-callback }, on_error => sub { error-callback }, # optional local_id => $identifier, secret => "shared secret", on_eof => sub { clean-close-callback }, on_connect => sub { successful-connect-callback }, # tls support tls => "accept|connect", tls_ctx => AnyEvent::TLS, peername => $peername, # for verification ; =cut sub new { my ($class, %arg) = @_; my $self = bless \%arg, $class; $self->{queue} = []; { Scalar::Util::weaken (my $self = $self); if (exists $arg{connect}) { $arg{tls} ||= "connect"; $arg{tls_ctx} ||= { sslv2 => 0, sslv3 => 0, tlsv1 => 1, verify => 1, verify_peername => "https" }; } $self->{hdl} = new AnyEvent::Handle (exists $arg{fh} ? (fh => delete $arg{fh}) : (connect => delete $arg{connect})), on_error => sub { $self->error ($_[2]); }, peername => delete $arg{peername}, ; my $secret = delete $arg{secret} ? delete $arg{secret} : AnyEvent::MP::default_secret; # send greeting my $lgreeting = "aemp;$PROTOCOL_VERSION_MAJOR;$PROTOCOL_VERSION_MINOR;AnyEvent::MP;$VERSION;" . (MIME::Base64::encode_base64 AnyEvent::MP::nonce 33, "") . ";" . "hmac_md6_64_256;" # hardcoded atm. . "storable;" # hardcoded atm. . "$self->{local_id};" . (exists $arg{tls} && $arg{tls_ctx} ? "tls1.0=$arg{tls};" : ""); $self->{hdl}->push_write ("$lgreeting\012"); # expect greeting $self->{hdl}->push_read (line => sub { my $rgreeting = $_[1]; my ($aemp, $major, $minor, $provider, $provider_version, $nonce2, $auth, $framing, $rid, @kv) = split /;/, $rgreeting; if ($aemp ne "aemp") { return $self->error ("unparsable greeting"); } elsif ($major != $PROTOCOL_VERSION_MAJOR) { return $self->error ("major version mismatch ($PROTOCOL_VERSION_MAJOR vs. $major)"); } elsif ($auth ne "hmac_md6_64_256") { return $self->error ("unsupported auth method ($auth)"); } elsif ($framing ne "storable") { return $self->error ("unsupported auth method ($auth)"); } $self->{remote_id} = $rid; $self->{greeting} = { provider => $provider, provider_version => $provider_version, }; /^([^=]+)(?:=(.*))?/ and $self->{greeting}{$1} = $2 for @kv; if (exists $self->{tls} and $self->{tls_ctx} and exists $self->{greeting}{"tls1.0"}) { if ($self->{tls} ne $self->{greeting}{"tls1.0"}) { return $self->error ("TLS server/client mismatch"); } $self->{hdl}->starttls ($self->{tls}, $self->{tls_ctx}); } # auth require Digest::MD6; require Digest::HMAC_MD6; my $key = Digest::MD6::md6_hex ($secret); my $lauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$lgreeting$rgreeting", 64, 256); my $rauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$rgreeting$lgreeting", 64, 256); $self->{hdl}->push_write ("$lauth\012"); $self->{hdl}->push_read (line => sub { my ($hdl, $rauth2) = @_; if ($rauth2 ne $rauth) { return $self->error ("authentication failure/shared secret mismatch"); } my $queue = delete $self->{queue}; # we are connected $self->{on_connect}($self) if $self->{on_connect}; $hdl->push_write (storable => $_) for @$queue; my $rmsg; $rmsg = sub { $_[0]->push_read (storable => $rmsg); $self->{on_recv}($self, $_[1]); }; $hdl->push_read (storable => $rmsg); }); }); } $self } sub error { my ($self, $msg) = @_; $self->{on_error}($self, $msg); $self->{hdl}->destroy; } sub send { my ($self, $msg) = @_; exists $self->{queue} ? push @{ $self->{queue} }, $msg : $self->{hdl}->push_write (storable => $msg); } sub destroy { my ($self) = @_; $self->{hdl}->destroy; } sub DESTROY { my ($self) = @_; $self->destroy; } =back =head1 SEE ALSO L. =head1 AUTHOR Marc Lehmann http://home.schmorp.de/ =cut 1