ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-MP/MP/Transport.pm
(Generate patch)

Comparing AnyEvent-MP/MP/Transport.pm (file contents):
Revision 1.25 by root, Thu Aug 6 10:21:48 2009 UTC vs.
Revision 1.28 by root, Sun Aug 9 16:08:16 2009 UTC

22 22
23package AnyEvent::MP::Transport; 23package AnyEvent::MP::Transport;
24 24
25use common::sense; 25use common::sense;
26 26
27use Scalar::Util; 27use Scalar::Util ();
28use List::Util ();
28use MIME::Base64 (); 29use MIME::Base64 ();
29use Storable (); 30use Storable ();
30use JSON::XS (); 31use JSON::XS ();
31 32
32use Digest::MD6 (); 33use Digest::MD6 ();
33use Digest::HMAC_MD6 (); 34use Digest::HMAC_MD6 ();
34 35
35use AE (); 36use AE ();
36use AnyEvent::Socket (); 37use AnyEvent::Socket ();
37use AnyEvent::Handle (); 38use AnyEvent::Handle 4.92 ();
38 39
39use base Exporter::; 40use base Exporter::;
40 41
41our $VERSION = '0.0';
42our $PROTOCOL_VERSION = 0; 42our $PROTOCOL_VERSION = 0;
43 43
44=item $listener = mp_listener $host, $port, <constructor-args>, $cb->($transport) 44=item $listener = mp_listener $host, $port, <constructor-args>, $cb->($transport)
45 45
46Creates a listener on the given host/port using 46Creates a listener on the given host/port using
112 peername => $peername, # for verification 112 peername => $peername, # for verification
113 ; 113 ;
114 114
115=cut 115=cut
116 116
117sub LATENCY() { 3 } # assumed max. network latency
118
117our @FRAMINGS = qw(json storable); # the framing types we accept and send, in order of preference 119our @FRAMINGS = qw(json storable); # the framing types we accept and send, in order of preference
118our @AUTH_SND = qw(hmac_md6_64_256); # auth types we send 120our @AUTH_SND = qw(hmac_md6_64_256); # auth types we send
119our @AUTH_RCV = (@AUTH_SND, qw(cleartext)); # auth types we accept 121our @AUTH_RCV = (@AUTH_SND, qw(cleartext)); # auth types we accept
120 122
121#AnyEvent::Handle::register_write_type mp_record => sub { 123#AnyEvent::Handle::register_write_type mp_record => sub {
135 unless exists $arg{secret}; 137 unless exists $arg{secret};
136 138
137 $arg{timeout} = 30 139 $arg{timeout} = 30
138 unless exists $arg{timeout}; 140 unless exists $arg{timeout};
139 141
140 my $keepalive = (int $arg{timeout} * 0.75) || 1; 142 $arg{timeout} = 1 + LATENCY
143 if $arg{timeout} < 1 + LATENCY;
141 144
142 my $secret = $arg{secret}; 145 my $secret = $arg{secret};
143 146
144 if ($secret =~ /-----BEGIN RSA PRIVATE KEY-----.*-----END RSA PRIVATE KEY-----.*-----BEGIN CERTIFICATE-----.*-----END CERTIFICATE-----/s) { 147 if ($secret =~ /-----BEGIN RSA PRIVATE KEY-----.*-----END RSA PRIVATE KEY-----.*-----BEGIN CERTIFICATE-----.*-----END CERTIFICATE-----/s) {
145 # assume TLS mode 148 # assume TLS mode
159 autocork => 1, 162 autocork => 1,
160 no_delay => 1, 163 no_delay => 1,
161 on_error => sub { 164 on_error => sub {
162 $self->error ($_[2]); 165 $self->error ($_[2]);
163 }, 166 },
164 timeout => $AnyEvent::MP::Base::CONNECT_TIMEOUT, 167 rtimeout => $AnyEvent::MP::Base::CONNECT_TIMEOUT,
165 peername => delete $arg{peername}, 168 peername => delete $arg{peername},
166 ; 169 ;
167 170
168 my $greeting_kv = $self->{greeting} ||= {}; 171 my $greeting_kv = $self->{greeting} ||= {};
169 172
170 $self->{local_node} = $AnyEvent::MP::Base::NODE; 173 $self->{local_node} = $AnyEvent::MP::Base::NODE;
171 174
172 $greeting_kv->{"tls"} = "1.0" if $arg{tls_ctx}; 175 $greeting_kv->{"tls"} = "1.0" if $arg{tls_ctx};
173 $greeting_kv->{provider} = "AE-$VERSION"; 176 $greeting_kv->{provider} = "AE-$AnyEvent::MP::Base::VERSION";
174 $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport}; 177 $greeting_kv->{peeraddr} = AnyEvent::Socket::format_hostport $self->{peerhost}, $self->{peerport};
175 $greeting_kv->{maxidle} = $keepalive; 178 $greeting_kv->{timeout} = $arg{timeout};
176 179
177 # send greeting 180 # send greeting
178 my $lgreeting1 = "aemp;$PROTOCOL_VERSION" 181 my $lgreeting1 = "aemp;$PROTOCOL_VERSION"
179 . ";$AnyEvent::MP::Base::UNIQ"
180 . ";$self->{local_node}" 182 . ";$self->{local_node}"
181 . ";" . (join ",", @AUTH_RCV) 183 . ";" . (join ",", @AUTH_RCV)
182 . ";" . (join ",", @FRAMINGS) 184 . ";" . (join ",", @FRAMINGS)
183 . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv); 185 . (join "", map ";$_=$greeting_kv->{$_}", keys %$greeting_kv);
184 186
189 # expect greeting 191 # expect greeting
190 $self->{hdl}->rbuf_max (4 * 1024); 192 $self->{hdl}->rbuf_max (4 * 1024);
191 $self->{hdl}->push_read (line => sub { 193 $self->{hdl}->push_read (line => sub {
192 my $rgreeting1 = $_[1]; 194 my $rgreeting1 = $_[1];
193 195
194 my ($aemp, $version, $uniq, $rnode, $auths, $framings, @kv) = split /;/, $rgreeting1; 196 my ($aemp, $version, $rnode, $auths, $framings, @kv) = split /;/, $rgreeting1;
195 197
196 if ($aemp ne "aemp") { 198 if ($aemp ne "aemp") {
197 return $self->error ("unparsable greeting"); 199 return $self->error ("unparsable greeting");
198 } elsif ($version != $PROTOCOL_VERSION) { 200 } elsif ($version != $PROTOCOL_VERSION) {
199 return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version)"); 201 return $self->error ("version mismatch (we: $PROTOCOL_VERSION, they: $version)");
221 } 223 }
222 224
223 defined $s_framing 225 defined $s_framing
224 or return $self->error ("$framings: no common framing method supported"); 226 or return $self->error ("$framings: no common framing method supported");
225 227
226 $self->{remote_uniq} = $uniq;
227 $self->{remote_node} = $rnode; 228 $self->{remote_node} = $rnode;
228 229
229 $self->{remote_greeting} = { 230 $self->{remote_greeting} = {
230 map /^([^=]+)(?:=(.*))?/ ? ($1 => $2) : (), 231 map /^([^=]+)(?:=(.*))?/ ? ($1 => $2) : (),
231 @kv 232 @kv
272 $self->{s_framing} = $s_framing; 273 $self->{s_framing} = $s_framing;
273 274
274 $hdl->rbuf_max (undef); 275 $hdl->rbuf_max (undef);
275 my $queue = delete $self->{queue}; # we are connected 276 my $queue = delete $self->{queue}; # we are connected
276 277
277 $self->{hdl}->timeout ($self->{remote_greeting}{keepalive} + 5) 278 $self->{hdl}->rtimeout ($self->{remote_greeting}{timeout});
278 if $self->{remote_greeting}{keepalive}; 279 $self->{hdl}->wtimeout ($arg{timeout} - LATENCY);
280 $self->{hdl}->on_wtimeout (sub { $self->send (["", "devnull"]) });
279 281
280 $self->connected; 282 $self->connected;
281 283
282 my $src_node = $self->{node}; 284 # send queued messages
283
284 $self->send ($_) 285 $self->send ($_)
285 for @$queue; 286 for @$queue;
287
288 # receive handling
289 my $src_node = $self->{node};
286 290
287 my $rmsg; $rmsg = sub { 291 my $rmsg; $rmsg = sub {
288 $_[0]->push_read ($r_framing => $rmsg); 292 $_[0]->push_read ($r_framing => $rmsg);
289 293
290 local $AnyEvent::MP::Base::SRCNODE = $src_node; 294 local $AnyEvent::MP::Base::SRCNODE = $src_node;
392 396
393The protocol version supported by this end, currently C<0>. If the 397The protocol version supported by this end, currently C<0>. If the
394versions don't match then no communication is possible. Minor extensions 398versions don't match then no communication is possible. Minor extensions
395are supposed to be handled through additional key-value pairs. 399are supposed to be handled through additional key-value pairs.
396 400
397=item a token uniquely identifying the current node instance
398
399This is a string that must change between restarts. It usually contains
400things like the current time, the (OS) process id or similar values, but
401no meaning of the contents are assumed.
402
403=item the node endpoint descriptors 401=item the node endpoint descriptors
404 402
405for public nodes, this is a comma-separated list of protocol endpoints, 403for public nodes, this is a comma-separated list of protocol endpoints,
406i.e., the noderef. For slave nodes, this is a unique identifier. 404i.e., the noderef. For slave nodes, this is a unique identifier of the
405form C<slave/nonce>.
407 406
408=item the acceptable authentication methods 407=item the acceptable authentication methods
409 408
410A comma-separated list of authentication methods supported by the 409A comma-separated list of authentication methods supported by the
411node. Note that AnyEvent::MP supports a C<hex_secret> authentication 410node. Note that AnyEvent::MP supports a C<hex_secret> authentication
440=item tls=<major>.<minor> 439=item tls=<major>.<minor>
441 440
442Indicates that the other side supports TLS (version should be 1.0) and 441Indicates that the other side supports TLS (version should be 1.0) and
443wishes to do a TLS handshake. 442wishes to do a TLS handshake.
444 443
445=item maxidle=<seconds> 444=item timeout=<seconds>
446 445
447The maximum amount of time the node will not sent data, i.e., idle. This 446The amount of time after which this node should be detected as dead unless
448can be used to close the conenction when no data has been received for a 447some data has been received. The node is responsible to send traffic
449too-long time (say, maxidle + 5 seconds). 448reasonably more often than this interval (such as every timeout minus five
449seconds).
450 450
451=back 451=back
452 452
453=head3 Second Greeting Line 453=head3 Second Greeting Line
454 454

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines