ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
Revision: 1.23
Committed: Sat Feb 1 21:43:28 2020 UTC (4 years, 3 months ago) by root
Branch: MAIN
CVS Tags: HEAD
Changes since 1.22: +3 -0 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::DBI - asynchronous DBI access
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::DBI;
8
9 my $cv = AnyEvent->condvar;
10
11 my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", "";
12
13 $dbh->exec ("select * from test where num=?", 10, sub {
14 my ($dbh, $rows, $rv) = @_;
15
16 $#_ or die "failure: $@";
17
18 print "@$_\n"
19 for @$rows;
20
21 $cv->broadcast;
22 });
23
24 # asynchronously do sth. else here
25
26 $cv->wait;
27
28 =head1 DESCRIPTION
29
30 This module is an L<AnyEvent> user, you need to make sure that you use and
31 run a supported event loop.
32
33 This module implements asynchronous DBI access by forking or executing
34 separate "DBI-Server" processes and sending them requests.
35
36 It means that you can run DBI requests in parallel to other tasks.
37
38 With DBD::mysql, the overhead for very simple statements
39 ("select 0") is somewhere around 50% compared to an explicit
40 prepare_cached/execute/fetchrow_arrayref/finish combination. With
41 DBD::SQlite3, it's more like a factor of 8 for this trivial statement.
42
43 =head2 ERROR HANDLING
44
45 This module defines a number of functions that accept a callback
46 argument. All callbacks used by this module get their AnyEvent::DBI handle
47 object passed as first argument.
48
49 If the request was successful, then there will be more arguments,
50 otherwise there will only be the C<$dbh> argument and C<$@> contains an
51 error message.
52
53 A convenient way to check whether an error occurred is to check C<$#_> -
54 if that is true, then the function was successful, otherwise there was an
55 error.
56
57 =cut
58
59 package AnyEvent::DBI;
60
61 use common::sense;
62
63 use Carp;
64 use Convert::Scalar ();
65 use AnyEvent::Fork ();
66 use CBOR::XS ();
67
68 use AnyEvent ();
69 use AnyEvent::Util ();
70
71 use Errno ();
72
73 our $VERSION = '3.04';
74
75 =head2 METHODS
76
77 =over 4
78
79 =item $dbh = new AnyEvent::DBI $database, $user, $pass, [key => value]...
80
81 Returns a database handle for the given database. Each database handle
82 has an associated server process that executes statements in order. If
83 you want to run more than one statement in parallel, you need to create
84 additional database handles.
85
86 The advantage of this approach is that transactions work as state is
87 preserved.
88
89 Example:
90
91 $dbh = new AnyEvent::DBI
92 "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "";
93
94 Additional key-value pairs can be used to adjust behaviour:
95
96 =over 4
97
98 =item on_error => $callback->($dbh, $filename, $line, $fatal)
99
100 When an error occurs, then this callback will be invoked. On entry, C<$@>
101 is set to the error message. C<$filename> and C<$line> is where the
102 original request was submitted.
103
104 If the fatal argument is true then the database connection is shut down
105 and your database handle became invalid. In addition to invoking the
106 C<on_error> callback, all of your queued request callbacks are called
107 without only the C<$dbh> argument.
108
109 If omitted, then C<die> will be called on any errors, fatal or not.
110
111 Note that AnyEvent::DBI will not catch errors in user-provided callbacks:
112 if you die in your callback, things might malfunction.
113
114 =item on_connect => $callback->($dbh[, $success])
115
116 If you supply an C<on_connect> callback, then this callback will be
117 invoked after the database connect attempt. If the connection succeeds,
118 C<$success> is true, otherwise it is missing and C<$@> contains the
119 C<$DBI::errstr>.
120
121 Regardless of whether C<on_connect> is supplied, connect errors will result in
122 C<on_error> being called. However, if no C<on_connect> callback is supplied, then
123 connection errors are considered fatal. The client will C<die> and the C<on_error>
124 callback will be called with C<$fatal> true.
125
126 When on_connect is supplied, connect error are not fatal and AnyEvent::DBI
127 will not C<die>. You still cannot, however, use the $dbh object you
128 received from C<new> to make requests.
129
130 =item fork_template => $AnyEvent::Fork-object
131
132 C<AnyEvent::DBI> uses C<< AnyEvent::Fork->new >> to create the database
133 slave, which in turn either C<exec>'s a new process (similar to the old
134 C<exec_server> constructor argument) or uses a process forked early (see
135 L<AnyEvent::Fork::Early>).
136
137 With this argument you can provide your own fork template. This can be
138 useful if you create a lot of C<AnyEvent::DBI> handles and want to save
139 memory (And speed up startup) by not having to load C<AnyEvent::DBI> again
140 and again into your child processes:
141
142 my $template = AnyEvent::Fork
143 ->new # create new template
144 ->require ("AnyEvent::DBI::Slave"); # preload AnyEvent::DBI::Slave module
145
146 for (...) {
147 $dbh = new AnyEvent::DBI ...
148 fork_template => $template;
149
150 =item timeout => seconds
151
152 If you supply a timeout parameter (fractional values are supported), then
153 a timer is started any time the DBI handle expects a response from the
154 server. This includes connection setup as well as requests made to the
155 backend. The timeout spans the duration from the moment the first data
156 is written (or queued to be written) until all expected responses are
157 returned, but is postponed for "timeout" seconds each time more data is
158 returned from the server. If the timer ever goes off then a fatal error is
159 generated. If you have an C<on_error> handler installed, then it will be
160 called, otherwise your program will die().
161
162 When altering your databases with timeouts it is wise to use
163 transactions. If you quit due to timeout while performing insert, update
164 or schema-altering commands you can end up not knowing if the action was
165 submitted to the database, complicating recovery.
166
167 Timeout errors are always fatal.
168
169 =back
170
171 Any additional key-value pairs will be rolled into a hash reference
172 and passed as the final argument to the C<< DBI->connect (...) >>
173 call. For example, to suppress errors on STDERR and send them instead to an
174 AnyEvent::Handle you could do:
175
176 $dbh = new AnyEvent::DBI
177 "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "",
178 PrintError => 0,
179 on_error => sub {
180 $log_handle->push_write ("DBI Error: $@ at $_[1]:$_[2]\n");
181 };
182
183 =cut
184
185 sub new {
186 my ($class, $dbi, $user, $pass, %arg) = @_;
187
188 # we use our own socketpair, so we always have a socket
189 # available, even before the forked process exsist.
190 # this is mostly done so this module is compatible
191 # to versions of itself older than 3.0.
192 my ($client, $server) = AnyEvent::Util::portable_socketpair
193 or croak "unable to create AnyEvent::DBI communications pipe: $!";
194
195 AnyEvent::fh_unblock $client;
196
197 my $fork = delete $arg{fork_template};
198
199 my %dbi_args = %arg;
200 delete @dbi_args{qw(on_connect on_error timeout fork_template exec_server)};
201
202 my $self = bless \%arg, $class;
203
204 $self->{fh} = $client;
205
206 my $rbuf;
207 my @caller = (caller)[1,2]; # the "default" caller
208
209 $fork = $fork ? $fork->fork : AnyEvent::Fork->new
210 or croak "fork: $!";
211
212 $fork->require ("AnyEvent::DBI::Slave");
213 $fork->send_arg ($VERSION);
214 $fork->send_fh ($server);
215
216 # we don't rely on the callback, because we use our own
217 # socketpair, for better or worse.
218 $fork->run ("AnyEvent::DBI::Slave::serve", sub { });
219
220 {
221 Convert::Scalar::weaken (my $self = $self);
222
223 my $cbor = new CBOR::XS;
224
225 $self->{rw} = AE::io $client, 0, sub {
226 my $len = Convert::Scalar::extend_read $client, $rbuf, 65536;
227
228 if ($len > 0) {
229 # we received data, so reset the timer
230 $self->{last_activity} = AE::now;
231
232 for my $res ($cbor->incr_parse_multiple ($rbuf)) {
233 last unless $self;
234
235 my $req = shift @{ $self->{queue} };
236
237 if (defined $res->[0]) {
238 $res->[0] = $self;
239 $req->[0](@$res);
240 } else {
241 my $cb = shift @$req;
242 local $@ = $res->[1];
243 $cb->($self);
244 $self->_error ($res->[1], @$req, $res->[2]) # error, request record, is_fatal
245 if $self; # cb() could have deleted it
246 }
247
248 # no more queued requests, so become idle
249 if ($self && !@{ $self->{queue} }) {
250 undef $self->{last_activity};
251 $self->{tw_cb}->();
252 }
253 }
254
255 } elsif (defined $len) {
256 # todo, caller?
257 $self->_error ("unexpected eof", @caller, 1);
258 } elsif ($! != Errno::EAGAIN) {
259 # todo, caller?
260 $self->_error ("read error: $!", @caller, 1);
261 }
262 };
263
264 $self->{tw_cb} = sub {
265 if ($self->{timeout} && $self->{last_activity}) {
266 if (AE::now > $self->{last_activity} + $self->{timeout}) {
267 # we did time out
268 my $req = $self->{queue}[0];
269 $self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal
270 } else {
271 # we need to re-set the timeout watcher
272 $self->{tw} = AE::timer
273 $self->{last_activity} + $self->{timeout} - AE::now,
274 0,
275 $self->{tw_cb},
276 ;
277 }
278 } else {
279 # no timeout check wanted, or idle
280 undef $self->{tw};
281 }
282 };
283
284 $self->{ww_cb} = sub {
285 $self->{last_activity} = AE::now;
286
287 my $len = syswrite $client, $self->{wbuf}
288 or return delete $self->{ww};
289
290 substr $self->{wbuf}, 0, $len, "";
291 };
292 }
293
294 $self->_req (
295 sub {
296 return unless $self;
297 $self->{child_pid} = $_[1];
298 },
299 (caller)[1,2],
300 "req_pid"
301 );
302
303 $self->_req (
304 sub {
305 return unless $self;
306 &{ $self->{on_connect} } if $self->{on_connect};
307 },
308 (caller)[1,2],
309 req_open => $dbi, $user, $pass, %dbi_args
310 );
311
312 $self
313 }
314
315 sub _server_pid {
316 shift->{child_pid}
317 }
318
319 sub kill_child {
320 my $self = shift;
321
322 if (my $pid = delete $self->{child_pid}) {
323 # kill and reap process
324 my $kid_watcher; $kid_watcher = AE::child $pid, sub {
325 undef $kid_watcher;
326 };
327 kill TERM => $pid;
328 }
329
330 delete $self->{rw};
331 delete $self->{ww};
332 delete $self->{tw};
333 close delete $self->{fh};
334 }
335
336 sub DESTROY {
337 shift->kill_child;
338 }
339
340 sub _error {
341 my ($self, $error, $filename, $line, $fatal) = @_;
342
343 if ($fatal) {
344 delete $self->{tw};
345 delete $self->{rw};
346 delete $self->{ww};
347 delete $self->{fh};
348
349 # for fatal errors call all enqueued callbacks with error
350 while (my $req = shift @{$self->{queue}}) {
351 local $@ = $error;
352 $req->[0]->($self);
353 }
354 $self->kill_child;
355 }
356
357 local $@ = $error;
358
359 if ($self->{on_error}) {
360 $self->{on_error}($self, $filename, $line, $fatal)
361 } else {
362 die "$error at $filename, line $line\n";
363 }
364 }
365
366 =item $dbh->on_error ($cb->($dbh, $filename, $line, $fatal))
367
368 Sets (or clears, with C<undef>) the C<on_error> handler.
369
370 =cut
371
372 sub on_error {
373 $_[0]{on_error} = $_[1];
374 }
375
376 =item $dbh->timeout ($seconds)
377
378 Sets (or clears, with C<undef>) the database timeout. Useful to extend the
379 timeout when you are about to make a really long query.
380
381 =cut
382
383 sub timeout {
384 my ($self, $timeout) = @_;
385
386 $self->{timeout} = $timeout;
387
388 # reschedule timer if one was running
389 $self->{tw_cb}->();
390 }
391
392 sub _req {
393 my ($self, $cb, $filename, $line) = splice @_, 0, 4, ();
394
395 unless ($self->{fh}) {
396 local $@ = my $err = 'no database connection';
397 $cb->($self);
398 $self->_error ($err, $filename, $line, 1);
399 return;
400 }
401
402 push @{ $self->{queue} }, [$cb, $filename, $line];
403
404 # re-start timeout if necessary
405 if ($self->{timeout} && !$self->{tw}) {
406 $self->{last_activity} = AE::now;
407 $self->{tw_cb}->();
408 }
409
410 $self->{wbuf} .= CBOR::XS::encode_cbor \@_;
411
412 unless ($self->{ww}) {
413 my $len = syswrite $self->{fh}, $self->{wbuf};
414 substr $self->{wbuf}, 0, $len, "";
415
416 # still any left? then install a write watcher
417 $self->{ww} = AE::io $self->{fh}, 1, $self->{ww_cb}
418 if length $self->{wbuf};
419 }
420 }
421
422 =item $dbh->attr ($attr_name[, $attr_value], $cb->($dbh, $new_value))
423
424 An accessor for the database handle attributes, such as C<AutoCommit>,
425 C<RaiseError>, C<PrintError> and so on. If you provide an C<$attr_value>
426 (which might be C<undef>), then the given attribute will be set to that
427 value.
428
429 The callback will be passed the database handle and the attribute's value
430 if successful.
431
432 If an error occurs and the C<on_error> callback returns, then only C<$dbh>
433 will be passed and C<$@> contains the error message.
434
435 =item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, $rv))
436
437 Executes the given SQL statement with placeholders replaced by
438 C<@args>. The statement will be prepared and cached on the server side, so
439 using placeholders is extremely important.
440
441 The callback will be called with a weakened AnyEvent::DBI object as the
442 first argument and the result of C<fetchall_arrayref> as (or C<undef>
443 if the statement wasn't a select statement) as the second argument.
444
445 Third argument is the return value from the C<< DBI->execute >> method
446 call.
447
448 If an error occurs and the C<on_error> callback returns, then only C<$dbh>
449 will be passed and C<$@> contains the error message.
450
451 =item $dbh->stattr ($attr_name, $cb->($dbh, $value))
452
453 An accessor for the statement attributes of the most recently executed
454 statement, such as C<NAME> or C<TYPE>.
455
456 The callback will be passed the database handle and the attribute's value
457 if successful.
458
459 If an error occurs and the C<on_error> callback returns, then only C<$dbh>
460 will be passed and C<$@> contains the error message.
461
462 =item $dbh->begin_work ($cb->($dbh[, $rc]))
463
464 =item $dbh->commit ($cb->($dbh[, $rc]))
465
466 =item $dbh->rollback ($cb->($dbh[, $rc]))
467
468 The begin_work, commit, and rollback methods expose the equivalent
469 transaction control method of the DBI driver. On success, C<$rc> is true.
470
471 If an error occurs and the C<on_error> callback returns, then only C<$dbh>
472 will be passed and C<$@> contains the error message.
473
474 =item $dbh->func ('string_which_yields_args_when_evaled', $func_name, $cb->($dbh, $rc, $dbi_err, $dbi_errstr))
475
476 This gives access to database driver private methods. Because they
477 are not standard you cannot always depend on the value of C<$rc> or
478 C<$dbi_err>. Check the documentation for your specific driver/function
479 combination to see what it returns.
480
481 Note that the first argument will be eval'ed to produce the argument list to
482 the func() method. This must be done because the serialization protocol
483 between the AnyEvent::DBI server process and your program does not support the
484 passage of closures.
485
486 Here's an example to extend the query language in SQLite so it supports an
487 intstr() function:
488
489 $cv = AnyEvent->condvar;
490 $dbh->func (
491 q{
492 instr => 2, sub {
493 my ($string, $search) = @_;
494 return index $string, $search;
495 },
496 },
497 create_function => sub {
498 return $cv->send ($@)
499 unless $#_;
500 $cv->send (undef, @_[1,2,3]);
501 }
502 );
503
504 my ($err,$rc,$errcode,$errstr) = $cv->recv;
505
506 die $err if defined $err;
507 die "EVAL failed: $errstr"
508 if $errcode;
509
510 # otherwise, we can ignore $rc and $errcode for this particular func
511
512 =cut
513
514 for my $cmd_name (qw(attr exec stattr begin_work commit rollback func)) {
515 eval 'sub ' . $cmd_name . '{
516 my $cb = pop;
517 splice @_, 1, 0, $cb, (caller)[1,2], "req_' . $cmd_name . '";
518 &_req
519 }';
520 }
521
522 =back
523
524 =head1 SEE ALSO
525
526 L<AnyEvent>, L<DBI>, L<Coro::Mysql>.
527
528 =head1 AUTHOR AND CONTACT
529
530 Marc Lehmann <schmorp@schmorp.de> (current maintainer)
531 http://home.schmorp.de/
532
533 Adam Rosenstein <adam@redcondor.com>
534 http://www.redcondor.com/
535
536 =cut
537
538 1