ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Transport.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Transport.pm (file contents):
Revision 1.25 by root, Thu Aug 6 10:21:48 2009 UTC vs.
Revision 1.27 by root, Sun Aug 9 00:31:40 2009 UTC

22 22
23package AnyEvent::MP::Transport; 23package AnyEvent::MP::Transport;
24 24
25use common::sense; 25use common::sense;
26 26
27use Scalar::Util; 27use Scalar::Util ();
28use List::Util ();
28use MIME::Base64 (); 29use MIME::Base64 ();
29use Storable (); 30use Storable ();
30use JSON::XS (); 31use JSON::XS ();
31 32
32use Digest::MD6 (); 33use Digest::MD6 ();
33use Digest::HMAC_MD6 (); 34use Digest::HMAC_MD6 ();
34 35
35use AE (); 36use AE ();
36use AnyEvent::Socket (); 37use AnyEvent::Socket ();
37use AnyEvent::Handle (); 38use AnyEvent::Handle 4.92 ();
38 39
39use base Exporter::; 40use base Exporter::;
40 41
41our $VERSION = '0.0'; 42our $VERSION = '0.0';
42our $PROTOCOL_VERSION = 0; 43our $PROTOCOL_VERSION = 0;
112 peername => $peername, # for verification 113 peername => $peername, # for verification
113 ; 114 ;
114 115
115=cut 116=cut
116 117
118sub LATENCY() { 3 } # assumed max. network latency
119
117our @FRAMINGS = qw(json storable); # the framing types we accept and send, in order of preference 120our @FRAMINGS = qw(json storable); # the framing types we accept and send, in order of preference
118our @AUTH_SND = qw(hmac_md6_64_256); # auth types we send 121our @AUTH_SND = qw(hmac_md6_64_256); # auth types we send
119our @AUTH_RCV = (@AUTH_SND, qw(cleartext)); # auth types we accept 122our @AUTH_RCV = (@AUTH_SND, qw(cleartext)); # auth types we accept
120 123
121#AnyEvent::Handle::register_write_type mp_record => sub { 124#AnyEvent::Handle::register_write_type mp_record => sub {
135 unless exists $arg{secret}; 138 unless exists $arg{secret};
136 139
137 $arg{timeout} = 30 140 $arg{timeout} = 30
138 unless exists $arg{timeout}; 141 unless exists $arg{timeout};
139 142
140 my $keepalive = (int $arg{timeout} * 0.75) || 1; 143 $arg{timeout} = 1 + LATENCY
144 if $arg{timeout} < 1 + LATENCY;
141 145
142 my $secret = $arg{secret}; 146 my $secret = $arg{secret};
143 147
144 if ($secret =~ /-----BEGIN RSA PRIVATE KEY-----.*-----END RSA PRIVATE KEY-----.*-----BEGIN CERTIFICATE-----.*-----END CERTIFICATE-----/s) { 148 if ($secret =~ /-----BEGIN RSA PRIVATE KEY-----.*-----END RSA PRIVATE KEY-----.*-----BEGIN CERTIFICATE-----.*-----END CERTIFICATE-----/s) {
145 # assume TLS mode 149 # assume TLS mode
159 autocork => 1, 163 autocork => 1,
160 no_delay => 1, 164 no_delay => 1,
161 on_error => sub { 165 on_error => sub {
162 $self->error ($_[2]); 166 $self->error ($_[2]);
163 }, 167 },
164 timeout => $AnyEvent::MP::Base::CONNECT_TIMEOUT, 168 rtimeout => $AnyEvent::MP::Base::CONNECT_TIMEOUT,
165 peername => delete $arg{peername}, 169 peername => delete $arg{peername},
166 ; 170 ;
167 171
168 my $greeting_kv = $self->{greeting} ||= {}; 172 my $greeting_kv = $self->{greeting} ||= {};
169 173
170 $self->{local_node} = $AnyEvent::MP::Base::NODE; 174 $self->{local_node} = $AnyEvent::MP::Base::NODE;
171 175
172 $greeting_kv->{"tls"} = "1.0" if $arg{tls_ctx}; 176 $greeting_kv->{"tls"} = "1.0" if $arg{tls_ctx};
173 $greeting_kv->{provider} = "AE-$VERSION"; 177 $greeting_kv->{provider} = "AE-$VERSION";
174 $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport}; 178 $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport};
175 $greeting_kv->{maxidle} = $keepalive; 179 $greeting_kv->{timeout} = $arg{timeout};
176 180
177 # send greeting 181 # send greeting
178 my $lgreeting1 = "aemp;$PROTOCOL_VERSION" 182 my $lgreeting1 = "aemp;$PROTOCOL_VERSION"
179 . ";$AnyEvent::MP::Base::UNIQ"
180 . ";$self->{local_node}" 183 . ";$self->{local_node}"
181 . ";" . (join ",", @AUTH_RCV) 184 . ";" . (join ",", @AUTH_RCV)
182 . ";" . (join ",", @FRAMINGS) 185 . ";" . (join ",", @FRAMINGS)
183 . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv); 186 . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv);
184 187
189 # expect greeting 192 # expect greeting
190 $self->{hdl}->rbuf_max (4 * 1024); 193 $self->{hdl}->rbuf_max (4 * 1024);
191 $self->{hdl}->push_read (line => sub { 194 $self->{hdl}->push_read (line => sub {
192 my $rgreeting1 = $_[1]; 195 my $rgreeting1 = $_[1];
193 196
194 my ($aemp, $version, $uniq, $rnode, $auths, $framings, @kv) = split /;/, $rgreeting1; 197 my ($aemp, $version, $rnode, $auths, $framings, @kv) = split /;/, $rgreeting1;
195 198
196 if ($aemp ne "aemp") { 199 if ($aemp ne "aemp") {
197 return $self->error ("unparsable greeting"); 200 return $self->error ("unparsable greeting");
198 } elsif ($version != $PROTOCOL_VERSION) { 201 } elsif ($version != $PROTOCOL_VERSION) {
199 return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version)"); 202 return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version)");
221 } 224 }
222 225
223 defined $s_framing 226 defined $s_framing
224 or return $self->error ("$framings: no common framing method supported"); 227 or return $self->error ("$framings: no common framing method supported");
225 228
226 $self->{remote_uniq} = $uniq;
227 $self->{remote_node} = $rnode; 229 $self->{remote_node} = $rnode;
228 230
229 $self->{remote_greeting} = { 231 $self->{remote_greeting} = {
230 map /^([^=]+)(?:=(.*))?/ ? ($1 => $2) : (), 232 map /^([^=]+)(?:=(.*))?/ ? ($1 => $2) : (),
231 @kv 233 @kv
272 $self->{s_framing} = $s_framing; 274 $self->{s_framing} = $s_framing;
273 275
274 $hdl->rbuf_max (undef); 276 $hdl->rbuf_max (undef);
275 my $queue = delete $self->{queue}; # we are connected 277 my $queue = delete $self->{queue}; # we are connected
276 278
277 $self->{hdl}->timeout ($self->{remote_greeting}{keepalive} + 5) 279 $self->{hdl}->rtimeout ($self->{remote_greeting}{timeout});
278 if $self->{remote_greeting}{keepalive}; 280 $self->{hdl}->wtimeout ($arg{timeout} - LATENCY);
281 $self->{hdl}->on_wtimeout (sub { $self->send (["", "devnull"]) });
279 282
280 $self->connected; 283 $self->connected;
281 284
282 my $src_node = $self->{node}; 285 # send queued messages
283
284 $self->send ($_) 286 $self->send ($_)
285 for @$queue; 287 for @$queue;
288
289 # receive handling
290 my $src_node = $self->{node};
286 291
287 my $rmsg; $rmsg = sub { 292 my $rmsg; $rmsg = sub {
288 $_[0]->push_read ($r_framing => $rmsg); 293 $_[0]->push_read ($r_framing => $rmsg);
289 294
290 local $AnyEvent::MP::Base::SRCNODE = $src_node; 295 local $AnyEvent::MP::Base::SRCNODE = $src_node;
392 397
393The protocol version supported by this end, currently C<0>. If the 398The protocol version supported by this end, currently C<0>. If the
394versions don't match then no communication is possible. Minor extensions 399versions don't match then no communication is possible. Minor extensions
395are supposed to be handled through additional key-value pairs. 400are supposed to be handled through additional key-value pairs.
396 401
397=item a token uniquely identifying the current node instance
398
399This is a string that must change between restarts. It usually contains
400things like the current time, the (OS) process id or similar values, but
401no meaning of the contents are assumed.
402
403=item the node endpoint descriptors 402=item the node endpoint descriptors
404 403
405for public nodes, this is a comma-separated list of protocol endpoints, 404for public nodes, this is a comma-separated list of protocol endpoints,
406i.e., the noderef. For slave nodes, this is a unique identifier. 405i.e., the noderef. For slave nodes, this is a unique identifier of the
406form C<slave/nonce>.
407 407
408=item the acceptable authentication methods 408=item the acceptable authentication methods
409 409
410A comma-separated list of authentication methods supported by the 410A comma-separated list of authentication methods supported by the
411node. Note that AnyEvent::MP supports a C<hex_secret> authentication 411node. Note that AnyEvent::MP supports a C<hex_secret> authentication
440=item tls=<major>.<minor> 440=item tls=<major>.<minor>
441 441
442Indicates that the other side supports TLS (version should be 1.0) and 442Indicates that the other side supports TLS (version should be 1.0) and
443wishes to do a TLS handshake. 443wishes to do a TLS handshake.
444 444
445=item maxidle=<seconds> 445=item timeout=<seconds>
446 446
447The maximum amount of time the node will not sent data, i.e., idle. This 447The amount of time after which this node should be detected as dead unless
448can be used to close the conenction when no data has been received for a 448some data has been received. The node is responsible to send traffic
449too-long time (say, maxidle + 5 seconds). 449reasonably more often than this interval (such as every timeout minus five
450seconds).
450 451
451=back 452=back
452 453
453=head3 Second Greeting Line 454=head3 Second Greeting Line
454 455

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines