ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
Revision: 1.14
Committed: Sat Oct 30 20:23:44 2010 UTC (13 years, 6 months ago) by root
Branch: MAIN
CVS Tags: rel-2_1
Changes since 1.13: +21 -40 lines
Log Message:
2.1

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::DBI - asynchronous DBI access
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::DBI;
8    
9 root 1.5 my $cv = AnyEvent->condvar;
10    
11     my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", "";
12    
13     $dbh->exec ("select * from test where num=?", 10, sub {
14 root 1.11 my ($dbh, $rows, $rv) = @_;
15    
16 root 1.12 $#_ or die "failure: $@";
17 root 1.5
18     print "@$_\n"
19     for @$rows;
20    
21     $cv->broadcast;
22     });
23    
24     # asynchronously do sth. else here
25    
26     $cv->wait;
27    
28 root 1.1 =head1 DESCRIPTION
29    
30     This module is an L<AnyEvent> user, you need to make sure that you use and
31     run a supported event loop.
32    
33 root 1.8 This module implements asynchronous DBI access by forking or executing
34 root 1.1 separate "DBI-Server" processes and sending them requests.
35    
36     It means that you can run DBI requests in parallel to other tasks.
37    
38 root 1.3 The overhead for very simple statements ("select 0") is somewhere
39 root 1.14 around 100% to 120% (dual/single core CPU) compared to an explicit
40 root 1.3 prepare_cached/execute/fetchrow_arrayref/finish combination.
41    
42 root 1.11 =head2 ERROR HANDLING
43    
44     This module defines a number of functions that accept a callback
45     argument. All callbacks used by this module get their AnyEvent::DBI handle
46     object passed as first argument.
47    
48     If the request was successful, then there will be more arguments,
49     otherwise there will only be the C<$dbh> argument and C<$@> contains an
50     error message.
51    
52     A convinient way to check whether an error occured is to check C<$#_> -
53     if that is true, then the function was successful, otherwise there was an
54     error.
55    
56 root 1.1 =cut
57    
58     package AnyEvent::DBI;
59    
60 root 1.14 use common::sense;
61 root 1.1
62     use Carp;
63     use Socket ();
64     use Scalar::Util ();
65     use Storable ();
66    
67 root 1.14 use DBI (); # only needed in child actually - do it before fork & !exec?
68 root 1.1
69     use AnyEvent ();
70     use AnyEvent::Util ();
71    
72 root 1.11 use Errno ();
73     use Fcntl ();
74     use POSIX ();
75    
76 root 1.14 our $VERSION = '2.1';
77 root 1.11
78     our $FD_MAX = eval { POSIX::sysconf (&POSIX::_SC_OPEN_MAX) - 1 } || 1023;
79 root 1.1
80 root 1.11 # this is the forked server code, could/should be bundled as it's own file
81 root 1.1
82     our $DBH;
83    
84     sub req_open {
85     my (undef, $dbi, $user, $pass, %attr) = @{+shift};
86    
87 root 1.10 $DBH = DBI->connect ($dbi, $user, $pass, \%attr) or die $DBI::errstr;
88 root 1.1
89 root 1.11 [1, 1]
90 root 1.1 }
91    
92 root 1.2 sub req_exec {
93     my (undef, $st, @args) = @{+shift};
94 root 1.10 my $sth = $DBH->prepare_cached ($st, undef, 1)
95     or die [$DBI::errstr];
96 root 1.2
97 root 1.7 my $rv = $sth->execute (@args)
98 root 1.10 or die [$sth->errstr];
99 root 1.2
100 root 1.11 [1, $sth->{NUM_OF_FIELDS} ? $sth->fetchall_arrayref : undef, $rv]
101 root 1.2 }
102    
103 root 1.10 sub req_attr {
104 root 1.11 my (undef, $attr_name, @attr_val) = @{+shift};
105 root 1.10
106 root 1.11 $DBH->{$attr_name} = $attr_val[0]
107     if @attr_val;
108 root 1.10
109     [1, $DBH->{$attr_name}]
110     }
111    
112     sub req_begin_work {
113 root 1.11 [1, $DBH->begin_work or die [$DBI::errstr]]
114 root 1.10 }
115    
116     sub req_commit {
117 root 1.11 [1, $DBH->commit or die [$DBI::errstr]]
118 root 1.10 }
119    
120     sub req_rollback {
121 root 1.11 [1, $DBH->rollback or die [$DBI::errstr]]
122 root 1.10 }
123    
124     sub req_func {
125     my (undef, $arg_string, $function) = @{+shift};
126     my @args = eval $arg_string;
127    
128 root 1.12 die "error evaling \$dbh->func() arg_string: $@"
129 root 1.11 if $@;
130 root 1.10
131 root 1.12 my $rc = $DBH->func (@args, $function);
132     return [1, $rc, $DBI::err, $DBI::errstr];
133 root 1.10 }
134    
135 root 1.11 sub serve_fh($$) {
136     my ($fh, $version) = @_;
137 root 1.10
138 root 1.11 if ($VERSION != $version) {
139     syswrite $fh,
140     pack "L/a*",
141     Storable::freeze
142     [undef, "AnyEvent::DBI version mismatch ($VERSION vs. $version)"];
143     return;
144     }
145 root 1.1
146     eval {
147     my $rbuf;
148    
149     while () {
150     sysread $fh, $rbuf, 16384, length $rbuf
151     or last;
152    
153     while () {
154     my $len = unpack "L", $rbuf;
155    
156     # full request available?
157     last unless $len && $len + 4 <= length $rbuf;
158    
159     my $req = Storable::thaw substr $rbuf, 4;
160     substr $rbuf, 0, $len + 4, ""; # remove length + request
161    
162     my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) };
163 root 1.11 $wbuf = pack "L/a*", Storable::freeze [undef, ref $@ ? ("$@->[0]", $@->[1]) : ("$@", 1)]
164 root 1.1 if $@;
165    
166     for (my $ofs = 0; $ofs < length $wbuf; ) {
167     $ofs += (syswrite $fh, substr $wbuf, $ofs
168     or die "unable to write results");
169     }
170     }
171     }
172     };
173 root 1.11 }
174 root 1.1
175 root 1.11 sub serve_fd($$) {
176     open my $fh, ">>&=$_[0]"
177     or die "Couldn't open server file descriptor: $!";
178 root 1.1
179 root 1.11 serve_fh $fh, $_[1];
180 root 1.10 }
181    
182 root 1.1 =head2 METHODS
183    
184     =over 4
185    
186     =item $dbh = new AnyEvent::DBI $database, $user, $pass, [key => value]...
187    
188     Returns a database handle for the given database. Each database handle
189     has an associated server process that executes statements in order. If
190     you want to run more than one statement in parallel, you need to create
191     additional database handles.
192    
193     The advantage of this approach is that transactions work as state is
194     preserved.
195    
196     Example:
197    
198     $dbh = new AnyEvent::DBI
199     "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "";
200    
201     Additional key-value pairs can be used to adjust behaviour:
202    
203     =over 4
204    
205     =item on_error => $callback->($dbh, $filename, $line, $fatal)
206    
207     When an error occurs, then this callback will be invoked. On entry, C<$@>
208     is set to the error message. C<$filename> and C<$line> is where the
209     original request was submitted.
210    
211 root 1.11 If the fatal argument is true then the database connection is shut down
212     and your database handle became invalid. In addition to invoking the
213     C<on_error> callback, all of your queued request callbacks are called
214     without only the C<$dbh> argument.
215 root 1.1
216 root 1.2 If omitted, then C<die> will be called on any errors, fatal or not.
217 root 1.1
218 root 1.11 =item on_connect => $callback->($dbh[, $success])
219 root 1.10
220 root 1.11 If you supply an C<on_connect> callback, then this callback will be
221     invoked after the database connect attempt. If the connection succeeds,
222     C<$success> is true, otherwise it is missing and C<$@> contains the
223     C<$DBI::errstr>.
224    
225     Regardless of whether C<on_connect> is supplied, connect errors will result in
226     C<on_error> being called. However, if no C<on_connect> callback is supplied, then
227     connection errors are considered fatal. The client will C<die> and the C<on_error>
228     callback will be called with C<$fatal> true.
229    
230     When on_connect is supplied, connect error are not fatal and AnyEvent::DBI
231     will not C<die>. You still cannot, however, use the $dbh object you
232     received from C<new> to make requests.
233 root 1.10
234     =item exec_server => 1
235    
236 root 1.11 If you supply an C<exec_server> argument, then the DBI server process will
237     fork and exec another perl interpreter (using C<$^X>) with just the
238 root 1.13 AnyEvent::DBI proxy running. This will provide the cleanest possible proxy
239 root 1.11 for your database server.
240    
241     If you do not supply the C<exec_server> argument (or supply it with a
242     false value) then the traditional method of starting the server by forking
243     the current process is used. The forked interpreter will try to clean
244     itself up by calling POSIX::close on all file descriptors except STDIN,
245     STDOUT, and STDERR (and the socket it uses to communicate with the cilent,
246     of course).
247 root 1.10
248     =item timeout => seconds
249    
250 root 1.11 If you supply a timeout parameter (fractional values are supported), then
251     a timer is started any time the DBI handle expects a response from the
252     server. This includes connection setup as well as requests made to the
253     backend. The timeout spans the duration from the moment the first data
254     is written (or queued to be written) until all expected responses are
255     returned, but is postponed for "timeout" seconds each time more data is
256     returned from the server. If the timer ever goes off then a fatal error is
257     generated. If you have an C<on_error> handler installed, then it will be
258     called, otherwise your program will die().
259    
260     When altering your databases with timeouts it is wise to use
261     transactions. If you quit due to timeout while performing insert, update
262     or schema-altering commands you can end up not knowing if the action was
263     submitted to the database, complicating recovery.
264 root 1.10
265     Timeout errors are always fatal.
266    
267 root 1.1 =back
268    
269 root 1.11 Any additional key-value pairs will be rolled into a hash reference
270     and passed as the final argument to the C<< DBI->connect (...) >>
271     call. For example, to supress errors on STDERR and send them instead to an
272     AnyEvent::Handle you could do:
273    
274     $dbh = new AnyEvent::DBI
275     "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "",
276     PrintError => 0,
277     on_error => sub {
278     $log_handle->push_write ("DBI Error: $@ at $_[1]:$_[2]\n");
279     };
280 root 1.10
281 root 1.1 =cut
282    
283     # stupid Storable autoloading, total loss-loss situation
284     Storable::thaw Storable::freeze [];
285    
286     sub new {
287     my ($class, $dbi, $user, $pass, %arg) = @_;
288    
289 root 1.11 my ($client, $server) = AnyEvent::Util::portable_socketpair
290 root 1.14 or croak "unable to create AnyEvent::DBI communications pipe: $!";
291 root 1.1
292 root 1.11 my %dbi_args = %arg;
293     delete @dbi_args{qw(on_connect on_error timeout exec_server)};
294 root 1.10
295 root 1.1 my $self = bless \%arg, $class;
296     $self->{fh} = $client;
297    
298     AnyEvent::Util::fh_nonblocking $client, 1;
299    
300     my $rbuf;
301     my @caller = (caller)[1,2]; # the "default" caller
302    
303 root 1.11 {
304     Scalar::Util::weaken (my $self = $self);
305    
306 root 1.14 $self->{rw} = AE::io $client, 0, sub {
307 root 1.11 return unless $self;
308    
309 root 1.14 $self->{last_activity} = AE::now;
310 root 1.11
311     my $len = sysread $client, $rbuf, 65536, length $rbuf;
312    
313     if ($len > 0) {
314     # we received data, so reset the timer
315 root 1.1
316 root 1.11 while () {
317     my $len = unpack "L", $rbuf;
318 root 1.1
319 root 1.11 # full response available?
320     last unless $len && $len + 4 <= length $rbuf;
321 root 1.1
322 root 1.11 my $res = Storable::thaw substr $rbuf, 4;
323     substr $rbuf, 0, $len + 4, ""; # remove length + request
324 root 1.1
325 root 1.11 last unless $self;
326     my $req = shift @{ $self->{queue} };
327 root 1.1
328 root 1.11 if (defined $res->[0]) {
329     $res->[0] = $self;
330     $req->[0](@$res);
331     } else {
332     my $cb = shift @$req;
333     local $@ = $res->[1];
334     $cb->($self);
335     $self->_error ($res->[1], @$req, $res->[2]) # error, request record, is_fatal
336     if $self; # cb() could have deleted it
337 root 1.10 }
338 root 1.11
339     # no more queued requests, so become idle
340     undef $self->{last_activity}
341     if $self && !@{ $self->{queue} };
342 root 1.10 }
343    
344 root 1.11 } elsif (defined $len) {
345     # todo, caller?
346     $self->_error ("unexpected eof", @caller, 1);
347     } elsif ($! != Errno::EAGAIN) {
348     # todo, caller?
349     $self->_error ("read error: $!", @caller, 1);
350     }
351 root 1.14 };
352 root 1.11
353     $self->{tw_cb} = sub {
354     if ($self->{timeout} && $self->{last_activity}) {
355 root 1.14 if (AE::now > $self->{last_activity} + $self->{timeout}) {
356 root 1.11 # we did time out
357     my $req = $self->{queue}[0];
358     $self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal
359     } else {
360     # we need to re-set the timeout watcher
361 root 1.14 $self->{tw} = AE::timer
362     $self->{last_activity} + $self->{timeout} - AE::now,
363     0,
364     $self->{tw_cb},
365     ;
366 root 1.1 }
367 root 1.11 } else {
368     # no timeout check wanted, or idle
369     undef $self->{tw};
370 root 1.1 }
371 root 1.11 };
372 root 1.1
373 root 1.11 $self->{ww_cb} = sub {
374     return unless $self;
375    
376 root 1.14 $self->{last_activity} = AE::now;
377 root 1.1
378 root 1.11 my $len = syswrite $client, $self->{wbuf}
379     or return delete $self->{ww};
380 root 1.3
381 root 1.11 substr $self->{wbuf}, 0, $len, "";
382     };
383     }
384 root 1.3
385 root 1.1 my $pid = fork;
386    
387     if ($pid) {
388     # parent
389     close $server;
390     } elsif (defined $pid) {
391     # child
392 root 1.10 my $serv_fno = fileno $server;
393 root 1.1
394 root 1.10 if ($self->{exec_server}) {
395 root 1.11 fcntl $server, &Fcntl::F_SETFD, 0; # don't close the server side
396     exec {$^X}
397     "$0 dbi slave",
398     -e => "require shift; AnyEvent::DBI::serve_fd ($serv_fno, $VERSION)",
399     $INC{"AnyEvent/DBI.pm"};
400 root 1.10 POSIX::_exit 124;
401     } else {
402     ($_ != $serv_fno) && POSIX::close $_
403 root 1.11 for $^F+1..$FD_MAX;
404     serve_fh $server, $VERSION;
405    
406     # no other way on the broken windows platform, even this leaks
407     # memory and might fail.
408     kill 9, $$
409     if AnyEvent::WIN32;
410    
411     # and this kills the parent process on windows
412     POSIX::_exit 0;
413 root 1.10 }
414 root 1.1 } else {
415     croak "fork: $!";
416     }
417    
418 root 1.10 $self->{child_pid} = $pid;
419 root 1.11
420 root 1.10 $self->_req (
421     ($self->{on_connect} ? $self->{on_connect} : sub { }),
422     (caller)[1,2],
423     req_open => $dbi, $user, $pass, %dbi_args
424     );
425 root 1.1
426     $self
427     }
428    
429 root 1.10 sub _server_pid {
430     shift->{child_pid}
431     }
432    
433     sub kill_child {
434 root 1.14 my $self = shift;
435 root 1.10
436 root 1.14 if (my $pid = delete $self->{child_pid}) {
437     kill TERM => $pid;
438 root 1.10 }
439 root 1.14 close delete $self->{fh};
440 root 1.10 }
441    
442     sub DESTROY {
443     shift->kill_child;
444     }
445    
446 root 1.1 sub _error {
447     my ($self, $error, $filename, $line, $fatal) = @_;
448    
449 root 1.10 if ($fatal) {
450 root 1.11 delete $self->{tw};
451 root 1.10 delete $self->{rw};
452     delete $self->{ww};
453     delete $self->{fh};
454    
455     # for fatal errors call all enqueued callbacks with error
456     while (my $req = shift @{$self->{queue}}) {
457 root 1.11 local $@ = $error;
458     $req->[0]->($self);
459 root 1.10 }
460     $self->kill_child;
461     }
462 root 1.1
463 root 1.11 local $@ = $error;
464 root 1.1
465 root 1.9 if ($self->{on_error}) {
466 root 1.10 $self->{on_error}($self, $filename, $line, $fatal)
467     } else {
468     die "$error at $filename, line $line\n";
469 root 1.9 }
470 root 1.10 }
471    
472     =item $dbh->on_error ($cb->($dbh, $filename, $line, $fatal))
473    
474 root 1.11 Sets (or clears, with C<undef>) the C<on_error> handler.
475 root 1.10
476     =cut
477 root 1.1
478 root 1.10 sub on_error {
479     $_[0]{on_error} = $_[1];
480     }
481    
482     =item $dbh->timeout ($seconds)
483    
484     Sets (or clears, with C<undef>) the database timeout. Useful to extend the
485     timeout when you are about to make a really long query.
486    
487     =cut
488    
489     sub timeout {
490     my ($self, $timeout) = @_;
491    
492 root 1.11 $self->{timeout} = $timeout;
493 root 1.10
494 root 1.11 # reschedule timer if one was running
495     $self->{tw_cb}->();
496 root 1.1 }
497    
498     sub _req {
499 root 1.10 my ($self, $cb, $filename, $line) = splice @_, 0, 4, ();
500    
501 root 1.11 unless ($self->{fh}) {
502     local $@ = my $err = 'no database connection';
503     $cb->($self);
504 root 1.10 $self->_error ($err, $filename, $line, 1);
505     return;
506     }
507    
508 root 1.11 push @{ $self->{queue} }, [$cb, $filename, $line];
509 root 1.1
510 root 1.11 # re-start timeout if necessary
511     if ($self->{timeout} && !$self->{tw}) {
512 root 1.14 $self->{last_activity} = AE::now;
513 root 1.11 $self->{tw_cb}->();
514 root 1.10 }
515 root 1.1
516     $self->{wbuf} .= pack "L/a*", Storable::freeze \@_;
517    
518     unless ($self->{ww}) {
519     my $len = syswrite $self->{fh}, $self->{wbuf};
520     substr $self->{wbuf}, 0, $len, "";
521    
522     # still any left? then install a write watcher
523 root 1.14 $self->{ww} = AE::io $self->{fh}, 1, $self->{ww_cb}
524 root 1.1 if length $self->{wbuf};
525     }
526     }
527    
528 root 1.11 =item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, $rv))
529 root 1.1
530     Executes the given SQL statement with placeholders replaced by
531 root 1.2 C<@args>. The statement will be prepared and cached on the server side, so
532 root 1.11 using placeholders is extremely important.
533 root 1.1
534 root 1.11 The callback will be called with a weakened AnyEvent::DBI object as the
535     first argument and the result of C<fetchall_arrayref> as (or C<undef>
536     if the statement wasn't a select statement) as the second argument.
537 root 1.1
538 root 1.11 Third argument is the return value from the C<< DBI->execute >> method
539     call.
540    
541     If an error occurs and the C<on_error> callback returns, then only C<$dbh>
542 root 1.2 will be passed and C<$@> contains the error message.
543    
544 root 1.11 =item $dbh->attr ($attr_name[, $attr_value], $cb->($dbh, $new_value))
545    
546     An accessor for the handle attributes, such as C<AutoCommit>,
547     C<RaiseError>, C<PrintError> and so on. If you provide an C<$attr_value>
548     (which might be C<undef>), then the given attribute will be set to that
549     value.
550 root 1.10
551 root 1.11 The callback will be passed the database handle and the attribute's value
552     if successful.
553    
554     If an error occurs and the C<on_error> callback returns, then only C<$dbh>
555     will be passed and C<$@> contains the error message.
556 root 1.10
557 root 1.12 =item $dbh->begin_work ($cb->($dbh[, $rc]))
558 root 1.10
559 root 1.12 =item $dbh->commit ($cb->($dbh[, $rc]))
560 root 1.10
561 root 1.12 =item $dbh->rollback ($cb->($dbh[, $rc]))
562 root 1.10
563 root 1.11 The begin_work, commit, and rollback methods expose the equivalent
564 root 1.12 transaction control method of the DBI driver. On success, C<$rc> is true.
565 root 1.10
566 root 1.11 If an error occurs and the C<on_error> callback returns, then only C<$dbh>
567     will be passed and C<$@> contains the error message.
568 root 1.10
569 root 1.12 =item $dbh->func ('string_which_yields_args_when_evaled', $func_name, $cb->($dbh, $rc, $dbi_err, $dbi_errstr))
570 root 1.10
571 root 1.11 This gives access to database driver private methods. Because they
572 root 1.12 are not standard you cannot always depend on the value of C<$rc> or
573     C<$dbi_err>. Check the documentation for your specific driver/function
574     combination to see what it returns.
575 root 1.10
576     Note that the first argument will be eval'ed to produce the argument list to
577 root 1.11 the func() method. This must be done because the serialization protocol
578 root 1.10 between the AnyEvent::DBI server process and your program does not support the
579     passage of closures.
580    
581     Here's an example to extend the query language in SQLite so it supports an
582     intstr() function:
583    
584     $cv = AnyEvent->condvar;
585 root 1.11 $dbh->func (
586 root 1.10 q{
587 root 1.11 instr => 2, sub {
588 root 1.10 my ($string, $search) = @_;
589     return index $string, $search;
590     },
591     },
592 root 1.11 create_function => sub {
593 root 1.12 return $cv->send ($@)
594     unless $#_;
595     $cv->send (undef, @_[1,2,3]);
596 root 1.11 }
597 root 1.10 );
598 root 1.11
599 root 1.12 my ($err,$rc,$errcode,$errstr) = $cv->recv;
600 root 1.11
601 root 1.12 die $err if defined $err;
602     die "EVAL failed: $errstr"
603     if $errcode;
604 root 1.11
605 root 1.12 # otherwise, we can ignore $rc and $errcode for this particular func
606 root 1.10
607 root 1.1 =cut
608    
609 root 1.10 for my $cmd_name (qw(exec attr begin_work commit rollback func)) {
610     eval 'sub ' . $cmd_name . '{
611     my $cb = pop;
612     splice @_, 1, 0, $cb, (caller)[1,2], "req_' . $cmd_name . '";
613 root 1.11 &_req
614 root 1.10 }';
615 root 1.1 }
616    
617     =back
618    
619     =head1 SEE ALSO
620    
621 root 1.11 L<AnyEvent>, L<DBI>, L<Coro::Mysql>.
622 root 1.1
623     =head1 AUTHOR
624    
625 root 1.4 Marc Lehmann <schmorp@schmorp.de>
626     http://home.schmorp.de/
627 root 1.1
628 root 1.10 Adam Rosenstein <adam@redcondor.com>
629     http://www.redcondor.com/
630    
631 root 1.1 =cut
632    
633 root 1.10 1;
634 root 1.1