ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
Revision: 1.14
Committed: Sat Oct 30 20:23:44 2010 UTC (13 years, 6 months ago) by root
Branch: MAIN
CVS Tags: rel-2_1
Changes since 1.13: +21 -40 lines
Log Message:
2.1

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::DBI - asynchronous DBI access
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::DBI;
8
9 my $cv = AnyEvent->condvar;
10
11 my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", "";
12
13 $dbh->exec ("select * from test where num=?", 10, sub {
14 my ($dbh, $rows, $rv) = @_;
15
16 $#_ or die "failure: $@";
17
18 print "@$_\n"
19 for @$rows;
20
21 $cv->broadcast;
22 });
23
24 # asynchronously do sth. else here
25
26 $cv->wait;
27
28 =head1 DESCRIPTION
29
30 This module is an L<AnyEvent> user, you need to make sure that you use and
31 run a supported event loop.
32
33 This module implements asynchronous DBI access by forking or executing
34 separate "DBI-Server" processes and sending them requests.
35
36 It means that you can run DBI requests in parallel to other tasks.
37
38 The overhead for very simple statements ("select 0") is somewhere
39 around 100% to 120% (dual/single core CPU) compared to an explicit
40 prepare_cached/execute/fetchrow_arrayref/finish combination.
41
42 =head2 ERROR HANDLING
43
44 This module defines a number of functions that accept a callback
45 argument. All callbacks used by this module get their AnyEvent::DBI handle
46 object passed as first argument.
47
48 If the request was successful, then there will be more arguments,
49 otherwise there will only be the C<$dbh> argument and C<$@> contains an
50 error message.
51
52 A convinient way to check whether an error occured is to check C<$#_> -
53 if that is true, then the function was successful, otherwise there was an
54 error.
55
56 =cut
57
58 package AnyEvent::DBI;
59
60 use common::sense;
61
62 use Carp;
63 use Socket ();
64 use Scalar::Util ();
65 use Storable ();
66
67 use DBI (); # only needed in child actually - do it before fork & !exec?
68
69 use AnyEvent ();
70 use AnyEvent::Util ();
71
72 use Errno ();
73 use Fcntl ();
74 use POSIX ();
75
76 our $VERSION = '2.1';
77
78 our $FD_MAX = eval { POSIX::sysconf (&POSIX::_SC_OPEN_MAX) - 1 } || 1023;
79
80 # this is the forked server code, could/should be bundled as it's own file
81
82 our $DBH;
83
84 sub req_open {
85 my (undef, $dbi, $user, $pass, %attr) = @{+shift};
86
87 $DBH = DBI->connect ($dbi, $user, $pass, \%attr) or die $DBI::errstr;
88
89 [1, 1]
90 }
91
92 sub req_exec {
93 my (undef, $st, @args) = @{+shift};
94 my $sth = $DBH->prepare_cached ($st, undef, 1)
95 or die [$DBI::errstr];
96
97 my $rv = $sth->execute (@args)
98 or die [$sth->errstr];
99
100 [1, $sth->{NUM_OF_FIELDS} ? $sth->fetchall_arrayref : undef, $rv]
101 }
102
103 sub req_attr {
104 my (undef, $attr_name, @attr_val) = @{+shift};
105
106 $DBH->{$attr_name} = $attr_val[0]
107 if @attr_val;
108
109 [1, $DBH->{$attr_name}]
110 }
111
112 sub req_begin_work {
113 [1, $DBH->begin_work or die [$DBI::errstr]]
114 }
115
116 sub req_commit {
117 [1, $DBH->commit or die [$DBI::errstr]]
118 }
119
120 sub req_rollback {
121 [1, $DBH->rollback or die [$DBI::errstr]]
122 }
123
124 sub req_func {
125 my (undef, $arg_string, $function) = @{+shift};
126 my @args = eval $arg_string;
127
128 die "error evaling \$dbh->func() arg_string: $@"
129 if $@;
130
131 my $rc = $DBH->func (@args, $function);
132 return [1, $rc, $DBI::err, $DBI::errstr];
133 }
134
135 sub serve_fh($$) {
136 my ($fh, $version) = @_;
137
138 if ($VERSION != $version) {
139 syswrite $fh,
140 pack "L/a*",
141 Storable::freeze
142 [undef, "AnyEvent::DBI version mismatch ($VERSION vs. $version)"];
143 return;
144 }
145
146 eval {
147 my $rbuf;
148
149 while () {
150 sysread $fh, $rbuf, 16384, length $rbuf
151 or last;
152
153 while () {
154 my $len = unpack "L", $rbuf;
155
156 # full request available?
157 last unless $len && $len + 4 <= length $rbuf;
158
159 my $req = Storable::thaw substr $rbuf, 4;
160 substr $rbuf, 0, $len + 4, ""; # remove length + request
161
162 my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) };
163 $wbuf = pack "L/a*", Storable::freeze [undef, ref $@ ? ("$@->[0]", $@->[1]) : ("$@", 1)]
164 if $@;
165
166 for (my $ofs = 0; $ofs < length $wbuf; ) {
167 $ofs += (syswrite $fh, substr $wbuf, $ofs
168 or die "unable to write results");
169 }
170 }
171 }
172 };
173 }
174
175 sub serve_fd($$) {
176 open my $fh, ">>&=$_[0]"
177 or die "Couldn't open server file descriptor: $!";
178
179 serve_fh $fh, $_[1];
180 }
181
182 =head2 METHODS
183
184 =over 4
185
186 =item $dbh = new AnyEvent::DBI $database, $user, $pass, [key => value]...
187
188 Returns a database handle for the given database. Each database handle
189 has an associated server process that executes statements in order. If
190 you want to run more than one statement in parallel, you need to create
191 additional database handles.
192
193 The advantage of this approach is that transactions work as state is
194 preserved.
195
196 Example:
197
198 $dbh = new AnyEvent::DBI
199 "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "";
200
201 Additional key-value pairs can be used to adjust behaviour:
202
203 =over 4
204
205 =item on_error => $callback->($dbh, $filename, $line, $fatal)
206
207 When an error occurs, then this callback will be invoked. On entry, C<$@>
208 is set to the error message. C<$filename> and C<$line> is where the
209 original request was submitted.
210
211 If the fatal argument is true then the database connection is shut down
212 and your database handle became invalid. In addition to invoking the
213 C<on_error> callback, all of your queued request callbacks are called
214 without only the C<$dbh> argument.
215
216 If omitted, then C<die> will be called on any errors, fatal or not.
217
218 =item on_connect => $callback->($dbh[, $success])
219
220 If you supply an C<on_connect> callback, then this callback will be
221 invoked after the database connect attempt. If the connection succeeds,
222 C<$success> is true, otherwise it is missing and C<$@> contains the
223 C<$DBI::errstr>.
224
225 Regardless of whether C<on_connect> is supplied, connect errors will result in
226 C<on_error> being called. However, if no C<on_connect> callback is supplied, then
227 connection errors are considered fatal. The client will C<die> and the C<on_error>
228 callback will be called with C<$fatal> true.
229
230 When on_connect is supplied, connect error are not fatal and AnyEvent::DBI
231 will not C<die>. You still cannot, however, use the $dbh object you
232 received from C<new> to make requests.
233
234 =item exec_server => 1
235
236 If you supply an C<exec_server> argument, then the DBI server process will
237 fork and exec another perl interpreter (using C<$^X>) with just the
238 AnyEvent::DBI proxy running. This will provide the cleanest possible proxy
239 for your database server.
240
241 If you do not supply the C<exec_server> argument (or supply it with a
242 false value) then the traditional method of starting the server by forking
243 the current process is used. The forked interpreter will try to clean
244 itself up by calling POSIX::close on all file descriptors except STDIN,
245 STDOUT, and STDERR (and the socket it uses to communicate with the cilent,
246 of course).
247
248 =item timeout => seconds
249
250 If you supply a timeout parameter (fractional values are supported), then
251 a timer is started any time the DBI handle expects a response from the
252 server. This includes connection setup as well as requests made to the
253 backend. The timeout spans the duration from the moment the first data
254 is written (or queued to be written) until all expected responses are
255 returned, but is postponed for "timeout" seconds each time more data is
256 returned from the server. If the timer ever goes off then a fatal error is
257 generated. If you have an C<on_error> handler installed, then it will be
258 called, otherwise your program will die().
259
260 When altering your databases with timeouts it is wise to use
261 transactions. If you quit due to timeout while performing insert, update
262 or schema-altering commands you can end up not knowing if the action was
263 submitted to the database, complicating recovery.
264
265 Timeout errors are always fatal.
266
267 =back
268
269 Any additional key-value pairs will be rolled into a hash reference
270 and passed as the final argument to the C<< DBI->connect (...) >>
271 call. For example, to supress errors on STDERR and send them instead to an
272 AnyEvent::Handle you could do:
273
274 $dbh = new AnyEvent::DBI
275 "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "",
276 PrintError => 0,
277 on_error => sub {
278 $log_handle->push_write ("DBI Error: $@ at $_[1]:$_[2]\n");
279 };
280
281 =cut
282
283 # stupid Storable autoloading, total loss-loss situation
284 Storable::thaw Storable::freeze [];
285
286 sub new {
287 my ($class, $dbi, $user, $pass, %arg) = @_;
288
289 my ($client, $server) = AnyEvent::Util::portable_socketpair
290 or croak "unable to create AnyEvent::DBI communications pipe: $!";
291
292 my %dbi_args = %arg;
293 delete @dbi_args{qw(on_connect on_error timeout exec_server)};
294
295 my $self = bless \%arg, $class;
296 $self->{fh} = $client;
297
298 AnyEvent::Util::fh_nonblocking $client, 1;
299
300 my $rbuf;
301 my @caller = (caller)[1,2]; # the "default" caller
302
303 {
304 Scalar::Util::weaken (my $self = $self);
305
306 $self->{rw} = AE::io $client, 0, sub {
307 return unless $self;
308
309 $self->{last_activity} = AE::now;
310
311 my $len = sysread $client, $rbuf, 65536, length $rbuf;
312
313 if ($len > 0) {
314 # we received data, so reset the timer
315
316 while () {
317 my $len = unpack "L", $rbuf;
318
319 # full response available?
320 last unless $len && $len + 4 <= length $rbuf;
321
322 my $res = Storable::thaw substr $rbuf, 4;
323 substr $rbuf, 0, $len + 4, ""; # remove length + request
324
325 last unless $self;
326 my $req = shift @{ $self->{queue} };
327
328 if (defined $res->[0]) {
329 $res->[0] = $self;
330 $req->[0](@$res);
331 } else {
332 my $cb = shift @$req;
333 local $@ = $res->[1];
334 $cb->($self);
335 $self->_error ($res->[1], @$req, $res->[2]) # error, request record, is_fatal
336 if $self; # cb() could have deleted it
337 }
338
339 # no more queued requests, so become idle
340 undef $self->{last_activity}
341 if $self && !@{ $self->{queue} };
342 }
343
344 } elsif (defined $len) {
345 # todo, caller?
346 $self->_error ("unexpected eof", @caller, 1);
347 } elsif ($! != Errno::EAGAIN) {
348 # todo, caller?
349 $self->_error ("read error: $!", @caller, 1);
350 }
351 };
352
353 $self->{tw_cb} = sub {
354 if ($self->{timeout} && $self->{last_activity}) {
355 if (AE::now > $self->{last_activity} + $self->{timeout}) {
356 # we did time out
357 my $req = $self->{queue}[0];
358 $self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal
359 } else {
360 # we need to re-set the timeout watcher
361 $self->{tw} = AE::timer
362 $self->{last_activity} + $self->{timeout} - AE::now,
363 0,
364 $self->{tw_cb},
365 ;
366 }
367 } else {
368 # no timeout check wanted, or idle
369 undef $self->{tw};
370 }
371 };
372
373 $self->{ww_cb} = sub {
374 return unless $self;
375
376 $self->{last_activity} = AE::now;
377
378 my $len = syswrite $client, $self->{wbuf}
379 or return delete $self->{ww};
380
381 substr $self->{wbuf}, 0, $len, "";
382 };
383 }
384
385 my $pid = fork;
386
387 if ($pid) {
388 # parent
389 close $server;
390 } elsif (defined $pid) {
391 # child
392 my $serv_fno = fileno $server;
393
394 if ($self->{exec_server}) {
395 fcntl $server, &Fcntl::F_SETFD, 0; # don't close the server side
396 exec {$^X}
397 "$0 dbi slave",
398 -e => "require shift; AnyEvent::DBI::serve_fd ($serv_fno, $VERSION)",
399 $INC{"AnyEvent/DBI.pm"};
400 POSIX::_exit 124;
401 } else {
402 ($_ != $serv_fno) && POSIX::close $_
403 for $^F+1..$FD_MAX;
404 serve_fh $server, $VERSION;
405
406 # no other way on the broken windows platform, even this leaks
407 # memory and might fail.
408 kill 9, $$
409 if AnyEvent::WIN32;
410
411 # and this kills the parent process on windows
412 POSIX::_exit 0;
413 }
414 } else {
415 croak "fork: $!";
416 }
417
418 $self->{child_pid} = $pid;
419
420 $self->_req (
421 ($self->{on_connect} ? $self->{on_connect} : sub { }),
422 (caller)[1,2],
423 req_open => $dbi, $user, $pass, %dbi_args
424 );
425
426 $self
427 }
428
429 sub _server_pid {
430 shift->{child_pid}
431 }
432
433 sub kill_child {
434 my $self = shift;
435
436 if (my $pid = delete $self->{child_pid}) {
437 kill TERM => $pid;
438 }
439 close delete $self->{fh};
440 }
441
442 sub DESTROY {
443 shift->kill_child;
444 }
445
446 sub _error {
447 my ($self, $error, $filename, $line, $fatal) = @_;
448
449 if ($fatal) {
450 delete $self->{tw};
451 delete $self->{rw};
452 delete $self->{ww};
453 delete $self->{fh};
454
455 # for fatal errors call all enqueued callbacks with error
456 while (my $req = shift @{$self->{queue}}) {
457 local $@ = $error;
458 $req->[0]->($self);
459 }
460 $self->kill_child;
461 }
462
463 local $@ = $error;
464
465 if ($self->{on_error}) {
466 $self->{on_error}($self, $filename, $line, $fatal)
467 } else {
468 die "$error at $filename, line $line\n";
469 }
470 }
471
472 =item $dbh->on_error ($cb->($dbh, $filename, $line, $fatal))
473
474 Sets (or clears, with C<undef>) the C<on_error> handler.
475
476 =cut
477
478 sub on_error {
479 $_[0]{on_error} = $_[1];
480 }
481
482 =item $dbh->timeout ($seconds)
483
484 Sets (or clears, with C<undef>) the database timeout. Useful to extend the
485 timeout when you are about to make a really long query.
486
487 =cut
488
489 sub timeout {
490 my ($self, $timeout) = @_;
491
492 $self->{timeout} = $timeout;
493
494 # reschedule timer if one was running
495 $self->{tw_cb}->();
496 }
497
498 sub _req {
499 my ($self, $cb, $filename, $line) = splice @_, 0, 4, ();
500
501 unless ($self->{fh}) {
502 local $@ = my $err = 'no database connection';
503 $cb->($self);
504 $self->_error ($err, $filename, $line, 1);
505 return;
506 }
507
508 push @{ $self->{queue} }, [$cb, $filename, $line];
509
510 # re-start timeout if necessary
511 if ($self->{timeout} && !$self->{tw}) {
512 $self->{last_activity} = AE::now;
513 $self->{tw_cb}->();
514 }
515
516 $self->{wbuf} .= pack "L/a*", Storable::freeze \@_;
517
518 unless ($self->{ww}) {
519 my $len = syswrite $self->{fh}, $self->{wbuf};
520 substr $self->{wbuf}, 0, $len, "";
521
522 # still any left? then install a write watcher
523 $self->{ww} = AE::io $self->{fh}, 1, $self->{ww_cb}
524 if length $self->{wbuf};
525 }
526 }
527
528 =item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, $rv))
529
530 Executes the given SQL statement with placeholders replaced by
531 C<@args>. The statement will be prepared and cached on the server side, so
532 using placeholders is extremely important.
533
534 The callback will be called with a weakened AnyEvent::DBI object as the
535 first argument and the result of C<fetchall_arrayref> as (or C<undef>
536 if the statement wasn't a select statement) as the second argument.
537
538 Third argument is the return value from the C<< DBI->execute >> method
539 call.
540
541 If an error occurs and the C<on_error> callback returns, then only C<$dbh>
542 will be passed and C<$@> contains the error message.
543
544 =item $dbh->attr ($attr_name[, $attr_value], $cb->($dbh, $new_value))
545
546 An accessor for the handle attributes, such as C<AutoCommit>,
547 C<RaiseError>, C<PrintError> and so on. If you provide an C<$attr_value>
548 (which might be C<undef>), then the given attribute will be set to that
549 value.
550
551 The callback will be passed the database handle and the attribute's value
552 if successful.
553
554 If an error occurs and the C<on_error> callback returns, then only C<$dbh>
555 will be passed and C<$@> contains the error message.
556
557 =item $dbh->begin_work ($cb->($dbh[, $rc]))
558
559 =item $dbh->commit ($cb->($dbh[, $rc]))
560
561 =item $dbh->rollback ($cb->($dbh[, $rc]))
562
563 The begin_work, commit, and rollback methods expose the equivalent
564 transaction control method of the DBI driver. On success, C<$rc> is true.
565
566 If an error occurs and the C<on_error> callback returns, then only C<$dbh>
567 will be passed and C<$@> contains the error message.
568
569 =item $dbh->func ('string_which_yields_args_when_evaled', $func_name, $cb->($dbh, $rc, $dbi_err, $dbi_errstr))
570
571 This gives access to database driver private methods. Because they
572 are not standard you cannot always depend on the value of C<$rc> or
573 C<$dbi_err>. Check the documentation for your specific driver/function
574 combination to see what it returns.
575
576 Note that the first argument will be eval'ed to produce the argument list to
577 the func() method. This must be done because the serialization protocol
578 between the AnyEvent::DBI server process and your program does not support the
579 passage of closures.
580
581 Here's an example to extend the query language in SQLite so it supports an
582 intstr() function:
583
584 $cv = AnyEvent->condvar;
585 $dbh->func (
586 q{
587 instr => 2, sub {
588 my ($string, $search) = @_;
589 return index $string, $search;
590 },
591 },
592 create_function => sub {
593 return $cv->send ($@)
594 unless $#_;
595 $cv->send (undef, @_[1,2,3]);
596 }
597 );
598
599 my ($err,$rc,$errcode,$errstr) = $cv->recv;
600
601 die $err if defined $err;
602 die "EVAL failed: $errstr"
603 if $errcode;
604
605 # otherwise, we can ignore $rc and $errcode for this particular func
606
607 =cut
608
609 for my $cmd_name (qw(exec attr begin_work commit rollback func)) {
610 eval 'sub ' . $cmd_name . '{
611 my $cb = pop;
612 splice @_, 1, 0, $cb, (caller)[1,2], "req_' . $cmd_name . '";
613 &_req
614 }';
615 }
616
617 =back
618
619 =head1 SEE ALSO
620
621 L<AnyEvent>, L<DBI>, L<Coro::Mysql>.
622
623 =head1 AUTHOR
624
625 Marc Lehmann <schmorp@schmorp.de>
626 http://home.schmorp.de/
627
628 Adam Rosenstein <adam@redcondor.com>
629 http://www.redcondor.com/
630
631 =cut
632
633 1;
634