ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
Revision: 1.21
Committed: Thu Apr 19 04:35:26 2018 UTC (6 years, 1 month ago) by root
Branch: MAIN
CVS Tags: rel-3_03
Changes since 1.20: +1 -1 lines
Log Message:
3.03

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::DBI - asynchronous DBI access
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::DBI;
8    
9 root 1.5 my $cv = AnyEvent->condvar;
10    
11     my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", "";
12    
13     $dbh->exec ("select * from test where num=?", 10, sub {
14 root 1.11 my ($dbh, $rows, $rv) = @_;
15    
16 root 1.12 $#_ or die "failure: $@";
17 root 1.5
18     print "@$_\n"
19     for @$rows;
20    
21     $cv->broadcast;
22     });
23    
24     # asynchronously do sth. else here
25    
26     $cv->wait;
27    
28 root 1.1 =head1 DESCRIPTION
29    
30     This module is an L<AnyEvent> user, you need to make sure that you use and
31     run a supported event loop.
32    
33 root 1.8 This module implements asynchronous DBI access by forking or executing
34 root 1.1 separate "DBI-Server" processes and sending them requests.
35    
36     It means that you can run DBI requests in parallel to other tasks.
37    
38 root 1.17 With DBD::mysql, the overhead for very simple statements
39     ("select 0") is somewhere around 50% compared to an explicit
40     prepare_cached/execute/fetchrow_arrayref/finish combination. With
41     DBD::SQlite3, it's more like a factor of 8 for this trivial statement.
42 root 1.3
43 root 1.11 =head2 ERROR HANDLING
44    
45     This module defines a number of functions that accept a callback
46     argument. All callbacks used by this module get their AnyEvent::DBI handle
47     object passed as first argument.
48    
49     If the request was successful, then there will be more arguments,
50     otherwise there will only be the C<$dbh> argument and C<$@> contains an
51     error message.
52    
53 root 1.17 A convenient way to check whether an error occurred is to check C<$#_> -
54 root 1.11 if that is true, then the function was successful, otherwise there was an
55     error.
56    
57 root 1.1 =cut
58    
59     package AnyEvent::DBI;
60    
61 root 1.14 use common::sense;
62 root 1.1
63     use Carp;
64 root 1.17 use Convert::Scalar ();
65     use AnyEvent::Fork ();
66     use CBOR::XS ();
67 root 1.1
68     use AnyEvent ();
69     use AnyEvent::Util ();
70    
71 root 1.11 use Errno ();
72 root 1.1
73 root 1.21 our $VERSION = '3.03';
74 root 1.10
75 root 1.1 =head2 METHODS
76    
77     =over 4
78    
79     =item $dbh = new AnyEvent::DBI $database, $user, $pass, [key => value]...
80    
81     Returns a database handle for the given database. Each database handle
82     has an associated server process that executes statements in order. If
83     you want to run more than one statement in parallel, you need to create
84     additional database handles.
85    
86     The advantage of this approach is that transactions work as state is
87     preserved.
88    
89     Example:
90    
91     $dbh = new AnyEvent::DBI
92     "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "";
93    
94     Additional key-value pairs can be used to adjust behaviour:
95    
96     =over 4
97    
98     =item on_error => $callback->($dbh, $filename, $line, $fatal)
99    
100     When an error occurs, then this callback will be invoked. On entry, C<$@>
101     is set to the error message. C<$filename> and C<$line> is where the
102     original request was submitted.
103    
104 root 1.11 If the fatal argument is true then the database connection is shut down
105     and your database handle became invalid. In addition to invoking the
106     C<on_error> callback, all of your queued request callbacks are called
107     without only the C<$dbh> argument.
108 root 1.1
109 root 1.2 If omitted, then C<die> will be called on any errors, fatal or not.
110 root 1.1
111 root 1.11 =item on_connect => $callback->($dbh[, $success])
112 root 1.10
113 root 1.11 If you supply an C<on_connect> callback, then this callback will be
114     invoked after the database connect attempt. If the connection succeeds,
115     C<$success> is true, otherwise it is missing and C<$@> contains the
116     C<$DBI::errstr>.
117    
118     Regardless of whether C<on_connect> is supplied, connect errors will result in
119     C<on_error> being called. However, if no C<on_connect> callback is supplied, then
120     connection errors are considered fatal. The client will C<die> and the C<on_error>
121     callback will be called with C<$fatal> true.
122    
123     When on_connect is supplied, connect error are not fatal and AnyEvent::DBI
124     will not C<die>. You still cannot, however, use the $dbh object you
125     received from C<new> to make requests.
126 root 1.10
127 root 1.17 =item fork_template => $AnyEvent::Fork-object
128 root 1.10
129 root 1.17 C<AnyEvent::DBI> uses C<< AnyEvent::Fork->new >> to create the database
130     slave, which in turn either C<exec>'s a new process (similar to the old
131     C<exec_server> constructor argument) or uses a process forked early (see
132     L<AnyEvent::Fork::Early>).
133    
134     With this argument you can provide your own fork template. This can be
135     useful if you create a lot of C<AnyEvent::DBI> handles and want to save
136     memory (And speed up startup) by not having to load C<AnyEvent::DBI> again
137     and again into your child processes:
138    
139     my $template = AnyEvent::Fork
140     ->new # create new template
141     ->require ("AnyEvent::DBI::Slave"); # preload AnyEvent::DBI::Slave module
142    
143     for (...) {
144     $dbh = new AnyEvent::DBI ...
145     fork_template => $template;
146 root 1.10
147     =item timeout => seconds
148    
149 root 1.11 If you supply a timeout parameter (fractional values are supported), then
150     a timer is started any time the DBI handle expects a response from the
151     server. This includes connection setup as well as requests made to the
152     backend. The timeout spans the duration from the moment the first data
153     is written (or queued to be written) until all expected responses are
154     returned, but is postponed for "timeout" seconds each time more data is
155     returned from the server. If the timer ever goes off then a fatal error is
156     generated. If you have an C<on_error> handler installed, then it will be
157     called, otherwise your program will die().
158    
159     When altering your databases with timeouts it is wise to use
160     transactions. If you quit due to timeout while performing insert, update
161     or schema-altering commands you can end up not knowing if the action was
162     submitted to the database, complicating recovery.
163 root 1.10
164     Timeout errors are always fatal.
165    
166 root 1.1 =back
167    
168 root 1.11 Any additional key-value pairs will be rolled into a hash reference
169     and passed as the final argument to the C<< DBI->connect (...) >>
170 root 1.17 call. For example, to suppress errors on STDERR and send them instead to an
171 root 1.11 AnyEvent::Handle you could do:
172    
173     $dbh = new AnyEvent::DBI
174     "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "",
175     PrintError => 0,
176     on_error => sub {
177     $log_handle->push_write ("DBI Error: $@ at $_[1]:$_[2]\n");
178     };
179 root 1.10
180 root 1.1 =cut
181    
182     sub new {
183     my ($class, $dbi, $user, $pass, %arg) = @_;
184    
185 root 1.17 # we use our own socketpair, so we always have a socket
186     # available, even before the forked process exsist.
187     # this is mostly done so this module is compatible
188     # to versions of itself older than 3.0.
189 root 1.11 my ($client, $server) = AnyEvent::Util::portable_socketpair
190 root 1.14 or croak "unable to create AnyEvent::DBI communications pipe: $!";
191 root 1.1
192 root 1.20 AnyEvent::fh_unblock $client;
193    
194 root 1.17 my $fork = delete $arg{fork_template};
195    
196 root 1.11 my %dbi_args = %arg;
197 root 1.17 delete @dbi_args{qw(on_connect on_error timeout fork_template exec_server)};
198 root 1.10
199 root 1.1 my $self = bless \%arg, $class;
200 root 1.17
201 root 1.1 $self->{fh} = $client;
202    
203     my $rbuf;
204     my @caller = (caller)[1,2]; # the "default" caller
205    
206 root 1.17 $fork = $fork ? $fork->fork : AnyEvent::Fork->new
207     or croak "fork: $!";
208    
209     $fork->require ("AnyEvent::DBI::Slave");
210     $fork->send_arg ($VERSION);
211     $fork->send_fh ($server);
212    
213     # we don't rely on the callback, because we use our own
214     # socketpair, for better or worse.
215     $fork->run ("AnyEvent::DBI::Slave::serve", sub { });
216    
217 root 1.11 {
218 root 1.17 Convert::Scalar::weaken (my $self = $self);
219    
220     my $cbor = new CBOR::XS;
221 root 1.11
222 root 1.14 $self->{rw} = AE::io $client, 0, sub {
223 root 1.17 my $len = Convert::Scalar::extend_read $client, $rbuf, 65536;
224 root 1.11
225     if ($len > 0) {
226     # we received data, so reset the timer
227 root 1.15 $self->{last_activity} = AE::now;
228 root 1.1
229 root 1.17 for my $res ($cbor->incr_parse_multiple ($rbuf)) {
230     last unless $self;
231 root 1.1
232 root 1.11 my $req = shift @{ $self->{queue} };
233 root 1.1
234 root 1.11 if (defined $res->[0]) {
235     $res->[0] = $self;
236     $req->[0](@$res);
237     } else {
238     my $cb = shift @$req;
239     local $@ = $res->[1];
240     $cb->($self);
241     $self->_error ($res->[1], @$req, $res->[2]) # error, request record, is_fatal
242     if $self; # cb() could have deleted it
243 root 1.10 }
244 root 1.11
245     # no more queued requests, so become idle
246 root 1.15 if ($self && !@{ $self->{queue} }) {
247     undef $self->{last_activity};
248     $self->{tw_cb}->();
249     }
250 root 1.10 }
251    
252 root 1.11 } elsif (defined $len) {
253     # todo, caller?
254     $self->_error ("unexpected eof", @caller, 1);
255     } elsif ($! != Errno::EAGAIN) {
256     # todo, caller?
257     $self->_error ("read error: $!", @caller, 1);
258     }
259 root 1.14 };
260 root 1.11
261     $self->{tw_cb} = sub {
262     if ($self->{timeout} && $self->{last_activity}) {
263 root 1.14 if (AE::now > $self->{last_activity} + $self->{timeout}) {
264 root 1.11 # we did time out
265     my $req = $self->{queue}[0];
266     $self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal
267     } else {
268     # we need to re-set the timeout watcher
269 root 1.14 $self->{tw} = AE::timer
270     $self->{last_activity} + $self->{timeout} - AE::now,
271     0,
272     $self->{tw_cb},
273     ;
274 root 1.1 }
275 root 1.11 } else {
276     # no timeout check wanted, or idle
277     undef $self->{tw};
278 root 1.1 }
279 root 1.11 };
280 root 1.1
281 root 1.11 $self->{ww_cb} = sub {
282 root 1.14 $self->{last_activity} = AE::now;
283 root 1.1
284 root 1.11 my $len = syswrite $client, $self->{wbuf}
285     or return delete $self->{ww};
286 root 1.3
287 root 1.11 substr $self->{wbuf}, 0, $len, "";
288     };
289     }
290 root 1.3
291 root 1.17 $self->_req (
292     sub {
293     return unless $self;
294     $self->{child_pid} = $_[1];
295     },
296     (caller)[1,2],
297     "req_pid"
298     );
299 root 1.11
300 root 1.10 $self->_req (
301 root 1.17 sub {
302     return unless $self;
303     &{ $self->{on_connect} } if $self->{on_connect};
304     },
305 root 1.10 (caller)[1,2],
306     req_open => $dbi, $user, $pass, %dbi_args
307     );
308 root 1.1
309     $self
310     }
311    
312 root 1.10 sub _server_pid {
313     shift->{child_pid}
314     }
315    
316     sub kill_child {
317 root 1.14 my $self = shift;
318 root 1.10
319 root 1.14 if (my $pid = delete $self->{child_pid}) {
320 root 1.15 # kill and reap process
321     my $kid_watcher; $kid_watcher = AE::child $pid, sub {
322     undef $kid_watcher;
323     };
324 root 1.14 kill TERM => $pid;
325 root 1.10 }
326 root 1.15
327 root 1.17 delete $self->{rw};
328     delete $self->{ww};
329     delete $self->{tw};
330 root 1.14 close delete $self->{fh};
331 root 1.10 }
332    
333     sub DESTROY {
334     shift->kill_child;
335     }
336    
337 root 1.1 sub _error {
338     my ($self, $error, $filename, $line, $fatal) = @_;
339    
340 root 1.10 if ($fatal) {
341 root 1.11 delete $self->{tw};
342 root 1.10 delete $self->{rw};
343     delete $self->{ww};
344     delete $self->{fh};
345    
346     # for fatal errors call all enqueued callbacks with error
347     while (my $req = shift @{$self->{queue}}) {
348 root 1.11 local $@ = $error;
349     $req->[0]->($self);
350 root 1.10 }
351     $self->kill_child;
352     }
353 root 1.1
354 root 1.11 local $@ = $error;
355 root 1.1
356 root 1.9 if ($self->{on_error}) {
357 root 1.10 $self->{on_error}($self, $filename, $line, $fatal)
358     } else {
359     die "$error at $filename, line $line\n";
360 root 1.9 }
361 root 1.10 }
362    
363     =item $dbh->on_error ($cb->($dbh, $filename, $line, $fatal))
364    
365 root 1.11 Sets (or clears, with C<undef>) the C<on_error> handler.
366 root 1.10
367     =cut
368 root 1.1
369 root 1.10 sub on_error {
370     $_[0]{on_error} = $_[1];
371     }
372    
373     =item $dbh->timeout ($seconds)
374    
375     Sets (or clears, with C<undef>) the database timeout. Useful to extend the
376     timeout when you are about to make a really long query.
377    
378     =cut
379    
380     sub timeout {
381     my ($self, $timeout) = @_;
382    
383 root 1.11 $self->{timeout} = $timeout;
384 root 1.10
385 root 1.11 # reschedule timer if one was running
386     $self->{tw_cb}->();
387 root 1.1 }
388    
389     sub _req {
390 root 1.10 my ($self, $cb, $filename, $line) = splice @_, 0, 4, ();
391    
392 root 1.11 unless ($self->{fh}) {
393     local $@ = my $err = 'no database connection';
394     $cb->($self);
395 root 1.10 $self->_error ($err, $filename, $line, 1);
396     return;
397     }
398    
399 root 1.11 push @{ $self->{queue} }, [$cb, $filename, $line];
400 root 1.1
401 root 1.11 # re-start timeout if necessary
402     if ($self->{timeout} && !$self->{tw}) {
403 root 1.14 $self->{last_activity} = AE::now;
404 root 1.11 $self->{tw_cb}->();
405 root 1.10 }
406 root 1.1
407 root 1.17 $self->{wbuf} .= CBOR::XS::encode_cbor \@_;
408 root 1.1
409     unless ($self->{ww}) {
410     my $len = syswrite $self->{fh}, $self->{wbuf};
411     substr $self->{wbuf}, 0, $len, "";
412    
413     # still any left? then install a write watcher
414 root 1.14 $self->{ww} = AE::io $self->{fh}, 1, $self->{ww_cb}
415 root 1.1 if length $self->{wbuf};
416     }
417     }
418    
419 root 1.17 =item $dbh->attr ($attr_name[, $attr_value], $cb->($dbh, $new_value))
420    
421     An accessor for the database handle attributes, such as C<AutoCommit>,
422     C<RaiseError>, C<PrintError> and so on. If you provide an C<$attr_value>
423     (which might be C<undef>), then the given attribute will be set to that
424     value.
425    
426     The callback will be passed the database handle and the attribute's value
427     if successful.
428    
429     If an error occurs and the C<on_error> callback returns, then only C<$dbh>
430     will be passed and C<$@> contains the error message.
431    
432 root 1.11 =item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, $rv))
433 root 1.1
434     Executes the given SQL statement with placeholders replaced by
435 root 1.2 C<@args>. The statement will be prepared and cached on the server side, so
436 root 1.11 using placeholders is extremely important.
437 root 1.1
438 root 1.11 The callback will be called with a weakened AnyEvent::DBI object as the
439     first argument and the result of C<fetchall_arrayref> as (or C<undef>
440     if the statement wasn't a select statement) as the second argument.
441 root 1.1
442 root 1.11 Third argument is the return value from the C<< DBI->execute >> method
443     call.
444    
445     If an error occurs and the C<on_error> callback returns, then only C<$dbh>
446 root 1.2 will be passed and C<$@> contains the error message.
447    
448 root 1.17 =item $dbh->stattr ($attr_name, $cb->($dbh, $value))
449 root 1.11
450 root 1.17 An accessor for the statement attributes of the most recently executed
451     statement, such as C<NAME> or C<TYPE>.
452 root 1.10
453 root 1.11 The callback will be passed the database handle and the attribute's value
454     if successful.
455    
456     If an error occurs and the C<on_error> callback returns, then only C<$dbh>
457     will be passed and C<$@> contains the error message.
458 root 1.10
459 root 1.12 =item $dbh->begin_work ($cb->($dbh[, $rc]))
460 root 1.10
461 root 1.12 =item $dbh->commit ($cb->($dbh[, $rc]))
462 root 1.10
463 root 1.12 =item $dbh->rollback ($cb->($dbh[, $rc]))
464 root 1.10
465 root 1.11 The begin_work, commit, and rollback methods expose the equivalent
466 root 1.12 transaction control method of the DBI driver. On success, C<$rc> is true.
467 root 1.10
468 root 1.11 If an error occurs and the C<on_error> callback returns, then only C<$dbh>
469     will be passed and C<$@> contains the error message.
470 root 1.10
471 root 1.12 =item $dbh->func ('string_which_yields_args_when_evaled', $func_name, $cb->($dbh, $rc, $dbi_err, $dbi_errstr))
472 root 1.10
473 root 1.11 This gives access to database driver private methods. Because they
474 root 1.12 are not standard you cannot always depend on the value of C<$rc> or
475     C<$dbi_err>. Check the documentation for your specific driver/function
476     combination to see what it returns.
477 root 1.10
478     Note that the first argument will be eval'ed to produce the argument list to
479 root 1.11 the func() method. This must be done because the serialization protocol
480 root 1.10 between the AnyEvent::DBI server process and your program does not support the
481     passage of closures.
482    
483     Here's an example to extend the query language in SQLite so it supports an
484     intstr() function:
485    
486     $cv = AnyEvent->condvar;
487 root 1.11 $dbh->func (
488 root 1.10 q{
489 root 1.11 instr => 2, sub {
490 root 1.10 my ($string, $search) = @_;
491     return index $string, $search;
492     },
493     },
494 root 1.11 create_function => sub {
495 root 1.12 return $cv->send ($@)
496     unless $#_;
497     $cv->send (undef, @_[1,2,3]);
498 root 1.11 }
499 root 1.10 );
500 root 1.11
501 root 1.12 my ($err,$rc,$errcode,$errstr) = $cv->recv;
502 root 1.11
503 root 1.12 die $err if defined $err;
504     die "EVAL failed: $errstr"
505     if $errcode;
506 root 1.11
507 root 1.12 # otherwise, we can ignore $rc and $errcode for this particular func
508 root 1.10
509 root 1.1 =cut
510    
511 root 1.17 for my $cmd_name (qw(attr exec stattr begin_work commit rollback func)) {
512 root 1.10 eval 'sub ' . $cmd_name . '{
513     my $cb = pop;
514     splice @_, 1, 0, $cb, (caller)[1,2], "req_' . $cmd_name . '";
515 root 1.11 &_req
516 root 1.10 }';
517 root 1.1 }
518    
519     =back
520    
521     =head1 SEE ALSO
522    
523 root 1.11 L<AnyEvent>, L<DBI>, L<Coro::Mysql>.
524 root 1.1
525 root 1.17 =head1 AUTHOR AND CONTACT
526 root 1.1
527 root 1.17 Marc Lehmann <schmorp@schmorp.de> (current maintainer)
528 root 1.4 http://home.schmorp.de/
529 root 1.1
530 root 1.10 Adam Rosenstein <adam@redcondor.com>
531     http://www.redcondor.com/
532    
533 root 1.1 =cut
534    
535 root 1.17 1