ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
(Generate patch)

Comparing AnyEvent-DBI/DBI.pm (file contents):
Revision 1.11 by root, Sun Jun 28 14:59:51 2009 UTC vs.
Revision 1.15 by root, Thu May 17 02:15:58 2012 UTC

11 my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", ""; 11 my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", "";
12 12
13 $dbh->exec ("select * from test where num=?", 10, sub { 13 $dbh->exec ("select * from test where num=?", 10, sub {
14 my ($dbh, $rows, $rv) = @_; 14 my ($dbh, $rows, $rv) = @_;
15 15
16 $rows or die "failure: $@"; 16 $#_ or die "failure: $@";
17 17
18 print "@$_\n" 18 print "@$_\n"
19 for @$rows; 19 for @$rows;
20 20
21 $cv->broadcast; 21 $cv->broadcast;
34separate "DBI-Server" processes and sending them requests. 34separate "DBI-Server" processes and sending them requests.
35 35
36It means that you can run DBI requests in parallel to other tasks. 36It means that you can run DBI requests in parallel to other tasks.
37 37
38The overhead for very simple statements ("select 0") is somewhere 38The overhead for very simple statements ("select 0") is somewhere
39around 120% to 200% (dual/single core CPU) compared to an explicit 39around 100% to 120% (dual/single core CPU) compared to an explicit
40prepare_cached/execute/fetchrow_arrayref/finish combination. 40prepare_cached/execute/fetchrow_arrayref/finish combination.
41 41
42=head2 ERROR HANDLING 42=head2 ERROR HANDLING
43 43
44This module defines a number of functions that accept a callback 44This module defines a number of functions that accept a callback
55 55
56=cut 56=cut
57 57
58package AnyEvent::DBI; 58package AnyEvent::DBI;
59 59
60use strict qw(vars subs); 60use common::sense;
61no warnings;
62 61
63use Carp; 62use Carp;
64use Socket (); 63use Socket ();
65use Scalar::Util (); 64use Scalar::Util ();
66use Storable (); 65use Storable ();
67 66
68use DBI (); 67use DBI (); # only needed in child actually - do it before fork & !exec?
69 68
70use AnyEvent (); 69use AnyEvent ();
71use AnyEvent::Util (); 70use AnyEvent::Util ();
72 71
73use Errno (); 72use Errno ();
74use Fcntl (); 73use Fcntl ();
75use POSIX (); 74use POSIX ();
76 75
77our $VERSION = '1.19'; 76our $VERSION = '2.2';
78 77
79our $FD_MAX = eval { POSIX::sysconf (&POSIX::_SC_OPEN_MAX) - 1 } || 1023; 78our $FD_MAX = eval { POSIX::sysconf (&POSIX::_SC_OPEN_MAX) - 1 } || 1023;
80 79
81# this is the forked server code, could/should be bundled as it's own file 80# this is the forked server code, could/should be bundled as it's own file
82 81
124 123
125sub req_func { 124sub req_func {
126 my (undef, $arg_string, $function) = @{+shift}; 125 my (undef, $arg_string, $function) = @{+shift};
127 my @args = eval $arg_string; 126 my @args = eval $arg_string;
128 127
129 die "Bad func () arg string: $@" 128 die "error evaling \$dbh->func() arg_string: $@"
130 if $@; 129 if $@;
131 130
132 my $rv = $DBH->func (@args, $function); 131 my $rc = $DBH->func (@args, $function);
133 return [$rv, $DBH->err]; 132 return [1, $rc, $DBI::err, $DBI::errstr];
134} 133}
135 134
136sub serve_fh($$) { 135sub serve_fh($$) {
137 my ($fh, $version) = @_; 136 my ($fh, $version) = @_;
138 137
234 233
235=item exec_server => 1 234=item exec_server => 1
236 235
237If you supply an C<exec_server> argument, then the DBI server process will 236If you supply an C<exec_server> argument, then the DBI server process will
238fork and exec another perl interpreter (using C<$^X>) with just the 237fork and exec another perl interpreter (using C<$^X>) with just the
239AnyEvent::DBI proxy running. This will provide the cleanest possible porxy 238AnyEvent::DBI proxy running. This will provide the cleanest possible proxy
240for your database server. 239for your database server.
241 240
242If you do not supply the C<exec_server> argument (or supply it with a 241If you do not supply the C<exec_server> argument (or supply it with a
243false value) then the traditional method of starting the server by forking 242false value) then the traditional method of starting the server by forking
244the current process is used. The forked interpreter will try to clean 243the current process is used. The forked interpreter will try to clean
286 285
287sub new { 286sub new {
288 my ($class, $dbi, $user, $pass, %arg) = @_; 287 my ($class, $dbi, $user, $pass, %arg) = @_;
289 288
290 my ($client, $server) = AnyEvent::Util::portable_socketpair 289 my ($client, $server) = AnyEvent::Util::portable_socketpair
291 or croak "unable to create Anyevent::DBI communications pipe: $!"; 290 or croak "unable to create AnyEvent::DBI communications pipe: $!";
292 291
293 my %dbi_args = %arg; 292 my %dbi_args = %arg;
294 delete @dbi_args{qw(on_connect on_error timeout exec_server)}; 293 delete @dbi_args{qw(on_connect on_error timeout exec_server)};
295 294
296 my $self = bless \%arg, $class; 295 my $self = bless \%arg, $class;
302 my @caller = (caller)[1,2]; # the "default" caller 301 my @caller = (caller)[1,2]; # the "default" caller
303 302
304 { 303 {
305 Scalar::Util::weaken (my $self = $self); 304 Scalar::Util::weaken (my $self = $self);
306 305
307 $self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub { 306 $self->{rw} = AE::io $client, 0, sub {
308 return unless $self; 307 return unless $self;
309
310 $self->{last_activity} = AnyEvent->now;
311 308
312 my $len = sysread $client, $rbuf, 65536, length $rbuf; 309 my $len = sysread $client, $rbuf, 65536, length $rbuf;
313 310
314 if ($len > 0) { 311 if ($len > 0) {
315 # we received data, so reset the timer 312 # we received data, so reset the timer
313 $self->{last_activity} = AE::now;
316 314
317 while () { 315 while () {
318 my $len = unpack "L", $rbuf; 316 my $len = unpack "L", $rbuf;
319 317
320 # full response available? 318 # full response available?
336 $self->_error ($res->[1], @$req, $res->[2]) # error, request record, is_fatal 334 $self->_error ($res->[1], @$req, $res->[2]) # error, request record, is_fatal
337 if $self; # cb() could have deleted it 335 if $self; # cb() could have deleted it
338 } 336 }
339 337
340 # no more queued requests, so become idle 338 # no more queued requests, so become idle
339 if ($self && !@{ $self->{queue} }) {
341 undef $self->{last_activity} 340 undef $self->{last_activity};
342 if $self && !@{ $self->{queue} }; 341 $self->{tw_cb}->();
342 }
343 } 343 }
344 344
345 } elsif (defined $len) { 345 } elsif (defined $len) {
346 # todo, caller? 346 # todo, caller?
347 $self->_error ("unexpected eof", @caller, 1); 347 $self->_error ("unexpected eof", @caller, 1);
348 } elsif ($! != Errno::EAGAIN) { 348 } elsif ($! != Errno::EAGAIN) {
349 # todo, caller? 349 # todo, caller?
350 $self->_error ("read error: $!", @caller, 1); 350 $self->_error ("read error: $!", @caller, 1);
351 } 351 }
352 }); 352 };
353 353
354 $self->{tw_cb} = sub { 354 $self->{tw_cb} = sub {
355 if ($self->{timeout} && $self->{last_activity}) { 355 if ($self->{timeout} && $self->{last_activity}) {
356 if (AnyEvent->now > $self->{last_activity} + $self->{timeout}) { 356 if (AE::now > $self->{last_activity} + $self->{timeout}) {
357 # we did time out 357 # we did time out
358 my $req = $self->{queue}[0]; 358 my $req = $self->{queue}[0];
359 $self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal 359 $self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal
360 } else { 360 } else {
361 # we need to re-set the timeout watcher 361 # we need to re-set the timeout watcher
362 $self->{tw} = AnyEvent->timer ( 362 $self->{tw} = AE::timer
363 after => $self->{last_activity} + $self->{timeout} - AnyEvent->now, 363 $self->{last_activity} + $self->{timeout} - AE::now,
364 0,
364 cb => $self->{tw_cb}, 365 $self->{tw_cb},
365 ); 366 ;
366 Scalar::Util::weaken $self;
367 } 367 }
368 } else { 368 } else {
369 # no timeout check wanted, or idle 369 # no timeout check wanted, or idle
370 undef $self->{tw}; 370 undef $self->{tw};
371 } 371 }
372 }; 372 };
373 373
374 $self->{ww_cb} = sub { 374 $self->{ww_cb} = sub {
375 return unless $self; 375 return unless $self;
376 376
377 $self->{last_activity} = AnyEvent->now; 377 $self->{last_activity} = AE::now;
378 378
379 my $len = syswrite $client, $self->{wbuf} 379 my $len = syswrite $client, $self->{wbuf}
380 or return delete $self->{ww}; 380 or return delete $self->{ww};
381 381
382 substr $self->{wbuf}, 0, $len, ""; 382 substr $self->{wbuf}, 0, $len, "";
430sub _server_pid { 430sub _server_pid {
431 shift->{child_pid} 431 shift->{child_pid}
432} 432}
433 433
434sub kill_child { 434sub kill_child {
435 my $self = shift; 435 my $self = shift;
436
436 my $child_pid = delete $self->{child_pid}; 437 if (my $pid = delete $self->{child_pid}) {
437 if ($child_pid) {
438 # send SIGKILL in two seconds
439 my $murder_timer = AnyEvent->timer (
440 after => 2,
441 cb => sub {
442 kill 9, $child_pid;
443 },
444 );
445
446 # reap process 438 # kill and reap process
447 my $kid_watcher; $kid_watcher = AnyEvent->child ( 439 my $kid_watcher; $kid_watcher = AE::child $pid, sub {
448 pid => $child_pid,
449 cb => sub {
450 # just hold on to this so it won't go away
451 undef $kid_watcher; 440 undef $kid_watcher;
452 # cancel SIGKILL
453 undef $murder_timer;
454 },
455 ); 441 };
442 kill TERM => $pid;
443 }
456 444
457 close $self->{fh}; 445 close delete $self->{fh};
458 }
459} 446}
460 447
461sub DESTROY { 448sub DESTROY {
462 shift->kill_child; 449 shift->kill_child;
463} 450}
526 513
527 push @{ $self->{queue} }, [$cb, $filename, $line]; 514 push @{ $self->{queue} }, [$cb, $filename, $line];
528 515
529 # re-start timeout if necessary 516 # re-start timeout if necessary
530 if ($self->{timeout} && !$self->{tw}) { 517 if ($self->{timeout} && !$self->{tw}) {
531 $self->{last_activity} = AnyEvent->now; 518 $self->{last_activity} = AE::now;
532 $self->{tw_cb}->(); 519 $self->{tw_cb}->();
533 } 520 }
534 521
535 $self->{wbuf} .= pack "L/a*", Storable::freeze \@_; 522 $self->{wbuf} .= pack "L/a*", Storable::freeze \@_;
536 523
537 unless ($self->{ww}) { 524 unless ($self->{ww}) {
538 my $len = syswrite $self->{fh}, $self->{wbuf}; 525 my $len = syswrite $self->{fh}, $self->{wbuf};
539 substr $self->{wbuf}, 0, $len, ""; 526 substr $self->{wbuf}, 0, $len, "";
540 527
541 # still any left? then install a write watcher 528 # still any left? then install a write watcher
542 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb}) 529 $self->{ww} = AE::io $self->{fh}, 1, $self->{ww_cb}
543 if length $self->{wbuf}; 530 if length $self->{wbuf};
544 } 531 }
545} 532}
546 533
547=item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, $rv)) 534=item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, $rv))
571if successful. 558if successful.
572 559
573If an error occurs and the C<on_error> callback returns, then only C<$dbh> 560If an error occurs and the C<on_error> callback returns, then only C<$dbh>
574will be passed and C<$@> contains the error message. 561will be passed and C<$@> contains the error message.
575 562
576=item $dbh->begin_work ($cb->($dbh[, $success])) 563=item $dbh->begin_work ($cb->($dbh[, $rc]))
577 564
578=item $dbh->commit ($cb->($dbh[, $success])) 565=item $dbh->commit ($cb->($dbh[, $rc]))
579 566
580=item $dbh->rollback ($cb->($dbh[, $success])) 567=item $dbh->rollback ($cb->($dbh[, $rc]))
581 568
582The begin_work, commit, and rollback methods expose the equivalent 569The begin_work, commit, and rollback methods expose the equivalent
583transaction control method of the DBI driver. On success, C<$success> 570transaction control method of the DBI driver. On success, C<$rc> is true.
584is true.
585 571
586If an error occurs and the C<on_error> callback returns, then only C<$dbh> 572If an error occurs and the C<on_error> callback returns, then only C<$dbh>
587will be passed and C<$@> contains the error message. 573will be passed and C<$@> contains the error message.
588 574
589=item $dbh->func ('string_which_yields_args_when_evaled', $func_name, $cb->($dbh, $result, $handle_error)) 575=item $dbh->func ('string_which_yields_args_when_evaled', $func_name, $cb->($dbh, $rc, $dbi_err, $dbi_errstr))
590 576
591This gives access to database driver private methods. Because they 577This gives access to database driver private methods. Because they
592are not standard you cannot always depend on the value of C<$result> 578are not standard you cannot always depend on the value of C<$rc> or
593or C<$handle_error>. Check the documentation for your specific 579C<$dbi_err>. Check the documentation for your specific driver/function
594driver/function combination to see what it returns. 580combination to see what it returns.
595 581
596Note that the first argument will be eval'ed to produce the argument list to 582Note that the first argument will be eval'ed to produce the argument list to
597the func() method. This must be done because the serialization protocol 583the func() method. This must be done because the serialization protocol
598between the AnyEvent::DBI server process and your program does not support the 584between the AnyEvent::DBI server process and your program does not support the
599passage of closures. 585passage of closures.
608 my ($string, $search) = @_; 594 my ($string, $search) = @_;
609 return index $string, $search; 595 return index $string, $search;
610 }, 596 },
611 }, 597 },
612 create_function => sub { 598 create_function => sub {
613 return $cv->send($@) 599 return $cv->send ($@)
614 unless $_[0]; 600 unless $#_;
615 $cv->send (undef, @_[1,2]); 601 $cv->send (undef, @_[1,2,3]);
616 } 602 }
617 ); 603 );
618 604
619 my ($err,$result,$handle_err) = $cv->recv; 605 my ($err,$rc,$errcode,$errstr) = $cv->recv;
620 606
607 die $err if defined $err;
621 die "EVAL failed: $err" 608 die "EVAL failed: $errstr"
622 if $err; 609 if $errcode;
623 610
624 # otherwise, we can ignore $result and $handle_err for this particular func 611 # otherwise, we can ignore $rc and $errcode for this particular func
625 612
626=cut 613=cut
627 614
628for my $cmd_name (qw(exec attr begin_work commit rollback func)) { 615for my $cmd_name (qw(exec attr begin_work commit rollback func)) {
629 eval 'sub ' . $cmd_name . '{ 616 eval 'sub ' . $cmd_name . '{

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines