ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
(Generate patch)

Comparing AnyEvent-DBI/DBI.pm (file contents):
Revision 1.13 by root, Sat Oct 23 21:47:13 2010 UTC vs.
Revision 1.14 by root, Sat Oct 30 20:23:44 2010 UTC

34separate "DBI-Server" processes and sending them requests. 34separate "DBI-Server" processes and sending them requests.
35 35
36It means that you can run DBI requests in parallel to other tasks. 36It means that you can run DBI requests in parallel to other tasks.
37 37
38The overhead for very simple statements ("select 0") is somewhere 38The overhead for very simple statements ("select 0") is somewhere
39around 120% to 200% (dual/single core CPU) compared to an explicit 39around 100% to 120% (dual/single core CPU) compared to an explicit
40prepare_cached/execute/fetchrow_arrayref/finish combination. 40prepare_cached/execute/fetchrow_arrayref/finish combination.
41 41
42=head2 ERROR HANDLING 42=head2 ERROR HANDLING
43 43
44This module defines a number of functions that accept a callback 44This module defines a number of functions that accept a callback
55 55
56=cut 56=cut
57 57
58package AnyEvent::DBI; 58package AnyEvent::DBI;
59 59
60use strict qw(vars subs); 60use common::sense;
61no warnings;
62 61
63use Carp; 62use Carp;
64use Socket (); 63use Socket ();
65use Scalar::Util (); 64use Scalar::Util ();
66use Storable (); 65use Storable ();
67 66
68use DBI (); 67use DBI (); # only needed in child actually - do it before fork & !exec?
69 68
70use AnyEvent (); 69use AnyEvent ();
71use AnyEvent::Util (); 70use AnyEvent::Util ();
72 71
73use Errno (); 72use Errno ();
74use Fcntl (); 73use Fcntl ();
75use POSIX (); 74use POSIX ();
76 75
77our $VERSION = '2.0'; 76our $VERSION = '2.1';
78 77
79our $FD_MAX = eval { POSIX::sysconf (&POSIX::_SC_OPEN_MAX) - 1 } || 1023; 78our $FD_MAX = eval { POSIX::sysconf (&POSIX::_SC_OPEN_MAX) - 1 } || 1023;
80 79
81# this is the forked server code, could/should be bundled as it's own file 80# this is the forked server code, could/should be bundled as it's own file
82 81
286 285
287sub new { 286sub new {
288 my ($class, $dbi, $user, $pass, %arg) = @_; 287 my ($class, $dbi, $user, $pass, %arg) = @_;
289 288
290 my ($client, $server) = AnyEvent::Util::portable_socketpair 289 my ($client, $server) = AnyEvent::Util::portable_socketpair
291 or croak "unable to create Anyevent::DBI communications pipe: $!"; 290 or croak "unable to create AnyEvent::DBI communications pipe: $!";
292 291
293 my %dbi_args = %arg; 292 my %dbi_args = %arg;
294 delete @dbi_args{qw(on_connect on_error timeout exec_server)}; 293 delete @dbi_args{qw(on_connect on_error timeout exec_server)};
295 294
296 my $self = bless \%arg, $class; 295 my $self = bless \%arg, $class;
302 my @caller = (caller)[1,2]; # the "default" caller 301 my @caller = (caller)[1,2]; # the "default" caller
303 302
304 { 303 {
305 Scalar::Util::weaken (my $self = $self); 304 Scalar::Util::weaken (my $self = $self);
306 305
307 $self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub { 306 $self->{rw} = AE::io $client, 0, sub {
308 return unless $self; 307 return unless $self;
309 308
310 $self->{last_activity} = AnyEvent->now; 309 $self->{last_activity} = AE::now;
311 310
312 my $len = sysread $client, $rbuf, 65536, length $rbuf; 311 my $len = sysread $client, $rbuf, 65536, length $rbuf;
313 312
314 if ($len > 0) { 313 if ($len > 0) {
315 # we received data, so reset the timer 314 # we received data, so reset the timer
347 $self->_error ("unexpected eof", @caller, 1); 346 $self->_error ("unexpected eof", @caller, 1);
348 } elsif ($! != Errno::EAGAIN) { 347 } elsif ($! != Errno::EAGAIN) {
349 # todo, caller? 348 # todo, caller?
350 $self->_error ("read error: $!", @caller, 1); 349 $self->_error ("read error: $!", @caller, 1);
351 } 350 }
352 }); 351 };
353 352
354 $self->{tw_cb} = sub { 353 $self->{tw_cb} = sub {
355 if ($self->{timeout} && $self->{last_activity}) { 354 if ($self->{timeout} && $self->{last_activity}) {
356 if (AnyEvent->now > $self->{last_activity} + $self->{timeout}) { 355 if (AE::now > $self->{last_activity} + $self->{timeout}) {
357 # we did time out 356 # we did time out
358 my $req = $self->{queue}[0]; 357 my $req = $self->{queue}[0];
359 $self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal 358 $self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal
360 } else { 359 } else {
361 # we need to re-set the timeout watcher 360 # we need to re-set the timeout watcher
362 $self->{tw} = AnyEvent->timer ( 361 $self->{tw} = AE::timer
363 after => $self->{last_activity} + $self->{timeout} - AnyEvent->now, 362 $self->{last_activity} + $self->{timeout} - AE::now,
363 0,
364 cb => $self->{tw_cb}, 364 $self->{tw_cb},
365 ); 365 ;
366 Scalar::Util::weaken $self;
367 } 366 }
368 } else { 367 } else {
369 # no timeout check wanted, or idle 368 # no timeout check wanted, or idle
370 undef $self->{tw}; 369 undef $self->{tw};
371 } 370 }
372 }; 371 };
373 372
374 $self->{ww_cb} = sub { 373 $self->{ww_cb} = sub {
375 return unless $self; 374 return unless $self;
376 375
377 $self->{last_activity} = AnyEvent->now; 376 $self->{last_activity} = AE::now;
378 377
379 my $len = syswrite $client, $self->{wbuf} 378 my $len = syswrite $client, $self->{wbuf}
380 or return delete $self->{ww}; 379 or return delete $self->{ww};
381 380
382 substr $self->{wbuf}, 0, $len, ""; 381 substr $self->{wbuf}, 0, $len, "";
430sub _server_pid { 429sub _server_pid {
431 shift->{child_pid} 430 shift->{child_pid}
432} 431}
433 432
434sub kill_child { 433sub kill_child {
435 my $self = shift; 434 my $self = shift;
435
436 my $child_pid = delete $self->{child_pid}; 436 if (my $pid = delete $self->{child_pid}) {
437 if ($child_pid) { 437 kill TERM => $pid;
438 # send SIGKILL in two seconds 438 }
439 my $murder_timer = AnyEvent->timer (
440 after => 2,
441 cb => sub {
442 kill 9, $child_pid;
443 },
444 );
445
446 # reap process
447 my $kid_watcher; $kid_watcher = AnyEvent->child (
448 pid => $child_pid,
449 cb => sub {
450 # just hold on to this so it won't go away
451 undef $kid_watcher;
452 # cancel SIGKILL
453 undef $murder_timer;
454 },
455 );
456
457 close $self->{fh}; 439 close delete $self->{fh};
458 }
459} 440}
460 441
461sub DESTROY { 442sub DESTROY {
462 shift->kill_child; 443 shift->kill_child;
463} 444}
526 507
527 push @{ $self->{queue} }, [$cb, $filename, $line]; 508 push @{ $self->{queue} }, [$cb, $filename, $line];
528 509
529 # re-start timeout if necessary 510 # re-start timeout if necessary
530 if ($self->{timeout} && !$self->{tw}) { 511 if ($self->{timeout} && !$self->{tw}) {
531 $self->{last_activity} = AnyEvent->now; 512 $self->{last_activity} = AE::now;
532 $self->{tw_cb}->(); 513 $self->{tw_cb}->();
533 } 514 }
534 515
535 $self->{wbuf} .= pack "L/a*", Storable::freeze \@_; 516 $self->{wbuf} .= pack "L/a*", Storable::freeze \@_;
536 517
537 unless ($self->{ww}) { 518 unless ($self->{ww}) {
538 my $len = syswrite $self->{fh}, $self->{wbuf}; 519 my $len = syswrite $self->{fh}, $self->{wbuf};
539 substr $self->{wbuf}, 0, $len, ""; 520 substr $self->{wbuf}, 0, $len, "";
540 521
541 # still any left? then install a write watcher 522 # still any left? then install a write watcher
542 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb}) 523 $self->{ww} = AE::io $self->{fh}, 1, $self->{ww_cb}
543 if length $self->{wbuf}; 524 if length $self->{wbuf};
544 } 525 }
545} 526}
546 527
547=item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, $rv)) 528=item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, $rv))

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines