ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
Revision: 1.18
Committed: Sun Aug 27 09:54:25 2017 UTC (6 years, 8 months ago) by root
Branch: MAIN
CVS Tags: rel-3_01
Changes since 1.17: +1 -1 lines
Log Message:
3.01

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::DBI - asynchronous DBI access
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::DBI;
8    
9 root 1.5 my $cv = AnyEvent->condvar;
10    
11     my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", "";
12    
13     $dbh->exec ("select * from test where num=?", 10, sub {
14 root 1.11 my ($dbh, $rows, $rv) = @_;
15    
16 root 1.12 $#_ or die "failure: $@";
17 root 1.5
18     print "@$_\n"
19     for @$rows;
20    
21     $cv->broadcast;
22     });
23    
24     # asynchronously do sth. else here
25    
26     $cv->wait;
27    
28 root 1.1 =head1 DESCRIPTION
29    
30     This module is an L<AnyEvent> user, you need to make sure that you use and
31     run a supported event loop.
32    
33 root 1.8 This module implements asynchronous DBI access by forking or executing
34 root 1.1 separate "DBI-Server" processes and sending them requests.
35    
36     It means that you can run DBI requests in parallel to other tasks.
37    
38 root 1.17 With DBD::mysql, the overhead for very simple statements
39     ("select 0") is somewhere around 50% compared to an explicit
40     prepare_cached/execute/fetchrow_arrayref/finish combination. With
41     DBD::SQlite3, it's more like a factor of 8 for this trivial statement.
42 root 1.3
43 root 1.11 =head2 ERROR HANDLING
44    
45     This module defines a number of functions that accept a callback
46     argument. All callbacks used by this module get their AnyEvent::DBI handle
47     object passed as first argument.
48    
49     If the request was successful, then there will be more arguments,
50     otherwise there will only be the C<$dbh> argument and C<$@> contains an
51     error message.
52    
53 root 1.17 A convenient way to check whether an error occurred is to check C<$#_> -
54 root 1.11 if that is true, then the function was successful, otherwise there was an
55     error.
56    
57 root 1.1 =cut
58    
59     package AnyEvent::DBI;
60    
61 root 1.14 use common::sense;
62 root 1.1
63     use Carp;
64 root 1.17 use Convert::Scalar ();
65     use AnyEvent::Fork ();
66     use CBOR::XS ();
67 root 1.1
68     use AnyEvent ();
69     use AnyEvent::Util ();
70    
71 root 1.11 use Errno ();
72 root 1.1
73 root 1.18 our $VERSION = '3.01';
74 root 1.10
75 root 1.1 =head2 METHODS
76    
77     =over 4
78    
79     =item $dbh = new AnyEvent::DBI $database, $user, $pass, [key => value]...
80    
81     Returns a database handle for the given database. Each database handle
82     has an associated server process that executes statements in order. If
83     you want to run more than one statement in parallel, you need to create
84     additional database handles.
85    
86     The advantage of this approach is that transactions work as state is
87     preserved.
88    
89     Example:
90    
91     $dbh = new AnyEvent::DBI
92     "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "";
93    
94     Additional key-value pairs can be used to adjust behaviour:
95    
96     =over 4
97    
98     =item on_error => $callback->($dbh, $filename, $line, $fatal)
99    
100     When an error occurs, then this callback will be invoked. On entry, C<$@>
101     is set to the error message. C<$filename> and C<$line> is where the
102     original request was submitted.
103    
104 root 1.11 If the fatal argument is true then the database connection is shut down
105     and your database handle became invalid. In addition to invoking the
106     C<on_error> callback, all of your queued request callbacks are called
107     without only the C<$dbh> argument.
108 root 1.1
109 root 1.2 If omitted, then C<die> will be called on any errors, fatal or not.
110 root 1.1
111 root 1.11 =item on_connect => $callback->($dbh[, $success])
112 root 1.10
113 root 1.11 If you supply an C<on_connect> callback, then this callback will be
114     invoked after the database connect attempt. If the connection succeeds,
115     C<$success> is true, otherwise it is missing and C<$@> contains the
116     C<$DBI::errstr>.
117    
118     Regardless of whether C<on_connect> is supplied, connect errors will result in
119     C<on_error> being called. However, if no C<on_connect> callback is supplied, then
120     connection errors are considered fatal. The client will C<die> and the C<on_error>
121     callback will be called with C<$fatal> true.
122    
123     When on_connect is supplied, connect error are not fatal and AnyEvent::DBI
124     will not C<die>. You still cannot, however, use the $dbh object you
125     received from C<new> to make requests.
126 root 1.10
127 root 1.17 =item fork_template => $AnyEvent::Fork-object
128 root 1.10
129 root 1.17 C<AnyEvent::DBI> uses C<< AnyEvent::Fork->new >> to create the database
130     slave, which in turn either C<exec>'s a new process (similar to the old
131     C<exec_server> constructor argument) or uses a process forked early (see
132     L<AnyEvent::Fork::Early>).
133    
134     With this argument you can provide your own fork template. This can be
135     useful if you create a lot of C<AnyEvent::DBI> handles and want to save
136     memory (And speed up startup) by not having to load C<AnyEvent::DBI> again
137     and again into your child processes:
138    
139     my $template = AnyEvent::Fork
140     ->new # create new template
141     ->require ("AnyEvent::DBI::Slave"); # preload AnyEvent::DBI::Slave module
142    
143     for (...) {
144     $dbh = new AnyEvent::DBI ...
145     fork_template => $template;
146 root 1.10
147     =item timeout => seconds
148    
149 root 1.11 If you supply a timeout parameter (fractional values are supported), then
150     a timer is started any time the DBI handle expects a response from the
151     server. This includes connection setup as well as requests made to the
152     backend. The timeout spans the duration from the moment the first data
153     is written (or queued to be written) until all expected responses are
154     returned, but is postponed for "timeout" seconds each time more data is
155     returned from the server. If the timer ever goes off then a fatal error is
156     generated. If you have an C<on_error> handler installed, then it will be
157     called, otherwise your program will die().
158    
159     When altering your databases with timeouts it is wise to use
160     transactions. If you quit due to timeout while performing insert, update
161     or schema-altering commands you can end up not knowing if the action was
162     submitted to the database, complicating recovery.
163 root 1.10
164     Timeout errors are always fatal.
165    
166 root 1.1 =back
167    
168 root 1.11 Any additional key-value pairs will be rolled into a hash reference
169     and passed as the final argument to the C<< DBI->connect (...) >>
170 root 1.17 call. For example, to suppress errors on STDERR and send them instead to an
171 root 1.11 AnyEvent::Handle you could do:
172    
173     $dbh = new AnyEvent::DBI
174     "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "",
175     PrintError => 0,
176     on_error => sub {
177     $log_handle->push_write ("DBI Error: $@ at $_[1]:$_[2]\n");
178     };
179 root 1.10
180 root 1.1 =cut
181    
182     sub new {
183     my ($class, $dbi, $user, $pass, %arg) = @_;
184    
185 root 1.17 # we use our own socketpair, so we always have a socket
186     # available, even before the forked process exsist.
187     # this is mostly done so this module is compatible
188     # to versions of itself older than 3.0.
189 root 1.11 my ($client, $server) = AnyEvent::Util::portable_socketpair
190 root 1.14 or croak "unable to create AnyEvent::DBI communications pipe: $!";
191 root 1.1
192 root 1.17 my $fork = delete $arg{fork_template};
193    
194 root 1.11 my %dbi_args = %arg;
195 root 1.17 delete @dbi_args{qw(on_connect on_error timeout fork_template exec_server)};
196 root 1.10
197 root 1.1 my $self = bless \%arg, $class;
198 root 1.17
199 root 1.1 $self->{fh} = $client;
200    
201     my $rbuf;
202     my @caller = (caller)[1,2]; # the "default" caller
203    
204 root 1.17 $fork = $fork ? $fork->fork : AnyEvent::Fork->new
205     or croak "fork: $!";
206    
207     $fork->require ("AnyEvent::DBI::Slave");
208     $fork->send_arg ($VERSION);
209     $fork->send_fh ($server);
210    
211     # we don't rely on the callback, because we use our own
212     # socketpair, for better or worse.
213     $fork->run ("AnyEvent::DBI::Slave::serve", sub { });
214    
215 root 1.11 {
216 root 1.17 Convert::Scalar::weaken (my $self = $self);
217    
218     my $cbor = new CBOR::XS;
219 root 1.11
220 root 1.14 $self->{rw} = AE::io $client, 0, sub {
221 root 1.17 my $len = Convert::Scalar::extend_read $client, $rbuf, 65536;
222 root 1.11
223     if ($len > 0) {
224     # we received data, so reset the timer
225 root 1.15 $self->{last_activity} = AE::now;
226 root 1.1
227 root 1.17 for my $res ($cbor->incr_parse_multiple ($rbuf)) {
228     last unless $self;
229 root 1.1
230 root 1.11 my $req = shift @{ $self->{queue} };
231 root 1.1
232 root 1.11 if (defined $res->[0]) {
233     $res->[0] = $self;
234     $req->[0](@$res);
235     } else {
236     my $cb = shift @$req;
237     local $@ = $res->[1];
238     $cb->($self);
239     $self->_error ($res->[1], @$req, $res->[2]) # error, request record, is_fatal
240     if $self; # cb() could have deleted it
241 root 1.10 }
242 root 1.11
243     # no more queued requests, so become idle
244 root 1.15 if ($self && !@{ $self->{queue} }) {
245     undef $self->{last_activity};
246     $self->{tw_cb}->();
247     }
248 root 1.10 }
249    
250 root 1.11 } elsif (defined $len) {
251     # todo, caller?
252     $self->_error ("unexpected eof", @caller, 1);
253     } elsif ($! != Errno::EAGAIN) {
254     # todo, caller?
255     $self->_error ("read error: $!", @caller, 1);
256     }
257 root 1.14 };
258 root 1.11
259     $self->{tw_cb} = sub {
260     if ($self->{timeout} && $self->{last_activity}) {
261 root 1.14 if (AE::now > $self->{last_activity} + $self->{timeout}) {
262 root 1.11 # we did time out
263     my $req = $self->{queue}[0];
264     $self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal
265     } else {
266     # we need to re-set the timeout watcher
267 root 1.14 $self->{tw} = AE::timer
268     $self->{last_activity} + $self->{timeout} - AE::now,
269     0,
270     $self->{tw_cb},
271     ;
272 root 1.1 }
273 root 1.11 } else {
274     # no timeout check wanted, or idle
275     undef $self->{tw};
276 root 1.1 }
277 root 1.11 };
278 root 1.1
279 root 1.11 $self->{ww_cb} = sub {
280 root 1.14 $self->{last_activity} = AE::now;
281 root 1.1
282 root 1.11 my $len = syswrite $client, $self->{wbuf}
283     or return delete $self->{ww};
284 root 1.3
285 root 1.11 substr $self->{wbuf}, 0, $len, "";
286     };
287     }
288 root 1.3
289 root 1.17 $self->_req (
290     sub {
291     return unless $self;
292     $self->{child_pid} = $_[1];
293     },
294     (caller)[1,2],
295     "req_pid"
296     );
297 root 1.11
298 root 1.10 $self->_req (
299 root 1.17 sub {
300     return unless $self;
301     &{ $self->{on_connect} } if $self->{on_connect};
302     },
303 root 1.10 (caller)[1,2],
304     req_open => $dbi, $user, $pass, %dbi_args
305     );
306 root 1.1
307     $self
308     }
309    
310 root 1.10 sub _server_pid {
311     shift->{child_pid}
312     }
313    
314     sub kill_child {
315 root 1.14 my $self = shift;
316 root 1.10
317 root 1.14 if (my $pid = delete $self->{child_pid}) {
318 root 1.15 # kill and reap process
319     my $kid_watcher; $kid_watcher = AE::child $pid, sub {
320     undef $kid_watcher;
321     };
322 root 1.14 kill TERM => $pid;
323 root 1.10 }
324 root 1.15
325 root 1.17 delete $self->{rw};
326     delete $self->{ww};
327     delete $self->{tw};
328 root 1.14 close delete $self->{fh};
329 root 1.10 }
330    
331     sub DESTROY {
332     shift->kill_child;
333     }
334    
335 root 1.1 sub _error {
336     my ($self, $error, $filename, $line, $fatal) = @_;
337    
338 root 1.10 if ($fatal) {
339 root 1.11 delete $self->{tw};
340 root 1.10 delete $self->{rw};
341     delete $self->{ww};
342     delete $self->{fh};
343    
344     # for fatal errors call all enqueued callbacks with error
345     while (my $req = shift @{$self->{queue}}) {
346 root 1.11 local $@ = $error;
347     $req->[0]->($self);
348 root 1.10 }
349     $self->kill_child;
350     }
351 root 1.1
352 root 1.11 local $@ = $error;
353 root 1.1
354 root 1.9 if ($self->{on_error}) {
355 root 1.10 $self->{on_error}($self, $filename, $line, $fatal)
356     } else {
357     die "$error at $filename, line $line\n";
358 root 1.9 }
359 root 1.10 }
360    
361     =item $dbh->on_error ($cb->($dbh, $filename, $line, $fatal))
362    
363 root 1.11 Sets (or clears, with C<undef>) the C<on_error> handler.
364 root 1.10
365     =cut
366 root 1.1
367 root 1.10 sub on_error {
368     $_[0]{on_error} = $_[1];
369     }
370    
371     =item $dbh->timeout ($seconds)
372    
373     Sets (or clears, with C<undef>) the database timeout. Useful to extend the
374     timeout when you are about to make a really long query.
375    
376     =cut
377    
378     sub timeout {
379     my ($self, $timeout) = @_;
380    
381 root 1.11 $self->{timeout} = $timeout;
382 root 1.10
383 root 1.11 # reschedule timer if one was running
384     $self->{tw_cb}->();
385 root 1.1 }
386    
387     sub _req {
388 root 1.10 my ($self, $cb, $filename, $line) = splice @_, 0, 4, ();
389    
390 root 1.11 unless ($self->{fh}) {
391     local $@ = my $err = 'no database connection';
392     $cb->($self);
393 root 1.10 $self->_error ($err, $filename, $line, 1);
394     return;
395     }
396    
397 root 1.11 push @{ $self->{queue} }, [$cb, $filename, $line];
398 root 1.1
399 root 1.11 # re-start timeout if necessary
400     if ($self->{timeout} && !$self->{tw}) {
401 root 1.14 $self->{last_activity} = AE::now;
402 root 1.11 $self->{tw_cb}->();
403 root 1.10 }
404 root 1.1
405 root 1.17 $self->{wbuf} .= CBOR::XS::encode_cbor \@_;
406 root 1.1
407     unless ($self->{ww}) {
408     my $len = syswrite $self->{fh}, $self->{wbuf};
409     substr $self->{wbuf}, 0, $len, "";
410    
411     # still any left? then install a write watcher
412 root 1.14 $self->{ww} = AE::io $self->{fh}, 1, $self->{ww_cb}
413 root 1.1 if length $self->{wbuf};
414     }
415     }
416    
417 root 1.17 =item $dbh->attr ($attr_name[, $attr_value], $cb->($dbh, $new_value))
418    
419     An accessor for the database handle attributes, such as C<AutoCommit>,
420     C<RaiseError>, C<PrintError> and so on. If you provide an C<$attr_value>
421     (which might be C<undef>), then the given attribute will be set to that
422     value.
423    
424     The callback will be passed the database handle and the attribute's value
425     if successful.
426    
427     If an error occurs and the C<on_error> callback returns, then only C<$dbh>
428     will be passed and C<$@> contains the error message.
429    
430 root 1.11 =item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, $rv))
431 root 1.1
432     Executes the given SQL statement with placeholders replaced by
433 root 1.2 C<@args>. The statement will be prepared and cached on the server side, so
434 root 1.11 using placeholders is extremely important.
435 root 1.1
436 root 1.11 The callback will be called with a weakened AnyEvent::DBI object as the
437     first argument and the result of C<fetchall_arrayref> as (or C<undef>
438     if the statement wasn't a select statement) as the second argument.
439 root 1.1
440 root 1.11 Third argument is the return value from the C<< DBI->execute >> method
441     call.
442    
443     If an error occurs and the C<on_error> callback returns, then only C<$dbh>
444 root 1.2 will be passed and C<$@> contains the error message.
445    
446 root 1.17 =item $dbh->stattr ($attr_name, $cb->($dbh, $value))
447 root 1.11
448 root 1.17 An accessor for the statement attributes of the most recently executed
449     statement, such as C<NAME> or C<TYPE>.
450 root 1.10
451 root 1.11 The callback will be passed the database handle and the attribute's value
452     if successful.
453    
454     If an error occurs and the C<on_error> callback returns, then only C<$dbh>
455     will be passed and C<$@> contains the error message.
456 root 1.10
457 root 1.12 =item $dbh->begin_work ($cb->($dbh[, $rc]))
458 root 1.10
459 root 1.12 =item $dbh->commit ($cb->($dbh[, $rc]))
460 root 1.10
461 root 1.12 =item $dbh->rollback ($cb->($dbh[, $rc]))
462 root 1.10
463 root 1.11 The begin_work, commit, and rollback methods expose the equivalent
464 root 1.12 transaction control method of the DBI driver. On success, C<$rc> is true.
465 root 1.10
466 root 1.11 If an error occurs and the C<on_error> callback returns, then only C<$dbh>
467     will be passed and C<$@> contains the error message.
468 root 1.10
469 root 1.12 =item $dbh->func ('string_which_yields_args_when_evaled', $func_name, $cb->($dbh, $rc, $dbi_err, $dbi_errstr))
470 root 1.10
471 root 1.11 This gives access to database driver private methods. Because they
472 root 1.12 are not standard you cannot always depend on the value of C<$rc> or
473     C<$dbi_err>. Check the documentation for your specific driver/function
474     combination to see what it returns.
475 root 1.10
476     Note that the first argument will be eval'ed to produce the argument list to
477 root 1.11 the func() method. This must be done because the serialization protocol
478 root 1.10 between the AnyEvent::DBI server process and your program does not support the
479     passage of closures.
480    
481     Here's an example to extend the query language in SQLite so it supports an
482     intstr() function:
483    
484     $cv = AnyEvent->condvar;
485 root 1.11 $dbh->func (
486 root 1.10 q{
487 root 1.11 instr => 2, sub {
488 root 1.10 my ($string, $search) = @_;
489     return index $string, $search;
490     },
491     },
492 root 1.11 create_function => sub {
493 root 1.12 return $cv->send ($@)
494     unless $#_;
495     $cv->send (undef, @_[1,2,3]);
496 root 1.11 }
497 root 1.10 );
498 root 1.11
499 root 1.12 my ($err,$rc,$errcode,$errstr) = $cv->recv;
500 root 1.11
501 root 1.12 die $err if defined $err;
502     die "EVAL failed: $errstr"
503     if $errcode;
504 root 1.11
505 root 1.12 # otherwise, we can ignore $rc and $errcode for this particular func
506 root 1.10
507 root 1.1 =cut
508    
509 root 1.17 for my $cmd_name (qw(attr exec stattr begin_work commit rollback func)) {
510 root 1.10 eval 'sub ' . $cmd_name . '{
511     my $cb = pop;
512     splice @_, 1, 0, $cb, (caller)[1,2], "req_' . $cmd_name . '";
513 root 1.11 &_req
514 root 1.10 }';
515 root 1.1 }
516    
517     =back
518    
519     =head1 SEE ALSO
520    
521 root 1.11 L<AnyEvent>, L<DBI>, L<Coro::Mysql>.
522 root 1.1
523 root 1.17 =head1 AUTHOR AND CONTACT
524 root 1.1
525 root 1.17 Marc Lehmann <schmorp@schmorp.de> (current maintainer)
526 root 1.4 http://home.schmorp.de/
527 root 1.1
528 root 1.10 Adam Rosenstein <adam@redcondor.com>
529     http://www.redcondor.com/
530    
531 root 1.1 =cut
532    
533 root 1.17 1