ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
Revision: 1.10
Committed: Tue Jun 2 16:16:03 2009 UTC (14 years, 11 months ago) by root
Branch: MAIN
Changes since 1.9: +372 -40 lines
Log Message:
big patch by adam

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::DBI - asynchronous DBI access
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::DBI;
8    
9 root 1.5 my $cv = AnyEvent->condvar;
10    
11     my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", "";
12    
13     $dbh->exec ("select * from test where num=?", 10, sub {
14 root 1.7 my ($rows, $rv) = @_;
15 root 1.5
16     print "@$_\n"
17     for @$rows;
18    
19     $cv->broadcast;
20     });
21    
22     # asynchronously do sth. else here
23    
24     $cv->wait;
25    
26 root 1.1 =head1 DESCRIPTION
27    
28     This module is an L<AnyEvent> user, you need to make sure that you use and
29     run a supported event loop.
30    
31 root 1.8 This module implements asynchronous DBI access by forking or executing
32 root 1.1 separate "DBI-Server" processes and sending them requests.
33    
34     It means that you can run DBI requests in parallel to other tasks.
35    
36 root 1.3 The overhead for very simple statements ("select 0") is somewhere
37 root 1.6 around 120% to 200% (dual/single core CPU) compared to an explicit
38 root 1.3 prepare_cached/execute/fetchrow_arrayref/finish combination.
39    
40 root 1.1 =cut
41    
42     package AnyEvent::DBI;
43    
44     use strict;
45     no warnings;
46    
47     use Carp;
48     use Socket ();
49     use Scalar::Util ();
50     use Storable ();
51    
52     use DBI ();
53    
54     use AnyEvent ();
55     use AnyEvent::Util ();
56    
57 root 1.10 use Errno qw(:POSIX);
58     use Fcntl qw(F_SETFD);
59     use POSIX qw(sysconf _SC_OPEN_MAX);
60    
61     our $VERSION = '1.2';
62     my $fd_max = 1023; # default
63     eval { $fd_max = sysconf _SC_OPEN_MAX - 1; };
64 root 1.1
65     # this is the forked server code
66    
67     our $DBH;
68    
69     sub req_open {
70     my (undef, $dbi, $user, $pass, %attr) = @{+shift};
71    
72 root 1.10 $DBH = DBI->connect ($dbi, $user, $pass, \%attr) or die $DBI::errstr;
73 root 1.1
74     [1]
75     }
76    
77 root 1.2 sub req_exec {
78     my (undef, $st, @args) = @{+shift};
79 root 1.10 my $sth = $DBH->prepare_cached ($st, undef, 1)
80     or die [$DBI::errstr];
81 root 1.2
82 root 1.7 my $rv = $sth->execute (@args)
83 root 1.10 or die [$sth->errstr];
84 root 1.2
85 root 1.7 [1, $sth->{NUM_OF_FIELDS} ? $sth->fetchall_arrayref : undef, { rv => $rv }]
86 root 1.2 }
87    
88 root 1.10 sub req_attr {
89     my (undef, $attr_name, $attr_val) = @{+shift};
90    
91     if (defined $attr_val) {
92     $DBH->{$attr_name} = $attr_val;
93     }
94    
95     [1, $DBH->{$attr_name}]
96     }
97    
98     sub req_begin_work {
99     [scalar $DBH->begin_work or die $DBI::errstr]
100     }
101    
102     sub req_commit {
103     [scalar $DBH->commit or die $DBI::errstr]
104     }
105    
106     sub req_rollback {
107     [scalar $DBH->rollback or die $DBI::errstr]
108     }
109    
110     sub req_func {
111     my (undef, $arg_string, $function) = @{+shift};
112     my @args = eval $arg_string;
113    
114     if ($@) {
115     die "Bad func() arg string: $@";
116     }
117    
118     my $rv = $DBH->func (@args, $function);
119     return [$rv . $DBH->err];
120     }
121    
122 root 1.1 sub serve {
123 root 1.10 my ($fileno) = @_;
124    
125     open my $fh, ">>&=$fileno"
126     or die "Couldn't open service socket: $!";
127 root 1.1
128     no strict;
129    
130     eval {
131     my $rbuf;
132    
133     while () {
134     sysread $fh, $rbuf, 16384, length $rbuf
135     or last;
136    
137     while () {
138     my $len = unpack "L", $rbuf;
139    
140     # full request available?
141     last unless $len && $len + 4 <= length $rbuf;
142    
143     my $req = Storable::thaw substr $rbuf, 4;
144     substr $rbuf, 0, $len + 4, ""; # remove length + request
145    
146     my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) };
147 root 1.10 $wbuf = pack "L/a*", Storable::freeze [undef, ref $@ ? "$@->[0]" : $@ , ref $@ ? $@->[1] : 1]
148 root 1.1 if $@;
149    
150     for (my $ofs = 0; $ofs < length $wbuf; ) {
151     $ofs += (syswrite $fh, substr $wbuf, $ofs
152     or die "unable to write results");
153     }
154     }
155     }
156     };
157    
158 root 1.7 if (AnyEvent::WIN32) {
159     kill 9, $$; # no other way on the broken windows platform
160     # and the above doesn't even work on windows, it seems the only
161     # way to is to leak memory and kill 9 from the parent. yay.
162     }
163    
164     require POSIX;
165 root 1.10 POSIX::_exit 0;
166 root 1.7 # and the above kills the parent process on windows
167 root 1.1 }
168    
169 root 1.10 sub start_server {
170     serve shift @ARGV;
171     }
172    
173 root 1.1 =head2 METHODS
174    
175     =over 4
176    
177     =item $dbh = new AnyEvent::DBI $database, $user, $pass, [key => value]...
178    
179     Returns a database handle for the given database. Each database handle
180     has an associated server process that executes statements in order. If
181     you want to run more than one statement in parallel, you need to create
182     additional database handles.
183    
184     The advantage of this approach is that transactions work as state is
185     preserved.
186    
187     Example:
188    
189     $dbh = new AnyEvent::DBI
190     "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "";
191    
192     Additional key-value pairs can be used to adjust behaviour:
193    
194     =over 4
195    
196     =item on_error => $callback->($dbh, $filename, $line, $fatal)
197    
198     When an error occurs, then this callback will be invoked. On entry, C<$@>
199     is set to the error message. C<$filename> and C<$line> is where the
200     original request was submitted.
201    
202 root 1.10 If the fatal argument is true then the database connection shuts down and your
203     database handle becomes invalid. All of your queued request callbacks are
204     called without any arguments.
205 root 1.1
206 root 1.2 If omitted, then C<die> will be called on any errors, fatal or not.
207 root 1.1
208 root 1.10 The C<$dbh> argument is always a weak reference to the AnyEvent::DBI object.
209    
210     =item on_connect => $callback->($dbh)
211    
212     If you supply an on_connect callback, then this callback will be invoked after
213     the database connection is attempted. If the connection succeeds, C<$dbh>
214     contains a weak reference to the AnyEvent::DBI object. If the connection fails
215     for any reason, no arguments are passed to the callback and C<$@> contains
216     $DBI::errstr.
217    
218     Regardless of whether on_connect is supplied, connect errors will result in
219     on_error being called. However, if no on_connect callback is supplied, then
220     connection errors are considered fatal. The client will die() and the on_error
221     callback will be called with C<$fatal> true. When on_connect is supplied,
222     connect error are not fatal and AnyEvent::DBI will not die(). You still
223     cannot, however, use the $dbh object you recived from new() to make requests.
224    
225     =item exec_server => 1
226    
227     If you supply an exec_server argument, then the DBI server process will call
228     something like:
229    
230     exec "$^X -MAnyEvent::DBI -e AnyEvent::DBI::start_server"
231    
232     after forking. This will provide the cleanest possible interpreter for your
233     database server. There are special provisions to include C<-Mblib> if the
234     current interpreter is running with blib.
235    
236     If you do not supply the exec_server argument (or supply it with a false value)
237     then the traditional method of starting the server within the same forked
238     interpreter context is used. The forked interpreter will try to clean itself
239     up by calling POSIX::close on all filedescriptors except STDIN, STDOUT, and
240     STDERR (and the socket it uses to communicate with the cilent, of course).
241    
242     =item timeout => seconds
243    
244     If you supply a timeout parameter (floating point number of seconds), then a
245     timer is started any time the DBI handle expects a response from the server.
246     This includes connection setup as well as requests made to the backend. The
247     timeout spans the duration from the moment the first data is written (or queued
248     to be written) until all expected responses are returned, but is postponed for
249     "timeout" seconds each time more data is returned from the server. If the
250     timer ever goes off then a fatal error is generated. If you have an on_error
251     handler installed, then it will be called, otherwise your program will die().
252    
253     When altering your databases with timeouts it is wise to use transactions. If
254     you quit due to timeout while performing insert, update or schema-altering
255     commands you can end up not knowing if the action was submitted to the
256     database, complicating recovery.
257    
258     Timeout errors are always fatal.
259    
260 root 1.1 =back
261    
262 root 1.10 Any additional key-value pairs will be rolled into a hash reference and passed
263     as the final argument to the DBI->connect(...) call. For example, to supress
264     errors on STDERR and send them instead to an AnyEvent::Handle you could do:
265    
266     $dbh = new AnyEvent::DBI
267     "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "",
268     PrintError => 0,
269     on_error => sub { $log_handle->push_write("DBI Error: $@ at $_[1]:$_[2]\n"); }
270    
271 root 1.1 =cut
272    
273     # stupid Storable autoloading, total loss-loss situation
274     Storable::thaw Storable::freeze [];
275    
276     sub new {
277     my ($class, $dbi, $user, $pass, %arg) = @_;
278    
279     socketpair my $client, my $server, &Socket::AF_UNIX, &Socket::SOCK_STREAM, &Socket::PF_UNSPEC
280     or croak "unable to create dbi communicaiton pipe: $!";
281    
282 root 1.10 my %dbi_args = ( %arg ) ;
283     delete @dbi_args{qw( on_connect on_error timeout exec_server )};
284    
285 root 1.1 my $self = bless \%arg, $class;
286     $self->{fh} = $client;
287    
288     Scalar::Util::weaken (my $wself = $self);
289    
290     AnyEvent::Util::fh_nonblocking $client, 1;
291    
292     my $rbuf;
293     my @caller = (caller)[1,2]; # the "default" caller
294    
295     $self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub {
296 root 1.10 return unless $wself;
297 root 1.1 my $len = sysread $client, $rbuf, 65536, length $rbuf;
298 root 1.10 my $err = $!;
299 root 1.1
300     if ($len > 0) {
301 root 1.10 # we received data, so reset the timer
302     delete $wself->{timer};
303     if ($wself->{timeout}) {
304     $wself->{timer} = AnyEvent->timer (
305     after => $wself->{timeout},
306     cb => sub { $wself && $wself->_timedout },
307     );
308     }
309 root 1.1
310     while () {
311     my $len = unpack "L", $rbuf;
312    
313 root 1.10 # full response available?
314 root 1.1 last unless $len && $len + 4 <= length $rbuf;
315    
316     my $res = Storable::thaw substr $rbuf, 4;
317     substr $rbuf, 0, $len + 4, ""; # remove length + request
318    
319 root 1.10 last unless $wself;
320 root 1.1 my $req = shift @{ $wself->{queue} };
321    
322     if (defined $res->[0]) {
323 root 1.10 $res->[0] = $wself;
324 root 1.1 $req->[0](@$res);
325     } else {
326     my $cb = shift @$req;
327 root 1.10 $@=$res->[1];
328 root 1.2 $cb->();
329 root 1.10 if ($wself) { # cb() could have deleted it
330     $wself->_error ($res->[1], @$req, $res->[2]); # error, request record, is_fatal
331     }
332     }
333    
334     # no more queued requests, so cancel timeout
335     if ($wself) {
336     delete $wself->{timer}
337     unless @{ $wself->{queue} };
338 root 1.1 }
339     }
340    
341     } elsif (defined $len) {
342     $wself->_error ("unexpected eof", @caller, 1);
343     } else {
344 root 1.10 return if $err == EAGAIN;
345     $wself->_error ("read error: $err", @caller, 1);
346 root 1.1 }
347     });
348    
349 root 1.3 $self->{ww_cb} = sub {
350 root 1.10 return unless $wself;
351 root 1.3 my $len = syswrite $client, $wself->{wbuf}
352     or return delete $wself->{ww};
353    
354     substr $wself->{wbuf}, 0, $len, "";
355     };
356    
357 root 1.1 my $pid = fork;
358    
359     if ($pid) {
360     # parent
361     close $server;
362     } elsif (defined $pid) {
363     # child
364 root 1.10 my $serv_fno = fileno $server;
365 root 1.1
366 root 1.10 if ($self->{exec_server}) {
367     fcntl $server, F_SETFD, 0; # don't close the server side
368     exec "$^X -MAnyEvent::DBI -e AnyEvent::DBI::start_server $serv_fno";
369     POSIX::_exit 124;
370     } else {
371     ($_ != $serv_fno) && POSIX::close $_
372     for $^F+1..$fd_max;
373     serve $serv_fno;
374     POSIX::_exit 0; # not reachable
375     }
376 root 1.1 } else {
377     croak "fork: $!";
378     }
379    
380 root 1.10 $self->{child_pid} = $pid;
381     # set a connect timeout
382     if ($self->{timeout}) {
383     $self->{timer} = AnyEvent->timer (
384     after => $self->{timeout},
385     cb => sub { $wself && $wself->_timedout },
386     );
387     }
388     $self->_req (
389     ($self->{on_connect} ? $self->{on_connect} : sub { }),
390     (caller)[1,2],
391     req_open => $dbi, $user, $pass, %dbi_args
392     );
393 root 1.1
394     $self
395     }
396    
397 root 1.10 sub _server_pid {
398     shift->{child_pid}
399     }
400    
401     sub kill_child {
402     my $self = shift;
403     my $child_pid = delete $self->{child_pid};
404     if ($child_pid) {
405     # send SIGKILL in two seconds
406     my $murder_timer = AnyEvent->timer (
407     after => 2,
408     cb => sub {
409     kill 9, $child_pid;
410     },
411     );
412    
413     # reap process
414     my $kid_watcher;
415     $kid_watcher = AnyEvent->child (
416     pid => $child_pid ,
417     cb => sub {
418     # just hold on to this so it won't go away
419     undef $kid_watcher;
420     # cancel SIGKILL
421     undef $murder_timer;
422     },
423     );
424    
425     # SIGTERM = the beginning of the end
426     kill TERM => $child_pid;
427     }
428     }
429    
430     sub DESTROY {
431     shift->kill_child;
432     }
433    
434 root 1.1 sub _error {
435     my ($self, $error, $filename, $line, $fatal) = @_;
436    
437 root 1.10 if ($fatal) {
438     delete $self->{rw};
439     delete $self->{ww};
440     delete $self->{fh};
441     delete $self->{timer};
442    
443     # for fatal errors call all enqueued callbacks with error
444     while (my $req = shift @{$self->{queue}}) {
445     $@ = $error;
446     $req->[0]->();
447     }
448     $self->kill_child;
449     }
450 root 1.1
451     $@ = $error;
452    
453 root 1.9 if ($self->{on_error}) {
454 root 1.10 $self->{on_error}($self, $filename, $line, $fatal)
455     } else {
456     die "$error at $filename, line $line\n";
457 root 1.9 }
458 root 1.10 }
459    
460     =item $dbh->on_error ($cb->($dbh, $filename, $line, $fatal))
461    
462     Sets (or clears, with C<undef>) the on_error handler.
463    
464     =cut
465 root 1.1
466 root 1.10 sub on_error {
467     $_[0]{on_error} = $_[1];
468     }
469    
470     =item $dbh->on_connect ($cb->($dbh))
471    
472     Sets (or clears, with C<undef>) the on_connect handler.
473    
474     =cut
475    
476     sub on_connect {
477     $_[0]{on_connect} = $_[1];
478     }
479    
480     =item $dbh->timeout ($seconds)
481    
482     Sets (or clears, with C<undef>) the database timeout. Useful to extend the
483     timeout when you are about to make a really long query.
484    
485     =cut
486    
487     sub timeout {
488     my ($self, $timeout) = @_;
489    
490     if ($timeout) {
491     $self->{timeout} = $timeout;
492     # reschedule timer if one was running
493     if ($self->{timer}) {
494     Scalar::Util::weaken (my $wself = $self);
495     $self->{timer} = AnyEvent->timer (
496     after => $self->{timeout},
497     cb => sub { $wself && $wself->_timedout },
498     );
499     }
500     } else {
501     delete @{%$self}[qw(timer timeout)];
502     }
503     }
504    
505     sub _timedout {
506     my ($self) = @_;
507    
508     my $req = shift @{ $self->{queue} };
509    
510     if ($req) {
511     my $cb = shift @$req;
512     $@ = 'TIMEOUT';
513     $cb->();
514     $self->_error ('TIMEOUT', @$req, 1); # timeouts are always fatal
515     } else {
516     # shouldn't be possible to timeout without a pending request
517     $self->_error ('TIMEOUT', 'NO_PENDING_WTF', 0, 1);
518     }
519 root 1.1 }
520    
521     sub _req {
522 root 1.10 my ($self, $cb, $filename, $line) = splice @_, 0, 4, ();
523    
524     if (!$self->{fh}) {
525     my $err = $@ = 'NO DATABASE CONNECTION';
526     $cb->();
527     $self->_error ($err, $filename, $line, 1);
528     return;
529     }
530    
531     push @{ $self->{queue} }, [$cb, $filename, $line ];
532 root 1.1
533 root 1.10 if ($self->{timeout} && !$self->{timer}) {
534     Scalar::Util::weaken (my $wself = $self);
535     $self->{timer} = AnyEvent->timer (
536     after => $self->{timeout},
537     cb => sub { $wself && $wself->_timedout },
538     );
539     }
540 root 1.1
541     $self->{wbuf} .= pack "L/a*", Storable::freeze \@_;
542    
543     unless ($self->{ww}) {
544     my $len = syswrite $self->{fh}, $self->{wbuf};
545     substr $self->{wbuf}, 0, $len, "";
546    
547     # still any left? then install a write watcher
548     $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb})
549     if length $self->{wbuf};
550     }
551     }
552    
553 root 1.10 =item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, \%metadata))
554 root 1.1
555     Executes the given SQL statement with placeholders replaced by
556 root 1.2 C<@args>. The statement will be prepared and cached on the server side, so
557     using placeholders is compulsory.
558 root 1.1
559 root 1.10 The callback will be called with a weakened AnyEvent::DBI object as the first
560     argument and the result of C<fetchall_arrayref> as (or C<undef> if the
561     statement wasn't a select statement) as the second argument. Third argument is
562     a hash reference holding metadata about the request. Currently, the only key
563     defined is C<$metadata->{rv}> holding the return value of
564     C<execute>. Additional metadata might be added.
565 root 1.1
566 root 1.2 If an error occurs and the C<on_error> callback returns, then no arguments
567     will be passed and C<$@> contains the error message.
568    
569 root 1.10 =item $dbh->attr (attr_name, [ $attr_value ], $cb->($dbh, $new_value))
570    
571     An accessor for the handle attributes, such as AutoCommit, RaiseError,
572     PrintError, etc. If you provide an $attr_value, then the given attribute will
573     be set to that value.
574    
575     The callback will be passed the database handle and the
576     attribute's value if successful. If accessing the attribute fails, then no
577     arguments are passed to your callback, and $@ contains a description of the
578     problem instead.
579    
580     =item $dbh->begin_work ($cb->($dbh))
581    
582     =item $dbh->commit ($cb->($dbh))
583    
584     =item $dbh->rollback ($cb->($dbh))
585    
586     The begin_work, commit, and rollback methods exopose the equivelant transaction
587     control methods of the DBI. If something goes wrong, you will get no $dbh in
588     your callaback, and will instead have an error to examine in $@.
589    
590     =item $dbh->func ('string_which_yields_args_when_evaled', $func_name, $cb->($dbh, $result, $handle_error))
591    
592     This gives access to database driver private methods. Because they are not
593     standard you cannot always depend on the value of $result or $handle_error.
594     Check the documentation for your specific driver/function combination to see
595     what it returns.
596    
597     Note that the first argument will be eval'ed to produce the argument list to
598     the func() method. This must be done because the searialization protocol
599     between the AnyEvent::DBI server process and your program does not support the
600     passage of closures.
601    
602     Here's an example to extend the query language in SQLite so it supports an
603     intstr() function:
604    
605     $cv = AnyEvent->condvar;
606     $dbh->func(
607     q{
608     'instr',
609     2,
610     sub {
611     my ($string, $search) = @_;
612     return index $string, $search;
613     },
614     },
615     'create_function',
616     sub {return $cv->send($@) unless $_[0];$cv->send(undef,@_[1,2]);}
617     );
618     my ($err,$result,$handle_err) = $cv->recv();
619     die "EVAL failed: $err" if $err;
620     # otherwise, we can ignore $result and $handle_err for this particular func
621    
622 root 1.1 =cut
623    
624 root 1.10 for my $cmd_name (qw(exec attr begin_work commit rollback func)) {
625     eval 'sub ' . $cmd_name . '{
626     my $cb = pop;
627     splice @_, 1, 0, $cb, (caller)[1,2], "req_' . $cmd_name . '";
628     goto &_req;
629     }';
630 root 1.1 }
631    
632     =back
633    
634     =head1 SEE ALSO
635    
636     L<AnyEvent>, L<DBI>.
637    
638     =head1 AUTHOR
639    
640 root 1.4 Marc Lehmann <schmorp@schmorp.de>
641     http://home.schmorp.de/
642 root 1.1
643 root 1.10 Adam Rosenstein <adam@redcondor.com>
644     http://www.redcondor.com/
645    
646 root 1.1 =cut
647    
648 root 1.10 1;
649 root 1.1