ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
Revision: 1.23
Committed: Sat Feb 1 21:43:28 2020 UTC (4 years, 3 months ago) by root
Branch: MAIN
CVS Tags: HEAD
Changes since 1.22: +3 -0 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::DBI - asynchronous DBI access
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::DBI;
8    
9 root 1.5 my $cv = AnyEvent->condvar;
10    
11     my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", "";
12    
13     $dbh->exec ("select * from test where num=?", 10, sub {
14 root 1.11 my ($dbh, $rows, $rv) = @_;
15    
16 root 1.12 $#_ or die "failure: $@";
17 root 1.5
18     print "@$_\n"
19     for @$rows;
20    
21     $cv->broadcast;
22     });
23    
24     # asynchronously do sth. else here
25    
26     $cv->wait;
27    
28 root 1.1 =head1 DESCRIPTION
29    
30     This module is an L<AnyEvent> user, you need to make sure that you use and
31     run a supported event loop.
32    
33 root 1.8 This module implements asynchronous DBI access by forking or executing
34 root 1.1 separate "DBI-Server" processes and sending them requests.
35    
36     It means that you can run DBI requests in parallel to other tasks.
37    
38 root 1.17 With DBD::mysql, the overhead for very simple statements
39     ("select 0") is somewhere around 50% compared to an explicit
40     prepare_cached/execute/fetchrow_arrayref/finish combination. With
41     DBD::SQlite3, it's more like a factor of 8 for this trivial statement.
42 root 1.3
43 root 1.11 =head2 ERROR HANDLING
44    
45     This module defines a number of functions that accept a callback
46     argument. All callbacks used by this module get their AnyEvent::DBI handle
47     object passed as first argument.
48    
49     If the request was successful, then there will be more arguments,
50     otherwise there will only be the C<$dbh> argument and C<$@> contains an
51     error message.
52    
53 root 1.17 A convenient way to check whether an error occurred is to check C<$#_> -
54 root 1.11 if that is true, then the function was successful, otherwise there was an
55     error.
56    
57 root 1.1 =cut
58    
59     package AnyEvent::DBI;
60    
61 root 1.14 use common::sense;
62 root 1.1
63     use Carp;
64 root 1.17 use Convert::Scalar ();
65     use AnyEvent::Fork ();
66     use CBOR::XS ();
67 root 1.1
68     use AnyEvent ();
69     use AnyEvent::Util ();
70    
71 root 1.11 use Errno ();
72 root 1.1
73 root 1.22 our $VERSION = '3.04';
74 root 1.10
75 root 1.1 =head2 METHODS
76    
77     =over 4
78    
79     =item $dbh = new AnyEvent::DBI $database, $user, $pass, [key => value]...
80    
81     Returns a database handle for the given database. Each database handle
82     has an associated server process that executes statements in order. If
83     you want to run more than one statement in parallel, you need to create
84     additional database handles.
85    
86     The advantage of this approach is that transactions work as state is
87     preserved.
88    
89     Example:
90    
91     $dbh = new AnyEvent::DBI
92     "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "";
93    
94     Additional key-value pairs can be used to adjust behaviour:
95    
96     =over 4
97    
98     =item on_error => $callback->($dbh, $filename, $line, $fatal)
99    
100     When an error occurs, then this callback will be invoked. On entry, C<$@>
101     is set to the error message. C<$filename> and C<$line> is where the
102     original request was submitted.
103    
104 root 1.11 If the fatal argument is true then the database connection is shut down
105     and your database handle became invalid. In addition to invoking the
106     C<on_error> callback, all of your queued request callbacks are called
107     without only the C<$dbh> argument.
108 root 1.1
109 root 1.2 If omitted, then C<die> will be called on any errors, fatal or not.
110 root 1.1
111 root 1.23 Note that AnyEvent::DBI will not catch errors in user-provided callbacks:
112     if you die in your callback, things might malfunction.
113    
114 root 1.11 =item on_connect => $callback->($dbh[, $success])
115 root 1.10
116 root 1.11 If you supply an C<on_connect> callback, then this callback will be
117     invoked after the database connect attempt. If the connection succeeds,
118     C<$success> is true, otherwise it is missing and C<$@> contains the
119     C<$DBI::errstr>.
120    
121     Regardless of whether C<on_connect> is supplied, connect errors will result in
122     C<on_error> being called. However, if no C<on_connect> callback is supplied, then
123     connection errors are considered fatal. The client will C<die> and the C<on_error>
124     callback will be called with C<$fatal> true.
125    
126     When on_connect is supplied, connect error are not fatal and AnyEvent::DBI
127     will not C<die>. You still cannot, however, use the $dbh object you
128     received from C<new> to make requests.
129 root 1.10
130 root 1.17 =item fork_template => $AnyEvent::Fork-object
131 root 1.10
132 root 1.17 C<AnyEvent::DBI> uses C<< AnyEvent::Fork->new >> to create the database
133     slave, which in turn either C<exec>'s a new process (similar to the old
134     C<exec_server> constructor argument) or uses a process forked early (see
135     L<AnyEvent::Fork::Early>).
136    
137     With this argument you can provide your own fork template. This can be
138     useful if you create a lot of C<AnyEvent::DBI> handles and want to save
139     memory (And speed up startup) by not having to load C<AnyEvent::DBI> again
140     and again into your child processes:
141    
142     my $template = AnyEvent::Fork
143     ->new # create new template
144     ->require ("AnyEvent::DBI::Slave"); # preload AnyEvent::DBI::Slave module
145    
146     for (...) {
147     $dbh = new AnyEvent::DBI ...
148     fork_template => $template;
149 root 1.10
150     =item timeout => seconds
151    
152 root 1.11 If you supply a timeout parameter (fractional values are supported), then
153     a timer is started any time the DBI handle expects a response from the
154     server. This includes connection setup as well as requests made to the
155     backend. The timeout spans the duration from the moment the first data
156     is written (or queued to be written) until all expected responses are
157     returned, but is postponed for "timeout" seconds each time more data is
158     returned from the server. If the timer ever goes off then a fatal error is
159     generated. If you have an C<on_error> handler installed, then it will be
160     called, otherwise your program will die().
161    
162     When altering your databases with timeouts it is wise to use
163     transactions. If you quit due to timeout while performing insert, update
164     or schema-altering commands you can end up not knowing if the action was
165     submitted to the database, complicating recovery.
166 root 1.10
167     Timeout errors are always fatal.
168    
169 root 1.1 =back
170    
171 root 1.11 Any additional key-value pairs will be rolled into a hash reference
172     and passed as the final argument to the C<< DBI->connect (...) >>
173 root 1.17 call. For example, to suppress errors on STDERR and send them instead to an
174 root 1.11 AnyEvent::Handle you could do:
175    
176     $dbh = new AnyEvent::DBI
177     "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "",
178     PrintError => 0,
179     on_error => sub {
180     $log_handle->push_write ("DBI Error: $@ at $_[1]:$_[2]\n");
181     };
182 root 1.10
183 root 1.1 =cut
184    
185     sub new {
186     my ($class, $dbi, $user, $pass, %arg) = @_;
187    
188 root 1.17 # we use our own socketpair, so we always have a socket
189     # available, even before the forked process exsist.
190     # this is mostly done so this module is compatible
191     # to versions of itself older than 3.0.
192 root 1.11 my ($client, $server) = AnyEvent::Util::portable_socketpair
193 root 1.14 or croak "unable to create AnyEvent::DBI communications pipe: $!";
194 root 1.1
195 root 1.20 AnyEvent::fh_unblock $client;
196    
197 root 1.17 my $fork = delete $arg{fork_template};
198    
199 root 1.11 my %dbi_args = %arg;
200 root 1.17 delete @dbi_args{qw(on_connect on_error timeout fork_template exec_server)};
201 root 1.10
202 root 1.1 my $self = bless \%arg, $class;
203 root 1.17
204 root 1.1 $self->{fh} = $client;
205    
206     my $rbuf;
207     my @caller = (caller)[1,2]; # the "default" caller
208    
209 root 1.17 $fork = $fork ? $fork->fork : AnyEvent::Fork->new
210     or croak "fork: $!";
211    
212     $fork->require ("AnyEvent::DBI::Slave");
213     $fork->send_arg ($VERSION);
214     $fork->send_fh ($server);
215    
216     # we don't rely on the callback, because we use our own
217     # socketpair, for better or worse.
218     $fork->run ("AnyEvent::DBI::Slave::serve", sub { });
219    
220 root 1.11 {
221 root 1.17 Convert::Scalar::weaken (my $self = $self);
222    
223     my $cbor = new CBOR::XS;
224 root 1.11
225 root 1.14 $self->{rw} = AE::io $client, 0, sub {
226 root 1.17 my $len = Convert::Scalar::extend_read $client, $rbuf, 65536;
227 root 1.11
228     if ($len > 0) {
229     # we received data, so reset the timer
230 root 1.15 $self->{last_activity} = AE::now;
231 root 1.1
232 root 1.17 for my $res ($cbor->incr_parse_multiple ($rbuf)) {
233     last unless $self;
234 root 1.1
235 root 1.11 my $req = shift @{ $self->{queue} };
236 root 1.1
237 root 1.11 if (defined $res->[0]) {
238     $res->[0] = $self;
239     $req->[0](@$res);
240     } else {
241     my $cb = shift @$req;
242     local $@ = $res->[1];
243     $cb->($self);
244     $self->_error ($res->[1], @$req, $res->[2]) # error, request record, is_fatal
245     if $self; # cb() could have deleted it
246 root 1.10 }
247 root 1.11
248     # no more queued requests, so become idle
249 root 1.15 if ($self && !@{ $self->{queue} }) {
250     undef $self->{last_activity};
251     $self->{tw_cb}->();
252     }
253 root 1.10 }
254    
255 root 1.11 } elsif (defined $len) {
256     # todo, caller?
257     $self->_error ("unexpected eof", @caller, 1);
258     } elsif ($! != Errno::EAGAIN) {
259     # todo, caller?
260     $self->_error ("read error: $!", @caller, 1);
261     }
262 root 1.14 };
263 root 1.11
264     $self->{tw_cb} = sub {
265     if ($self->{timeout} && $self->{last_activity}) {
266 root 1.14 if (AE::now > $self->{last_activity} + $self->{timeout}) {
267 root 1.11 # we did time out
268     my $req = $self->{queue}[0];
269     $self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal
270     } else {
271     # we need to re-set the timeout watcher
272 root 1.14 $self->{tw} = AE::timer
273     $self->{last_activity} + $self->{timeout} - AE::now,
274     0,
275     $self->{tw_cb},
276     ;
277 root 1.1 }
278 root 1.11 } else {
279     # no timeout check wanted, or idle
280     undef $self->{tw};
281 root 1.1 }
282 root 1.11 };
283 root 1.1
284 root 1.11 $self->{ww_cb} = sub {
285 root 1.14 $self->{last_activity} = AE::now;
286 root 1.1
287 root 1.11 my $len = syswrite $client, $self->{wbuf}
288     or return delete $self->{ww};
289 root 1.3
290 root 1.11 substr $self->{wbuf}, 0, $len, "";
291     };
292     }
293 root 1.3
294 root 1.17 $self->_req (
295     sub {
296     return unless $self;
297     $self->{child_pid} = $_[1];
298     },
299     (caller)[1,2],
300     "req_pid"
301     );
302 root 1.11
303 root 1.10 $self->_req (
304 root 1.17 sub {
305     return unless $self;
306     &{ $self->{on_connect} } if $self->{on_connect};
307     },
308 root 1.10 (caller)[1,2],
309     req_open => $dbi, $user, $pass, %dbi_args
310     );
311 root 1.1
312     $self
313     }
314    
315 root 1.10 sub _server_pid {
316     shift->{child_pid}
317     }
318    
319     sub kill_child {
320 root 1.14 my $self = shift;
321 root 1.10
322 root 1.14 if (my $pid = delete $self->{child_pid}) {
323 root 1.15 # kill and reap process
324     my $kid_watcher; $kid_watcher = AE::child $pid, sub {
325     undef $kid_watcher;
326     };
327 root 1.14 kill TERM => $pid;
328 root 1.10 }
329 root 1.15
330 root 1.17 delete $self->{rw};
331     delete $self->{ww};
332     delete $self->{tw};
333 root 1.14 close delete $self->{fh};
334 root 1.10 }
335    
336     sub DESTROY {
337     shift->kill_child;
338     }
339    
340 root 1.1 sub _error {
341     my ($self, $error, $filename, $line, $fatal) = @_;
342    
343 root 1.10 if ($fatal) {
344 root 1.11 delete $self->{tw};
345 root 1.10 delete $self->{rw};
346     delete $self->{ww};
347     delete $self->{fh};
348    
349     # for fatal errors call all enqueued callbacks with error
350     while (my $req = shift @{$self->{queue}}) {
351 root 1.11 local $@ = $error;
352     $req->[0]->($self);
353 root 1.10 }
354     $self->kill_child;
355     }
356 root 1.1
357 root 1.11 local $@ = $error;
358 root 1.1
359 root 1.9 if ($self->{on_error}) {
360 root 1.10 $self->{on_error}($self, $filename, $line, $fatal)
361     } else {
362     die "$error at $filename, line $line\n";
363 root 1.9 }
364 root 1.10 }
365    
366     =item $dbh->on_error ($cb->($dbh, $filename, $line, $fatal))
367    
368 root 1.11 Sets (or clears, with C<undef>) the C<on_error> handler.
369 root 1.10
370     =cut
371 root 1.1
372 root 1.10 sub on_error {
373     $_[0]{on_error} = $_[1];
374     }
375    
376     =item $dbh->timeout ($seconds)
377    
378     Sets (or clears, with C<undef>) the database timeout. Useful to extend the
379     timeout when you are about to make a really long query.
380    
381     =cut
382    
383     sub timeout {
384     my ($self, $timeout) = @_;
385    
386 root 1.11 $self->{timeout} = $timeout;
387 root 1.10
388 root 1.11 # reschedule timer if one was running
389     $self->{tw_cb}->();
390 root 1.1 }
391    
392     sub _req {
393 root 1.10 my ($self, $cb, $filename, $line) = splice @_, 0, 4, ();
394    
395 root 1.11 unless ($self->{fh}) {
396     local $@ = my $err = 'no database connection';
397     $cb->($self);
398 root 1.10 $self->_error ($err, $filename, $line, 1);
399     return;
400     }
401    
402 root 1.11 push @{ $self->{queue} }, [$cb, $filename, $line];
403 root 1.1
404 root 1.11 # re-start timeout if necessary
405     if ($self->{timeout} && !$self->{tw}) {
406 root 1.14 $self->{last_activity} = AE::now;
407 root 1.11 $self->{tw_cb}->();
408 root 1.10 }
409 root 1.1
410 root 1.17 $self->{wbuf} .= CBOR::XS::encode_cbor \@_;
411 root 1.1
412     unless ($self->{ww}) {
413     my $len = syswrite $self->{fh}, $self->{wbuf};
414     substr $self->{wbuf}, 0, $len, "";
415    
416     # still any left? then install a write watcher
417 root 1.14 $self->{ww} = AE::io $self->{fh}, 1, $self->{ww_cb}
418 root 1.1 if length $self->{wbuf};
419     }
420     }
421    
422 root 1.17 =item $dbh->attr ($attr_name[, $attr_value], $cb->($dbh, $new_value))
423    
424     An accessor for the database handle attributes, such as C<AutoCommit>,
425     C<RaiseError>, C<PrintError> and so on. If you provide an C<$attr_value>
426     (which might be C<undef>), then the given attribute will be set to that
427     value.
428    
429     The callback will be passed the database handle and the attribute's value
430     if successful.
431    
432     If an error occurs and the C<on_error> callback returns, then only C<$dbh>
433     will be passed and C<$@> contains the error message.
434    
435 root 1.11 =item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, $rv))
436 root 1.1
437     Executes the given SQL statement with placeholders replaced by
438 root 1.2 C<@args>. The statement will be prepared and cached on the server side, so
439 root 1.11 using placeholders is extremely important.
440 root 1.1
441 root 1.11 The callback will be called with a weakened AnyEvent::DBI object as the
442     first argument and the result of C<fetchall_arrayref> as (or C<undef>
443     if the statement wasn't a select statement) as the second argument.
444 root 1.1
445 root 1.11 Third argument is the return value from the C<< DBI->execute >> method
446     call.
447    
448     If an error occurs and the C<on_error> callback returns, then only C<$dbh>
449 root 1.2 will be passed and C<$@> contains the error message.
450    
451 root 1.17 =item $dbh->stattr ($attr_name, $cb->($dbh, $value))
452 root 1.11
453 root 1.17 An accessor for the statement attributes of the most recently executed
454     statement, such as C<NAME> or C<TYPE>.
455 root 1.10
456 root 1.11 The callback will be passed the database handle and the attribute's value
457     if successful.
458    
459     If an error occurs and the C<on_error> callback returns, then only C<$dbh>
460     will be passed and C<$@> contains the error message.
461 root 1.10
462 root 1.12 =item $dbh->begin_work ($cb->($dbh[, $rc]))
463 root 1.10
464 root 1.12 =item $dbh->commit ($cb->($dbh[, $rc]))
465 root 1.10
466 root 1.12 =item $dbh->rollback ($cb->($dbh[, $rc]))
467 root 1.10
468 root 1.11 The begin_work, commit, and rollback methods expose the equivalent
469 root 1.12 transaction control method of the DBI driver. On success, C<$rc> is true.
470 root 1.10
471 root 1.11 If an error occurs and the C<on_error> callback returns, then only C<$dbh>
472     will be passed and C<$@> contains the error message.
473 root 1.10
474 root 1.12 =item $dbh->func ('string_which_yields_args_when_evaled', $func_name, $cb->($dbh, $rc, $dbi_err, $dbi_errstr))
475 root 1.10
476 root 1.11 This gives access to database driver private methods. Because they
477 root 1.12 are not standard you cannot always depend on the value of C<$rc> or
478     C<$dbi_err>. Check the documentation for your specific driver/function
479     combination to see what it returns.
480 root 1.10
481     Note that the first argument will be eval'ed to produce the argument list to
482 root 1.11 the func() method. This must be done because the serialization protocol
483 root 1.10 between the AnyEvent::DBI server process and your program does not support the
484     passage of closures.
485    
486     Here's an example to extend the query language in SQLite so it supports an
487     intstr() function:
488    
489     $cv = AnyEvent->condvar;
490 root 1.11 $dbh->func (
491 root 1.10 q{
492 root 1.11 instr => 2, sub {
493 root 1.10 my ($string, $search) = @_;
494     return index $string, $search;
495     },
496     },
497 root 1.11 create_function => sub {
498 root 1.12 return $cv->send ($@)
499     unless $#_;
500     $cv->send (undef, @_[1,2,3]);
501 root 1.11 }
502 root 1.10 );
503 root 1.11
504 root 1.12 my ($err,$rc,$errcode,$errstr) = $cv->recv;
505 root 1.11
506 root 1.12 die $err if defined $err;
507     die "EVAL failed: $errstr"
508     if $errcode;
509 root 1.11
510 root 1.12 # otherwise, we can ignore $rc and $errcode for this particular func
511 root 1.10
512 root 1.1 =cut
513    
514 root 1.17 for my $cmd_name (qw(attr exec stattr begin_work commit rollback func)) {
515 root 1.10 eval 'sub ' . $cmd_name . '{
516     my $cb = pop;
517     splice @_, 1, 0, $cb, (caller)[1,2], "req_' . $cmd_name . '";
518 root 1.11 &_req
519 root 1.10 }';
520 root 1.1 }
521    
522     =back
523    
524     =head1 SEE ALSO
525    
526 root 1.11 L<AnyEvent>, L<DBI>, L<Coro::Mysql>.
527 root 1.1
528 root 1.17 =head1 AUTHOR AND CONTACT
529 root 1.1
530 root 1.17 Marc Lehmann <schmorp@schmorp.de> (current maintainer)
531 root 1.4 http://home.schmorp.de/
532 root 1.1
533 root 1.10 Adam Rosenstein <adam@redcondor.com>
534     http://www.redcondor.com/
535    
536 root 1.1 =cut
537    
538 root 1.17 1