--- AnyEvent-DBI/DBI.pm 2010/10/30 20:23:44 1.14 +++ AnyEvent-DBI/DBI.pm 2017/08/27 09:54:25 1.18 @@ -35,9 +35,10 @@ It means that you can run DBI requests in parallel to other tasks. -The overhead for very simple statements ("select 0") is somewhere -around 100% to 120% (dual/single core CPU) compared to an explicit -prepare_cached/execute/fetchrow_arrayref/finish combination. +With DBD::mysql, the overhead for very simple statements +("select 0") is somewhere around 50% compared to an explicit +prepare_cached/execute/fetchrow_arrayref/finish combination. With +DBD::SQlite3, it's more like a factor of 8 for this trivial statement. =head2 ERROR HANDLING @@ -49,7 +50,7 @@ otherwise there will only be the C<$dbh> argument and C<$@> contains an error message. -A convinient way to check whether an error occured is to check C<$#_> - +A convenient way to check whether an error occurred is to check C<$#_> - if that is true, then the function was successful, otherwise there was an error. @@ -60,124 +61,16 @@ use common::sense; use Carp; -use Socket (); -use Scalar::Util (); -use Storable (); - -use DBI (); # only needed in child actually - do it before fork & !exec? +use Convert::Scalar (); +use AnyEvent::Fork (); +use CBOR::XS (); use AnyEvent (); use AnyEvent::Util (); use Errno (); -use Fcntl (); -use POSIX (); - -our $VERSION = '2.1'; - -our $FD_MAX = eval { POSIX::sysconf (&POSIX::_SC_OPEN_MAX) - 1 } || 1023; - -# this is the forked server code, could/should be bundled as it's own file - -our $DBH; - -sub req_open { - my (undef, $dbi, $user, $pass, %attr) = @{+shift}; - - $DBH = DBI->connect ($dbi, $user, $pass, \%attr) or die $DBI::errstr; - - [1, 1] -} - -sub req_exec { - my (undef, $st, @args) = @{+shift}; - my $sth = $DBH->prepare_cached ($st, undef, 1) - or die [$DBI::errstr]; - - my $rv = $sth->execute (@args) - or die [$sth->errstr]; - - [1, $sth->{NUM_OF_FIELDS} ? $sth->fetchall_arrayref : undef, $rv] -} - -sub req_attr { - my (undef, $attr_name, @attr_val) = @{+shift}; - - $DBH->{$attr_name} = $attr_val[0] - if @attr_val; - - [1, $DBH->{$attr_name}] -} - -sub req_begin_work { - [1, $DBH->begin_work or die [$DBI::errstr]] -} - -sub req_commit { - [1, $DBH->commit or die [$DBI::errstr]] -} - -sub req_rollback { - [1, $DBH->rollback or die [$DBI::errstr]] -} -sub req_func { - my (undef, $arg_string, $function) = @{+shift}; - my @args = eval $arg_string; - - die "error evaling \$dbh->func() arg_string: $@" - if $@; - - my $rc = $DBH->func (@args, $function); - return [1, $rc, $DBI::err, $DBI::errstr]; -} - -sub serve_fh($$) { - my ($fh, $version) = @_; - - if ($VERSION != $version) { - syswrite $fh, - pack "L/a*", - Storable::freeze - [undef, "AnyEvent::DBI version mismatch ($VERSION vs. $version)"]; - return; - } - - eval { - my $rbuf; - - while () { - sysread $fh, $rbuf, 16384, length $rbuf - or last; - - while () { - my $len = unpack "L", $rbuf; - - # full request available? - last unless $len && $len + 4 <= length $rbuf; - - my $req = Storable::thaw substr $rbuf, 4; - substr $rbuf, 0, $len + 4, ""; # remove length + request - - my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) }; - $wbuf = pack "L/a*", Storable::freeze [undef, ref $@ ? ("$@->[0]", $@->[1]) : ("$@", 1)] - if $@; - - for (my $ofs = 0; $ofs < length $wbuf; ) { - $ofs += (syswrite $fh, substr $wbuf, $ofs - or die "unable to write results"); - } - } - } - }; -} - -sub serve_fd($$) { - open my $fh, ">>&=$_[0]" - or die "Couldn't open server file descriptor: $!"; - - serve_fh $fh, $_[1]; -} +our $VERSION = '3.01'; =head2 METHODS @@ -231,19 +124,25 @@ will not C. You still cannot, however, use the $dbh object you received from C to make requests. -=item exec_server => 1 +=item fork_template => $AnyEvent::Fork-object -If you supply an C argument, then the DBI server process will -fork and exec another perl interpreter (using C<$^X>) with just the -AnyEvent::DBI proxy running. This will provide the cleanest possible proxy -for your database server. - -If you do not supply the C argument (or supply it with a -false value) then the traditional method of starting the server by forking -the current process is used. The forked interpreter will try to clean -itself up by calling POSIX::close on all file descriptors except STDIN, -STDOUT, and STDERR (and the socket it uses to communicate with the cilent, -of course). +C uses C<< AnyEvent::Fork->new >> to create the database +slave, which in turn either C's a new process (similar to the old +C constructor argument) or uses a process forked early (see +L). + +With this argument you can provide your own fork template. This can be +useful if you create a lot of C handles and want to save +memory (And speed up startup) by not having to load C again +and again into your child processes: + + my $template = AnyEvent::Fork + ->new # create new template + ->require ("AnyEvent::DBI::Slave"); # preload AnyEvent::DBI::Slave module + + for (...) { + $dbh = new AnyEvent::DBI ... + fork_template => $template; =item timeout => seconds @@ -268,7 +167,7 @@ Any additional key-value pairs will be rolled into a hash reference and passed as the final argument to the C<< DBI->connect (...) >> -call. For example, to supress errors on STDERR and send them instead to an +call. For example, to suppress errors on STDERR and send them instead to an AnyEvent::Handle you could do: $dbh = new AnyEvent::DBI @@ -280,49 +179,54 @@ =cut -# stupid Storable autoloading, total loss-loss situation -Storable::thaw Storable::freeze []; - sub new { my ($class, $dbi, $user, $pass, %arg) = @_; + # we use our own socketpair, so we always have a socket + # available, even before the forked process exsist. + # this is mostly done so this module is compatible + # to versions of itself older than 3.0. my ($client, $server) = AnyEvent::Util::portable_socketpair or croak "unable to create AnyEvent::DBI communications pipe: $!"; + my $fork = delete $arg{fork_template}; + my %dbi_args = %arg; - delete @dbi_args{qw(on_connect on_error timeout exec_server)}; + delete @dbi_args{qw(on_connect on_error timeout fork_template exec_server)}; my $self = bless \%arg, $class; - $self->{fh} = $client; - AnyEvent::Util::fh_nonblocking $client, 1; + $self->{fh} = $client; my $rbuf; my @caller = (caller)[1,2]; # the "default" caller - { - Scalar::Util::weaken (my $self = $self); + $fork = $fork ? $fork->fork : AnyEvent::Fork->new + or croak "fork: $!"; - $self->{rw} = AE::io $client, 0, sub { - return unless $self; + $fork->require ("AnyEvent::DBI::Slave"); + $fork->send_arg ($VERSION); + $fork->send_fh ($server); + + # we don't rely on the callback, because we use our own + # socketpair, for better or worse. + $fork->run ("AnyEvent::DBI::Slave::serve", sub { }); - $self->{last_activity} = AE::now; + { + Convert::Scalar::weaken (my $self = $self); + + my $cbor = new CBOR::XS; - my $len = sysread $client, $rbuf, 65536, length $rbuf; + $self->{rw} = AE::io $client, 0, sub { + my $len = Convert::Scalar::extend_read $client, $rbuf, 65536; if ($len > 0) { # we received data, so reset the timer + $self->{last_activity} = AE::now; - while () { - my $len = unpack "L", $rbuf; - - # full response available? - last unless $len && $len + 4 <= length $rbuf; - - my $res = Storable::thaw substr $rbuf, 4; - substr $rbuf, 0, $len + 4, ""; # remove length + request - + for my $res ($cbor->incr_parse_multiple ($rbuf)) { last unless $self; + my $req = shift @{ $self->{queue} }; if (defined $res->[0]) { @@ -337,8 +241,10 @@ } # no more queued requests, so become idle - undef $self->{last_activity} - if $self && !@{ $self->{queue} }; + if ($self && !@{ $self->{queue} }) { + undef $self->{last_activity}; + $self->{tw_cb}->(); + } } } elsif (defined $len) { @@ -371,8 +277,6 @@ }; $self->{ww_cb} = sub { - return unless $self; - $self->{last_activity} = AE::now; my $len = syswrite $client, $self->{wbuf} @@ -382,43 +286,20 @@ }; } - my $pid = fork; - - if ($pid) { - # parent - close $server; - } elsif (defined $pid) { - # child - my $serv_fno = fileno $server; - - if ($self->{exec_server}) { - fcntl $server, &Fcntl::F_SETFD, 0; # don't close the server side - exec {$^X} - "$0 dbi slave", - -e => "require shift; AnyEvent::DBI::serve_fd ($serv_fno, $VERSION)", - $INC{"AnyEvent/DBI.pm"}; - POSIX::_exit 124; - } else { - ($_ != $serv_fno) && POSIX::close $_ - for $^F+1..$FD_MAX; - serve_fh $server, $VERSION; - - # no other way on the broken windows platform, even this leaks - # memory and might fail. - kill 9, $$ - if AnyEvent::WIN32; - - # and this kills the parent process on windows - POSIX::_exit 0; - } - } else { - croak "fork: $!"; - } - - $self->{child_pid} = $pid; + $self->_req ( + sub { + return unless $self; + $self->{child_pid} = $_[1]; + }, + (caller)[1,2], + "req_pid" + ); $self->_req ( - ($self->{on_connect} ? $self->{on_connect} : sub { }), + sub { + return unless $self; + &{ $self->{on_connect} } if $self->{on_connect}; + }, (caller)[1,2], req_open => $dbi, $user, $pass, %dbi_args ); @@ -434,8 +315,16 @@ my $self = shift; if (my $pid = delete $self->{child_pid}) { + # kill and reap process + my $kid_watcher; $kid_watcher = AE::child $pid, sub { + undef $kid_watcher; + }; kill TERM => $pid; } + + delete $self->{rw}; + delete $self->{ww}; + delete $self->{tw}; close delete $self->{fh}; } @@ -513,7 +402,7 @@ $self->{tw_cb}->(); } - $self->{wbuf} .= pack "L/a*", Storable::freeze \@_; + $self->{wbuf} .= CBOR::XS::encode_cbor \@_; unless ($self->{ww}) { my $len = syswrite $self->{fh}, $self->{wbuf}; @@ -525,6 +414,19 @@ } } +=item $dbh->attr ($attr_name[, $attr_value], $cb->($dbh, $new_value)) + +An accessor for the database handle attributes, such as C, +C, C and so on. If you provide an C<$attr_value> +(which might be C), then the given attribute will be set to that +value. + +The callback will be passed the database handle and the attribute's value +if successful. + +If an error occurs and the C callback returns, then only C<$dbh> +will be passed and C<$@> contains the error message. + =item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, $rv)) Executes the given SQL statement with placeholders replaced by @@ -541,12 +443,10 @@ If an error occurs and the C callback returns, then only C<$dbh> will be passed and C<$@> contains the error message. -=item $dbh->attr ($attr_name[, $attr_value], $cb->($dbh, $new_value)) +=item $dbh->stattr ($attr_name, $cb->($dbh, $value)) -An accessor for the handle attributes, such as C, -C, C and so on. If you provide an C<$attr_value> -(which might be C), then the given attribute will be set to that -value. +An accessor for the statement attributes of the most recently executed +statement, such as C or C. The callback will be passed the database handle and the attribute's value if successful. @@ -606,7 +506,7 @@ =cut -for my $cmd_name (qw(exec attr begin_work commit rollback func)) { +for my $cmd_name (qw(attr exec stattr begin_work commit rollback func)) { eval 'sub ' . $cmd_name . '{ my $cb = pop; splice @_, 1, 0, $cb, (caller)[1,2], "req_' . $cmd_name . '"; @@ -620,9 +520,9 @@ L, L, L. -=head1 AUTHOR +=head1 AUTHOR AND CONTACT - Marc Lehmann + Marc Lehmann (current maintainer) http://home.schmorp.de/ Adam Rosenstein @@ -630,5 +530,4 @@ =cut -1; - +1