--- AnyEvent-DBI/DBI.pm 2008/06/12 11:56:59 1.6 +++ AnyEvent-DBI/DBI.pm 2009/06/29 08:25:05 1.12 @@ -11,7 +11,9 @@ my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", ""; $dbh->exec ("select * from test where num=?", 10, sub { - my ($rows) = @_; + my ($dbh, $rows, $rv) = @_; + + $#_ or die "failure: $@"; print "@$_\n" for @$rows; @@ -28,7 +30,7 @@ This module is an L user, you need to make sure that you use and run a supported event loop. -This module implements asynchronous DBI access my forking or executing +This module implements asynchronous DBI access by forking or executing separate "DBI-Server" processes and sending them requests. It means that you can run DBI requests in parallel to other tasks. @@ -37,11 +39,25 @@ around 120% to 200% (dual/single core CPU) compared to an explicit prepare_cached/execute/fetchrow_arrayref/finish combination. +=head2 ERROR HANDLING + +This module defines a number of functions that accept a callback +argument. All callbacks used by this module get their AnyEvent::DBI handle +object passed as first argument. + +If the request was successful, then there will be more arguments, +otherwise there will only be the C<$dbh> argument and C<$@> contains an +error message. + +A convinient way to check whether an error occured is to check C<$#_> - +if that is true, then the function was successful, otherwise there was an +error. + =cut package AnyEvent::DBI; -use strict; +use strict qw(vars subs); no warnings; use Carp; @@ -54,35 +70,79 @@ use AnyEvent (); use AnyEvent::Util (); -our $VERSION = '1.0'; +use Errno (); +use Fcntl (); +use POSIX (); + +our $VERSION = '2.0'; -# this is the forked server code +our $FD_MAX = eval { POSIX::sysconf (&POSIX::_SC_OPEN_MAX) - 1 } || 1023; + +# this is the forked server code, could/should be bundled as it's own file our $DBH; sub req_open { my (undef, $dbi, $user, $pass, %attr) = @{+shift}; - $DBH = DBI->connect ($dbi, $user, $pass, \%attr); + $DBH = DBI->connect ($dbi, $user, $pass, \%attr) or die $DBI::errstr; - [1] + [1, 1] } sub req_exec { my (undef, $st, @args) = @{+shift}; + my $sth = $DBH->prepare_cached ($st, undef, 1) + or die [$DBI::errstr]; + + my $rv = $sth->execute (@args) + or die [$sth->errstr]; + + [1, $sth->{NUM_OF_FIELDS} ? $sth->fetchall_arrayref : undef, $rv] +} + +sub req_attr { + my (undef, $attr_name, @attr_val) = @{+shift}; + + $DBH->{$attr_name} = $attr_val[0] + if @attr_val; + + [1, $DBH->{$attr_name}] +} + +sub req_begin_work { + [1, $DBH->begin_work or die [$DBI::errstr]] +} + +sub req_commit { + [1, $DBH->commit or die [$DBI::errstr]] +} - my $sth = $DBH->prepare_cached ($st, undef, 1); +sub req_rollback { + [1, $DBH->rollback or die [$DBI::errstr]] +} - $sth->execute (@args) - or die $sth->errstr; +sub req_func { + my (undef, $arg_string, $function) = @{+shift}; + my @args = eval $arg_string; - [$sth->fetchall_arrayref] + die "error evaling \$dbh->func() arg_string: $@" + if $@; + + my $rc = $DBH->func (@args, $function); + return [1, $rc, $DBI::err, $DBI::errstr]; } -sub serve { - my ($fh) = @_; +sub serve_fh($$) { + my ($fh, $version) = @_; - no strict; + if ($VERSION != $version) { + syswrite $fh, + pack "L/a*", + Storable::freeze + [undef, "AnyEvent::DBI version mismatch ($VERSION vs. $version)"]; + return; + } eval { my $rbuf; @@ -101,8 +161,7 @@ substr $rbuf, 0, $len + 4, ""; # remove length + request my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) }; - - $wbuf = pack "L/a*", Storable::freeze [undef, "$@"] + $wbuf = pack "L/a*", Storable::freeze [undef, ref $@ ? ("$@->[0]", $@->[1]) : ("$@", 1)] if $@; for (my $ofs = 0; $ofs < length $wbuf; ) { @@ -112,8 +171,13 @@ } } }; +} + +sub serve_fd($$) { + open my $fh, ">>&=$_[0]" + or die "Couldn't open server file descriptor: $!"; - kill 9, $$; # no other way on the broken windows platform + serve_fh $fh, $_[1]; } =head2 METHODS @@ -145,14 +209,76 @@ is set to the error message. C<$filename> and C<$line> is where the original request was submitted. -If this callback returns and this was a fatal error (C<$fatal> is true) -then AnyEvent::DBI die's, otherwise it calls the original request callback -without any arguments. +If the fatal argument is true then the database connection is shut down +and your database handle became invalid. In addition to invoking the +C callback, all of your queued request callbacks are called +without only the C<$dbh> argument. If omitted, then C will be called on any errors, fatal or not. +=item on_connect => $callback->($dbh[, $success]) + +If you supply an C callback, then this callback will be +invoked after the database connect attempt. If the connection succeeds, +C<$success> is true, otherwise it is missing and C<$@> contains the +C<$DBI::errstr>. + +Regardless of whether C is supplied, connect errors will result in +C being called. However, if no C callback is supplied, then +connection errors are considered fatal. The client will C and the C +callback will be called with C<$fatal> true. + +When on_connect is supplied, connect error are not fatal and AnyEvent::DBI +will not C. You still cannot, however, use the $dbh object you +received from C to make requests. + +=item exec_server => 1 + +If you supply an C argument, then the DBI server process will +fork and exec another perl interpreter (using C<$^X>) with just the +AnyEvent::DBI proxy running. This will provide the cleanest possible porxy +for your database server. + +If you do not supply the C argument (or supply it with a +false value) then the traditional method of starting the server by forking +the current process is used. The forked interpreter will try to clean +itself up by calling POSIX::close on all file descriptors except STDIN, +STDOUT, and STDERR (and the socket it uses to communicate with the cilent, +of course). + +=item timeout => seconds + +If you supply a timeout parameter (fractional values are supported), then +a timer is started any time the DBI handle expects a response from the +server. This includes connection setup as well as requests made to the +backend. The timeout spans the duration from the moment the first data +is written (or queued to be written) until all expected responses are +returned, but is postponed for "timeout" seconds each time more data is +returned from the server. If the timer ever goes off then a fatal error is +generated. If you have an C handler installed, then it will be +called, otherwise your program will die(). + +When altering your databases with timeouts it is wise to use +transactions. If you quit due to timeout while performing insert, update +or schema-altering commands you can end up not knowing if the action was +submitted to the database, complicating recovery. + +Timeout errors are always fatal. + =back +Any additional key-value pairs will be rolled into a hash reference +and passed as the final argument to the C<< DBI->connect (...) >> +call. For example, to supress errors on STDERR and send them instead to an +AnyEvent::Handle you could do: + + $dbh = new AnyEvent::DBI + "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "", + PrintError => 0, + on_error => sub { + $log_handle->push_write ("DBI Error: $@ at $_[1]:$_[2]\n"); + }; + =cut # stupid Storable autoloading, total loss-loss situation @@ -161,99 +287,250 @@ sub new { my ($class, $dbi, $user, $pass, %arg) = @_; - socketpair my $client, my $server, &Socket::AF_UNIX, &Socket::SOCK_STREAM, &Socket::PF_UNSPEC - or croak "unable to create dbi communicaiton pipe: $!"; + my ($client, $server) = AnyEvent::Util::portable_socketpair + or croak "unable to create Anyevent::DBI communications pipe: $!"; - my $self = bless \%arg, $class; + my %dbi_args = %arg; + delete @dbi_args{qw(on_connect on_error timeout exec_server)}; + my $self = bless \%arg, $class; $self->{fh} = $client; - Scalar::Util::weaken (my $wself = $self); - AnyEvent::Util::fh_nonblocking $client, 1; my $rbuf; my @caller = (caller)[1,2]; # the "default" caller - $self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub { - my $len = sysread $client, $rbuf, 65536, length $rbuf; + { + Scalar::Util::weaken (my $self = $self); - if ($len > 0) { + $self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub { + return unless $self; - while () { - my $len = unpack "L", $rbuf; + $self->{last_activity} = AnyEvent->now; - # full request available? - last unless $len && $len + 4 <= length $rbuf; + my $len = sysread $client, $rbuf, 65536, length $rbuf; - my $res = Storable::thaw substr $rbuf, 4; - substr $rbuf, 0, $len + 4, ""; # remove length + request + if ($len > 0) { + # we received data, so reset the timer + + while () { + my $len = unpack "L", $rbuf; + + # full response available? + last unless $len && $len + 4 <= length $rbuf; + + my $res = Storable::thaw substr $rbuf, 4; + substr $rbuf, 0, $len + 4, ""; # remove length + request - my $req = shift @{ $wself->{queue} }; + last unless $self; + my $req = shift @{ $self->{queue} }; - if (defined $res->[0]) { - $req->[0](@$res); + if (defined $res->[0]) { + $res->[0] = $self; + $req->[0](@$res); + } else { + my $cb = shift @$req; + local $@ = $res->[1]; + $cb->($self); + $self->_error ($res->[1], @$req, $res->[2]) # error, request record, is_fatal + if $self; # cb() could have deleted it + } + + # no more queued requests, so become idle + undef $self->{last_activity} + if $self && !@{ $self->{queue} }; + } + + } elsif (defined $len) { + # todo, caller? + $self->_error ("unexpected eof", @caller, 1); + } elsif ($! != Errno::EAGAIN) { + # todo, caller? + $self->_error ("read error: $!", @caller, 1); + } + }); + + $self->{tw_cb} = sub { + if ($self->{timeout} && $self->{last_activity}) { + if (AnyEvent->now > $self->{last_activity} + $self->{timeout}) { + # we did time out + my $req = $self->{queue}[0]; + $self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal } else { - my $cb = shift @$req; - $wself->_error ($res->[1], @$req); - $cb->(); + # we need to re-set the timeout watcher + $self->{tw} = AnyEvent->timer ( + after => $self->{last_activity} + $self->{timeout} - AnyEvent->now, + cb => $self->{tw_cb}, + ); + Scalar::Util::weaken $self; } + } else { + # no timeout check wanted, or idle + undef $self->{tw}; } + }; - } elsif (defined $len) { - $wself->_error ("unexpected eof", @caller, 1); - } else { - $wself->_error ("read error: $!", @caller, 1); - } - }); + $self->{ww_cb} = sub { + return unless $self; - $self->{ww_cb} = sub { - my $len = syswrite $client, $wself->{wbuf} - or return delete $wself->{ww}; + $self->{last_activity} = AnyEvent->now; - substr $wself->{wbuf}, 0, $len, ""; - }; + my $len = syswrite $client, $self->{wbuf} + or return delete $self->{ww}; + + substr $self->{wbuf}, 0, $len, ""; + }; + } my $pid = fork; if ($pid) { # parent close $server; - } elsif (defined $pid) { # child - close $client; - @_ = $server; - goto &serve; + my $serv_fno = fileno $server; + if ($self->{exec_server}) { + fcntl $server, &Fcntl::F_SETFD, 0; # don't close the server side + exec {$^X} + "$0 dbi slave", + -e => "require shift; AnyEvent::DBI::serve_fd ($serv_fno, $VERSION)", + $INC{"AnyEvent/DBI.pm"}; + POSIX::_exit 124; + } else { + ($_ != $serv_fno) && POSIX::close $_ + for $^F+1..$FD_MAX; + serve_fh $server, $VERSION; + + # no other way on the broken windows platform, even this leaks + # memory and might fail. + kill 9, $$ + if AnyEvent::WIN32; + + # and this kills the parent process on windows + POSIX::_exit 0; + } } else { croak "fork: $!"; } - $self->_req (sub { }, (caller)[1,2], 1, req_open => $dbi, $user, $pass); + $self->{child_pid} = $pid; + + $self->_req ( + ($self->{on_connect} ? $self->{on_connect} : sub { }), + (caller)[1,2], + req_open => $dbi, $user, $pass, %dbi_args + ); $self } +sub _server_pid { + shift->{child_pid} +} + +sub kill_child { + my $self = shift; + my $child_pid = delete $self->{child_pid}; + if ($child_pid) { + # send SIGKILL in two seconds + my $murder_timer = AnyEvent->timer ( + after => 2, + cb => sub { + kill 9, $child_pid; + }, + ); + + # reap process + my $kid_watcher; $kid_watcher = AnyEvent->child ( + pid => $child_pid, + cb => sub { + # just hold on to this so it won't go away + undef $kid_watcher; + # cancel SIGKILL + undef $murder_timer; + }, + ); + + close $self->{fh}; + } +} + +sub DESTROY { + shift->kill_child; +} + sub _error { my ($self, $error, $filename, $line, $fatal) = @_; - delete $self->{rw}; - delete $self->{ww}; - delete $self->{fh}; + if ($fatal) { + delete $self->{tw}; + delete $self->{rw}; + delete $self->{ww}; + delete $self->{fh}; + + # for fatal errors call all enqueued callbacks with error + while (my $req = shift @{$self->{queue}}) { + local $@ = $error; + $req->[0]->($self); + } + $self->kill_child; + } + + local $@ = $error; + + if ($self->{on_error}) { + $self->{on_error}($self, $filename, $line, $fatal) + } else { + die "$error at $filename, line $line\n"; + } +} + +=item $dbh->on_error ($cb->($dbh, $filename, $line, $fatal)) + +Sets (or clears, with C) the C handler. + +=cut + +sub on_error { + $_[0]{on_error} = $_[1]; +} + +=item $dbh->timeout ($seconds) + +Sets (or clears, with C) the database timeout. Useful to extend the +timeout when you are about to make a really long query. + +=cut - $@ = $error; +sub timeout { + my ($self, $timeout) = @_; - $self->{on_error}($self, $filename, $line, $fatal) - if $self->{on_error}; + $self->{timeout} = $timeout; - die "$error at $filename, line $line\n"; + # reschedule timer if one was running + $self->{tw_cb}->(); } sub _req { - my ($self, $cb, $filename, $line, $fatal) = splice @_, 0, 5, (); + my ($self, $cb, $filename, $line) = splice @_, 0, 4, (); - push @{ $self->{queue} }, [$cb, $filename, $line, $fatal]; + unless ($self->{fh}) { + local $@ = my $err = 'no database connection'; + $cb->($self); + $self->_error ($err, $filename, $line, 1); + return; + } + + push @{ $self->{queue} }, [$cb, $filename, $line]; + + # re-start timeout if necessary + if ($self->{timeout} && !$self->{tw}) { + $self->{last_activity} = AnyEvent->now; + $self->{tw_cb}->(); + } $self->{wbuf} .= pack "L/a*", Storable::freeze \@_; @@ -267,39 +544,110 @@ } } -=item $dbh->exec ("statement", @args, $cb->($rows, %extra)) +=item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, $rv)) Executes the given SQL statement with placeholders replaced by C<@args>. The statement will be prepared and cached on the server side, so -using placeholders is compulsory. +using placeholders is extremely important. + +The callback will be called with a weakened AnyEvent::DBI object as the +first argument and the result of C as (or C +if the statement wasn't a select statement) as the second argument. -The callback will be called with the result of C as -first argument and possibly a hash reference with additional information. +Third argument is the return value from the C<< DBI->execute >> method +call. -If an error occurs and the C callback returns, then no arguments +If an error occurs and the C callback returns, then only C<$dbh> will be passed and C<$@> contains the error message. -=cut +=item $dbh->attr ($attr_name[, $attr_value], $cb->($dbh, $new_value)) -sub exec { - my $cb = pop; - splice @_, 1, 0, $cb, (caller)[1,2], 0, "req_exec"; +An accessor for the handle attributes, such as C, +C, C and so on. If you provide an C<$attr_value> +(which might be C), then the given attribute will be set to that +value. + +The callback will be passed the database handle and the attribute's value +if successful. + +If an error occurs and the C callback returns, then only C<$dbh> +will be passed and C<$@> contains the error message. - goto &_req; +=item $dbh->begin_work ($cb->($dbh[, $rc])) + +=item $dbh->commit ($cb->($dbh[, $rc])) + +=item $dbh->rollback ($cb->($dbh[, $rc])) + +The begin_work, commit, and rollback methods expose the equivalent +transaction control method of the DBI driver. On success, C<$rc> is true. + +If an error occurs and the C callback returns, then only C<$dbh> +will be passed and C<$@> contains the error message. + +=item $dbh->func ('string_which_yields_args_when_evaled', $func_name, $cb->($dbh, $rc, $dbi_err, $dbi_errstr)) + +This gives access to database driver private methods. Because they +are not standard you cannot always depend on the value of C<$rc> or +C<$dbi_err>. Check the documentation for your specific driver/function +combination to see what it returns. + +Note that the first argument will be eval'ed to produce the argument list to +the func() method. This must be done because the serialization protocol +between the AnyEvent::DBI server process and your program does not support the +passage of closures. + +Here's an example to extend the query language in SQLite so it supports an +intstr() function: + + $cv = AnyEvent->condvar; + $dbh->func ( + q{ + instr => 2, sub { + my ($string, $search) = @_; + return index $string, $search; + }, + }, + create_function => sub { + return $cv->send ($@) + unless $#_; + $cv->send (undef, @_[1,2,3]); + } + ); + + my ($err,$rc,$errcode,$errstr) = $cv->recv; + + die $err if defined $err; + die "EVAL failed: $errstr" + if $errcode; + + # otherwise, we can ignore $rc and $errcode for this particular func + +=cut + +for my $cmd_name (qw(exec attr begin_work commit rollback func)) { + eval 'sub ' . $cmd_name . '{ + my $cb = pop; + splice @_, 1, 0, $cb, (caller)[1,2], "req_' . $cmd_name . '"; + &_req + }'; } =back =head1 SEE ALSO -L, L. +L, L, L. =head1 AUTHOR Marc Lehmann http://home.schmorp.de/ + Adam Rosenstein + http://www.redcondor.com/ + =cut -1 +1;