ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
Revision: 1.19
Committed: Mon Sep 4 11:46:30 2017 UTC (6 years, 9 months ago) by root
Branch: MAIN
CVS Tags: rel-3_02
Changes since 1.18: +1 -1 lines
Log Message:
3.02

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::DBI - asynchronous DBI access
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::DBI;
8
9 my $cv = AnyEvent->condvar;
10
11 my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", "";
12
13 $dbh->exec ("select * from test where num=?", 10, sub {
14 my ($dbh, $rows, $rv) = @_;
15
16 $#_ or die "failure: $@";
17
18 print "@$_\n"
19 for @$rows;
20
21 $cv->broadcast;
22 });
23
24 # asynchronously do sth. else here
25
26 $cv->wait;
27
28 =head1 DESCRIPTION
29
30 This module is an L<AnyEvent> user, you need to make sure that you use and
31 run a supported event loop.
32
33 This module implements asynchronous DBI access by forking or executing
34 separate "DBI-Server" processes and sending them requests.
35
36 It means that you can run DBI requests in parallel to other tasks.
37
38 With DBD::mysql, the overhead for very simple statements
39 ("select 0") is somewhere around 50% compared to an explicit
40 prepare_cached/execute/fetchrow_arrayref/finish combination. With
41 DBD::SQlite3, it's more like a factor of 8 for this trivial statement.
42
43 =head2 ERROR HANDLING
44
45 This module defines a number of functions that accept a callback
46 argument. All callbacks used by this module get their AnyEvent::DBI handle
47 object passed as first argument.
48
49 If the request was successful, then there will be more arguments,
50 otherwise there will only be the C<$dbh> argument and C<$@> contains an
51 error message.
52
53 A convenient way to check whether an error occurred is to check C<$#_> -
54 if that is true, then the function was successful, otherwise there was an
55 error.
56
57 =cut
58
59 package AnyEvent::DBI;
60
61 use common::sense;
62
63 use Carp;
64 use Convert::Scalar ();
65 use AnyEvent::Fork ();
66 use CBOR::XS ();
67
68 use AnyEvent ();
69 use AnyEvent::Util ();
70
71 use Errno ();
72
73 our $VERSION = '3.02';
74
75 =head2 METHODS
76
77 =over 4
78
79 =item $dbh = new AnyEvent::DBI $database, $user, $pass, [key => value]...
80
81 Returns a database handle for the given database. Each database handle
82 has an associated server process that executes statements in order. If
83 you want to run more than one statement in parallel, you need to create
84 additional database handles.
85
86 The advantage of this approach is that transactions work as state is
87 preserved.
88
89 Example:
90
91 $dbh = new AnyEvent::DBI
92 "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "";
93
94 Additional key-value pairs can be used to adjust behaviour:
95
96 =over 4
97
98 =item on_error => $callback->($dbh, $filename, $line, $fatal)
99
100 When an error occurs, then this callback will be invoked. On entry, C<$@>
101 is set to the error message. C<$filename> and C<$line> is where the
102 original request was submitted.
103
104 If the fatal argument is true then the database connection is shut down
105 and your database handle became invalid. In addition to invoking the
106 C<on_error> callback, all of your queued request callbacks are called
107 without only the C<$dbh> argument.
108
109 If omitted, then C<die> will be called on any errors, fatal or not.
110
111 =item on_connect => $callback->($dbh[, $success])
112
113 If you supply an C<on_connect> callback, then this callback will be
114 invoked after the database connect attempt. If the connection succeeds,
115 C<$success> is true, otherwise it is missing and C<$@> contains the
116 C<$DBI::errstr>.
117
118 Regardless of whether C<on_connect> is supplied, connect errors will result in
119 C<on_error> being called. However, if no C<on_connect> callback is supplied, then
120 connection errors are considered fatal. The client will C<die> and the C<on_error>
121 callback will be called with C<$fatal> true.
122
123 When on_connect is supplied, connect error are not fatal and AnyEvent::DBI
124 will not C<die>. You still cannot, however, use the $dbh object you
125 received from C<new> to make requests.
126
127 =item fork_template => $AnyEvent::Fork-object
128
129 C<AnyEvent::DBI> uses C<< AnyEvent::Fork->new >> to create the database
130 slave, which in turn either C<exec>'s a new process (similar to the old
131 C<exec_server> constructor argument) or uses a process forked early (see
132 L<AnyEvent::Fork::Early>).
133
134 With this argument you can provide your own fork template. This can be
135 useful if you create a lot of C<AnyEvent::DBI> handles and want to save
136 memory (And speed up startup) by not having to load C<AnyEvent::DBI> again
137 and again into your child processes:
138
139 my $template = AnyEvent::Fork
140 ->new # create new template
141 ->require ("AnyEvent::DBI::Slave"); # preload AnyEvent::DBI::Slave module
142
143 for (...) {
144 $dbh = new AnyEvent::DBI ...
145 fork_template => $template;
146
147 =item timeout => seconds
148
149 If you supply a timeout parameter (fractional values are supported), then
150 a timer is started any time the DBI handle expects a response from the
151 server. This includes connection setup as well as requests made to the
152 backend. The timeout spans the duration from the moment the first data
153 is written (or queued to be written) until all expected responses are
154 returned, but is postponed for "timeout" seconds each time more data is
155 returned from the server. If the timer ever goes off then a fatal error is
156 generated. If you have an C<on_error> handler installed, then it will be
157 called, otherwise your program will die().
158
159 When altering your databases with timeouts it is wise to use
160 transactions. If you quit due to timeout while performing insert, update
161 or schema-altering commands you can end up not knowing if the action was
162 submitted to the database, complicating recovery.
163
164 Timeout errors are always fatal.
165
166 =back
167
168 Any additional key-value pairs will be rolled into a hash reference
169 and passed as the final argument to the C<< DBI->connect (...) >>
170 call. For example, to suppress errors on STDERR and send them instead to an
171 AnyEvent::Handle you could do:
172
173 $dbh = new AnyEvent::DBI
174 "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "",
175 PrintError => 0,
176 on_error => sub {
177 $log_handle->push_write ("DBI Error: $@ at $_[1]:$_[2]\n");
178 };
179
180 =cut
181
182 sub new {
183 my ($class, $dbi, $user, $pass, %arg) = @_;
184
185 # we use our own socketpair, so we always have a socket
186 # available, even before the forked process exsist.
187 # this is mostly done so this module is compatible
188 # to versions of itself older than 3.0.
189 my ($client, $server) = AnyEvent::Util::portable_socketpair
190 or croak "unable to create AnyEvent::DBI communications pipe: $!";
191
192 my $fork = delete $arg{fork_template};
193
194 my %dbi_args = %arg;
195 delete @dbi_args{qw(on_connect on_error timeout fork_template exec_server)};
196
197 my $self = bless \%arg, $class;
198
199 $self->{fh} = $client;
200
201 my $rbuf;
202 my @caller = (caller)[1,2]; # the "default" caller
203
204 $fork = $fork ? $fork->fork : AnyEvent::Fork->new
205 or croak "fork: $!";
206
207 $fork->require ("AnyEvent::DBI::Slave");
208 $fork->send_arg ($VERSION);
209 $fork->send_fh ($server);
210
211 # we don't rely on the callback, because we use our own
212 # socketpair, for better or worse.
213 $fork->run ("AnyEvent::DBI::Slave::serve", sub { });
214
215 {
216 Convert::Scalar::weaken (my $self = $self);
217
218 my $cbor = new CBOR::XS;
219
220 $self->{rw} = AE::io $client, 0, sub {
221 my $len = Convert::Scalar::extend_read $client, $rbuf, 65536;
222
223 if ($len > 0) {
224 # we received data, so reset the timer
225 $self->{last_activity} = AE::now;
226
227 for my $res ($cbor->incr_parse_multiple ($rbuf)) {
228 last unless $self;
229
230 my $req = shift @{ $self->{queue} };
231
232 if (defined $res->[0]) {
233 $res->[0] = $self;
234 $req->[0](@$res);
235 } else {
236 my $cb = shift @$req;
237 local $@ = $res->[1];
238 $cb->($self);
239 $self->_error ($res->[1], @$req, $res->[2]) # error, request record, is_fatal
240 if $self; # cb() could have deleted it
241 }
242
243 # no more queued requests, so become idle
244 if ($self && !@{ $self->{queue} }) {
245 undef $self->{last_activity};
246 $self->{tw_cb}->();
247 }
248 }
249
250 } elsif (defined $len) {
251 # todo, caller?
252 $self->_error ("unexpected eof", @caller, 1);
253 } elsif ($! != Errno::EAGAIN) {
254 # todo, caller?
255 $self->_error ("read error: $!", @caller, 1);
256 }
257 };
258
259 $self->{tw_cb} = sub {
260 if ($self->{timeout} && $self->{last_activity}) {
261 if (AE::now > $self->{last_activity} + $self->{timeout}) {
262 # we did time out
263 my $req = $self->{queue}[0];
264 $self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal
265 } else {
266 # we need to re-set the timeout watcher
267 $self->{tw} = AE::timer
268 $self->{last_activity} + $self->{timeout} - AE::now,
269 0,
270 $self->{tw_cb},
271 ;
272 }
273 } else {
274 # no timeout check wanted, or idle
275 undef $self->{tw};
276 }
277 };
278
279 $self->{ww_cb} = sub {
280 $self->{last_activity} = AE::now;
281
282 my $len = syswrite $client, $self->{wbuf}
283 or return delete $self->{ww};
284
285 substr $self->{wbuf}, 0, $len, "";
286 };
287 }
288
289 $self->_req (
290 sub {
291 return unless $self;
292 $self->{child_pid} = $_[1];
293 },
294 (caller)[1,2],
295 "req_pid"
296 );
297
298 $self->_req (
299 sub {
300 return unless $self;
301 &{ $self->{on_connect} } if $self->{on_connect};
302 },
303 (caller)[1,2],
304 req_open => $dbi, $user, $pass, %dbi_args
305 );
306
307 $self
308 }
309
310 sub _server_pid {
311 shift->{child_pid}
312 }
313
314 sub kill_child {
315 my $self = shift;
316
317 if (my $pid = delete $self->{child_pid}) {
318 # kill and reap process
319 my $kid_watcher; $kid_watcher = AE::child $pid, sub {
320 undef $kid_watcher;
321 };
322 kill TERM => $pid;
323 }
324
325 delete $self->{rw};
326 delete $self->{ww};
327 delete $self->{tw};
328 close delete $self->{fh};
329 }
330
331 sub DESTROY {
332 shift->kill_child;
333 }
334
335 sub _error {
336 my ($self, $error, $filename, $line, $fatal) = @_;
337
338 if ($fatal) {
339 delete $self->{tw};
340 delete $self->{rw};
341 delete $self->{ww};
342 delete $self->{fh};
343
344 # for fatal errors call all enqueued callbacks with error
345 while (my $req = shift @{$self->{queue}}) {
346 local $@ = $error;
347 $req->[0]->($self);
348 }
349 $self->kill_child;
350 }
351
352 local $@ = $error;
353
354 if ($self->{on_error}) {
355 $self->{on_error}($self, $filename, $line, $fatal)
356 } else {
357 die "$error at $filename, line $line\n";
358 }
359 }
360
361 =item $dbh->on_error ($cb->($dbh, $filename, $line, $fatal))
362
363 Sets (or clears, with C<undef>) the C<on_error> handler.
364
365 =cut
366
367 sub on_error {
368 $_[0]{on_error} = $_[1];
369 }
370
371 =item $dbh->timeout ($seconds)
372
373 Sets (or clears, with C<undef>) the database timeout. Useful to extend the
374 timeout when you are about to make a really long query.
375
376 =cut
377
378 sub timeout {
379 my ($self, $timeout) = @_;
380
381 $self->{timeout} = $timeout;
382
383 # reschedule timer if one was running
384 $self->{tw_cb}->();
385 }
386
387 sub _req {
388 my ($self, $cb, $filename, $line) = splice @_, 0, 4, ();
389
390 unless ($self->{fh}) {
391 local $@ = my $err = 'no database connection';
392 $cb->($self);
393 $self->_error ($err, $filename, $line, 1);
394 return;
395 }
396
397 push @{ $self->{queue} }, [$cb, $filename, $line];
398
399 # re-start timeout if necessary
400 if ($self->{timeout} && !$self->{tw}) {
401 $self->{last_activity} = AE::now;
402 $self->{tw_cb}->();
403 }
404
405 $self->{wbuf} .= CBOR::XS::encode_cbor \@_;
406
407 unless ($self->{ww}) {
408 my $len = syswrite $self->{fh}, $self->{wbuf};
409 substr $self->{wbuf}, 0, $len, "";
410
411 # still any left? then install a write watcher
412 $self->{ww} = AE::io $self->{fh}, 1, $self->{ww_cb}
413 if length $self->{wbuf};
414 }
415 }
416
417 =item $dbh->attr ($attr_name[, $attr_value], $cb->($dbh, $new_value))
418
419 An accessor for the database handle attributes, such as C<AutoCommit>,
420 C<RaiseError>, C<PrintError> and so on. If you provide an C<$attr_value>
421 (which might be C<undef>), then the given attribute will be set to that
422 value.
423
424 The callback will be passed the database handle and the attribute's value
425 if successful.
426
427 If an error occurs and the C<on_error> callback returns, then only C<$dbh>
428 will be passed and C<$@> contains the error message.
429
430 =item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, $rv))
431
432 Executes the given SQL statement with placeholders replaced by
433 C<@args>. The statement will be prepared and cached on the server side, so
434 using placeholders is extremely important.
435
436 The callback will be called with a weakened AnyEvent::DBI object as the
437 first argument and the result of C<fetchall_arrayref> as (or C<undef>
438 if the statement wasn't a select statement) as the second argument.
439
440 Third argument is the return value from the C<< DBI->execute >> method
441 call.
442
443 If an error occurs and the C<on_error> callback returns, then only C<$dbh>
444 will be passed and C<$@> contains the error message.
445
446 =item $dbh->stattr ($attr_name, $cb->($dbh, $value))
447
448 An accessor for the statement attributes of the most recently executed
449 statement, such as C<NAME> or C<TYPE>.
450
451 The callback will be passed the database handle and the attribute's value
452 if successful.
453
454 If an error occurs and the C<on_error> callback returns, then only C<$dbh>
455 will be passed and C<$@> contains the error message.
456
457 =item $dbh->begin_work ($cb->($dbh[, $rc]))
458
459 =item $dbh->commit ($cb->($dbh[, $rc]))
460
461 =item $dbh->rollback ($cb->($dbh[, $rc]))
462
463 The begin_work, commit, and rollback methods expose the equivalent
464 transaction control method of the DBI driver. On success, C<$rc> is true.
465
466 If an error occurs and the C<on_error> callback returns, then only C<$dbh>
467 will be passed and C<$@> contains the error message.
468
469 =item $dbh->func ('string_which_yields_args_when_evaled', $func_name, $cb->($dbh, $rc, $dbi_err, $dbi_errstr))
470
471 This gives access to database driver private methods. Because they
472 are not standard you cannot always depend on the value of C<$rc> or
473 C<$dbi_err>. Check the documentation for your specific driver/function
474 combination to see what it returns.
475
476 Note that the first argument will be eval'ed to produce the argument list to
477 the func() method. This must be done because the serialization protocol
478 between the AnyEvent::DBI server process and your program does not support the
479 passage of closures.
480
481 Here's an example to extend the query language in SQLite so it supports an
482 intstr() function:
483
484 $cv = AnyEvent->condvar;
485 $dbh->func (
486 q{
487 instr => 2, sub {
488 my ($string, $search) = @_;
489 return index $string, $search;
490 },
491 },
492 create_function => sub {
493 return $cv->send ($@)
494 unless $#_;
495 $cv->send (undef, @_[1,2,3]);
496 }
497 );
498
499 my ($err,$rc,$errcode,$errstr) = $cv->recv;
500
501 die $err if defined $err;
502 die "EVAL failed: $errstr"
503 if $errcode;
504
505 # otherwise, we can ignore $rc and $errcode for this particular func
506
507 =cut
508
509 for my $cmd_name (qw(attr exec stattr begin_work commit rollback func)) {
510 eval 'sub ' . $cmd_name . '{
511 my $cb = pop;
512 splice @_, 1, 0, $cb, (caller)[1,2], "req_' . $cmd_name . '";
513 &_req
514 }';
515 }
516
517 =back
518
519 =head1 SEE ALSO
520
521 L<AnyEvent>, L<DBI>, L<Coro::Mysql>.
522
523 =head1 AUTHOR AND CONTACT
524
525 Marc Lehmann <schmorp@schmorp.de> (current maintainer)
526 http://home.schmorp.de/
527
528 Adam Rosenstein <adam@redcondor.com>
529 http://www.redcondor.com/
530
531 =cut
532
533 1