ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
Revision: 1.15
Committed: Thu May 17 02:15:58 2012 UTC (12 years ago) by root
Branch: MAIN
CVS Tags: rel-2_2
Changes since 1.14: +11 -5 lines
Log Message:
2.2

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::DBI - asynchronous DBI access
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::DBI;
8    
9 root 1.5 my $cv = AnyEvent->condvar;
10    
11     my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", "";
12    
13     $dbh->exec ("select * from test where num=?", 10, sub {
14 root 1.11 my ($dbh, $rows, $rv) = @_;
15    
16 root 1.12 $#_ or die "failure: $@";
17 root 1.5
18     print "@$_\n"
19     for @$rows;
20    
21     $cv->broadcast;
22     });
23    
24     # asynchronously do sth. else here
25    
26     $cv->wait;
27    
28 root 1.1 =head1 DESCRIPTION
29    
30     This module is an L<AnyEvent> user, you need to make sure that you use and
31     run a supported event loop.
32    
33 root 1.8 This module implements asynchronous DBI access by forking or executing
34 root 1.1 separate "DBI-Server" processes and sending them requests.
35    
36     It means that you can run DBI requests in parallel to other tasks.
37    
38 root 1.3 The overhead for very simple statements ("select 0") is somewhere
39 root 1.14 around 100% to 120% (dual/single core CPU) compared to an explicit
40 root 1.3 prepare_cached/execute/fetchrow_arrayref/finish combination.
41    
42 root 1.11 =head2 ERROR HANDLING
43    
44     This module defines a number of functions that accept a callback
45     argument. All callbacks used by this module get their AnyEvent::DBI handle
46     object passed as first argument.
47    
48     If the request was successful, then there will be more arguments,
49     otherwise there will only be the C<$dbh> argument and C<$@> contains an
50     error message.
51    
52     A convinient way to check whether an error occured is to check C<$#_> -
53     if that is true, then the function was successful, otherwise there was an
54     error.
55    
56 root 1.1 =cut
57    
58     package AnyEvent::DBI;
59    
60 root 1.14 use common::sense;
61 root 1.1
62     use Carp;
63     use Socket ();
64     use Scalar::Util ();
65     use Storable ();
66    
67 root 1.14 use DBI (); # only needed in child actually - do it before fork & !exec?
68 root 1.1
69     use AnyEvent ();
70     use AnyEvent::Util ();
71    
72 root 1.11 use Errno ();
73     use Fcntl ();
74     use POSIX ();
75    
76 root 1.15 our $VERSION = '2.2';
77 root 1.11
78     our $FD_MAX = eval { POSIX::sysconf (&POSIX::_SC_OPEN_MAX) - 1 } || 1023;
79 root 1.1
80 root 1.11 # this is the forked server code, could/should be bundled as it's own file
81 root 1.1
82     our $DBH;
83    
84     sub req_open {
85     my (undef, $dbi, $user, $pass, %attr) = @{+shift};
86    
87 root 1.10 $DBH = DBI->connect ($dbi, $user, $pass, \%attr) or die $DBI::errstr;
88 root 1.1
89 root 1.11 [1, 1]
90 root 1.1 }
91    
92 root 1.2 sub req_exec {
93     my (undef, $st, @args) = @{+shift};
94 root 1.10 my $sth = $DBH->prepare_cached ($st, undef, 1)
95     or die [$DBI::errstr];
96 root 1.2
97 root 1.7 my $rv = $sth->execute (@args)
98 root 1.10 or die [$sth->errstr];
99 root 1.2
100 root 1.11 [1, $sth->{NUM_OF_FIELDS} ? $sth->fetchall_arrayref : undef, $rv]
101 root 1.2 }
102    
103 root 1.10 sub req_attr {
104 root 1.11 my (undef, $attr_name, @attr_val) = @{+shift};
105 root 1.10
106 root 1.11 $DBH->{$attr_name} = $attr_val[0]
107     if @attr_val;
108 root 1.10
109     [1, $DBH->{$attr_name}]
110     }
111    
112     sub req_begin_work {
113 root 1.11 [1, $DBH->begin_work or die [$DBI::errstr]]
114 root 1.10 }
115    
116     sub req_commit {
117 root 1.11 [1, $DBH->commit or die [$DBI::errstr]]
118 root 1.10 }
119    
120     sub req_rollback {
121 root 1.11 [1, $DBH->rollback or die [$DBI::errstr]]
122 root 1.10 }
123    
124     sub req_func {
125     my (undef, $arg_string, $function) = @{+shift};
126     my @args = eval $arg_string;
127    
128 root 1.12 die "error evaling \$dbh->func() arg_string: $@"
129 root 1.11 if $@;
130 root 1.10
131 root 1.12 my $rc = $DBH->func (@args, $function);
132     return [1, $rc, $DBI::err, $DBI::errstr];
133 root 1.10 }
134    
135 root 1.11 sub serve_fh($$) {
136     my ($fh, $version) = @_;
137 root 1.10
138 root 1.11 if ($VERSION != $version) {
139     syswrite $fh,
140     pack "L/a*",
141     Storable::freeze
142     [undef, "AnyEvent::DBI version mismatch ($VERSION vs. $version)"];
143     return;
144     }
145 root 1.1
146     eval {
147     my $rbuf;
148    
149     while () {
150     sysread $fh, $rbuf, 16384, length $rbuf
151     or last;
152    
153     while () {
154     my $len = unpack "L", $rbuf;
155    
156     # full request available?
157     last unless $len && $len + 4 <= length $rbuf;
158    
159     my $req = Storable::thaw substr $rbuf, 4;
160     substr $rbuf, 0, $len + 4, ""; # remove length + request
161    
162     my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) };
163 root 1.11 $wbuf = pack "L/a*", Storable::freeze [undef, ref $@ ? ("$@->[0]", $@->[1]) : ("$@", 1)]
164 root 1.1 if $@;
165    
166     for (my $ofs = 0; $ofs < length $wbuf; ) {
167     $ofs += (syswrite $fh, substr $wbuf, $ofs
168     or die "unable to write results");
169     }
170     }
171     }
172     };
173 root 1.11 }
174 root 1.1
175 root 1.11 sub serve_fd($$) {
176     open my $fh, ">>&=$_[0]"
177     or die "Couldn't open server file descriptor: $!";
178 root 1.1
179 root 1.11 serve_fh $fh, $_[1];
180 root 1.10 }
181    
182 root 1.1 =head2 METHODS
183    
184     =over 4
185    
186     =item $dbh = new AnyEvent::DBI $database, $user, $pass, [key => value]...
187    
188     Returns a database handle for the given database. Each database handle
189     has an associated server process that executes statements in order. If
190     you want to run more than one statement in parallel, you need to create
191     additional database handles.
192    
193     The advantage of this approach is that transactions work as state is
194     preserved.
195    
196     Example:
197    
198     $dbh = new AnyEvent::DBI
199     "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "";
200    
201     Additional key-value pairs can be used to adjust behaviour:
202    
203     =over 4
204    
205     =item on_error => $callback->($dbh, $filename, $line, $fatal)
206    
207     When an error occurs, then this callback will be invoked. On entry, C<$@>
208     is set to the error message. C<$filename> and C<$line> is where the
209     original request was submitted.
210    
211 root 1.11 If the fatal argument is true then the database connection is shut down
212     and your database handle became invalid. In addition to invoking the
213     C<on_error> callback, all of your queued request callbacks are called
214     without only the C<$dbh> argument.
215 root 1.1
216 root 1.2 If omitted, then C<die> will be called on any errors, fatal or not.
217 root 1.1
218 root 1.11 =item on_connect => $callback->($dbh[, $success])
219 root 1.10
220 root 1.11 If you supply an C<on_connect> callback, then this callback will be
221     invoked after the database connect attempt. If the connection succeeds,
222     C<$success> is true, otherwise it is missing and C<$@> contains the
223     C<$DBI::errstr>.
224    
225     Regardless of whether C<on_connect> is supplied, connect errors will result in
226     C<on_error> being called. However, if no C<on_connect> callback is supplied, then
227     connection errors are considered fatal. The client will C<die> and the C<on_error>
228     callback will be called with C<$fatal> true.
229    
230     When on_connect is supplied, connect error are not fatal and AnyEvent::DBI
231     will not C<die>. You still cannot, however, use the $dbh object you
232     received from C<new> to make requests.
233 root 1.10
234     =item exec_server => 1
235    
236 root 1.11 If you supply an C<exec_server> argument, then the DBI server process will
237     fork and exec another perl interpreter (using C<$^X>) with just the
238 root 1.13 AnyEvent::DBI proxy running. This will provide the cleanest possible proxy
239 root 1.11 for your database server.
240    
241     If you do not supply the C<exec_server> argument (or supply it with a
242     false value) then the traditional method of starting the server by forking
243     the current process is used. The forked interpreter will try to clean
244     itself up by calling POSIX::close on all file descriptors except STDIN,
245     STDOUT, and STDERR (and the socket it uses to communicate with the cilent,
246     of course).
247 root 1.10
248     =item timeout => seconds
249    
250 root 1.11 If you supply a timeout parameter (fractional values are supported), then
251     a timer is started any time the DBI handle expects a response from the
252     server. This includes connection setup as well as requests made to the
253     backend. The timeout spans the duration from the moment the first data
254     is written (or queued to be written) until all expected responses are
255     returned, but is postponed for "timeout" seconds each time more data is
256     returned from the server. If the timer ever goes off then a fatal error is
257     generated. If you have an C<on_error> handler installed, then it will be
258     called, otherwise your program will die().
259    
260     When altering your databases with timeouts it is wise to use
261     transactions. If you quit due to timeout while performing insert, update
262     or schema-altering commands you can end up not knowing if the action was
263     submitted to the database, complicating recovery.
264 root 1.10
265     Timeout errors are always fatal.
266    
267 root 1.1 =back
268    
269 root 1.11 Any additional key-value pairs will be rolled into a hash reference
270     and passed as the final argument to the C<< DBI->connect (...) >>
271     call. For example, to supress errors on STDERR and send them instead to an
272     AnyEvent::Handle you could do:
273    
274     $dbh = new AnyEvent::DBI
275     "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "",
276     PrintError => 0,
277     on_error => sub {
278     $log_handle->push_write ("DBI Error: $@ at $_[1]:$_[2]\n");
279     };
280 root 1.10
281 root 1.1 =cut
282    
283     # stupid Storable autoloading, total loss-loss situation
284     Storable::thaw Storable::freeze [];
285    
286     sub new {
287     my ($class, $dbi, $user, $pass, %arg) = @_;
288    
289 root 1.11 my ($client, $server) = AnyEvent::Util::portable_socketpair
290 root 1.14 or croak "unable to create AnyEvent::DBI communications pipe: $!";
291 root 1.1
292 root 1.11 my %dbi_args = %arg;
293     delete @dbi_args{qw(on_connect on_error timeout exec_server)};
294 root 1.10
295 root 1.1 my $self = bless \%arg, $class;
296     $self->{fh} = $client;
297    
298     AnyEvent::Util::fh_nonblocking $client, 1;
299    
300     my $rbuf;
301     my @caller = (caller)[1,2]; # the "default" caller
302    
303 root 1.11 {
304     Scalar::Util::weaken (my $self = $self);
305    
306 root 1.14 $self->{rw} = AE::io $client, 0, sub {
307 root 1.11 return unless $self;
308    
309     my $len = sysread $client, $rbuf, 65536, length $rbuf;
310    
311     if ($len > 0) {
312     # we received data, so reset the timer
313 root 1.15 $self->{last_activity} = AE::now;
314 root 1.1
315 root 1.11 while () {
316     my $len = unpack "L", $rbuf;
317 root 1.1
318 root 1.11 # full response available?
319     last unless $len && $len + 4 <= length $rbuf;
320 root 1.1
321 root 1.11 my $res = Storable::thaw substr $rbuf, 4;
322     substr $rbuf, 0, $len + 4, ""; # remove length + request
323 root 1.1
324 root 1.11 last unless $self;
325     my $req = shift @{ $self->{queue} };
326 root 1.1
327 root 1.11 if (defined $res->[0]) {
328     $res->[0] = $self;
329     $req->[0](@$res);
330     } else {
331     my $cb = shift @$req;
332     local $@ = $res->[1];
333     $cb->($self);
334     $self->_error ($res->[1], @$req, $res->[2]) # error, request record, is_fatal
335     if $self; # cb() could have deleted it
336 root 1.10 }
337 root 1.11
338     # no more queued requests, so become idle
339 root 1.15 if ($self && !@{ $self->{queue} }) {
340     undef $self->{last_activity};
341     $self->{tw_cb}->();
342     }
343 root 1.10 }
344    
345 root 1.11 } elsif (defined $len) {
346     # todo, caller?
347     $self->_error ("unexpected eof", @caller, 1);
348     } elsif ($! != Errno::EAGAIN) {
349     # todo, caller?
350     $self->_error ("read error: $!", @caller, 1);
351     }
352 root 1.14 };
353 root 1.11
354     $self->{tw_cb} = sub {
355     if ($self->{timeout} && $self->{last_activity}) {
356 root 1.14 if (AE::now > $self->{last_activity} + $self->{timeout}) {
357 root 1.11 # we did time out
358     my $req = $self->{queue}[0];
359     $self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal
360     } else {
361     # we need to re-set the timeout watcher
362 root 1.14 $self->{tw} = AE::timer
363     $self->{last_activity} + $self->{timeout} - AE::now,
364     0,
365     $self->{tw_cb},
366     ;
367 root 1.1 }
368 root 1.11 } else {
369     # no timeout check wanted, or idle
370     undef $self->{tw};
371 root 1.1 }
372 root 1.11 };
373 root 1.1
374 root 1.11 $self->{ww_cb} = sub {
375     return unless $self;
376    
377 root 1.14 $self->{last_activity} = AE::now;
378 root 1.1
379 root 1.11 my $len = syswrite $client, $self->{wbuf}
380     or return delete $self->{ww};
381 root 1.3
382 root 1.11 substr $self->{wbuf}, 0, $len, "";
383     };
384     }
385 root 1.3
386 root 1.1 my $pid = fork;
387    
388     if ($pid) {
389     # parent
390     close $server;
391     } elsif (defined $pid) {
392     # child
393 root 1.10 my $serv_fno = fileno $server;
394 root 1.1
395 root 1.10 if ($self->{exec_server}) {
396 root 1.11 fcntl $server, &Fcntl::F_SETFD, 0; # don't close the server side
397     exec {$^X}
398     "$0 dbi slave",
399     -e => "require shift; AnyEvent::DBI::serve_fd ($serv_fno, $VERSION)",
400     $INC{"AnyEvent/DBI.pm"};
401 root 1.10 POSIX::_exit 124;
402     } else {
403     ($_ != $serv_fno) && POSIX::close $_
404 root 1.11 for $^F+1..$FD_MAX;
405     serve_fh $server, $VERSION;
406    
407     # no other way on the broken windows platform, even this leaks
408     # memory and might fail.
409     kill 9, $$
410     if AnyEvent::WIN32;
411    
412     # and this kills the parent process on windows
413     POSIX::_exit 0;
414 root 1.10 }
415 root 1.1 } else {
416     croak "fork: $!";
417     }
418    
419 root 1.10 $self->{child_pid} = $pid;
420 root 1.11
421 root 1.10 $self->_req (
422     ($self->{on_connect} ? $self->{on_connect} : sub { }),
423     (caller)[1,2],
424     req_open => $dbi, $user, $pass, %dbi_args
425     );
426 root 1.1
427     $self
428     }
429    
430 root 1.10 sub _server_pid {
431     shift->{child_pid}
432     }
433    
434     sub kill_child {
435 root 1.14 my $self = shift;
436 root 1.10
437 root 1.14 if (my $pid = delete $self->{child_pid}) {
438 root 1.15 # kill and reap process
439     my $kid_watcher; $kid_watcher = AE::child $pid, sub {
440     undef $kid_watcher;
441     };
442 root 1.14 kill TERM => $pid;
443 root 1.10 }
444 root 1.15
445 root 1.14 close delete $self->{fh};
446 root 1.10 }
447    
448     sub DESTROY {
449     shift->kill_child;
450     }
451    
452 root 1.1 sub _error {
453     my ($self, $error, $filename, $line, $fatal) = @_;
454    
455 root 1.10 if ($fatal) {
456 root 1.11 delete $self->{tw};
457 root 1.10 delete $self->{rw};
458     delete $self->{ww};
459     delete $self->{fh};
460    
461     # for fatal errors call all enqueued callbacks with error
462     while (my $req = shift @{$self->{queue}}) {
463 root 1.11 local $@ = $error;
464     $req->[0]->($self);
465 root 1.10 }
466     $self->kill_child;
467     }
468 root 1.1
469 root 1.11 local $@ = $error;
470 root 1.1
471 root 1.9 if ($self->{on_error}) {
472 root 1.10 $self->{on_error}($self, $filename, $line, $fatal)
473     } else {
474     die "$error at $filename, line $line\n";
475 root 1.9 }
476 root 1.10 }
477    
478     =item $dbh->on_error ($cb->($dbh, $filename, $line, $fatal))
479    
480 root 1.11 Sets (or clears, with C<undef>) the C<on_error> handler.
481 root 1.10
482     =cut
483 root 1.1
484 root 1.10 sub on_error {
485     $_[0]{on_error} = $_[1];
486     }
487    
488     =item $dbh->timeout ($seconds)
489    
490     Sets (or clears, with C<undef>) the database timeout. Useful to extend the
491     timeout when you are about to make a really long query.
492    
493     =cut
494    
495     sub timeout {
496     my ($self, $timeout) = @_;
497    
498 root 1.11 $self->{timeout} = $timeout;
499 root 1.10
500 root 1.11 # reschedule timer if one was running
501     $self->{tw_cb}->();
502 root 1.1 }
503    
504     sub _req {
505 root 1.10 my ($self, $cb, $filename, $line) = splice @_, 0, 4, ();
506    
507 root 1.11 unless ($self->{fh}) {
508     local $@ = my $err = 'no database connection';
509     $cb->($self);
510 root 1.10 $self->_error ($err, $filename, $line, 1);
511     return;
512     }
513    
514 root 1.11 push @{ $self->{queue} }, [$cb, $filename, $line];
515 root 1.1
516 root 1.11 # re-start timeout if necessary
517     if ($self->{timeout} && !$self->{tw}) {
518 root 1.14 $self->{last_activity} = AE::now;
519 root 1.11 $self->{tw_cb}->();
520 root 1.10 }
521 root 1.1
522     $self->{wbuf} .= pack "L/a*", Storable::freeze \@_;
523    
524     unless ($self->{ww}) {
525     my $len = syswrite $self->{fh}, $self->{wbuf};
526     substr $self->{wbuf}, 0, $len, "";
527    
528     # still any left? then install a write watcher
529 root 1.14 $self->{ww} = AE::io $self->{fh}, 1, $self->{ww_cb}
530 root 1.1 if length $self->{wbuf};
531     }
532     }
533    
534 root 1.11 =item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, $rv))
535 root 1.1
536     Executes the given SQL statement with placeholders replaced by
537 root 1.2 C<@args>. The statement will be prepared and cached on the server side, so
538 root 1.11 using placeholders is extremely important.
539 root 1.1
540 root 1.11 The callback will be called with a weakened AnyEvent::DBI object as the
541     first argument and the result of C<fetchall_arrayref> as (or C<undef>
542     if the statement wasn't a select statement) as the second argument.
543 root 1.1
544 root 1.11 Third argument is the return value from the C<< DBI->execute >> method
545     call.
546    
547     If an error occurs and the C<on_error> callback returns, then only C<$dbh>
548 root 1.2 will be passed and C<$@> contains the error message.
549    
550 root 1.11 =item $dbh->attr ($attr_name[, $attr_value], $cb->($dbh, $new_value))
551    
552     An accessor for the handle attributes, such as C<AutoCommit>,
553     C<RaiseError>, C<PrintError> and so on. If you provide an C<$attr_value>
554     (which might be C<undef>), then the given attribute will be set to that
555     value.
556 root 1.10
557 root 1.11 The callback will be passed the database handle and the attribute's value
558     if successful.
559    
560     If an error occurs and the C<on_error> callback returns, then only C<$dbh>
561     will be passed and C<$@> contains the error message.
562 root 1.10
563 root 1.12 =item $dbh->begin_work ($cb->($dbh[, $rc]))
564 root 1.10
565 root 1.12 =item $dbh->commit ($cb->($dbh[, $rc]))
566 root 1.10
567 root 1.12 =item $dbh->rollback ($cb->($dbh[, $rc]))
568 root 1.10
569 root 1.11 The begin_work, commit, and rollback methods expose the equivalent
570 root 1.12 transaction control method of the DBI driver. On success, C<$rc> is true.
571 root 1.10
572 root 1.11 If an error occurs and the C<on_error> callback returns, then only C<$dbh>
573     will be passed and C<$@> contains the error message.
574 root 1.10
575 root 1.12 =item $dbh->func ('string_which_yields_args_when_evaled', $func_name, $cb->($dbh, $rc, $dbi_err, $dbi_errstr))
576 root 1.10
577 root 1.11 This gives access to database driver private methods. Because they
578 root 1.12 are not standard you cannot always depend on the value of C<$rc> or
579     C<$dbi_err>. Check the documentation for your specific driver/function
580     combination to see what it returns.
581 root 1.10
582     Note that the first argument will be eval'ed to produce the argument list to
583 root 1.11 the func() method. This must be done because the serialization protocol
584 root 1.10 between the AnyEvent::DBI server process and your program does not support the
585     passage of closures.
586    
587     Here's an example to extend the query language in SQLite so it supports an
588     intstr() function:
589    
590     $cv = AnyEvent->condvar;
591 root 1.11 $dbh->func (
592 root 1.10 q{
593 root 1.11 instr => 2, sub {
594 root 1.10 my ($string, $search) = @_;
595     return index $string, $search;
596     },
597     },
598 root 1.11 create_function => sub {
599 root 1.12 return $cv->send ($@)
600     unless $#_;
601     $cv->send (undef, @_[1,2,3]);
602 root 1.11 }
603 root 1.10 );
604 root 1.11
605 root 1.12 my ($err,$rc,$errcode,$errstr) = $cv->recv;
606 root 1.11
607 root 1.12 die $err if defined $err;
608     die "EVAL failed: $errstr"
609     if $errcode;
610 root 1.11
611 root 1.12 # otherwise, we can ignore $rc and $errcode for this particular func
612 root 1.10
613 root 1.1 =cut
614    
615 root 1.10 for my $cmd_name (qw(exec attr begin_work commit rollback func)) {
616     eval 'sub ' . $cmd_name . '{
617     my $cb = pop;
618     splice @_, 1, 0, $cb, (caller)[1,2], "req_' . $cmd_name . '";
619 root 1.11 &_req
620 root 1.10 }';
621 root 1.1 }
622    
623     =back
624    
625     =head1 SEE ALSO
626    
627 root 1.11 L<AnyEvent>, L<DBI>, L<Coro::Mysql>.
628 root 1.1
629     =head1 AUTHOR
630    
631 root 1.4 Marc Lehmann <schmorp@schmorp.de>
632     http://home.schmorp.de/
633 root 1.1
634 root 1.10 Adam Rosenstein <adam@redcondor.com>
635     http://www.redcondor.com/
636    
637 root 1.1 =cut
638    
639 root 1.10 1;
640 root 1.1