--- AnyEvent-DBI/DBI.pm 2008/07/21 02:34:40 1.7 +++ AnyEvent-DBI/DBI.pm 2018/04/19 04:35:26 1.21 @@ -11,7 +11,9 @@ my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", ""; $dbh->exec ("select * from test where num=?", 10, sub { - my ($rows, $rv) = @_; + my ($dbh, $rows, $rv) = @_; + + $#_ or die "failure: $@"; print "@$_\n" for @$rows; @@ -28,101 +30,47 @@ This module is an L user, you need to make sure that you use and run a supported event loop. -This module implements asynchronous DBI access my forking or executing +This module implements asynchronous DBI access by forking or executing separate "DBI-Server" processes and sending them requests. It means that you can run DBI requests in parallel to other tasks. -The overhead for very simple statements ("select 0") is somewhere -around 120% to 200% (dual/single core CPU) compared to an explicit -prepare_cached/execute/fetchrow_arrayref/finish combination. +With DBD::mysql, the overhead for very simple statements +("select 0") is somewhere around 50% compared to an explicit +prepare_cached/execute/fetchrow_arrayref/finish combination. With +DBD::SQlite3, it's more like a factor of 8 for this trivial statement. + +=head2 ERROR HANDLING + +This module defines a number of functions that accept a callback +argument. All callbacks used by this module get their AnyEvent::DBI handle +object passed as first argument. + +If the request was successful, then there will be more arguments, +otherwise there will only be the C<$dbh> argument and C<$@> contains an +error message. + +A convenient way to check whether an error occurred is to check C<$#_> - +if that is true, then the function was successful, otherwise there was an +error. =cut package AnyEvent::DBI; -use strict; -no warnings; +use common::sense; use Carp; -use Socket (); -use Scalar::Util (); -use Storable (); - -use DBI (); +use Convert::Scalar (); +use AnyEvent::Fork (); +use CBOR::XS (); use AnyEvent (); use AnyEvent::Util (); -our $VERSION = '1.1'; - -# this is the forked server code - -our $DBH; - -sub req_open { - my (undef, $dbi, $user, $pass, %attr) = @{+shift}; - - $DBH = DBI->connect ($dbi, $user, $pass, \%attr); - - [1] -} - -sub req_exec { - my (undef, $st, @args) = @{+shift}; - - my $sth = $DBH->prepare_cached ($st, undef, 1); - - my $rv = $sth->execute (@args) - or die $sth->errstr; - - [1, $sth->{NUM_OF_FIELDS} ? $sth->fetchall_arrayref : undef, { rv => $rv }] -} - -sub serve { - my ($fh) = @_; +use Errno (); - no strict; - - eval { - my $rbuf; - - while () { - sysread $fh, $rbuf, 16384, length $rbuf - or last; - - while () { - my $len = unpack "L", $rbuf; - - # full request available? - last unless $len && $len + 4 <= length $rbuf; - - my $req = Storable::thaw substr $rbuf, 4; - substr $rbuf, 0, $len + 4, ""; # remove length + request - - my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) }; - - $wbuf = pack "L/a*", Storable::freeze [undef, "$@"] - if $@; - - for (my $ofs = 0; $ofs < length $wbuf; ) { - $ofs += (syswrite $fh, substr $wbuf, $ofs - or die "unable to write results"); - } - } - } - }; - - if (AnyEvent::WIN32) { - kill 9, $$; # no other way on the broken windows platform - # and the above doesn't even work on windows, it seems the only - # way to is to leak memory and kill 9 from the parent. yay. - } - - require POSIX; - POSIX::_exit (0); - # and the above kills the parent process on windows -} +our $VERSION = '3.03'; =head2 METHODS @@ -153,163 +101,435 @@ is set to the error message. C<$filename> and C<$line> is where the original request was submitted. -If this callback returns and this was a fatal error (C<$fatal> is true) -then AnyEvent::DBI die's, otherwise it calls the original request callback -without any arguments. +If the fatal argument is true then the database connection is shut down +and your database handle became invalid. In addition to invoking the +C callback, all of your queued request callbacks are called +without only the C<$dbh> argument. If omitted, then C will be called on any errors, fatal or not. +=item on_connect => $callback->($dbh[, $success]) + +If you supply an C callback, then this callback will be +invoked after the database connect attempt. If the connection succeeds, +C<$success> is true, otherwise it is missing and C<$@> contains the +C<$DBI::errstr>. + +Regardless of whether C is supplied, connect errors will result in +C being called. However, if no C callback is supplied, then +connection errors are considered fatal. The client will C and the C +callback will be called with C<$fatal> true. + +When on_connect is supplied, connect error are not fatal and AnyEvent::DBI +will not C. You still cannot, however, use the $dbh object you +received from C to make requests. + +=item fork_template => $AnyEvent::Fork-object + +C uses C<< AnyEvent::Fork->new >> to create the database +slave, which in turn either C's a new process (similar to the old +C constructor argument) or uses a process forked early (see +L). + +With this argument you can provide your own fork template. This can be +useful if you create a lot of C handles and want to save +memory (And speed up startup) by not having to load C again +and again into your child processes: + + my $template = AnyEvent::Fork + ->new # create new template + ->require ("AnyEvent::DBI::Slave"); # preload AnyEvent::DBI::Slave module + + for (...) { + $dbh = new AnyEvent::DBI ... + fork_template => $template; + +=item timeout => seconds + +If you supply a timeout parameter (fractional values are supported), then +a timer is started any time the DBI handle expects a response from the +server. This includes connection setup as well as requests made to the +backend. The timeout spans the duration from the moment the first data +is written (or queued to be written) until all expected responses are +returned, but is postponed for "timeout" seconds each time more data is +returned from the server. If the timer ever goes off then a fatal error is +generated. If you have an C handler installed, then it will be +called, otherwise your program will die(). + +When altering your databases with timeouts it is wise to use +transactions. If you quit due to timeout while performing insert, update +or schema-altering commands you can end up not knowing if the action was +submitted to the database, complicating recovery. + +Timeout errors are always fatal. + =back -=cut +Any additional key-value pairs will be rolled into a hash reference +and passed as the final argument to the C<< DBI->connect (...) >> +call. For example, to suppress errors on STDERR and send them instead to an +AnyEvent::Handle you could do: -# stupid Storable autoloading, total loss-loss situation -Storable::thaw Storable::freeze []; + $dbh = new AnyEvent::DBI + "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "", + PrintError => 0, + on_error => sub { + $log_handle->push_write ("DBI Error: $@ at $_[1]:$_[2]\n"); + }; + +=cut sub new { my ($class, $dbi, $user, $pass, %arg) = @_; - socketpair my $client, my $server, &Socket::AF_UNIX, &Socket::SOCK_STREAM, &Socket::PF_UNSPEC - or croak "unable to create dbi communicaiton pipe: $!"; + # we use our own socketpair, so we always have a socket + # available, even before the forked process exsist. + # this is mostly done so this module is compatible + # to versions of itself older than 3.0. + my ($client, $server) = AnyEvent::Util::portable_socketpair + or croak "unable to create AnyEvent::DBI communications pipe: $!"; - my $self = bless \%arg, $class; + AnyEvent::fh_unblock $client; - $self->{fh} = $client; + my $fork = delete $arg{fork_template}; + + my %dbi_args = %arg; + delete @dbi_args{qw(on_connect on_error timeout fork_template exec_server)}; - Scalar::Util::weaken (my $wself = $self); + my $self = bless \%arg, $class; - AnyEvent::Util::fh_nonblocking $client, 1; + $self->{fh} = $client; my $rbuf; my @caller = (caller)[1,2]; # the "default" caller - $self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub { - my $len = sysread $client, $rbuf, 65536, length $rbuf; - - if ($len > 0) { - - while () { - my $len = unpack "L", $rbuf; - - # full request available? - last unless $len && $len + 4 <= length $rbuf; + $fork = $fork ? $fork->fork : AnyEvent::Fork->new + or croak "fork: $!"; - my $res = Storable::thaw substr $rbuf, 4; - substr $rbuf, 0, $len + 4, ""; # remove length + request + $fork->require ("AnyEvent::DBI::Slave"); + $fork->send_arg ($VERSION); + $fork->send_fh ($server); + + # we don't rely on the callback, because we use our own + # socketpair, for better or worse. + $fork->run ("AnyEvent::DBI::Slave::serve", sub { }); + + { + Convert::Scalar::weaken (my $self = $self); + + my $cbor = new CBOR::XS; + + $self->{rw} = AE::io $client, 0, sub { + my $len = Convert::Scalar::extend_read $client, $rbuf, 65536; + + if ($len > 0) { + # we received data, so reset the timer + $self->{last_activity} = AE::now; + + for my $res ($cbor->incr_parse_multiple ($rbuf)) { + last unless $self; + + my $req = shift @{ $self->{queue} }; + + if (defined $res->[0]) { + $res->[0] = $self; + $req->[0](@$res); + } else { + my $cb = shift @$req; + local $@ = $res->[1]; + $cb->($self); + $self->_error ($res->[1], @$req, $res->[2]) # error, request record, is_fatal + if $self; # cb() could have deleted it + } + + # no more queued requests, so become idle + if ($self && !@{ $self->{queue} }) { + undef $self->{last_activity}; + $self->{tw_cb}->(); + } + } - my $req = shift @{ $wself->{queue} }; + } elsif (defined $len) { + # todo, caller? + $self->_error ("unexpected eof", @caller, 1); + } elsif ($! != Errno::EAGAIN) { + # todo, caller? + $self->_error ("read error: $!", @caller, 1); + } + }; - if (defined $res->[0]) { - $req->[0](@$res); + $self->{tw_cb} = sub { + if ($self->{timeout} && $self->{last_activity}) { + if (AE::now > $self->{last_activity} + $self->{timeout}) { + # we did time out + my $req = $self->{queue}[0]; + $self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal } else { - my $cb = shift @$req; - $wself->_error ($res->[1], @$req); - $cb->(); + # we need to re-set the timeout watcher + $self->{tw} = AE::timer + $self->{last_activity} + $self->{timeout} - AE::now, + 0, + $self->{tw_cb}, + ; } + } else { + # no timeout check wanted, or idle + undef $self->{tw}; } + }; - } elsif (defined $len) { - $wself->_error ("unexpected eof", @caller, 1); - } else { - $wself->_error ("read error: $!", @caller, 1); - } - }); + $self->{ww_cb} = sub { + $self->{last_activity} = AE::now; - $self->{ww_cb} = sub { - my $len = syswrite $client, $wself->{wbuf} - or return delete $wself->{ww}; - - substr $wself->{wbuf}, 0, $len, ""; - }; - - my $pid = fork; - - if ($pid) { - # parent - close $server; - - } elsif (defined $pid) { - # child - close $client; - @_ = $server; - goto &serve; + my $len = syswrite $client, $self->{wbuf} + or return delete $self->{ww}; - } else { - croak "fork: $!"; + substr $self->{wbuf}, 0, $len, ""; + }; } - $self->_req (sub { }, (caller)[1,2], 1, req_open => $dbi, $user, $pass); + $self->_req ( + sub { + return unless $self; + $self->{child_pid} = $_[1]; + }, + (caller)[1,2], + "req_pid" + ); + + $self->_req ( + sub { + return unless $self; + &{ $self->{on_connect} } if $self->{on_connect}; + }, + (caller)[1,2], + req_open => $dbi, $user, $pass, %dbi_args + ); $self } -sub _error { - my ($self, $error, $filename, $line, $fatal) = @_; +sub _server_pid { + shift->{child_pid} +} + +sub kill_child { + my $self = shift; + + if (my $pid = delete $self->{child_pid}) { + # kill and reap process + my $kid_watcher; $kid_watcher = AE::child $pid, sub { + undef $kid_watcher; + }; + kill TERM => $pid; + } delete $self->{rw}; delete $self->{ww}; - delete $self->{fh}; + delete $self->{tw}; + close delete $self->{fh}; +} + +sub DESTROY { + shift->kill_child; +} + +sub _error { + my ($self, $error, $filename, $line, $fatal) = @_; - $@ = $error; + if ($fatal) { + delete $self->{tw}; + delete $self->{rw}; + delete $self->{ww}; + delete $self->{fh}; + + # for fatal errors call all enqueued callbacks with error + while (my $req = shift @{$self->{queue}}) { + local $@ = $error; + $req->[0]->($self); + } + $self->kill_child; + } - $self->{on_error}($self, $filename, $line, $fatal) - if $self->{on_error}; + local $@ = $error; - die "$error at $filename, line $line\n"; + if ($self->{on_error}) { + $self->{on_error}($self, $filename, $line, $fatal) + } else { + die "$error at $filename, line $line\n"; + } +} + +=item $dbh->on_error ($cb->($dbh, $filename, $line, $fatal)) + +Sets (or clears, with C) the C handler. + +=cut + +sub on_error { + $_[0]{on_error} = $_[1]; +} + +=item $dbh->timeout ($seconds) + +Sets (or clears, with C) the database timeout. Useful to extend the +timeout when you are about to make a really long query. + +=cut + +sub timeout { + my ($self, $timeout) = @_; + + $self->{timeout} = $timeout; + + # reschedule timer if one was running + $self->{tw_cb}->(); } sub _req { - my ($self, $cb, $filename, $line, $fatal) = splice @_, 0, 5, (); + my ($self, $cb, $filename, $line) = splice @_, 0, 4, (); - push @{ $self->{queue} }, [$cb, $filename, $line, $fatal]; + unless ($self->{fh}) { + local $@ = my $err = 'no database connection'; + $cb->($self); + $self->_error ($err, $filename, $line, 1); + return; + } - $self->{wbuf} .= pack "L/a*", Storable::freeze \@_; + push @{ $self->{queue} }, [$cb, $filename, $line]; + + # re-start timeout if necessary + if ($self->{timeout} && !$self->{tw}) { + $self->{last_activity} = AE::now; + $self->{tw_cb}->(); + } + + $self->{wbuf} .= CBOR::XS::encode_cbor \@_; unless ($self->{ww}) { my $len = syswrite $self->{fh}, $self->{wbuf}; substr $self->{wbuf}, 0, $len, ""; # still any left? then install a write watcher - $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb}) + $self->{ww} = AE::io $self->{fh}, 1, $self->{ww_cb} if length $self->{wbuf}; } } -=item $dbh->exec ("statement", @args, $cb->($rows, $rv, ...)) +=item $dbh->attr ($attr_name[, $attr_value], $cb->($dbh, $new_value)) + +An accessor for the database handle attributes, such as C, +C, C and so on. If you provide an C<$attr_value> +(which might be C), then the given attribute will be set to that +value. + +The callback will be passed the database handle and the attribute's value +if successful. + +If an error occurs and the C callback returns, then only C<$dbh> +will be passed and C<$@> contains the error message. + +=item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, $rv)) Executes the given SQL statement with placeholders replaced by C<@args>. The statement will be prepared and cached on the server side, so -using placeholders is compulsory. +using placeholders is extremely important. + +The callback will be called with a weakened AnyEvent::DBI object as the +first argument and the result of C as (or C +if the statement wasn't a select statement) as the second argument. -The callback will be called with the result of C as -first argument (or C if the statement wasn't a select statement) -and the return value of C as second argument. Additional -arguments might get passed as well. +Third argument is the return value from the C<< DBI->execute >> method +call. -If an error occurs and the C callback returns, then no arguments +If an error occurs and the C callback returns, then only C<$dbh> will be passed and C<$@> contains the error message. -=cut +=item $dbh->stattr ($attr_name, $cb->($dbh, $value)) + +An accessor for the statement attributes of the most recently executed +statement, such as C or C. + +The callback will be passed the database handle and the attribute's value +if successful. + +If an error occurs and the C callback returns, then only C<$dbh> +will be passed and C<$@> contains the error message. + +=item $dbh->begin_work ($cb->($dbh[, $rc])) + +=item $dbh->commit ($cb->($dbh[, $rc])) + +=item $dbh->rollback ($cb->($dbh[, $rc])) + +The begin_work, commit, and rollback methods expose the equivalent +transaction control method of the DBI driver. On success, C<$rc> is true. + +If an error occurs and the C callback returns, then only C<$dbh> +will be passed and C<$@> contains the error message. -sub exec { - my $cb = pop; - splice @_, 1, 0, $cb, (caller)[1,2], 0, "req_exec"; +=item $dbh->func ('string_which_yields_args_when_evaled', $func_name, $cb->($dbh, $rc, $dbi_err, $dbi_errstr)) - goto &_req; +This gives access to database driver private methods. Because they +are not standard you cannot always depend on the value of C<$rc> or +C<$dbi_err>. Check the documentation for your specific driver/function +combination to see what it returns. + +Note that the first argument will be eval'ed to produce the argument list to +the func() method. This must be done because the serialization protocol +between the AnyEvent::DBI server process and your program does not support the +passage of closures. + +Here's an example to extend the query language in SQLite so it supports an +intstr() function: + + $cv = AnyEvent->condvar; + $dbh->func ( + q{ + instr => 2, sub { + my ($string, $search) = @_; + return index $string, $search; + }, + }, + create_function => sub { + return $cv->send ($@) + unless $#_; + $cv->send (undef, @_[1,2,3]); + } + ); + + my ($err,$rc,$errcode,$errstr) = $cv->recv; + + die $err if defined $err; + die "EVAL failed: $errstr" + if $errcode; + + # otherwise, we can ignore $rc and $errcode for this particular func + +=cut + +for my $cmd_name (qw(attr exec stattr begin_work commit rollback func)) { + eval 'sub ' . $cmd_name . '{ + my $cb = pop; + splice @_, 1, 0, $cb, (caller)[1,2], "req_' . $cmd_name . '"; + &_req + }'; } =back =head1 SEE ALSO -L, L. +L, L, L. -=head1 AUTHOR +=head1 AUTHOR AND CONTACT - Marc Lehmann + Marc Lehmann (current maintainer) http://home.schmorp.de/ + Adam Rosenstein + http://www.redcondor.com/ + =cut 1 -