ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Transport.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Transport.pm (file contents):
Revision 1.1 by root, Thu Jul 30 08:38:50 2009 UTC vs.
Revision 1.2 by root, Fri Jul 31 20:55:46 2009 UTC

24use common::sense; 24use common::sense;
25 25
26use Scalar::Util; 26use Scalar::Util;
27use MIME::Base64 (); 27use MIME::Base64 ();
28use Storable (); 28use Storable ();
29use JSON::XS ();
29 30
30use AE (); 31use AE ();
31use AnyEvent::Socket (); 32use AnyEvent::Socket ();
32use AnyEvent::Handle (); 33use AnyEvent::Handle ();
34
33use AnyEvent::MP (); 35use AnyEvent::MP::Util ();
34 36
35use base Exporter::; 37use base Exporter::;
36 38
37our $VERSION = '0.0'; 39our $VERSION = '0.0';
38our $PROTOCOL_VERSION_MAJOR = 0; 40our $PROTOCOL_VERSION = 0;
39our $PROTOCOL_VERSION_MINOR = 0;
40 41
41=item $listener = mp_listener $host, $port, <constructor-args>, $cb->($transport) 42=item $listener = mp_listener $host, $port, <constructor-args>, $cb->($transport)
42 43
43Creates a listener on the given host/port using 44Creates a listener on the given host/port using
44C<AnyEvent::Socket::tcp_server>. 45C<AnyEvent::Socket::tcp_server>.
64 @args, 65 @args,
65 ); 66 );
66 } 67 }
67} 68}
68 69
70=item $guard = mp_connect $host, $port, <constructor-args>, $cb->($transport)
71
72=cut
73
74sub mp_connect {
75 my $cb = pop;
76 my ($host, $port, @args) = @_;
77
78 AnyEvent::Socket::tcp_connect $host, $port, sub {
79 my ($fh, $nhost, $nport) = @_;
80
81 return $cb->() unless $fh;
82
83 $cb->(new AnyEvent::MP::Transport
84 fh => $fh,
85 peername => $host,
86 peerhost => $nhost,
87 peerport => $nport,
88 tls => "accept",
89 @args,
90 );
91 }
92}
93
69=item new AnyEvent::MP::Transport 94=item new AnyEvent::MP::Transport
70 95
71 # immediately starts negotiation 96 # immediately starts negotiation
72 my $transport = new AnyEvent::MP::Transport 97 my $transport = new AnyEvent::MP::Transport
73 # fh OR connect is mandatory 98 # mandatory
74 fh => $filehandle, 99 fh => $filehandle,
75 connect => [$host, $port], 100 local_id => $identifier,
76
77 # mandatory
78 on_recv => sub { receive-callback }, 101 on_recv => sub { receive-callback },
79 on_error => sub { error-callback }, 102 on_error => sub { error-callback },
80 103
81 # optional 104 # optional
82 local_id => $identifier,
83 secret => "shared secret", 105 secret => "shared secret",
84 on_eof => sub { clean-close-callback }, 106 on_eof => sub { clean-close-callback },
85 on_connect => sub { successful-connect-callback }, 107 on_connect => sub { successful-connect-callback },
108 greeting => { key => value },
86 109
87 # tls support 110 # tls support
88 tls => "accept|connect", 111 tls => "accept|connect",
89 tls_ctx => AnyEvent::TLS, 112 tls_ctx => AnyEvent::TLS,
90 peername => $peername, # for verification 113 peername => $peername, # for verification
102 { 125 {
103 Scalar::Util::weaken (my $self = $self); 126 Scalar::Util::weaken (my $self = $self);
104 127
105 if (exists $arg{connect}) { 128 if (exists $arg{connect}) {
106 $arg{tls} ||= "connect"; 129 $arg{tls} ||= "connect";
107 $arg{tls_ctx} ||= { sslv2 => 0, sslv3 => 0, tlsv1 => 1, verify => 1, verify_peername => "https" }; 130 $arg{tls_ctx} ||= { sslv2 => 0, sslv3 => 0, tlsv1 => 1, verify => 1 };
108 } 131 }
109 132
133 $arg{secret} = AnyEvent::MP::default_secret ()
134 unless exists $arg{secret};
135
110 $self->{hdl} = new AnyEvent::Handle 136 $self->{hdl} = new AnyEvent::Handle
111 (exists $arg{fh} ? (fh => delete $arg{fh}) : (connect => delete $arg{connect})), 137 fh => delete $arg{fh},
138 rbuf_max => 64 * 1024,
112 on_error => sub { 139 on_error => sub {
113 $self->error ($_[2]); 140 $self->error ($_[2]);
114 }, 141 },
115 peername => delete $arg{peername}, 142 peername => delete $arg{peername},
116 ; 143 ;
117 144
118 my $secret = delete $arg{secret} ? delete $arg{secret} : AnyEvent::MP::default_secret; 145 my $secret = $arg{secret};
146 my $greeting_kv = $self->{greeting} ||= {};
147 $greeting_kv->{"tls1.0"} ||= $arg{tls}
148 if exists $arg{tls} && $arg{tls_ctx};
149 $greeting_kv->{provider} = "AE-$VERSION";
119 150
120 # send greeting 151 # send greeting
121 my $lgreeting = "aemp;$PROTOCOL_VERSION_MAJOR;$PROTOCOL_VERSION_MINOR;AnyEvent::MP;$VERSION;" 152 my $lgreeting = "aemp;$PROTOCOL_VERSION;$PROTOCOL_VERSION" # version, min
153 . ";$AnyEvent::MP::UNIQ"
154 . ";$AnyEvent::MP::NODE"
122 . (MIME::Base64::encode_base64 AnyEvent::MP::nonce 33, "") . ";" 155 . ";" . (MIME::Base64::encode_base64 AnyEvent::MP::Util::nonce 33, "")
123 . "hmac_md6_64_256;" # hardcoded atm. 156 . ";hmac_md6_64_256" # hardcoded atm.
124 . "storable;" # hardcoded atm. 157 . ";json" # hardcoded atm.
125 . "$self->{local_id};" 158 . ";$self->{peerhost};$self->{peerport}"
126 . (exists $arg{tls} && $arg{tls_ctx} ? "tls1.0=$arg{tls};" : ""); 159 . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv);
127 160
128 $self->{hdl}->push_write ("$lgreeting\012"); 161 $self->{hdl}->push_write ("$lgreeting\012");
129 162
130 # expect greeting 163 # expect greeting
131 $self->{hdl}->push_read (line => sub { 164 $self->{hdl}->push_read (line => sub {
132 my $rgreeting = $_[1]; 165 my $rgreeting = $_[1];
133 166
134 my ($aemp, $major, $minor, $provider, $provider_version, $nonce2, $auth, $framing, $rid, @kv) = split /;/, $rgreeting; 167 my ($aemp, $version, $version_min, $uniq, $rnode, undef, $auth, $framing, $peerport, $peerhost, @kv) = split /;/, $rgreeting;
135 168
136 if ($aemp ne "aemp") { 169 if ($aemp ne "aemp") {
137 return $self->error ("unparsable greeting"); 170 return $self->error ("unparsable greeting");
138 } elsif ($major != $PROTOCOL_VERSION_MAJOR) { 171 } elsif ($version_min > $PROTOCOL_VERSION) {
139 return $self->error ("major version mismatch ($PROTOCOL_VERSION_MAJOR vs. $major)"); 172 return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version_min .. $version)");
140 } elsif ($auth ne "hmac_md6_64_256") { 173 } elsif ($auth ne "hmac_md6_64_256") {
141 return $self->error ("unsupported auth method ($auth)"); 174 return $self->error ("unsupported auth method ($auth)");
142 } elsif ($framing ne "storable") { 175 } elsif ($framing ne "json") {
143 return $self->error ("unsupported auth method ($auth)"); 176 return $self->error ("unsupported framing method ($auth)");
144 } 177 }
145 178
179 $self->{remote_uniq} = $uniq;
146 $self->{remote_id} = $rid; 180 $self->{remote_node} = $rnode;
147 181
148 $self->{greeting} = { 182 $self->{remote_greeting} = {
149 provider => $provider, 183 map /^([^=]+)(?:=(.*))?/ ? ($1 => $2) : (),
150 provider_version => $provider_version, 184 @kv
151 }; 185 };
152 186
153 /^([^=]+)(?:=(.*))?/ and $self->{greeting}{$1} = $2
154 for @kv;
155
156 if (exists $self->{tls} and $self->{tls_ctx} and exists $self->{greeting}{"tls1.0"}) { 187 if (exists $self->{tls} and $self->{tls_ctx} and exists $self->{remote_greeting}{"tls1.0"}) {
157 if ($self->{tls} ne $self->{greeting}{"tls1.0"}) { 188 if ($self->{tls} ne $self->{remote_greeting}{"tls1.0"}) {
158 return $self->error ("TLS server/client mismatch"); 189 return $self->error ("TLS server/client mismatch");
159 } 190 }
160 $self->{hdl}->starttls ($self->{tls}, $self->{tls_ctx}); 191 $self->{hdl}->starttls ($self->{tls}, $self->{tls_ctx});
161 } 192 }
162 193
165 require Digest::HMAC_MD6; 196 require Digest::HMAC_MD6;
166 197
167 my $key = Digest::MD6::md6_hex ($secret); 198 my $key = Digest::MD6::md6_hex ($secret);
168 my $lauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$lgreeting$rgreeting", 64, 256); 199 my $lauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$lgreeting$rgreeting", 64, 256);
169 my $rauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$rgreeting$lgreeting", 64, 256); 200 my $rauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$rgreeting$lgreeting", 64, 256);
201
202 $lauth ne $rauth # echo attack?
203 or return $self->error ("authentication error");
204
170 $self->{hdl}->push_write ("$lauth\012"); 205 $self->{hdl}->push_write ("$auth;$lauth;$framing\012");
171 206
207 $self->{hdl}->rbuf_max (64); # enough for 44 reply bytes or so
172 $self->{hdl}->push_read (line => sub { 208 $self->{hdl}->push_read (line => sub {
173 my ($hdl, $rauth2) = @_; 209 my ($hdl, $rline) = @_;
210
211 my ($auth_method, $rauth2, $r_framing) = split /;/, $rline;
174 212
175 if ($rauth2 ne $rauth) { 213 if ($rauth2 ne $rauth) {
176 return $self->error ("authentication failure/shared secret mismatch"); 214 return $self->error ("authentication failure/shared secret mismatch");
177 } 215 }
178 216
217 $self->{s_framing} = "json";#d#
218
219 $hdl->rbuf_max (undef);
179 my $queue = delete $self->{queue}; # we are connected 220 my $queue = delete $self->{queue}; # we are connected
180 221
181 $self->{on_connect}($self) 222 $self->connected;
182 if $self->{on_connect};
183 223
184 $hdl->push_write (storable => $_) 224 $hdl->push_write ($self->{s_framing} => $_)
185 for @$queue; 225 for @$queue;
186 226
187 my $rmsg; $rmsg = sub { 227 my $rmsg; $rmsg = sub {
188 $_[0]->push_read (storable => $rmsg); 228 $_[0]->push_read ($r_framing => $rmsg);
189 229
190 $self->{on_recv}($self, $_[1]); 230 AnyEvent::MP::_inject ($_[1]);
191 }; 231 };
192 $hdl->push_read (storable => $rmsg); 232 $hdl->push_read ($r_framing => $rmsg);
193 }); 233 });
194 }); 234 });
195 } 235 }
196 236
197 $self 237 $self
202 242
203 $self->{on_error}($self, $msg); 243 $self->{on_error}($self, $msg);
204 $self->{hdl}->destroy; 244 $self->{hdl}->destroy;
205} 245}
206 246
247sub connected {
248 my ($self) = @_;
249
250 (AnyEvent::MP::add_node ($self->{remote_node}))
251 ->set_transport ($self);
252}
253
207sub send { 254sub send {
208 my ($self, $msg) = @_; 255 $_[0]{hdl}->push_write ($_[0]{s_framing} => $_[1]);
209
210 exists $self->{queue}
211 ? push @{ $self->{queue} }, $msg
212 : $self->{hdl}->push_write (storable => $msg);
213} 256}
214 257
215sub destroy { 258sub destroy {
216 my ($self) = @_; 259 my ($self) = @_;
217 260
218 $self->{hdl}->destroy; 261 $self->{hdl}->destroy
262 if $self->{hdl};
219} 263}
220 264
221sub DESTROY { 265sub DESTROY {
222 my ($self) = @_; 266 my ($self) = @_;
223 267

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines