ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
(Generate patch)

Comparing AnyEvent-DBI/DBI.pm (file contents):
Revision 1.11 by root, Sun Jun 28 14:59:51 2009 UTC vs.
Revision 1.14 by root, Sat Oct 30 20:23:44 2010 UTC

11 my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", ""; 11 my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", "";
12 12
13 $dbh->exec ("select * from test where num=?", 10, sub { 13 $dbh->exec ("select * from test where num=?", 10, sub {
14 my ($dbh, $rows, $rv) = @_; 14 my ($dbh, $rows, $rv) = @_;
15 15
16 $rows or die "failure: $@"; 16 $#_ or die "failure: $@";
17 17
18 print "@$_\n" 18 print "@$_\n"
19 for @$rows; 19 for @$rows;
20 20
21 $cv->broadcast; 21 $cv->broadcast;
34separate "DBI-Server" processes and sending them requests. 34separate "DBI-Server" processes and sending them requests.
35 35
36It means that you can run DBI requests in parallel to other tasks. 36It means that you can run DBI requests in parallel to other tasks.
37 37
38The overhead for very simple statements ("select 0") is somewhere 38The overhead for very simple statements ("select 0") is somewhere
39around 120% to 200% (dual/single core CPU) compared to an explicit 39around 100% to 120% (dual/single core CPU) compared to an explicit
40prepare_cached/execute/fetchrow_arrayref/finish combination. 40prepare_cached/execute/fetchrow_arrayref/finish combination.
41 41
42=head2 ERROR HANDLING 42=head2 ERROR HANDLING
43 43
44This module defines a number of functions that accept a callback 44This module defines a number of functions that accept a callback
55 55
56=cut 56=cut
57 57
58package AnyEvent::DBI; 58package AnyEvent::DBI;
59 59
60use strict qw(vars subs); 60use common::sense;
61no warnings;
62 61
63use Carp; 62use Carp;
64use Socket (); 63use Socket ();
65use Scalar::Util (); 64use Scalar::Util ();
66use Storable (); 65use Storable ();
67 66
68use DBI (); 67use DBI (); # only needed in child actually - do it before fork & !exec?
69 68
70use AnyEvent (); 69use AnyEvent ();
71use AnyEvent::Util (); 70use AnyEvent::Util ();
72 71
73use Errno (); 72use Errno ();
74use Fcntl (); 73use Fcntl ();
75use POSIX (); 74use POSIX ();
76 75
77our $VERSION = '1.19'; 76our $VERSION = '2.1';
78 77
79our $FD_MAX = eval { POSIX::sysconf (&POSIX::_SC_OPEN_MAX) - 1 } || 1023; 78our $FD_MAX = eval { POSIX::sysconf (&POSIX::_SC_OPEN_MAX) - 1 } || 1023;
80 79
81# this is the forked server code, could/should be bundled as it's own file 80# this is the forked server code, could/should be bundled as it's own file
82 81
124 123
125sub req_func { 124sub req_func {
126 my (undef, $arg_string, $function) = @{+shift}; 125 my (undef, $arg_string, $function) = @{+shift};
127 my @args = eval $arg_string; 126 my @args = eval $arg_string;
128 127
129 die "Bad func () arg string: $@" 128 die "error evaling \$dbh->func() arg_string: $@"
130 if $@; 129 if $@;
131 130
132 my $rv = $DBH->func (@args, $function); 131 my $rc = $DBH->func (@args, $function);
133 return [$rv, $DBH->err]; 132 return [1, $rc, $DBI::err, $DBI::errstr];
134} 133}
135 134
136sub serve_fh($$) { 135sub serve_fh($$) {
137 my ($fh, $version) = @_; 136 my ($fh, $version) = @_;
138 137
234 233
235=item exec_server => 1 234=item exec_server => 1
236 235
237If you supply an C<exec_server> argument, then the DBI server process will 236If you supply an C<exec_server> argument, then the DBI server process will
238fork and exec another perl interpreter (using C<$^X>) with just the 237fork and exec another perl interpreter (using C<$^X>) with just the
239AnyEvent::DBI proxy running. This will provide the cleanest possible porxy 238AnyEvent::DBI proxy running. This will provide the cleanest possible proxy
240for your database server. 239for your database server.
241 240
242If you do not supply the C<exec_server> argument (or supply it with a 241If you do not supply the C<exec_server> argument (or supply it with a
243false value) then the traditional method of starting the server by forking 242false value) then the traditional method of starting the server by forking
244the current process is used. The forked interpreter will try to clean 243the current process is used. The forked interpreter will try to clean
286 285
287sub new { 286sub new {
288 my ($class, $dbi, $user, $pass, %arg) = @_; 287 my ($class, $dbi, $user, $pass, %arg) = @_;
289 288
290 my ($client, $server) = AnyEvent::Util::portable_socketpair 289 my ($client, $server) = AnyEvent::Util::portable_socketpair
291 or croak "unable to create Anyevent::DBI communications pipe: $!"; 290 or croak "unable to create AnyEvent::DBI communications pipe: $!";
292 291
293 my %dbi_args = %arg; 292 my %dbi_args = %arg;
294 delete @dbi_args{qw(on_connect on_error timeout exec_server)}; 293 delete @dbi_args{qw(on_connect on_error timeout exec_server)};
295 294
296 my $self = bless \%arg, $class; 295 my $self = bless \%arg, $class;
302 my @caller = (caller)[1,2]; # the "default" caller 301 my @caller = (caller)[1,2]; # the "default" caller
303 302
304 { 303 {
305 Scalar::Util::weaken (my $self = $self); 304 Scalar::Util::weaken (my $self = $self);
306 305
307 $self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub { 306 $self->{rw} = AE::io $client, 0, sub {
308 return unless $self; 307 return unless $self;
309 308
310 $self->{last_activity} = AnyEvent->now; 309 $self->{last_activity} = AE::now;
311 310
312 my $len = sysread $client, $rbuf, 65536, length $rbuf; 311 my $len = sysread $client, $rbuf, 65536, length $rbuf;
313 312
314 if ($len > 0) { 313 if ($len > 0) {
315 # we received data, so reset the timer 314 # we received data, so reset the timer
347 $self->_error ("unexpected eof", @caller, 1); 346 $self->_error ("unexpected eof", @caller, 1);
348 } elsif ($! != Errno::EAGAIN) { 347 } elsif ($! != Errno::EAGAIN) {
349 # todo, caller? 348 # todo, caller?
350 $self->_error ("read error: $!", @caller, 1); 349 $self->_error ("read error: $!", @caller, 1);
351 } 350 }
352 }); 351 };
353 352
354 $self->{tw_cb} = sub { 353 $self->{tw_cb} = sub {
355 if ($self->{timeout} && $self->{last_activity}) { 354 if ($self->{timeout} && $self->{last_activity}) {
356 if (AnyEvent->now > $self->{last_activity} + $self->{timeout}) { 355 if (AE::now > $self->{last_activity} + $self->{timeout}) {
357 # we did time out 356 # we did time out
358 my $req = $self->{queue}[0]; 357 my $req = $self->{queue}[0];
359 $self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal 358 $self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal
360 } else { 359 } else {
361 # we need to re-set the timeout watcher 360 # we need to re-set the timeout watcher
362 $self->{tw} = AnyEvent->timer ( 361 $self->{tw} = AE::timer
363 after => $self->{last_activity} + $self->{timeout} - AnyEvent->now, 362 $self->{last_activity} + $self->{timeout} - AE::now,
363 0,
364 cb => $self->{tw_cb}, 364 $self->{tw_cb},
365 ); 365 ;
366 Scalar::Util::weaken $self;
367 } 366 }
368 } else { 367 } else {
369 # no timeout check wanted, or idle 368 # no timeout check wanted, or idle
370 undef $self->{tw}; 369 undef $self->{tw};
371 } 370 }
372 }; 371 };
373 372
374 $self->{ww_cb} = sub { 373 $self->{ww_cb} = sub {
375 return unless $self; 374 return unless $self;
376 375
377 $self->{last_activity} = AnyEvent->now; 376 $self->{last_activity} = AE::now;
378 377
379 my $len = syswrite $client, $self->{wbuf} 378 my $len = syswrite $client, $self->{wbuf}
380 or return delete $self->{ww}; 379 or return delete $self->{ww};
381 380
382 substr $self->{wbuf}, 0, $len, ""; 381 substr $self->{wbuf}, 0, $len, "";
430sub _server_pid { 429sub _server_pid {
431 shift->{child_pid} 430 shift->{child_pid}
432} 431}
433 432
434sub kill_child { 433sub kill_child {
435 my $self = shift; 434 my $self = shift;
435
436 my $child_pid = delete $self->{child_pid}; 436 if (my $pid = delete $self->{child_pid}) {
437 if ($child_pid) { 437 kill TERM => $pid;
438 # send SIGKILL in two seconds 438 }
439 my $murder_timer = AnyEvent->timer (
440 after => 2,
441 cb => sub {
442 kill 9, $child_pid;
443 },
444 );
445
446 # reap process
447 my $kid_watcher; $kid_watcher = AnyEvent->child (
448 pid => $child_pid,
449 cb => sub {
450 # just hold on to this so it won't go away
451 undef $kid_watcher;
452 # cancel SIGKILL
453 undef $murder_timer;
454 },
455 );
456
457 close $self->{fh}; 439 close delete $self->{fh};
458 }
459} 440}
460 441
461sub DESTROY { 442sub DESTROY {
462 shift->kill_child; 443 shift->kill_child;
463} 444}
526 507
527 push @{ $self->{queue} }, [$cb, $filename, $line]; 508 push @{ $self->{queue} }, [$cb, $filename, $line];
528 509
529 # re-start timeout if necessary 510 # re-start timeout if necessary
530 if ($self->{timeout} && !$self->{tw}) { 511 if ($self->{timeout} && !$self->{tw}) {
531 $self->{last_activity} = AnyEvent->now; 512 $self->{last_activity} = AE::now;
532 $self->{tw_cb}->(); 513 $self->{tw_cb}->();
533 } 514 }
534 515
535 $self->{wbuf} .= pack "L/a*", Storable::freeze \@_; 516 $self->{wbuf} .= pack "L/a*", Storable::freeze \@_;
536 517
537 unless ($self->{ww}) { 518 unless ($self->{ww}) {
538 my $len = syswrite $self->{fh}, $self->{wbuf}; 519 my $len = syswrite $self->{fh}, $self->{wbuf};
539 substr $self->{wbuf}, 0, $len, ""; 520 substr $self->{wbuf}, 0, $len, "";
540 521
541 # still any left? then install a write watcher 522 # still any left? then install a write watcher
542 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb}) 523 $self->{ww} = AE::io $self->{fh}, 1, $self->{ww_cb}
543 if length $self->{wbuf}; 524 if length $self->{wbuf};
544 } 525 }
545} 526}
546 527
547=item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, $rv)) 528=item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, $rv))
571if successful. 552if successful.
572 553
573If an error occurs and the C<on_error> callback returns, then only C<$dbh> 554If an error occurs and the C<on_error> callback returns, then only C<$dbh>
574will be passed and C<$@> contains the error message. 555will be passed and C<$@> contains the error message.
575 556
576=item $dbh->begin_work ($cb->($dbh[, $success])) 557=item $dbh->begin_work ($cb->($dbh[, $rc]))
577 558
578=item $dbh->commit ($cb->($dbh[, $success])) 559=item $dbh->commit ($cb->($dbh[, $rc]))
579 560
580=item $dbh->rollback ($cb->($dbh[, $success])) 561=item $dbh->rollback ($cb->($dbh[, $rc]))
581 562
582The begin_work, commit, and rollback methods expose the equivalent 563The begin_work, commit, and rollback methods expose the equivalent
583transaction control method of the DBI driver. On success, C<$success> 564transaction control method of the DBI driver. On success, C<$rc> is true.
584is true.
585 565
586If an error occurs and the C<on_error> callback returns, then only C<$dbh> 566If an error occurs and the C<on_error> callback returns, then only C<$dbh>
587will be passed and C<$@> contains the error message. 567will be passed and C<$@> contains the error message.
588 568
589=item $dbh->func ('string_which_yields_args_when_evaled', $func_name, $cb->($dbh, $result, $handle_error)) 569=item $dbh->func ('string_which_yields_args_when_evaled', $func_name, $cb->($dbh, $rc, $dbi_err, $dbi_errstr))
590 570
591This gives access to database driver private methods. Because they 571This gives access to database driver private methods. Because they
592are not standard you cannot always depend on the value of C<$result> 572are not standard you cannot always depend on the value of C<$rc> or
593or C<$handle_error>. Check the documentation for your specific 573C<$dbi_err>. Check the documentation for your specific driver/function
594driver/function combination to see what it returns. 574combination to see what it returns.
595 575
596Note that the first argument will be eval'ed to produce the argument list to 576Note that the first argument will be eval'ed to produce the argument list to
597the func() method. This must be done because the serialization protocol 577the func() method. This must be done because the serialization protocol
598between the AnyEvent::DBI server process and your program does not support the 578between the AnyEvent::DBI server process and your program does not support the
599passage of closures. 579passage of closures.
608 my ($string, $search) = @_; 588 my ($string, $search) = @_;
609 return index $string, $search; 589 return index $string, $search;
610 }, 590 },
611 }, 591 },
612 create_function => sub { 592 create_function => sub {
613 return $cv->send($@) 593 return $cv->send ($@)
614 unless $_[0]; 594 unless $#_;
615 $cv->send (undef, @_[1,2]); 595 $cv->send (undef, @_[1,2,3]);
616 } 596 }
617 ); 597 );
618 598
619 my ($err,$result,$handle_err) = $cv->recv; 599 my ($err,$rc,$errcode,$errstr) = $cv->recv;
620 600
601 die $err if defined $err;
621 die "EVAL failed: $err" 602 die "EVAL failed: $errstr"
622 if $err; 603 if $errcode;
623 604
624 # otherwise, we can ignore $result and $handle_err for this particular func 605 # otherwise, we can ignore $rc and $errcode for this particular func
625 606
626=cut 607=cut
627 608
628for my $cmd_name (qw(exec attr begin_work commit rollback func)) { 609for my $cmd_name (qw(exec attr begin_work commit rollback func)) {
629 eval 'sub ' . $cmd_name . '{ 610 eval 'sub ' . $cmd_name . '{

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines