ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Transport.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Transport.pm (file contents):
Revision 1.1 by root, Thu Jul 30 08:38:50 2009 UTC vs.
Revision 1.5 by root, Sun Aug 2 14:44:37 2009 UTC

24use common::sense; 24use common::sense;
25 25
26use Scalar::Util; 26use Scalar::Util;
27use MIME::Base64 (); 27use MIME::Base64 ();
28use Storable (); 28use Storable ();
29use JSON::XS ();
29 30
30use AE (); 31use AE ();
31use AnyEvent::Socket (); 32use AnyEvent::Socket ();
32use AnyEvent::Handle (); 33use AnyEvent::Handle ();
34
33use AnyEvent::MP (); 35use AnyEvent::MP::Util ();
34 36
35use base Exporter::; 37use base Exporter::;
36 38
37our $VERSION = '0.0'; 39our $VERSION = '0.0';
38our $PROTOCOL_VERSION_MAJOR = 0; 40our $PROTOCOL_VERSION = 0;
39our $PROTOCOL_VERSION_MINOR = 0;
40 41
41=item $listener = mp_listener $host, $port, <constructor-args>, $cb->($transport) 42=item $listener = mp_listener $host, $port, <constructor-args>, $cb->($transport)
42 43
43Creates a listener on the given host/port using 44Creates a listener on the given host/port using
44C<AnyEvent::Socket::tcp_server>. 45C<AnyEvent::Socket::tcp_server>.
64 @args, 65 @args,
65 ); 66 );
66 } 67 }
67} 68}
68 69
70=item $guard = mp_connect $host, $port, <constructor-args>, $cb->($transport)
71
72=cut
73
74sub mp_connect {
75 my $cb = pop;
76 my ($host, $port, @args) = @_;
77
78 AnyEvent::Socket::tcp_connect $host, $port, sub {
79 my ($fh, $nhost, $nport) = @_;
80
81 return $cb->() unless $fh;
82
83 $cb->(new AnyEvent::MP::Transport
84 fh => $fh,
85 peername => $host,
86 peerhost => $nhost,
87 peerport => $nport,
88 tls => "accept",
89 @args,
90 );
91 }
92}
93
69=item new AnyEvent::MP::Transport 94=item new AnyEvent::MP::Transport
70 95
71 # immediately starts negotiation 96 # immediately starts negotiation
72 my $transport = new AnyEvent::MP::Transport 97 my $transport = new AnyEvent::MP::Transport
73 # fh OR connect is mandatory 98 # mandatory
74 fh => $filehandle, 99 fh => $filehandle,
75 connect => [$host, $port], 100 local_id => $identifier,
76
77 # mandatory
78 on_recv => sub { receive-callback }, 101 on_recv => sub { receive-callback },
79 on_error => sub { error-callback }, 102 on_error => sub { error-callback },
80 103
81 # optional 104 # optional
82 local_id => $identifier,
83 secret => "shared secret", 105 secret => "shared secret",
84 on_eof => sub { clean-close-callback }, 106 on_eof => sub { clean-close-callback },
85 on_connect => sub { successful-connect-callback }, 107 on_connect => sub { successful-connect-callback },
108 greeting => { key => value },
86 109
87 # tls support 110 # tls support
88 tls => "accept|connect", 111 tls => "accept|connect",
89 tls_ctx => AnyEvent::TLS, 112 tls_ctx => AnyEvent::TLS,
90 peername => $peername, # for verification 113 peername => $peername, # for verification
91 ; 114 ;
92 115
93=cut 116=cut
94 117
118our @FRAMING_WANT = qw(json storable);#d##TODO#
119
95sub new { 120sub new {
96 my ($class, %arg) = @_; 121 my ($class, %arg) = @_;
97 122
98 my $self = bless \%arg, $class; 123 my $self = bless \%arg, $class;
99 124
102 { 127 {
103 Scalar::Util::weaken (my $self = $self); 128 Scalar::Util::weaken (my $self = $self);
104 129
105 if (exists $arg{connect}) { 130 if (exists $arg{connect}) {
106 $arg{tls} ||= "connect"; 131 $arg{tls} ||= "connect";
107 $arg{tls_ctx} ||= { sslv2 => 0, sslv3 => 0, tlsv1 => 1, verify => 1, verify_peername => "https" }; 132 $arg{tls_ctx} ||= { sslv2 => 0, sslv3 => 0, tlsv1 => 1, verify => 1 };
108 } 133 }
109 134
135 $arg{secret} = AnyEvent::MP::Base::default_secret ()
136 unless exists $arg{secret};
137
110 $self->{hdl} = new AnyEvent::Handle 138 $self->{hdl} = new AnyEvent::Handle
111 (exists $arg{fh} ? (fh => delete $arg{fh}) : (connect => delete $arg{connect})), 139 fh => delete $arg{fh},
140 rbuf_max => 64 * 1024,
141 autocork => 1,
142 no_delay => 1,
112 on_error => sub { 143 on_error => sub {
113 $self->error ($_[2]); 144 $self->error ($_[2]);
114 }, 145 },
115 peername => delete $arg{peername}, 146 peername => delete $arg{peername},
116 ; 147 ;
117 148
118 my $secret = delete $arg{secret} ? delete $arg{secret} : AnyEvent::MP::default_secret; 149 my $secret = $arg{secret};
150 my $greeting_kv = $self->{greeting} ||= {};
151 $greeting_kv->{"tls1.0"} ||= $arg{tls}
152 if exists $arg{tls} && $arg{tls_ctx};
153 $greeting_kv->{provider} = "AE-$VERSION";
119 154
120 # send greeting 155 # send greeting
121 my $lgreeting = "aemp;$PROTOCOL_VERSION_MAJOR;$PROTOCOL_VERSION_MINOR;AnyEvent::MP;$VERSION;" 156 my $lgreeting = "aemp;$PROTOCOL_VERSION;$PROTOCOL_VERSION" # version, min
157 . ";$AnyEvent::MP::Base::UNIQ"
158 . ";$AnyEvent::MP::Base::NODE"
122 . (MIME::Base64::encode_base64 AnyEvent::MP::nonce 33, "") . ";" 159 . ";" . (MIME::Base64::encode_base64 AnyEvent::MP::Base::nonce (33), "")
123 . "hmac_md6_64_256;" # hardcoded atm. 160 . ";hmac_md6_64_256" # hardcoded atm.
124 . "storable;" # hardcoded atm. 161 . ";json" # hardcoded atm.
125 . "$self->{local_id};" 162 . ";$self->{peerhost};$self->{peerport}"
126 . (exists $arg{tls} && $arg{tls_ctx} ? "tls1.0=$arg{tls};" : ""); 163 . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv);
127 164
128 $self->{hdl}->push_write ("$lgreeting\012"); 165 $self->{hdl}->push_write ("$lgreeting\012");
129 166
130 # expect greeting 167 # expect greeting
131 $self->{hdl}->push_read (line => sub { 168 $self->{hdl}->push_read (line => sub {
132 my $rgreeting = $_[1]; 169 my $rgreeting = $_[1];
133 170
134 my ($aemp, $major, $minor, $provider, $provider_version, $nonce2, $auth, $framing, $rid, @kv) = split /;/, $rgreeting; 171 my ($aemp, $version, $version_min, $uniq, $rnode, undef, $auth, $framing, $peerport, $peerhost, @kv) = split /;/, $rgreeting;
135 172
136 if ($aemp ne "aemp") { 173 if ($aemp ne "aemp") {
137 return $self->error ("unparsable greeting"); 174 return $self->error ("unparsable greeting");
138 } elsif ($major != $PROTOCOL_VERSION_MAJOR) { 175 } elsif ($version_min > $PROTOCOL_VERSION) {
139 return $self->error ("major version mismatch ($PROTOCOL_VERSION_MAJOR vs. $major)"); 176 return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version_min .. $version)");
140 } elsif ($auth ne "hmac_md6_64_256") { 177 } elsif ($auth ne "hmac_md6_64_256") {
141 return $self->error ("unsupported auth method ($auth)"); 178 return $self->error ("unsupported auth method ($auth)");
142 } elsif ($framing ne "storable") { 179 } elsif ($framing ne "json") {
143 return $self->error ("unsupported auth method ($auth)"); 180 return $self->error ("unsupported framing method ($auth)");
144 } 181 }
145 182
183 $self->{remote_uniq} = $uniq;
146 $self->{remote_id} = $rid; 184 $self->{remote_node} = $rnode;
147 185
148 $self->{greeting} = { 186 $self->{remote_greeting} = {
149 provider => $provider, 187 map /^([^=]+)(?:=(.*))?/ ? ($1 => $2) : (),
150 provider_version => $provider_version, 188 @kv
151 }; 189 };
152 190
153 /^([^=]+)(?:=(.*))?/ and $self->{greeting}{$1} = $2
154 for @kv;
155
156 if (exists $self->{tls} and $self->{tls_ctx} and exists $self->{greeting}{"tls1.0"}) { 191 if (exists $self->{tls} and $self->{tls_ctx} and exists $self->{remote_greeting}{"tls1.0"}) {
157 if ($self->{tls} ne $self->{greeting}{"tls1.0"}) { 192 if ($self->{tls} ne $self->{remote_greeting}{"tls1.0"}) {
158 return $self->error ("TLS server/client mismatch"); 193 return $self->error ("TLS server/client mismatch");
159 } 194 }
160 $self->{hdl}->starttls ($self->{tls}, $self->{tls_ctx}); 195 $self->{hdl}->starttls ($self->{tls}, $self->{tls_ctx});
161 } 196 }
162 197
163 # auth 198 # auth
164 require Digest::MD6; 199 require Digest::MD6;
165 require Digest::HMAC_MD6; 200 require Digest::HMAC_MD6;
166 201
167 my $key = Digest::MD6::md6_hex ($secret); 202 my $key = Digest::MD6::md6_hex ($secret);
168 my $lauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$lgreeting$rgreeting", 64, 256); 203 my $lauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$lgreeting\012$rgreeting", 64, 256);
169 my $rauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$rgreeting$lgreeting", 64, 256); 204 my $rauth = Digest::HMAC_MD6::hmac_md6_base64 ($key, "$rgreeting\012$lgreeting", 64, 256);
205
206 $lauth ne $rauth # echo attack?
207 or return $self->error ("authentication error");
208
170 $self->{hdl}->push_write ("$lauth\012"); 209 $self->{hdl}->push_write ("$auth;$lauth;$framing\012");
171 210
211 $self->{hdl}->rbuf_max (64); # enough for 44 reply bytes or so
172 $self->{hdl}->push_read (line => sub { 212 $self->{hdl}->push_read (line => sub {
173 my ($hdl, $rauth2) = @_; 213 my ($hdl, $rline) = @_;
214
215 my ($auth_method, $rauth2, $r_framing) = split /;/, $rline;
174 216
175 if ($rauth2 ne $rauth) { 217 if ($rauth2 ne $rauth) {
176 return $self->error ("authentication failure/shared secret mismatch"); 218 return $self->error ("authentication failure/shared secret mismatch");
177 } 219 }
178 220
221 $self->{s_framing} = "json";#d#
222
223 $hdl->rbuf_max (undef);
179 my $queue = delete $self->{queue}; # we are connected 224 my $queue = delete $self->{queue}; # we are connected
180 225
181 $self->{on_connect}($self) 226 $self->connected;
182 if $self->{on_connect};
183 227
184 $hdl->push_write (storable => $_) 228 $hdl->push_write ($self->{s_framing} => $_)
185 for @$queue; 229 for @$queue;
186 230
187 my $rmsg; $rmsg = sub { 231 my $rmsg; $rmsg = sub {
188 $_[0]->push_read (storable => $rmsg); 232 $_[0]->push_read ($r_framing => $rmsg);
189 233
190 $self->{on_recv}($self, $_[1]); 234 AnyEvent::MP::Base::_inject ($_[1]);
191 }; 235 };
192 $hdl->push_read (storable => $rmsg); 236 $hdl->push_read ($r_framing => $rmsg);
193 }); 237 });
194 }); 238 });
195 } 239 }
196 240
197 $self 241 $self
198} 242}
199 243
200sub error { 244sub error {
201 my ($self, $msg) = @_; 245 my ($self, $msg) = @_;
202 246
247 if ($self->{node} && $self->{node}{transport} == $self) {
248 $self->{node}->clr_transport;
249 }
203 $self->{on_error}($self, $msg); 250# $self->{on_error}($self, $msg);
204 $self->{hdl}->destroy; 251 $self->destroy;
252}
253
254sub connected {
255 my ($self) = @_;
256
257 my $node = AnyEvent::MP::Base::add_node ($self->{remote_node});
258 Scalar::Util::weaken ($self->{node} = $node);
259 $node->set_transport ($self);
205} 260}
206 261
207sub send { 262sub send {
208 my ($self, $msg) = @_; 263 $_[0]{hdl}->push_write ($_[0]{s_framing} => $_[1]);
209
210 exists $self->{queue}
211 ? push @{ $self->{queue} }, $msg
212 : $self->{hdl}->push_write (storable => $msg);
213} 264}
214 265
215sub destroy { 266sub destroy {
216 my ($self) = @_; 267 my ($self) = @_;
217 268
218 $self->{hdl}->destroy; 269 $self->{hdl}->destroy
270 if $self->{hdl};
219} 271}
220 272
221sub DESTROY { 273sub DESTROY {
222 my ($self) = @_; 274 my ($self) = @_;
223 275

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines