… | |
… | |
11 | my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", ""; |
11 | my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", ""; |
12 | |
12 | |
13 | $dbh->exec ("select * from test where num=?", 10, sub { |
13 | $dbh->exec ("select * from test where num=?", 10, sub { |
14 | my ($dbh, $rows, $rv) = @_; |
14 | my ($dbh, $rows, $rv) = @_; |
15 | |
15 | |
16 | $rows or die "failure: $@"; |
16 | $#_ or die "failure: $@"; |
17 | |
17 | |
18 | print "@$_\n" |
18 | print "@$_\n" |
19 | for @$rows; |
19 | for @$rows; |
20 | |
20 | |
21 | $cv->broadcast; |
21 | $cv->broadcast; |
… | |
… | |
34 | separate "DBI-Server" processes and sending them requests. |
34 | separate "DBI-Server" processes and sending them requests. |
35 | |
35 | |
36 | It means that you can run DBI requests in parallel to other tasks. |
36 | It means that you can run DBI requests in parallel to other tasks. |
37 | |
37 | |
38 | The overhead for very simple statements ("select 0") is somewhere |
38 | The overhead for very simple statements ("select 0") is somewhere |
39 | around 120% to 200% (dual/single core CPU) compared to an explicit |
39 | around 100% to 120% (dual/single core CPU) compared to an explicit |
40 | prepare_cached/execute/fetchrow_arrayref/finish combination. |
40 | prepare_cached/execute/fetchrow_arrayref/finish combination. |
41 | |
41 | |
42 | =head2 ERROR HANDLING |
42 | =head2 ERROR HANDLING |
43 | |
43 | |
44 | This module defines a number of functions that accept a callback |
44 | This module defines a number of functions that accept a callback |
… | |
… | |
55 | |
55 | |
56 | =cut |
56 | =cut |
57 | |
57 | |
58 | package AnyEvent::DBI; |
58 | package AnyEvent::DBI; |
59 | |
59 | |
60 | use strict qw(vars subs); |
60 | use common::sense; |
61 | no warnings; |
|
|
62 | |
61 | |
63 | use Carp; |
62 | use Carp; |
64 | use Socket (); |
63 | use Socket (); |
65 | use Scalar::Util (); |
64 | use Scalar::Util (); |
66 | use Storable (); |
65 | use Storable (); |
67 | |
66 | |
68 | use DBI (); |
67 | use DBI (); # only needed in child actually - do it before fork & !exec? |
69 | |
68 | |
70 | use AnyEvent (); |
69 | use AnyEvent (); |
71 | use AnyEvent::Util (); |
70 | use AnyEvent::Util (); |
72 | |
71 | |
73 | use Errno (); |
72 | use Errno (); |
74 | use Fcntl (); |
73 | use Fcntl (); |
75 | use POSIX (); |
74 | use POSIX (); |
76 | |
75 | |
77 | our $VERSION = '1.19'; |
76 | our $VERSION = '2.1'; |
78 | |
77 | |
79 | our $FD_MAX = eval { POSIX::sysconf (&POSIX::_SC_OPEN_MAX) - 1 } || 1023; |
78 | our $FD_MAX = eval { POSIX::sysconf (&POSIX::_SC_OPEN_MAX) - 1 } || 1023; |
80 | |
79 | |
81 | # this is the forked server code, could/should be bundled as it's own file |
80 | # this is the forked server code, could/should be bundled as it's own file |
82 | |
81 | |
… | |
… | |
124 | |
123 | |
125 | sub req_func { |
124 | sub req_func { |
126 | my (undef, $arg_string, $function) = @{+shift}; |
125 | my (undef, $arg_string, $function) = @{+shift}; |
127 | my @args = eval $arg_string; |
126 | my @args = eval $arg_string; |
128 | |
127 | |
129 | die "Bad func () arg string: $@" |
128 | die "error evaling \$dbh->func() arg_string: $@" |
130 | if $@; |
129 | if $@; |
131 | |
130 | |
132 | my $rv = $DBH->func (@args, $function); |
131 | my $rc = $DBH->func (@args, $function); |
133 | return [$rv, $DBH->err]; |
132 | return [1, $rc, $DBI::err, $DBI::errstr]; |
134 | } |
133 | } |
135 | |
134 | |
136 | sub serve_fh($$) { |
135 | sub serve_fh($$) { |
137 | my ($fh, $version) = @_; |
136 | my ($fh, $version) = @_; |
138 | |
137 | |
… | |
… | |
234 | |
233 | |
235 | =item exec_server => 1 |
234 | =item exec_server => 1 |
236 | |
235 | |
237 | If you supply an C<exec_server> argument, then the DBI server process will |
236 | If you supply an C<exec_server> argument, then the DBI server process will |
238 | fork and exec another perl interpreter (using C<$^X>) with just the |
237 | fork and exec another perl interpreter (using C<$^X>) with just the |
239 | AnyEvent::DBI proxy running. This will provide the cleanest possible porxy |
238 | AnyEvent::DBI proxy running. This will provide the cleanest possible proxy |
240 | for your database server. |
239 | for your database server. |
241 | |
240 | |
242 | If you do not supply the C<exec_server> argument (or supply it with a |
241 | If you do not supply the C<exec_server> argument (or supply it with a |
243 | false value) then the traditional method of starting the server by forking |
242 | false value) then the traditional method of starting the server by forking |
244 | the current process is used. The forked interpreter will try to clean |
243 | the current process is used. The forked interpreter will try to clean |
… | |
… | |
286 | |
285 | |
287 | sub new { |
286 | sub new { |
288 | my ($class, $dbi, $user, $pass, %arg) = @_; |
287 | my ($class, $dbi, $user, $pass, %arg) = @_; |
289 | |
288 | |
290 | my ($client, $server) = AnyEvent::Util::portable_socketpair |
289 | my ($client, $server) = AnyEvent::Util::portable_socketpair |
291 | or croak "unable to create Anyevent::DBI communications pipe: $!"; |
290 | or croak "unable to create AnyEvent::DBI communications pipe: $!"; |
292 | |
291 | |
293 | my %dbi_args = %arg; |
292 | my %dbi_args = %arg; |
294 | delete @dbi_args{qw(on_connect on_error timeout exec_server)}; |
293 | delete @dbi_args{qw(on_connect on_error timeout exec_server)}; |
295 | |
294 | |
296 | my $self = bless \%arg, $class; |
295 | my $self = bless \%arg, $class; |
… | |
… | |
302 | my @caller = (caller)[1,2]; # the "default" caller |
301 | my @caller = (caller)[1,2]; # the "default" caller |
303 | |
302 | |
304 | { |
303 | { |
305 | Scalar::Util::weaken (my $self = $self); |
304 | Scalar::Util::weaken (my $self = $self); |
306 | |
305 | |
307 | $self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub { |
306 | $self->{rw} = AE::io $client, 0, sub { |
308 | return unless $self; |
307 | return unless $self; |
309 | |
308 | |
310 | $self->{last_activity} = AnyEvent->now; |
309 | $self->{last_activity} = AE::now; |
311 | |
310 | |
312 | my $len = sysread $client, $rbuf, 65536, length $rbuf; |
311 | my $len = sysread $client, $rbuf, 65536, length $rbuf; |
313 | |
312 | |
314 | if ($len > 0) { |
313 | if ($len > 0) { |
315 | # we received data, so reset the timer |
314 | # we received data, so reset the timer |
… | |
… | |
347 | $self->_error ("unexpected eof", @caller, 1); |
346 | $self->_error ("unexpected eof", @caller, 1); |
348 | } elsif ($! != Errno::EAGAIN) { |
347 | } elsif ($! != Errno::EAGAIN) { |
349 | # todo, caller? |
348 | # todo, caller? |
350 | $self->_error ("read error: $!", @caller, 1); |
349 | $self->_error ("read error: $!", @caller, 1); |
351 | } |
350 | } |
352 | }); |
351 | }; |
353 | |
352 | |
354 | $self->{tw_cb} = sub { |
353 | $self->{tw_cb} = sub { |
355 | if ($self->{timeout} && $self->{last_activity}) { |
354 | if ($self->{timeout} && $self->{last_activity}) { |
356 | if (AnyEvent->now > $self->{last_activity} + $self->{timeout}) { |
355 | if (AE::now > $self->{last_activity} + $self->{timeout}) { |
357 | # we did time out |
356 | # we did time out |
358 | my $req = $self->{queue}[0]; |
357 | my $req = $self->{queue}[0]; |
359 | $self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal |
358 | $self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal |
360 | } else { |
359 | } else { |
361 | # we need to re-set the timeout watcher |
360 | # we need to re-set the timeout watcher |
362 | $self->{tw} = AnyEvent->timer ( |
361 | $self->{tw} = AE::timer |
363 | after => $self->{last_activity} + $self->{timeout} - AnyEvent->now, |
362 | $self->{last_activity} + $self->{timeout} - AE::now, |
|
|
363 | 0, |
364 | cb => $self->{tw_cb}, |
364 | $self->{tw_cb}, |
365 | ); |
365 | ; |
366 | Scalar::Util::weaken $self; |
|
|
367 | } |
366 | } |
368 | } else { |
367 | } else { |
369 | # no timeout check wanted, or idle |
368 | # no timeout check wanted, or idle |
370 | undef $self->{tw}; |
369 | undef $self->{tw}; |
371 | } |
370 | } |
372 | }; |
371 | }; |
373 | |
372 | |
374 | $self->{ww_cb} = sub { |
373 | $self->{ww_cb} = sub { |
375 | return unless $self; |
374 | return unless $self; |
376 | |
375 | |
377 | $self->{last_activity} = AnyEvent->now; |
376 | $self->{last_activity} = AE::now; |
378 | |
377 | |
379 | my $len = syswrite $client, $self->{wbuf} |
378 | my $len = syswrite $client, $self->{wbuf} |
380 | or return delete $self->{ww}; |
379 | or return delete $self->{ww}; |
381 | |
380 | |
382 | substr $self->{wbuf}, 0, $len, ""; |
381 | substr $self->{wbuf}, 0, $len, ""; |
… | |
… | |
430 | sub _server_pid { |
429 | sub _server_pid { |
431 | shift->{child_pid} |
430 | shift->{child_pid} |
432 | } |
431 | } |
433 | |
432 | |
434 | sub kill_child { |
433 | sub kill_child { |
435 | my $self = shift; |
434 | my $self = shift; |
|
|
435 | |
436 | my $child_pid = delete $self->{child_pid}; |
436 | if (my $pid = delete $self->{child_pid}) { |
437 | if ($child_pid) { |
437 | kill TERM => $pid; |
438 | # send SIGKILL in two seconds |
438 | } |
439 | my $murder_timer = AnyEvent->timer ( |
|
|
440 | after => 2, |
|
|
441 | cb => sub { |
|
|
442 | kill 9, $child_pid; |
|
|
443 | }, |
|
|
444 | ); |
|
|
445 | |
|
|
446 | # reap process |
|
|
447 | my $kid_watcher; $kid_watcher = AnyEvent->child ( |
|
|
448 | pid => $child_pid, |
|
|
449 | cb => sub { |
|
|
450 | # just hold on to this so it won't go away |
|
|
451 | undef $kid_watcher; |
|
|
452 | # cancel SIGKILL |
|
|
453 | undef $murder_timer; |
|
|
454 | }, |
|
|
455 | ); |
|
|
456 | |
|
|
457 | close $self->{fh}; |
439 | close delete $self->{fh}; |
458 | } |
|
|
459 | } |
440 | } |
460 | |
441 | |
461 | sub DESTROY { |
442 | sub DESTROY { |
462 | shift->kill_child; |
443 | shift->kill_child; |
463 | } |
444 | } |
… | |
… | |
526 | |
507 | |
527 | push @{ $self->{queue} }, [$cb, $filename, $line]; |
508 | push @{ $self->{queue} }, [$cb, $filename, $line]; |
528 | |
509 | |
529 | # re-start timeout if necessary |
510 | # re-start timeout if necessary |
530 | if ($self->{timeout} && !$self->{tw}) { |
511 | if ($self->{timeout} && !$self->{tw}) { |
531 | $self->{last_activity} = AnyEvent->now; |
512 | $self->{last_activity} = AE::now; |
532 | $self->{tw_cb}->(); |
513 | $self->{tw_cb}->(); |
533 | } |
514 | } |
534 | |
515 | |
535 | $self->{wbuf} .= pack "L/a*", Storable::freeze \@_; |
516 | $self->{wbuf} .= pack "L/a*", Storable::freeze \@_; |
536 | |
517 | |
537 | unless ($self->{ww}) { |
518 | unless ($self->{ww}) { |
538 | my $len = syswrite $self->{fh}, $self->{wbuf}; |
519 | my $len = syswrite $self->{fh}, $self->{wbuf}; |
539 | substr $self->{wbuf}, 0, $len, ""; |
520 | substr $self->{wbuf}, 0, $len, ""; |
540 | |
521 | |
541 | # still any left? then install a write watcher |
522 | # still any left? then install a write watcher |
542 | $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb}) |
523 | $self->{ww} = AE::io $self->{fh}, 1, $self->{ww_cb} |
543 | if length $self->{wbuf}; |
524 | if length $self->{wbuf}; |
544 | } |
525 | } |
545 | } |
526 | } |
546 | |
527 | |
547 | =item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, $rv)) |
528 | =item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, $rv)) |
… | |
… | |
571 | if successful. |
552 | if successful. |
572 | |
553 | |
573 | If an error occurs and the C<on_error> callback returns, then only C<$dbh> |
554 | If an error occurs and the C<on_error> callback returns, then only C<$dbh> |
574 | will be passed and C<$@> contains the error message. |
555 | will be passed and C<$@> contains the error message. |
575 | |
556 | |
576 | =item $dbh->begin_work ($cb->($dbh[, $success])) |
557 | =item $dbh->begin_work ($cb->($dbh[, $rc])) |
577 | |
558 | |
578 | =item $dbh->commit ($cb->($dbh[, $success])) |
559 | =item $dbh->commit ($cb->($dbh[, $rc])) |
579 | |
560 | |
580 | =item $dbh->rollback ($cb->($dbh[, $success])) |
561 | =item $dbh->rollback ($cb->($dbh[, $rc])) |
581 | |
562 | |
582 | The begin_work, commit, and rollback methods expose the equivalent |
563 | The begin_work, commit, and rollback methods expose the equivalent |
583 | transaction control method of the DBI driver. On success, C<$success> |
564 | transaction control method of the DBI driver. On success, C<$rc> is true. |
584 | is true. |
|
|
585 | |
565 | |
586 | If an error occurs and the C<on_error> callback returns, then only C<$dbh> |
566 | If an error occurs and the C<on_error> callback returns, then only C<$dbh> |
587 | will be passed and C<$@> contains the error message. |
567 | will be passed and C<$@> contains the error message. |
588 | |
568 | |
589 | =item $dbh->func ('string_which_yields_args_when_evaled', $func_name, $cb->($dbh, $result, $handle_error)) |
569 | =item $dbh->func ('string_which_yields_args_when_evaled', $func_name, $cb->($dbh, $rc, $dbi_err, $dbi_errstr)) |
590 | |
570 | |
591 | This gives access to database driver private methods. Because they |
571 | This gives access to database driver private methods. Because they |
592 | are not standard you cannot always depend on the value of C<$result> |
572 | are not standard you cannot always depend on the value of C<$rc> or |
593 | or C<$handle_error>. Check the documentation for your specific |
573 | C<$dbi_err>. Check the documentation for your specific driver/function |
594 | driver/function combination to see what it returns. |
574 | combination to see what it returns. |
595 | |
575 | |
596 | Note that the first argument will be eval'ed to produce the argument list to |
576 | Note that the first argument will be eval'ed to produce the argument list to |
597 | the func() method. This must be done because the serialization protocol |
577 | the func() method. This must be done because the serialization protocol |
598 | between the AnyEvent::DBI server process and your program does not support the |
578 | between the AnyEvent::DBI server process and your program does not support the |
599 | passage of closures. |
579 | passage of closures. |
… | |
… | |
608 | my ($string, $search) = @_; |
588 | my ($string, $search) = @_; |
609 | return index $string, $search; |
589 | return index $string, $search; |
610 | }, |
590 | }, |
611 | }, |
591 | }, |
612 | create_function => sub { |
592 | create_function => sub { |
613 | return $cv->send($@) |
593 | return $cv->send ($@) |
614 | unless $_[0]; |
594 | unless $#_; |
615 | $cv->send (undef, @_[1,2]); |
595 | $cv->send (undef, @_[1,2,3]); |
616 | } |
596 | } |
617 | ); |
597 | ); |
618 | |
598 | |
619 | my ($err,$result,$handle_err) = $cv->recv; |
599 | my ($err,$rc,$errcode,$errstr) = $cv->recv; |
620 | |
600 | |
|
|
601 | die $err if defined $err; |
621 | die "EVAL failed: $err" |
602 | die "EVAL failed: $errstr" |
622 | if $err; |
603 | if $errcode; |
623 | |
604 | |
624 | # otherwise, we can ignore $result and $handle_err for this particular func |
605 | # otherwise, we can ignore $rc and $errcode for this particular func |
625 | |
606 | |
626 | =cut |
607 | =cut |
627 | |
608 | |
628 | for my $cmd_name (qw(exec attr begin_work commit rollback func)) { |
609 | for my $cmd_name (qw(exec attr begin_work commit rollback func)) { |
629 | eval 'sub ' . $cmd_name . '{ |
610 | eval 'sub ' . $cmd_name . '{ |