ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
Revision: 1.13
Committed: Sat Oct 23 21:47:13 2010 UTC (13 years, 7 months ago) by root
Branch: MAIN
Changes since 1.12: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::DBI - asynchronous DBI access
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::DBI;
8    
9 root 1.5 my $cv = AnyEvent->condvar;
10    
11     my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", "";
12    
13     $dbh->exec ("select * from test where num=?", 10, sub {
14 root 1.11 my ($dbh, $rows, $rv) = @_;
15    
16 root 1.12 $#_ or die "failure: $@";
17 root 1.5
18     print "@$_\n"
19     for @$rows;
20    
21     $cv->broadcast;
22     });
23    
24     # asynchronously do sth. else here
25    
26     $cv->wait;
27    
28 root 1.1 =head1 DESCRIPTION
29    
30     This module is an L<AnyEvent> user, you need to make sure that you use and
31     run a supported event loop.
32    
33 root 1.8 This module implements asynchronous DBI access by forking or executing
34 root 1.1 separate "DBI-Server" processes and sending them requests.
35    
36     It means that you can run DBI requests in parallel to other tasks.
37    
38 root 1.3 The overhead for very simple statements ("select 0") is somewhere
39 root 1.6 around 120% to 200% (dual/single core CPU) compared to an explicit
40 root 1.3 prepare_cached/execute/fetchrow_arrayref/finish combination.
41    
42 root 1.11 =head2 ERROR HANDLING
43    
44     This module defines a number of functions that accept a callback
45     argument. All callbacks used by this module get their AnyEvent::DBI handle
46     object passed as first argument.
47    
48     If the request was successful, then there will be more arguments,
49     otherwise there will only be the C<$dbh> argument and C<$@> contains an
50     error message.
51    
52     A convinient way to check whether an error occured is to check C<$#_> -
53     if that is true, then the function was successful, otherwise there was an
54     error.
55    
56 root 1.1 =cut
57    
58     package AnyEvent::DBI;
59    
60 root 1.11 use strict qw(vars subs);
61 root 1.1 no warnings;
62    
63     use Carp;
64     use Socket ();
65     use Scalar::Util ();
66     use Storable ();
67    
68     use DBI ();
69    
70     use AnyEvent ();
71     use AnyEvent::Util ();
72    
73 root 1.11 use Errno ();
74     use Fcntl ();
75     use POSIX ();
76    
77 root 1.12 our $VERSION = '2.0';
78 root 1.11
79     our $FD_MAX = eval { POSIX::sysconf (&POSIX::_SC_OPEN_MAX) - 1 } || 1023;
80 root 1.1
81 root 1.11 # this is the forked server code, could/should be bundled as it's own file
82 root 1.1
83     our $DBH;
84    
85     sub req_open {
86     my (undef, $dbi, $user, $pass, %attr) = @{+shift};
87    
88 root 1.10 $DBH = DBI->connect ($dbi, $user, $pass, \%attr) or die $DBI::errstr;
89 root 1.1
90 root 1.11 [1, 1]
91 root 1.1 }
92    
93 root 1.2 sub req_exec {
94     my (undef, $st, @args) = @{+shift};
95 root 1.10 my $sth = $DBH->prepare_cached ($st, undef, 1)
96     or die [$DBI::errstr];
97 root 1.2
98 root 1.7 my $rv = $sth->execute (@args)
99 root 1.10 or die [$sth->errstr];
100 root 1.2
101 root 1.11 [1, $sth->{NUM_OF_FIELDS} ? $sth->fetchall_arrayref : undef, $rv]
102 root 1.2 }
103    
104 root 1.10 sub req_attr {
105 root 1.11 my (undef, $attr_name, @attr_val) = @{+shift};
106 root 1.10
107 root 1.11 $DBH->{$attr_name} = $attr_val[0]
108     if @attr_val;
109 root 1.10
110     [1, $DBH->{$attr_name}]
111     }
112    
113     sub req_begin_work {
114 root 1.11 [1, $DBH->begin_work or die [$DBI::errstr]]
115 root 1.10 }
116    
117     sub req_commit {
118 root 1.11 [1, $DBH->commit or die [$DBI::errstr]]
119 root 1.10 }
120    
121     sub req_rollback {
122 root 1.11 [1, $DBH->rollback or die [$DBI::errstr]]
123 root 1.10 }
124    
125     sub req_func {
126     my (undef, $arg_string, $function) = @{+shift};
127     my @args = eval $arg_string;
128    
129 root 1.12 die "error evaling \$dbh->func() arg_string: $@"
130 root 1.11 if $@;
131 root 1.10
132 root 1.12 my $rc = $DBH->func (@args, $function);
133     return [1, $rc, $DBI::err, $DBI::errstr];
134 root 1.10 }
135    
136 root 1.11 sub serve_fh($$) {
137     my ($fh, $version) = @_;
138 root 1.10
139 root 1.11 if ($VERSION != $version) {
140     syswrite $fh,
141     pack "L/a*",
142     Storable::freeze
143     [undef, "AnyEvent::DBI version mismatch ($VERSION vs. $version)"];
144     return;
145     }
146 root 1.1
147     eval {
148     my $rbuf;
149    
150     while () {
151     sysread $fh, $rbuf, 16384, length $rbuf
152     or last;
153    
154     while () {
155     my $len = unpack "L", $rbuf;
156    
157     # full request available?
158     last unless $len && $len + 4 <= length $rbuf;
159    
160     my $req = Storable::thaw substr $rbuf, 4;
161     substr $rbuf, 0, $len + 4, ""; # remove length + request
162    
163     my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) };
164 root 1.11 $wbuf = pack "L/a*", Storable::freeze [undef, ref $@ ? ("$@->[0]", $@->[1]) : ("$@", 1)]
165 root 1.1 if $@;
166    
167     for (my $ofs = 0; $ofs < length $wbuf; ) {
168     $ofs += (syswrite $fh, substr $wbuf, $ofs
169     or die "unable to write results");
170     }
171     }
172     }
173     };
174 root 1.11 }
175 root 1.1
176 root 1.11 sub serve_fd($$) {
177     open my $fh, ">>&=$_[0]"
178     or die "Couldn't open server file descriptor: $!";
179 root 1.1
180 root 1.11 serve_fh $fh, $_[1];
181 root 1.10 }
182    
183 root 1.1 =head2 METHODS
184    
185     =over 4
186    
187     =item $dbh = new AnyEvent::DBI $database, $user, $pass, [key => value]...
188    
189     Returns a database handle for the given database. Each database handle
190     has an associated server process that executes statements in order. If
191     you want to run more than one statement in parallel, you need to create
192     additional database handles.
193    
194     The advantage of this approach is that transactions work as state is
195     preserved.
196    
197     Example:
198    
199     $dbh = new AnyEvent::DBI
200     "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "";
201    
202     Additional key-value pairs can be used to adjust behaviour:
203    
204     =over 4
205    
206     =item on_error => $callback->($dbh, $filename, $line, $fatal)
207    
208     When an error occurs, then this callback will be invoked. On entry, C<$@>
209     is set to the error message. C<$filename> and C<$line> is where the
210     original request was submitted.
211    
212 root 1.11 If the fatal argument is true then the database connection is shut down
213     and your database handle became invalid. In addition to invoking the
214     C<on_error> callback, all of your queued request callbacks are called
215     without only the C<$dbh> argument.
216 root 1.1
217 root 1.2 If omitted, then C<die> will be called on any errors, fatal or not.
218 root 1.1
219 root 1.11 =item on_connect => $callback->($dbh[, $success])
220 root 1.10
221 root 1.11 If you supply an C<on_connect> callback, then this callback will be
222     invoked after the database connect attempt. If the connection succeeds,
223     C<$success> is true, otherwise it is missing and C<$@> contains the
224     C<$DBI::errstr>.
225    
226     Regardless of whether C<on_connect> is supplied, connect errors will result in
227     C<on_error> being called. However, if no C<on_connect> callback is supplied, then
228     connection errors are considered fatal. The client will C<die> and the C<on_error>
229     callback will be called with C<$fatal> true.
230    
231     When on_connect is supplied, connect error are not fatal and AnyEvent::DBI
232     will not C<die>. You still cannot, however, use the $dbh object you
233     received from C<new> to make requests.
234 root 1.10
235     =item exec_server => 1
236    
237 root 1.11 If you supply an C<exec_server> argument, then the DBI server process will
238     fork and exec another perl interpreter (using C<$^X>) with just the
239 root 1.13 AnyEvent::DBI proxy running. This will provide the cleanest possible proxy
240 root 1.11 for your database server.
241    
242     If you do not supply the C<exec_server> argument (or supply it with a
243     false value) then the traditional method of starting the server by forking
244     the current process is used. The forked interpreter will try to clean
245     itself up by calling POSIX::close on all file descriptors except STDIN,
246     STDOUT, and STDERR (and the socket it uses to communicate with the cilent,
247     of course).
248 root 1.10
249     =item timeout => seconds
250    
251 root 1.11 If you supply a timeout parameter (fractional values are supported), then
252     a timer is started any time the DBI handle expects a response from the
253     server. This includes connection setup as well as requests made to the
254     backend. The timeout spans the duration from the moment the first data
255     is written (or queued to be written) until all expected responses are
256     returned, but is postponed for "timeout" seconds each time more data is
257     returned from the server. If the timer ever goes off then a fatal error is
258     generated. If you have an C<on_error> handler installed, then it will be
259     called, otherwise your program will die().
260    
261     When altering your databases with timeouts it is wise to use
262     transactions. If you quit due to timeout while performing insert, update
263     or schema-altering commands you can end up not knowing if the action was
264     submitted to the database, complicating recovery.
265 root 1.10
266     Timeout errors are always fatal.
267    
268 root 1.1 =back
269    
270 root 1.11 Any additional key-value pairs will be rolled into a hash reference
271     and passed as the final argument to the C<< DBI->connect (...) >>
272     call. For example, to supress errors on STDERR and send them instead to an
273     AnyEvent::Handle you could do:
274    
275     $dbh = new AnyEvent::DBI
276     "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "",
277     PrintError => 0,
278     on_error => sub {
279     $log_handle->push_write ("DBI Error: $@ at $_[1]:$_[2]\n");
280     };
281 root 1.10
282 root 1.1 =cut
283    
284     # stupid Storable autoloading, total loss-loss situation
285     Storable::thaw Storable::freeze [];
286    
287     sub new {
288     my ($class, $dbi, $user, $pass, %arg) = @_;
289    
290 root 1.11 my ($client, $server) = AnyEvent::Util::portable_socketpair
291     or croak "unable to create Anyevent::DBI communications pipe: $!";
292 root 1.1
293 root 1.11 my %dbi_args = %arg;
294     delete @dbi_args{qw(on_connect on_error timeout exec_server)};
295 root 1.10
296 root 1.1 my $self = bless \%arg, $class;
297     $self->{fh} = $client;
298    
299     AnyEvent::Util::fh_nonblocking $client, 1;
300    
301     my $rbuf;
302     my @caller = (caller)[1,2]; # the "default" caller
303    
304 root 1.11 {
305     Scalar::Util::weaken (my $self = $self);
306    
307     $self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub {
308     return unless $self;
309    
310     $self->{last_activity} = AnyEvent->now;
311    
312     my $len = sysread $client, $rbuf, 65536, length $rbuf;
313    
314     if ($len > 0) {
315     # we received data, so reset the timer
316 root 1.1
317 root 1.11 while () {
318     my $len = unpack "L", $rbuf;
319 root 1.1
320 root 1.11 # full response available?
321     last unless $len && $len + 4 <= length $rbuf;
322 root 1.1
323 root 1.11 my $res = Storable::thaw substr $rbuf, 4;
324     substr $rbuf, 0, $len + 4, ""; # remove length + request
325 root 1.1
326 root 1.11 last unless $self;
327     my $req = shift @{ $self->{queue} };
328 root 1.1
329 root 1.11 if (defined $res->[0]) {
330     $res->[0] = $self;
331     $req->[0](@$res);
332     } else {
333     my $cb = shift @$req;
334     local $@ = $res->[1];
335     $cb->($self);
336     $self->_error ($res->[1], @$req, $res->[2]) # error, request record, is_fatal
337     if $self; # cb() could have deleted it
338 root 1.10 }
339 root 1.11
340     # no more queued requests, so become idle
341     undef $self->{last_activity}
342     if $self && !@{ $self->{queue} };
343 root 1.10 }
344    
345 root 1.11 } elsif (defined $len) {
346     # todo, caller?
347     $self->_error ("unexpected eof", @caller, 1);
348     } elsif ($! != Errno::EAGAIN) {
349     # todo, caller?
350     $self->_error ("read error: $!", @caller, 1);
351     }
352     });
353    
354     $self->{tw_cb} = sub {
355     if ($self->{timeout} && $self->{last_activity}) {
356     if (AnyEvent->now > $self->{last_activity} + $self->{timeout}) {
357     # we did time out
358     my $req = $self->{queue}[0];
359     $self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal
360     } else {
361     # we need to re-set the timeout watcher
362     $self->{tw} = AnyEvent->timer (
363     after => $self->{last_activity} + $self->{timeout} - AnyEvent->now,
364     cb => $self->{tw_cb},
365     );
366     Scalar::Util::weaken $self;
367 root 1.1 }
368 root 1.11 } else {
369     # no timeout check wanted, or idle
370     undef $self->{tw};
371 root 1.1 }
372 root 1.11 };
373 root 1.1
374 root 1.11 $self->{ww_cb} = sub {
375     return unless $self;
376    
377     $self->{last_activity} = AnyEvent->now;
378 root 1.1
379 root 1.11 my $len = syswrite $client, $self->{wbuf}
380     or return delete $self->{ww};
381 root 1.3
382 root 1.11 substr $self->{wbuf}, 0, $len, "";
383     };
384     }
385 root 1.3
386 root 1.1 my $pid = fork;
387    
388     if ($pid) {
389     # parent
390     close $server;
391     } elsif (defined $pid) {
392     # child
393 root 1.10 my $serv_fno = fileno $server;
394 root 1.1
395 root 1.10 if ($self->{exec_server}) {
396 root 1.11 fcntl $server, &Fcntl::F_SETFD, 0; # don't close the server side
397     exec {$^X}
398     "$0 dbi slave",
399     -e => "require shift; AnyEvent::DBI::serve_fd ($serv_fno, $VERSION)",
400     $INC{"AnyEvent/DBI.pm"};
401 root 1.10 POSIX::_exit 124;
402     } else {
403     ($_ != $serv_fno) && POSIX::close $_
404 root 1.11 for $^F+1..$FD_MAX;
405     serve_fh $server, $VERSION;
406    
407     # no other way on the broken windows platform, even this leaks
408     # memory and might fail.
409     kill 9, $$
410     if AnyEvent::WIN32;
411    
412     # and this kills the parent process on windows
413     POSIX::_exit 0;
414 root 1.10 }
415 root 1.1 } else {
416     croak "fork: $!";
417     }
418    
419 root 1.10 $self->{child_pid} = $pid;
420 root 1.11
421 root 1.10 $self->_req (
422     ($self->{on_connect} ? $self->{on_connect} : sub { }),
423     (caller)[1,2],
424     req_open => $dbi, $user, $pass, %dbi_args
425     );
426 root 1.1
427     $self
428     }
429    
430 root 1.10 sub _server_pid {
431     shift->{child_pid}
432     }
433    
434     sub kill_child {
435     my $self = shift;
436     my $child_pid = delete $self->{child_pid};
437     if ($child_pid) {
438     # send SIGKILL in two seconds
439     my $murder_timer = AnyEvent->timer (
440     after => 2,
441     cb => sub {
442     kill 9, $child_pid;
443     },
444     );
445    
446     # reap process
447 root 1.11 my $kid_watcher; $kid_watcher = AnyEvent->child (
448     pid => $child_pid,
449 root 1.10 cb => sub {
450     # just hold on to this so it won't go away
451     undef $kid_watcher;
452     # cancel SIGKILL
453     undef $murder_timer;
454     },
455     );
456    
457 root 1.11 close $self->{fh};
458 root 1.10 }
459     }
460    
461     sub DESTROY {
462     shift->kill_child;
463     }
464    
465 root 1.1 sub _error {
466     my ($self, $error, $filename, $line, $fatal) = @_;
467    
468 root 1.10 if ($fatal) {
469 root 1.11 delete $self->{tw};
470 root 1.10 delete $self->{rw};
471     delete $self->{ww};
472     delete $self->{fh};
473    
474     # for fatal errors call all enqueued callbacks with error
475     while (my $req = shift @{$self->{queue}}) {
476 root 1.11 local $@ = $error;
477     $req->[0]->($self);
478 root 1.10 }
479     $self->kill_child;
480     }
481 root 1.1
482 root 1.11 local $@ = $error;
483 root 1.1
484 root 1.9 if ($self->{on_error}) {
485 root 1.10 $self->{on_error}($self, $filename, $line, $fatal)
486     } else {
487     die "$error at $filename, line $line\n";
488 root 1.9 }
489 root 1.10 }
490    
491     =item $dbh->on_error ($cb->($dbh, $filename, $line, $fatal))
492    
493 root 1.11 Sets (or clears, with C<undef>) the C<on_error> handler.
494 root 1.10
495     =cut
496 root 1.1
497 root 1.10 sub on_error {
498     $_[0]{on_error} = $_[1];
499     }
500    
501     =item $dbh->timeout ($seconds)
502    
503     Sets (or clears, with C<undef>) the database timeout. Useful to extend the
504     timeout when you are about to make a really long query.
505    
506     =cut
507    
508     sub timeout {
509     my ($self, $timeout) = @_;
510    
511 root 1.11 $self->{timeout} = $timeout;
512 root 1.10
513 root 1.11 # reschedule timer if one was running
514     $self->{tw_cb}->();
515 root 1.1 }
516    
517     sub _req {
518 root 1.10 my ($self, $cb, $filename, $line) = splice @_, 0, 4, ();
519    
520 root 1.11 unless ($self->{fh}) {
521     local $@ = my $err = 'no database connection';
522     $cb->($self);
523 root 1.10 $self->_error ($err, $filename, $line, 1);
524     return;
525     }
526    
527 root 1.11 push @{ $self->{queue} }, [$cb, $filename, $line];
528 root 1.1
529 root 1.11 # re-start timeout if necessary
530     if ($self->{timeout} && !$self->{tw}) {
531     $self->{last_activity} = AnyEvent->now;
532     $self->{tw_cb}->();
533 root 1.10 }
534 root 1.1
535     $self->{wbuf} .= pack "L/a*", Storable::freeze \@_;
536    
537     unless ($self->{ww}) {
538     my $len = syswrite $self->{fh}, $self->{wbuf};
539     substr $self->{wbuf}, 0, $len, "";
540    
541     # still any left? then install a write watcher
542     $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb})
543     if length $self->{wbuf};
544     }
545     }
546    
547 root 1.11 =item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, $rv))
548 root 1.1
549     Executes the given SQL statement with placeholders replaced by
550 root 1.2 C<@args>. The statement will be prepared and cached on the server side, so
551 root 1.11 using placeholders is extremely important.
552 root 1.1
553 root 1.11 The callback will be called with a weakened AnyEvent::DBI object as the
554     first argument and the result of C<fetchall_arrayref> as (or C<undef>
555     if the statement wasn't a select statement) as the second argument.
556 root 1.1
557 root 1.11 Third argument is the return value from the C<< DBI->execute >> method
558     call.
559    
560     If an error occurs and the C<on_error> callback returns, then only C<$dbh>
561 root 1.2 will be passed and C<$@> contains the error message.
562    
563 root 1.11 =item $dbh->attr ($attr_name[, $attr_value], $cb->($dbh, $new_value))
564    
565     An accessor for the handle attributes, such as C<AutoCommit>,
566     C<RaiseError>, C<PrintError> and so on. If you provide an C<$attr_value>
567     (which might be C<undef>), then the given attribute will be set to that
568     value.
569 root 1.10
570 root 1.11 The callback will be passed the database handle and the attribute's value
571     if successful.
572    
573     If an error occurs and the C<on_error> callback returns, then only C<$dbh>
574     will be passed and C<$@> contains the error message.
575 root 1.10
576 root 1.12 =item $dbh->begin_work ($cb->($dbh[, $rc]))
577 root 1.10
578 root 1.12 =item $dbh->commit ($cb->($dbh[, $rc]))
579 root 1.10
580 root 1.12 =item $dbh->rollback ($cb->($dbh[, $rc]))
581 root 1.10
582 root 1.11 The begin_work, commit, and rollback methods expose the equivalent
583 root 1.12 transaction control method of the DBI driver. On success, C<$rc> is true.
584 root 1.10
585 root 1.11 If an error occurs and the C<on_error> callback returns, then only C<$dbh>
586     will be passed and C<$@> contains the error message.
587 root 1.10
588 root 1.12 =item $dbh->func ('string_which_yields_args_when_evaled', $func_name, $cb->($dbh, $rc, $dbi_err, $dbi_errstr))
589 root 1.10
590 root 1.11 This gives access to database driver private methods. Because they
591 root 1.12 are not standard you cannot always depend on the value of C<$rc> or
592     C<$dbi_err>. Check the documentation for your specific driver/function
593     combination to see what it returns.
594 root 1.10
595     Note that the first argument will be eval'ed to produce the argument list to
596 root 1.11 the func() method. This must be done because the serialization protocol
597 root 1.10 between the AnyEvent::DBI server process and your program does not support the
598     passage of closures.
599    
600     Here's an example to extend the query language in SQLite so it supports an
601     intstr() function:
602    
603     $cv = AnyEvent->condvar;
604 root 1.11 $dbh->func (
605 root 1.10 q{
606 root 1.11 instr => 2, sub {
607 root 1.10 my ($string, $search) = @_;
608     return index $string, $search;
609     },
610     },
611 root 1.11 create_function => sub {
612 root 1.12 return $cv->send ($@)
613     unless $#_;
614     $cv->send (undef, @_[1,2,3]);
615 root 1.11 }
616 root 1.10 );
617 root 1.11
618 root 1.12 my ($err,$rc,$errcode,$errstr) = $cv->recv;
619 root 1.11
620 root 1.12 die $err if defined $err;
621     die "EVAL failed: $errstr"
622     if $errcode;
623 root 1.11
624 root 1.12 # otherwise, we can ignore $rc and $errcode for this particular func
625 root 1.10
626 root 1.1 =cut
627    
628 root 1.10 for my $cmd_name (qw(exec attr begin_work commit rollback func)) {
629     eval 'sub ' . $cmd_name . '{
630     my $cb = pop;
631     splice @_, 1, 0, $cb, (caller)[1,2], "req_' . $cmd_name . '";
632 root 1.11 &_req
633 root 1.10 }';
634 root 1.1 }
635    
636     =back
637    
638     =head1 SEE ALSO
639    
640 root 1.11 L<AnyEvent>, L<DBI>, L<Coro::Mysql>.
641 root 1.1
642     =head1 AUTHOR
643    
644 root 1.4 Marc Lehmann <schmorp@schmorp.de>
645     http://home.schmorp.de/
646 root 1.1
647 root 1.10 Adam Rosenstein <adam@redcondor.com>
648     http://www.redcondor.com/
649    
650 root 1.1 =cut
651    
652 root 1.10 1;
653 root 1.1