ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
Revision: 1.20
Committed: Sat Mar 24 23:22:25 2018 UTC (6 years, 1 month ago) by root
Branch: MAIN
Changes since 1.19: +2 -0 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::DBI - asynchronous DBI access
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::DBI;
8
9 my $cv = AnyEvent->condvar;
10
11 my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", "";
12
13 $dbh->exec ("select * from test where num=?", 10, sub {
14 my ($dbh, $rows, $rv) = @_;
15
16 $#_ or die "failure: $@";
17
18 print "@$_\n"
19 for @$rows;
20
21 $cv->broadcast;
22 });
23
24 # asynchronously do sth. else here
25
26 $cv->wait;
27
28 =head1 DESCRIPTION
29
30 This module is an L<AnyEvent> user, you need to make sure that you use and
31 run a supported event loop.
32
33 This module implements asynchronous DBI access by forking or executing
34 separate "DBI-Server" processes and sending them requests.
35
36 It means that you can run DBI requests in parallel to other tasks.
37
38 With DBD::mysql, the overhead for very simple statements
39 ("select 0") is somewhere around 50% compared to an explicit
40 prepare_cached/execute/fetchrow_arrayref/finish combination. With
41 DBD::SQlite3, it's more like a factor of 8 for this trivial statement.
42
43 =head2 ERROR HANDLING
44
45 This module defines a number of functions that accept a callback
46 argument. All callbacks used by this module get their AnyEvent::DBI handle
47 object passed as first argument.
48
49 If the request was successful, then there will be more arguments,
50 otherwise there will only be the C<$dbh> argument and C<$@> contains an
51 error message.
52
53 A convenient way to check whether an error occurred is to check C<$#_> -
54 if that is true, then the function was successful, otherwise there was an
55 error.
56
57 =cut
58
59 package AnyEvent::DBI;
60
61 use common::sense;
62
63 use Carp;
64 use Convert::Scalar ();
65 use AnyEvent::Fork ();
66 use CBOR::XS ();
67
68 use AnyEvent ();
69 use AnyEvent::Util ();
70
71 use Errno ();
72
73 our $VERSION = '3.02';
74
75 =head2 METHODS
76
77 =over 4
78
79 =item $dbh = new AnyEvent::DBI $database, $user, $pass, [key => value]...
80
81 Returns a database handle for the given database. Each database handle
82 has an associated server process that executes statements in order. If
83 you want to run more than one statement in parallel, you need to create
84 additional database handles.
85
86 The advantage of this approach is that transactions work as state is
87 preserved.
88
89 Example:
90
91 $dbh = new AnyEvent::DBI
92 "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "";
93
94 Additional key-value pairs can be used to adjust behaviour:
95
96 =over 4
97
98 =item on_error => $callback->($dbh, $filename, $line, $fatal)
99
100 When an error occurs, then this callback will be invoked. On entry, C<$@>
101 is set to the error message. C<$filename> and C<$line> is where the
102 original request was submitted.
103
104 If the fatal argument is true then the database connection is shut down
105 and your database handle became invalid. In addition to invoking the
106 C<on_error> callback, all of your queued request callbacks are called
107 without only the C<$dbh> argument.
108
109 If omitted, then C<die> will be called on any errors, fatal or not.
110
111 =item on_connect => $callback->($dbh[, $success])
112
113 If you supply an C<on_connect> callback, then this callback will be
114 invoked after the database connect attempt. If the connection succeeds,
115 C<$success> is true, otherwise it is missing and C<$@> contains the
116 C<$DBI::errstr>.
117
118 Regardless of whether C<on_connect> is supplied, connect errors will result in
119 C<on_error> being called. However, if no C<on_connect> callback is supplied, then
120 connection errors are considered fatal. The client will C<die> and the C<on_error>
121 callback will be called with C<$fatal> true.
122
123 When on_connect is supplied, connect error are not fatal and AnyEvent::DBI
124 will not C<die>. You still cannot, however, use the $dbh object you
125 received from C<new> to make requests.
126
127 =item fork_template => $AnyEvent::Fork-object
128
129 C<AnyEvent::DBI> uses C<< AnyEvent::Fork->new >> to create the database
130 slave, which in turn either C<exec>'s a new process (similar to the old
131 C<exec_server> constructor argument) or uses a process forked early (see
132 L<AnyEvent::Fork::Early>).
133
134 With this argument you can provide your own fork template. This can be
135 useful if you create a lot of C<AnyEvent::DBI> handles and want to save
136 memory (And speed up startup) by not having to load C<AnyEvent::DBI> again
137 and again into your child processes:
138
139 my $template = AnyEvent::Fork
140 ->new # create new template
141 ->require ("AnyEvent::DBI::Slave"); # preload AnyEvent::DBI::Slave module
142
143 for (...) {
144 $dbh = new AnyEvent::DBI ...
145 fork_template => $template;
146
147 =item timeout => seconds
148
149 If you supply a timeout parameter (fractional values are supported), then
150 a timer is started any time the DBI handle expects a response from the
151 server. This includes connection setup as well as requests made to the
152 backend. The timeout spans the duration from the moment the first data
153 is written (or queued to be written) until all expected responses are
154 returned, but is postponed for "timeout" seconds each time more data is
155 returned from the server. If the timer ever goes off then a fatal error is
156 generated. If you have an C<on_error> handler installed, then it will be
157 called, otherwise your program will die().
158
159 When altering your databases with timeouts it is wise to use
160 transactions. If you quit due to timeout while performing insert, update
161 or schema-altering commands you can end up not knowing if the action was
162 submitted to the database, complicating recovery.
163
164 Timeout errors are always fatal.
165
166 =back
167
168 Any additional key-value pairs will be rolled into a hash reference
169 and passed as the final argument to the C<< DBI->connect (...) >>
170 call. For example, to suppress errors on STDERR and send them instead to an
171 AnyEvent::Handle you could do:
172
173 $dbh = new AnyEvent::DBI
174 "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "",
175 PrintError => 0,
176 on_error => sub {
177 $log_handle->push_write ("DBI Error: $@ at $_[1]:$_[2]\n");
178 };
179
180 =cut
181
182 sub new {
183 my ($class, $dbi, $user, $pass, %arg) = @_;
184
185 # we use our own socketpair, so we always have a socket
186 # available, even before the forked process exsist.
187 # this is mostly done so this module is compatible
188 # to versions of itself older than 3.0.
189 my ($client, $server) = AnyEvent::Util::portable_socketpair
190 or croak "unable to create AnyEvent::DBI communications pipe: $!";
191
192 AnyEvent::fh_unblock $client;
193
194 my $fork = delete $arg{fork_template};
195
196 my %dbi_args = %arg;
197 delete @dbi_args{qw(on_connect on_error timeout fork_template exec_server)};
198
199 my $self = bless \%arg, $class;
200
201 $self->{fh} = $client;
202
203 my $rbuf;
204 my @caller = (caller)[1,2]; # the "default" caller
205
206 $fork = $fork ? $fork->fork : AnyEvent::Fork->new
207 or croak "fork: $!";
208
209 $fork->require ("AnyEvent::DBI::Slave");
210 $fork->send_arg ($VERSION);
211 $fork->send_fh ($server);
212
213 # we don't rely on the callback, because we use our own
214 # socketpair, for better or worse.
215 $fork->run ("AnyEvent::DBI::Slave::serve", sub { });
216
217 {
218 Convert::Scalar::weaken (my $self = $self);
219
220 my $cbor = new CBOR::XS;
221
222 $self->{rw} = AE::io $client, 0, sub {
223 my $len = Convert::Scalar::extend_read $client, $rbuf, 65536;
224
225 if ($len > 0) {
226 # we received data, so reset the timer
227 $self->{last_activity} = AE::now;
228
229 for my $res ($cbor->incr_parse_multiple ($rbuf)) {
230 last unless $self;
231
232 my $req = shift @{ $self->{queue} };
233
234 if (defined $res->[0]) {
235 $res->[0] = $self;
236 $req->[0](@$res);
237 } else {
238 my $cb = shift @$req;
239 local $@ = $res->[1];
240 $cb->($self);
241 $self->_error ($res->[1], @$req, $res->[2]) # error, request record, is_fatal
242 if $self; # cb() could have deleted it
243 }
244
245 # no more queued requests, so become idle
246 if ($self && !@{ $self->{queue} }) {
247 undef $self->{last_activity};
248 $self->{tw_cb}->();
249 }
250 }
251
252 } elsif (defined $len) {
253 # todo, caller?
254 $self->_error ("unexpected eof", @caller, 1);
255 } elsif ($! != Errno::EAGAIN) {
256 # todo, caller?
257 $self->_error ("read error: $!", @caller, 1);
258 }
259 };
260
261 $self->{tw_cb} = sub {
262 if ($self->{timeout} && $self->{last_activity}) {
263 if (AE::now > $self->{last_activity} + $self->{timeout}) {
264 # we did time out
265 my $req = $self->{queue}[0];
266 $self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal
267 } else {
268 # we need to re-set the timeout watcher
269 $self->{tw} = AE::timer
270 $self->{last_activity} + $self->{timeout} - AE::now,
271 0,
272 $self->{tw_cb},
273 ;
274 }
275 } else {
276 # no timeout check wanted, or idle
277 undef $self->{tw};
278 }
279 };
280
281 $self->{ww_cb} = sub {
282 $self->{last_activity} = AE::now;
283
284 my $len = syswrite $client, $self->{wbuf}
285 or return delete $self->{ww};
286
287 substr $self->{wbuf}, 0, $len, "";
288 };
289 }
290
291 $self->_req (
292 sub {
293 return unless $self;
294 $self->{child_pid} = $_[1];
295 },
296 (caller)[1,2],
297 "req_pid"
298 );
299
300 $self->_req (
301 sub {
302 return unless $self;
303 &{ $self->{on_connect} } if $self->{on_connect};
304 },
305 (caller)[1,2],
306 req_open => $dbi, $user, $pass, %dbi_args
307 );
308
309 $self
310 }
311
312 sub _server_pid {
313 shift->{child_pid}
314 }
315
316 sub kill_child {
317 my $self = shift;
318
319 if (my $pid = delete $self->{child_pid}) {
320 # kill and reap process
321 my $kid_watcher; $kid_watcher = AE::child $pid, sub {
322 undef $kid_watcher;
323 };
324 kill TERM => $pid;
325 }
326
327 delete $self->{rw};
328 delete $self->{ww};
329 delete $self->{tw};
330 close delete $self->{fh};
331 }
332
333 sub DESTROY {
334 shift->kill_child;
335 }
336
337 sub _error {
338 my ($self, $error, $filename, $line, $fatal) = @_;
339
340 if ($fatal) {
341 delete $self->{tw};
342 delete $self->{rw};
343 delete $self->{ww};
344 delete $self->{fh};
345
346 # for fatal errors call all enqueued callbacks with error
347 while (my $req = shift @{$self->{queue}}) {
348 local $@ = $error;
349 $req->[0]->($self);
350 }
351 $self->kill_child;
352 }
353
354 local $@ = $error;
355
356 if ($self->{on_error}) {
357 $self->{on_error}($self, $filename, $line, $fatal)
358 } else {
359 die "$error at $filename, line $line\n";
360 }
361 }
362
363 =item $dbh->on_error ($cb->($dbh, $filename, $line, $fatal))
364
365 Sets (or clears, with C<undef>) the C<on_error> handler.
366
367 =cut
368
369 sub on_error {
370 $_[0]{on_error} = $_[1];
371 }
372
373 =item $dbh->timeout ($seconds)
374
375 Sets (or clears, with C<undef>) the database timeout. Useful to extend the
376 timeout when you are about to make a really long query.
377
378 =cut
379
380 sub timeout {
381 my ($self, $timeout) = @_;
382
383 $self->{timeout} = $timeout;
384
385 # reschedule timer if one was running
386 $self->{tw_cb}->();
387 }
388
389 sub _req {
390 my ($self, $cb, $filename, $line) = splice @_, 0, 4, ();
391
392 unless ($self->{fh}) {
393 local $@ = my $err = 'no database connection';
394 $cb->($self);
395 $self->_error ($err, $filename, $line, 1);
396 return;
397 }
398
399 push @{ $self->{queue} }, [$cb, $filename, $line];
400
401 # re-start timeout if necessary
402 if ($self->{timeout} && !$self->{tw}) {
403 $self->{last_activity} = AE::now;
404 $self->{tw_cb}->();
405 }
406
407 $self->{wbuf} .= CBOR::XS::encode_cbor \@_;
408
409 unless ($self->{ww}) {
410 my $len = syswrite $self->{fh}, $self->{wbuf};
411 substr $self->{wbuf}, 0, $len, "";
412
413 # still any left? then install a write watcher
414 $self->{ww} = AE::io $self->{fh}, 1, $self->{ww_cb}
415 if length $self->{wbuf};
416 }
417 }
418
419 =item $dbh->attr ($attr_name[, $attr_value], $cb->($dbh, $new_value))
420
421 An accessor for the database handle attributes, such as C<AutoCommit>,
422 C<RaiseError>, C<PrintError> and so on. If you provide an C<$attr_value>
423 (which might be C<undef>), then the given attribute will be set to that
424 value.
425
426 The callback will be passed the database handle and the attribute's value
427 if successful.
428
429 If an error occurs and the C<on_error> callback returns, then only C<$dbh>
430 will be passed and C<$@> contains the error message.
431
432 =item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, $rv))
433
434 Executes the given SQL statement with placeholders replaced by
435 C<@args>. The statement will be prepared and cached on the server side, so
436 using placeholders is extremely important.
437
438 The callback will be called with a weakened AnyEvent::DBI object as the
439 first argument and the result of C<fetchall_arrayref> as (or C<undef>
440 if the statement wasn't a select statement) as the second argument.
441
442 Third argument is the return value from the C<< DBI->execute >> method
443 call.
444
445 If an error occurs and the C<on_error> callback returns, then only C<$dbh>
446 will be passed and C<$@> contains the error message.
447
448 =item $dbh->stattr ($attr_name, $cb->($dbh, $value))
449
450 An accessor for the statement attributes of the most recently executed
451 statement, such as C<NAME> or C<TYPE>.
452
453 The callback will be passed the database handle and the attribute's value
454 if successful.
455
456 If an error occurs and the C<on_error> callback returns, then only C<$dbh>
457 will be passed and C<$@> contains the error message.
458
459 =item $dbh->begin_work ($cb->($dbh[, $rc]))
460
461 =item $dbh->commit ($cb->($dbh[, $rc]))
462
463 =item $dbh->rollback ($cb->($dbh[, $rc]))
464
465 The begin_work, commit, and rollback methods expose the equivalent
466 transaction control method of the DBI driver. On success, C<$rc> is true.
467
468 If an error occurs and the C<on_error> callback returns, then only C<$dbh>
469 will be passed and C<$@> contains the error message.
470
471 =item $dbh->func ('string_which_yields_args_when_evaled', $func_name, $cb->($dbh, $rc, $dbi_err, $dbi_errstr))
472
473 This gives access to database driver private methods. Because they
474 are not standard you cannot always depend on the value of C<$rc> or
475 C<$dbi_err>. Check the documentation for your specific driver/function
476 combination to see what it returns.
477
478 Note that the first argument will be eval'ed to produce the argument list to
479 the func() method. This must be done because the serialization protocol
480 between the AnyEvent::DBI server process and your program does not support the
481 passage of closures.
482
483 Here's an example to extend the query language in SQLite so it supports an
484 intstr() function:
485
486 $cv = AnyEvent->condvar;
487 $dbh->func (
488 q{
489 instr => 2, sub {
490 my ($string, $search) = @_;
491 return index $string, $search;
492 },
493 },
494 create_function => sub {
495 return $cv->send ($@)
496 unless $#_;
497 $cv->send (undef, @_[1,2,3]);
498 }
499 );
500
501 my ($err,$rc,$errcode,$errstr) = $cv->recv;
502
503 die $err if defined $err;
504 die "EVAL failed: $errstr"
505 if $errcode;
506
507 # otherwise, we can ignore $rc and $errcode for this particular func
508
509 =cut
510
511 for my $cmd_name (qw(attr exec stattr begin_work commit rollback func)) {
512 eval 'sub ' . $cmd_name . '{
513 my $cb = pop;
514 splice @_, 1, 0, $cb, (caller)[1,2], "req_' . $cmd_name . '";
515 &_req
516 }';
517 }
518
519 =back
520
521 =head1 SEE ALSO
522
523 L<AnyEvent>, L<DBI>, L<Coro::Mysql>.
524
525 =head1 AUTHOR AND CONTACT
526
527 Marc Lehmann <schmorp@schmorp.de> (current maintainer)
528 http://home.schmorp.de/
529
530 Adam Rosenstein <adam@redcondor.com>
531 http://www.redcondor.com/
532
533 =cut
534
535 1