… | |
… | |
34 | separate "DBI-Server" processes and sending them requests. |
34 | separate "DBI-Server" processes and sending them requests. |
35 | |
35 | |
36 | It means that you can run DBI requests in parallel to other tasks. |
36 | It means that you can run DBI requests in parallel to other tasks. |
37 | |
37 | |
38 | The overhead for very simple statements ("select 0") is somewhere |
38 | The overhead for very simple statements ("select 0") is somewhere |
39 | around 120% to 200% (dual/single core CPU) compared to an explicit |
39 | around 100% to 120% (dual/single core CPU) compared to an explicit |
40 | prepare_cached/execute/fetchrow_arrayref/finish combination. |
40 | prepare_cached/execute/fetchrow_arrayref/finish combination. |
41 | |
41 | |
42 | =head2 ERROR HANDLING |
42 | =head2 ERROR HANDLING |
43 | |
43 | |
44 | This module defines a number of functions that accept a callback |
44 | This module defines a number of functions that accept a callback |
… | |
… | |
55 | |
55 | |
56 | =cut |
56 | =cut |
57 | |
57 | |
58 | package AnyEvent::DBI; |
58 | package AnyEvent::DBI; |
59 | |
59 | |
60 | use strict qw(vars subs); |
60 | use common::sense; |
61 | no warnings; |
|
|
62 | |
61 | |
63 | use Carp; |
62 | use Carp; |
64 | use Socket (); |
63 | use Socket (); |
65 | use Scalar::Util (); |
64 | use Scalar::Util (); |
66 | use Storable (); |
65 | use Storable (); |
67 | |
66 | |
68 | use DBI (); |
67 | use DBI (); # only needed in child actually - do it before fork & !exec? |
69 | |
68 | |
70 | use AnyEvent (); |
69 | use AnyEvent (); |
71 | use AnyEvent::Util (); |
70 | use AnyEvent::Util (); |
72 | |
71 | |
73 | use Errno (); |
72 | use Errno (); |
74 | use Fcntl (); |
73 | use Fcntl (); |
75 | use POSIX (); |
74 | use POSIX (); |
76 | |
75 | |
77 | our $VERSION = '2.0'; |
76 | our $VERSION = '2.1'; |
78 | |
77 | |
79 | our $FD_MAX = eval { POSIX::sysconf (&POSIX::_SC_OPEN_MAX) - 1 } || 1023; |
78 | our $FD_MAX = eval { POSIX::sysconf (&POSIX::_SC_OPEN_MAX) - 1 } || 1023; |
80 | |
79 | |
81 | # this is the forked server code, could/should be bundled as it's own file |
80 | # this is the forked server code, could/should be bundled as it's own file |
82 | |
81 | |
… | |
… | |
286 | |
285 | |
287 | sub new { |
286 | sub new { |
288 | my ($class, $dbi, $user, $pass, %arg) = @_; |
287 | my ($class, $dbi, $user, $pass, %arg) = @_; |
289 | |
288 | |
290 | my ($client, $server) = AnyEvent::Util::portable_socketpair |
289 | my ($client, $server) = AnyEvent::Util::portable_socketpair |
291 | or croak "unable to create Anyevent::DBI communications pipe: $!"; |
290 | or croak "unable to create AnyEvent::DBI communications pipe: $!"; |
292 | |
291 | |
293 | my %dbi_args = %arg; |
292 | my %dbi_args = %arg; |
294 | delete @dbi_args{qw(on_connect on_error timeout exec_server)}; |
293 | delete @dbi_args{qw(on_connect on_error timeout exec_server)}; |
295 | |
294 | |
296 | my $self = bless \%arg, $class; |
295 | my $self = bless \%arg, $class; |
… | |
… | |
302 | my @caller = (caller)[1,2]; # the "default" caller |
301 | my @caller = (caller)[1,2]; # the "default" caller |
303 | |
302 | |
304 | { |
303 | { |
305 | Scalar::Util::weaken (my $self = $self); |
304 | Scalar::Util::weaken (my $self = $self); |
306 | |
305 | |
307 | $self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub { |
306 | $self->{rw} = AE::io $client, 0, sub { |
308 | return unless $self; |
307 | return unless $self; |
309 | |
308 | |
310 | $self->{last_activity} = AnyEvent->now; |
309 | $self->{last_activity} = AE::now; |
311 | |
310 | |
312 | my $len = sysread $client, $rbuf, 65536, length $rbuf; |
311 | my $len = sysread $client, $rbuf, 65536, length $rbuf; |
313 | |
312 | |
314 | if ($len > 0) { |
313 | if ($len > 0) { |
315 | # we received data, so reset the timer |
314 | # we received data, so reset the timer |
… | |
… | |
347 | $self->_error ("unexpected eof", @caller, 1); |
346 | $self->_error ("unexpected eof", @caller, 1); |
348 | } elsif ($! != Errno::EAGAIN) { |
347 | } elsif ($! != Errno::EAGAIN) { |
349 | # todo, caller? |
348 | # todo, caller? |
350 | $self->_error ("read error: $!", @caller, 1); |
349 | $self->_error ("read error: $!", @caller, 1); |
351 | } |
350 | } |
352 | }); |
351 | }; |
353 | |
352 | |
354 | $self->{tw_cb} = sub { |
353 | $self->{tw_cb} = sub { |
355 | if ($self->{timeout} && $self->{last_activity}) { |
354 | if ($self->{timeout} && $self->{last_activity}) { |
356 | if (AnyEvent->now > $self->{last_activity} + $self->{timeout}) { |
355 | if (AE::now > $self->{last_activity} + $self->{timeout}) { |
357 | # we did time out |
356 | # we did time out |
358 | my $req = $self->{queue}[0]; |
357 | my $req = $self->{queue}[0]; |
359 | $self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal |
358 | $self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal |
360 | } else { |
359 | } else { |
361 | # we need to re-set the timeout watcher |
360 | # we need to re-set the timeout watcher |
362 | $self->{tw} = AnyEvent->timer ( |
361 | $self->{tw} = AE::timer |
363 | after => $self->{last_activity} + $self->{timeout} - AnyEvent->now, |
362 | $self->{last_activity} + $self->{timeout} - AE::now, |
|
|
363 | 0, |
364 | cb => $self->{tw_cb}, |
364 | $self->{tw_cb}, |
365 | ); |
365 | ; |
366 | Scalar::Util::weaken $self; |
|
|
367 | } |
366 | } |
368 | } else { |
367 | } else { |
369 | # no timeout check wanted, or idle |
368 | # no timeout check wanted, or idle |
370 | undef $self->{tw}; |
369 | undef $self->{tw}; |
371 | } |
370 | } |
372 | }; |
371 | }; |
373 | |
372 | |
374 | $self->{ww_cb} = sub { |
373 | $self->{ww_cb} = sub { |
375 | return unless $self; |
374 | return unless $self; |
376 | |
375 | |
377 | $self->{last_activity} = AnyEvent->now; |
376 | $self->{last_activity} = AE::now; |
378 | |
377 | |
379 | my $len = syswrite $client, $self->{wbuf} |
378 | my $len = syswrite $client, $self->{wbuf} |
380 | or return delete $self->{ww}; |
379 | or return delete $self->{ww}; |
381 | |
380 | |
382 | substr $self->{wbuf}, 0, $len, ""; |
381 | substr $self->{wbuf}, 0, $len, ""; |
… | |
… | |
430 | sub _server_pid { |
429 | sub _server_pid { |
431 | shift->{child_pid} |
430 | shift->{child_pid} |
432 | } |
431 | } |
433 | |
432 | |
434 | sub kill_child { |
433 | sub kill_child { |
435 | my $self = shift; |
434 | my $self = shift; |
|
|
435 | |
436 | my $child_pid = delete $self->{child_pid}; |
436 | if (my $pid = delete $self->{child_pid}) { |
437 | if ($child_pid) { |
437 | kill TERM => $pid; |
438 | # send SIGKILL in two seconds |
438 | } |
439 | my $murder_timer = AnyEvent->timer ( |
|
|
440 | after => 2, |
|
|
441 | cb => sub { |
|
|
442 | kill 9, $child_pid; |
|
|
443 | }, |
|
|
444 | ); |
|
|
445 | |
|
|
446 | # reap process |
|
|
447 | my $kid_watcher; $kid_watcher = AnyEvent->child ( |
|
|
448 | pid => $child_pid, |
|
|
449 | cb => sub { |
|
|
450 | # just hold on to this so it won't go away |
|
|
451 | undef $kid_watcher; |
|
|
452 | # cancel SIGKILL |
|
|
453 | undef $murder_timer; |
|
|
454 | }, |
|
|
455 | ); |
|
|
456 | |
|
|
457 | close $self->{fh}; |
439 | close delete $self->{fh}; |
458 | } |
|
|
459 | } |
440 | } |
460 | |
441 | |
461 | sub DESTROY { |
442 | sub DESTROY { |
462 | shift->kill_child; |
443 | shift->kill_child; |
463 | } |
444 | } |
… | |
… | |
526 | |
507 | |
527 | push @{ $self->{queue} }, [$cb, $filename, $line]; |
508 | push @{ $self->{queue} }, [$cb, $filename, $line]; |
528 | |
509 | |
529 | # re-start timeout if necessary |
510 | # re-start timeout if necessary |
530 | if ($self->{timeout} && !$self->{tw}) { |
511 | if ($self->{timeout} && !$self->{tw}) { |
531 | $self->{last_activity} = AnyEvent->now; |
512 | $self->{last_activity} = AE::now; |
532 | $self->{tw_cb}->(); |
513 | $self->{tw_cb}->(); |
533 | } |
514 | } |
534 | |
515 | |
535 | $self->{wbuf} .= pack "L/a*", Storable::freeze \@_; |
516 | $self->{wbuf} .= pack "L/a*", Storable::freeze \@_; |
536 | |
517 | |
537 | unless ($self->{ww}) { |
518 | unless ($self->{ww}) { |
538 | my $len = syswrite $self->{fh}, $self->{wbuf}; |
519 | my $len = syswrite $self->{fh}, $self->{wbuf}; |
539 | substr $self->{wbuf}, 0, $len, ""; |
520 | substr $self->{wbuf}, 0, $len, ""; |
540 | |
521 | |
541 | # still any left? then install a write watcher |
522 | # still any left? then install a write watcher |
542 | $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb}) |
523 | $self->{ww} = AE::io $self->{fh}, 1, $self->{ww_cb} |
543 | if length $self->{wbuf}; |
524 | if length $self->{wbuf}; |
544 | } |
525 | } |
545 | } |
526 | } |
546 | |
527 | |
547 | =item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, $rv)) |
528 | =item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, $rv)) |