ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
Revision: 1.15
Committed: Thu May 17 02:15:58 2012 UTC (12 years ago) by root
Branch: MAIN
CVS Tags: rel-2_2
Changes since 1.14: +11 -5 lines
Log Message:
2.2

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::DBI - asynchronous DBI access
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::DBI;
8
9 my $cv = AnyEvent->condvar;
10
11 my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", "";
12
13 $dbh->exec ("select * from test where num=?", 10, sub {
14 my ($dbh, $rows, $rv) = @_;
15
16 $#_ or die "failure: $@";
17
18 print "@$_\n"
19 for @$rows;
20
21 $cv->broadcast;
22 });
23
24 # asynchronously do sth. else here
25
26 $cv->wait;
27
28 =head1 DESCRIPTION
29
30 This module is an L<AnyEvent> user, you need to make sure that you use and
31 run a supported event loop.
32
33 This module implements asynchronous DBI access by forking or executing
34 separate "DBI-Server" processes and sending them requests.
35
36 It means that you can run DBI requests in parallel to other tasks.
37
38 The overhead for very simple statements ("select 0") is somewhere
39 around 100% to 120% (dual/single core CPU) compared to an explicit
40 prepare_cached/execute/fetchrow_arrayref/finish combination.
41
42 =head2 ERROR HANDLING
43
44 This module defines a number of functions that accept a callback
45 argument. All callbacks used by this module get their AnyEvent::DBI handle
46 object passed as first argument.
47
48 If the request was successful, then there will be more arguments,
49 otherwise there will only be the C<$dbh> argument and C<$@> contains an
50 error message.
51
52 A convinient way to check whether an error occured is to check C<$#_> -
53 if that is true, then the function was successful, otherwise there was an
54 error.
55
56 =cut
57
58 package AnyEvent::DBI;
59
60 use common::sense;
61
62 use Carp;
63 use Socket ();
64 use Scalar::Util ();
65 use Storable ();
66
67 use DBI (); # only needed in child actually - do it before fork & !exec?
68
69 use AnyEvent ();
70 use AnyEvent::Util ();
71
72 use Errno ();
73 use Fcntl ();
74 use POSIX ();
75
76 our $VERSION = '2.2';
77
78 our $FD_MAX = eval { POSIX::sysconf (&POSIX::_SC_OPEN_MAX) - 1 } || 1023;
79
80 # this is the forked server code, could/should be bundled as it's own file
81
82 our $DBH;
83
84 sub req_open {
85 my (undef, $dbi, $user, $pass, %attr) = @{+shift};
86
87 $DBH = DBI->connect ($dbi, $user, $pass, \%attr) or die $DBI::errstr;
88
89 [1, 1]
90 }
91
92 sub req_exec {
93 my (undef, $st, @args) = @{+shift};
94 my $sth = $DBH->prepare_cached ($st, undef, 1)
95 or die [$DBI::errstr];
96
97 my $rv = $sth->execute (@args)
98 or die [$sth->errstr];
99
100 [1, $sth->{NUM_OF_FIELDS} ? $sth->fetchall_arrayref : undef, $rv]
101 }
102
103 sub req_attr {
104 my (undef, $attr_name, @attr_val) = @{+shift};
105
106 $DBH->{$attr_name} = $attr_val[0]
107 if @attr_val;
108
109 [1, $DBH->{$attr_name}]
110 }
111
112 sub req_begin_work {
113 [1, $DBH->begin_work or die [$DBI::errstr]]
114 }
115
116 sub req_commit {
117 [1, $DBH->commit or die [$DBI::errstr]]
118 }
119
120 sub req_rollback {
121 [1, $DBH->rollback or die [$DBI::errstr]]
122 }
123
124 sub req_func {
125 my (undef, $arg_string, $function) = @{+shift};
126 my @args = eval $arg_string;
127
128 die "error evaling \$dbh->func() arg_string: $@"
129 if $@;
130
131 my $rc = $DBH->func (@args, $function);
132 return [1, $rc, $DBI::err, $DBI::errstr];
133 }
134
135 sub serve_fh($$) {
136 my ($fh, $version) = @_;
137
138 if ($VERSION != $version) {
139 syswrite $fh,
140 pack "L/a*",
141 Storable::freeze
142 [undef, "AnyEvent::DBI version mismatch ($VERSION vs. $version)"];
143 return;
144 }
145
146 eval {
147 my $rbuf;
148
149 while () {
150 sysread $fh, $rbuf, 16384, length $rbuf
151 or last;
152
153 while () {
154 my $len = unpack "L", $rbuf;
155
156 # full request available?
157 last unless $len && $len + 4 <= length $rbuf;
158
159 my $req = Storable::thaw substr $rbuf, 4;
160 substr $rbuf, 0, $len + 4, ""; # remove length + request
161
162 my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) };
163 $wbuf = pack "L/a*", Storable::freeze [undef, ref $@ ? ("$@->[0]", $@->[1]) : ("$@", 1)]
164 if $@;
165
166 for (my $ofs = 0; $ofs < length $wbuf; ) {
167 $ofs += (syswrite $fh, substr $wbuf, $ofs
168 or die "unable to write results");
169 }
170 }
171 }
172 };
173 }
174
175 sub serve_fd($$) {
176 open my $fh, ">>&=$_[0]"
177 or die "Couldn't open server file descriptor: $!";
178
179 serve_fh $fh, $_[1];
180 }
181
182 =head2 METHODS
183
184 =over 4
185
186 =item $dbh = new AnyEvent::DBI $database, $user, $pass, [key => value]...
187
188 Returns a database handle for the given database. Each database handle
189 has an associated server process that executes statements in order. If
190 you want to run more than one statement in parallel, you need to create
191 additional database handles.
192
193 The advantage of this approach is that transactions work as state is
194 preserved.
195
196 Example:
197
198 $dbh = new AnyEvent::DBI
199 "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "";
200
201 Additional key-value pairs can be used to adjust behaviour:
202
203 =over 4
204
205 =item on_error => $callback->($dbh, $filename, $line, $fatal)
206
207 When an error occurs, then this callback will be invoked. On entry, C<$@>
208 is set to the error message. C<$filename> and C<$line> is where the
209 original request was submitted.
210
211 If the fatal argument is true then the database connection is shut down
212 and your database handle became invalid. In addition to invoking the
213 C<on_error> callback, all of your queued request callbacks are called
214 without only the C<$dbh> argument.
215
216 If omitted, then C<die> will be called on any errors, fatal or not.
217
218 =item on_connect => $callback->($dbh[, $success])
219
220 If you supply an C<on_connect> callback, then this callback will be
221 invoked after the database connect attempt. If the connection succeeds,
222 C<$success> is true, otherwise it is missing and C<$@> contains the
223 C<$DBI::errstr>.
224
225 Regardless of whether C<on_connect> is supplied, connect errors will result in
226 C<on_error> being called. However, if no C<on_connect> callback is supplied, then
227 connection errors are considered fatal. The client will C<die> and the C<on_error>
228 callback will be called with C<$fatal> true.
229
230 When on_connect is supplied, connect error are not fatal and AnyEvent::DBI
231 will not C<die>. You still cannot, however, use the $dbh object you
232 received from C<new> to make requests.
233
234 =item exec_server => 1
235
236 If you supply an C<exec_server> argument, then the DBI server process will
237 fork and exec another perl interpreter (using C<$^X>) with just the
238 AnyEvent::DBI proxy running. This will provide the cleanest possible proxy
239 for your database server.
240
241 If you do not supply the C<exec_server> argument (or supply it with a
242 false value) then the traditional method of starting the server by forking
243 the current process is used. The forked interpreter will try to clean
244 itself up by calling POSIX::close on all file descriptors except STDIN,
245 STDOUT, and STDERR (and the socket it uses to communicate with the cilent,
246 of course).
247
248 =item timeout => seconds
249
250 If you supply a timeout parameter (fractional values are supported), then
251 a timer is started any time the DBI handle expects a response from the
252 server. This includes connection setup as well as requests made to the
253 backend. The timeout spans the duration from the moment the first data
254 is written (or queued to be written) until all expected responses are
255 returned, but is postponed for "timeout" seconds each time more data is
256 returned from the server. If the timer ever goes off then a fatal error is
257 generated. If you have an C<on_error> handler installed, then it will be
258 called, otherwise your program will die().
259
260 When altering your databases with timeouts it is wise to use
261 transactions. If you quit due to timeout while performing insert, update
262 or schema-altering commands you can end up not knowing if the action was
263 submitted to the database, complicating recovery.
264
265 Timeout errors are always fatal.
266
267 =back
268
269 Any additional key-value pairs will be rolled into a hash reference
270 and passed as the final argument to the C<< DBI->connect (...) >>
271 call. For example, to supress errors on STDERR and send them instead to an
272 AnyEvent::Handle you could do:
273
274 $dbh = new AnyEvent::DBI
275 "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "",
276 PrintError => 0,
277 on_error => sub {
278 $log_handle->push_write ("DBI Error: $@ at $_[1]:$_[2]\n");
279 };
280
281 =cut
282
283 # stupid Storable autoloading, total loss-loss situation
284 Storable::thaw Storable::freeze [];
285
286 sub new {
287 my ($class, $dbi, $user, $pass, %arg) = @_;
288
289 my ($client, $server) = AnyEvent::Util::portable_socketpair
290 or croak "unable to create AnyEvent::DBI communications pipe: $!";
291
292 my %dbi_args = %arg;
293 delete @dbi_args{qw(on_connect on_error timeout exec_server)};
294
295 my $self = bless \%arg, $class;
296 $self->{fh} = $client;
297
298 AnyEvent::Util::fh_nonblocking $client, 1;
299
300 my $rbuf;
301 my @caller = (caller)[1,2]; # the "default" caller
302
303 {
304 Scalar::Util::weaken (my $self = $self);
305
306 $self->{rw} = AE::io $client, 0, sub {
307 return unless $self;
308
309 my $len = sysread $client, $rbuf, 65536, length $rbuf;
310
311 if ($len > 0) {
312 # we received data, so reset the timer
313 $self->{last_activity} = AE::now;
314
315 while () {
316 my $len = unpack "L", $rbuf;
317
318 # full response available?
319 last unless $len && $len + 4 <= length $rbuf;
320
321 my $res = Storable::thaw substr $rbuf, 4;
322 substr $rbuf, 0, $len + 4, ""; # remove length + request
323
324 last unless $self;
325 my $req = shift @{ $self->{queue} };
326
327 if (defined $res->[0]) {
328 $res->[0] = $self;
329 $req->[0](@$res);
330 } else {
331 my $cb = shift @$req;
332 local $@ = $res->[1];
333 $cb->($self);
334 $self->_error ($res->[1], @$req, $res->[2]) # error, request record, is_fatal
335 if $self; # cb() could have deleted it
336 }
337
338 # no more queued requests, so become idle
339 if ($self && !@{ $self->{queue} }) {
340 undef $self->{last_activity};
341 $self->{tw_cb}->();
342 }
343 }
344
345 } elsif (defined $len) {
346 # todo, caller?
347 $self->_error ("unexpected eof", @caller, 1);
348 } elsif ($! != Errno::EAGAIN) {
349 # todo, caller?
350 $self->_error ("read error: $!", @caller, 1);
351 }
352 };
353
354 $self->{tw_cb} = sub {
355 if ($self->{timeout} && $self->{last_activity}) {
356 if (AE::now > $self->{last_activity} + $self->{timeout}) {
357 # we did time out
358 my $req = $self->{queue}[0];
359 $self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal
360 } else {
361 # we need to re-set the timeout watcher
362 $self->{tw} = AE::timer
363 $self->{last_activity} + $self->{timeout} - AE::now,
364 0,
365 $self->{tw_cb},
366 ;
367 }
368 } else {
369 # no timeout check wanted, or idle
370 undef $self->{tw};
371 }
372 };
373
374 $self->{ww_cb} = sub {
375 return unless $self;
376
377 $self->{last_activity} = AE::now;
378
379 my $len = syswrite $client, $self->{wbuf}
380 or return delete $self->{ww};
381
382 substr $self->{wbuf}, 0, $len, "";
383 };
384 }
385
386 my $pid = fork;
387
388 if ($pid) {
389 # parent
390 close $server;
391 } elsif (defined $pid) {
392 # child
393 my $serv_fno = fileno $server;
394
395 if ($self->{exec_server}) {
396 fcntl $server, &Fcntl::F_SETFD, 0; # don't close the server side
397 exec {$^X}
398 "$0 dbi slave",
399 -e => "require shift; AnyEvent::DBI::serve_fd ($serv_fno, $VERSION)",
400 $INC{"AnyEvent/DBI.pm"};
401 POSIX::_exit 124;
402 } else {
403 ($_ != $serv_fno) && POSIX::close $_
404 for $^F+1..$FD_MAX;
405 serve_fh $server, $VERSION;
406
407 # no other way on the broken windows platform, even this leaks
408 # memory and might fail.
409 kill 9, $$
410 if AnyEvent::WIN32;
411
412 # and this kills the parent process on windows
413 POSIX::_exit 0;
414 }
415 } else {
416 croak "fork: $!";
417 }
418
419 $self->{child_pid} = $pid;
420
421 $self->_req (
422 ($self->{on_connect} ? $self->{on_connect} : sub { }),
423 (caller)[1,2],
424 req_open => $dbi, $user, $pass, %dbi_args
425 );
426
427 $self
428 }
429
430 sub _server_pid {
431 shift->{child_pid}
432 }
433
434 sub kill_child {
435 my $self = shift;
436
437 if (my $pid = delete $self->{child_pid}) {
438 # kill and reap process
439 my $kid_watcher; $kid_watcher = AE::child $pid, sub {
440 undef $kid_watcher;
441 };
442 kill TERM => $pid;
443 }
444
445 close delete $self->{fh};
446 }
447
448 sub DESTROY {
449 shift->kill_child;
450 }
451
452 sub _error {
453 my ($self, $error, $filename, $line, $fatal) = @_;
454
455 if ($fatal) {
456 delete $self->{tw};
457 delete $self->{rw};
458 delete $self->{ww};
459 delete $self->{fh};
460
461 # for fatal errors call all enqueued callbacks with error
462 while (my $req = shift @{$self->{queue}}) {
463 local $@ = $error;
464 $req->[0]->($self);
465 }
466 $self->kill_child;
467 }
468
469 local $@ = $error;
470
471 if ($self->{on_error}) {
472 $self->{on_error}($self, $filename, $line, $fatal)
473 } else {
474 die "$error at $filename, line $line\n";
475 }
476 }
477
478 =item $dbh->on_error ($cb->($dbh, $filename, $line, $fatal))
479
480 Sets (or clears, with C<undef>) the C<on_error> handler.
481
482 =cut
483
484 sub on_error {
485 $_[0]{on_error} = $_[1];
486 }
487
488 =item $dbh->timeout ($seconds)
489
490 Sets (or clears, with C<undef>) the database timeout. Useful to extend the
491 timeout when you are about to make a really long query.
492
493 =cut
494
495 sub timeout {
496 my ($self, $timeout) = @_;
497
498 $self->{timeout} = $timeout;
499
500 # reschedule timer if one was running
501 $self->{tw_cb}->();
502 }
503
504 sub _req {
505 my ($self, $cb, $filename, $line) = splice @_, 0, 4, ();
506
507 unless ($self->{fh}) {
508 local $@ = my $err = 'no database connection';
509 $cb->($self);
510 $self->_error ($err, $filename, $line, 1);
511 return;
512 }
513
514 push @{ $self->{queue} }, [$cb, $filename, $line];
515
516 # re-start timeout if necessary
517 if ($self->{timeout} && !$self->{tw}) {
518 $self->{last_activity} = AE::now;
519 $self->{tw_cb}->();
520 }
521
522 $self->{wbuf} .= pack "L/a*", Storable::freeze \@_;
523
524 unless ($self->{ww}) {
525 my $len = syswrite $self->{fh}, $self->{wbuf};
526 substr $self->{wbuf}, 0, $len, "";
527
528 # still any left? then install a write watcher
529 $self->{ww} = AE::io $self->{fh}, 1, $self->{ww_cb}
530 if length $self->{wbuf};
531 }
532 }
533
534 =item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, $rv))
535
536 Executes the given SQL statement with placeholders replaced by
537 C<@args>. The statement will be prepared and cached on the server side, so
538 using placeholders is extremely important.
539
540 The callback will be called with a weakened AnyEvent::DBI object as the
541 first argument and the result of C<fetchall_arrayref> as (or C<undef>
542 if the statement wasn't a select statement) as the second argument.
543
544 Third argument is the return value from the C<< DBI->execute >> method
545 call.
546
547 If an error occurs and the C<on_error> callback returns, then only C<$dbh>
548 will be passed and C<$@> contains the error message.
549
550 =item $dbh->attr ($attr_name[, $attr_value], $cb->($dbh, $new_value))
551
552 An accessor for the handle attributes, such as C<AutoCommit>,
553 C<RaiseError>, C<PrintError> and so on. If you provide an C<$attr_value>
554 (which might be C<undef>), then the given attribute will be set to that
555 value.
556
557 The callback will be passed the database handle and the attribute's value
558 if successful.
559
560 If an error occurs and the C<on_error> callback returns, then only C<$dbh>
561 will be passed and C<$@> contains the error message.
562
563 =item $dbh->begin_work ($cb->($dbh[, $rc]))
564
565 =item $dbh->commit ($cb->($dbh[, $rc]))
566
567 =item $dbh->rollback ($cb->($dbh[, $rc]))
568
569 The begin_work, commit, and rollback methods expose the equivalent
570 transaction control method of the DBI driver. On success, C<$rc> is true.
571
572 If an error occurs and the C<on_error> callback returns, then only C<$dbh>
573 will be passed and C<$@> contains the error message.
574
575 =item $dbh->func ('string_which_yields_args_when_evaled', $func_name, $cb->($dbh, $rc, $dbi_err, $dbi_errstr))
576
577 This gives access to database driver private methods. Because they
578 are not standard you cannot always depend on the value of C<$rc> or
579 C<$dbi_err>. Check the documentation for your specific driver/function
580 combination to see what it returns.
581
582 Note that the first argument will be eval'ed to produce the argument list to
583 the func() method. This must be done because the serialization protocol
584 between the AnyEvent::DBI server process and your program does not support the
585 passage of closures.
586
587 Here's an example to extend the query language in SQLite so it supports an
588 intstr() function:
589
590 $cv = AnyEvent->condvar;
591 $dbh->func (
592 q{
593 instr => 2, sub {
594 my ($string, $search) = @_;
595 return index $string, $search;
596 },
597 },
598 create_function => sub {
599 return $cv->send ($@)
600 unless $#_;
601 $cv->send (undef, @_[1,2,3]);
602 }
603 );
604
605 my ($err,$rc,$errcode,$errstr) = $cv->recv;
606
607 die $err if defined $err;
608 die "EVAL failed: $errstr"
609 if $errcode;
610
611 # otherwise, we can ignore $rc and $errcode for this particular func
612
613 =cut
614
615 for my $cmd_name (qw(exec attr begin_work commit rollback func)) {
616 eval 'sub ' . $cmd_name . '{
617 my $cb = pop;
618 splice @_, 1, 0, $cb, (caller)[1,2], "req_' . $cmd_name . '";
619 &_req
620 }';
621 }
622
623 =back
624
625 =head1 SEE ALSO
626
627 L<AnyEvent>, L<DBI>, L<Coro::Mysql>.
628
629 =head1 AUTHOR
630
631 Marc Lehmann <schmorp@schmorp.de>
632 http://home.schmorp.de/
633
634 Adam Rosenstein <adam@redcondor.com>
635 http://www.redcondor.com/
636
637 =cut
638
639 1;
640