ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
(Generate patch)

Comparing AnyEvent-DBI/DBI.pm (file contents):
Revision 1.9 by root, Thu Nov 6 13:56:58 2008 UTC vs.
Revision 1.10 by root, Tue Jun 2 16:16:03 2009 UTC

52use DBI (); 52use DBI ();
53 53
54use AnyEvent (); 54use AnyEvent ();
55use AnyEvent::Util (); 55use AnyEvent::Util ();
56 56
57use Errno qw(:POSIX);
58use Fcntl qw(F_SETFD);
59use POSIX qw(sysconf _SC_OPEN_MAX);
60
57our $VERSION = '1.1'; 61our $VERSION = '1.2';
62my $fd_max = 1023; # default
63eval { $fd_max = sysconf _SC_OPEN_MAX - 1; };
58 64
59# this is the forked server code 65# this is the forked server code
60 66
61our $DBH; 67our $DBH;
62 68
63sub req_open { 69sub req_open {
64 my (undef, $dbi, $user, $pass, %attr) = @{+shift}; 70 my (undef, $dbi, $user, $pass, %attr) = @{+shift};
65 71
66 $DBH = DBI->connect ($dbi, $user, $pass, \%attr); 72 $DBH = DBI->connect ($dbi, $user, $pass, \%attr) or die $DBI::errstr;
67 73
68 [1] 74 [1]
69} 75}
70 76
71sub req_exec { 77sub req_exec {
72 my (undef, $st, @args) = @{+shift}; 78 my (undef, $st, @args) = @{+shift};
73
74 my $sth = $DBH->prepare_cached ($st, undef, 1); 79 my $sth = $DBH->prepare_cached ($st, undef, 1)
80 or die [$DBI::errstr];
75 81
76 my $rv = $sth->execute (@args) 82 my $rv = $sth->execute (@args)
77 or die $sth->errstr; 83 or die [$sth->errstr];
78 84
79 [1, $sth->{NUM_OF_FIELDS} ? $sth->fetchall_arrayref : undef, { rv => $rv }] 85 [1, $sth->{NUM_OF_FIELDS} ? $sth->fetchall_arrayref : undef, { rv => $rv }]
80} 86}
81 87
88sub req_attr {
89 my (undef, $attr_name, $attr_val) = @{+shift};
90
91 if (defined $attr_val) {
92 $DBH->{$attr_name} = $attr_val;
93 }
94
95 [1, $DBH->{$attr_name}]
96}
97
98sub req_begin_work {
99 [scalar $DBH->begin_work or die $DBI::errstr]
100}
101
102sub req_commit {
103 [scalar $DBH->commit or die $DBI::errstr]
104}
105
106sub req_rollback {
107 [scalar $DBH->rollback or die $DBI::errstr]
108}
109
110sub req_func {
111 my (undef, $arg_string, $function) = @{+shift};
112 my @args = eval $arg_string;
113
114 if ($@) {
115 die "Bad func() arg string: $@";
116 }
117
118 my $rv = $DBH->func (@args, $function);
119 return [$rv . $DBH->err];
120}
121
82sub serve { 122sub serve {
83 my ($fh) = @_; 123 my ($fileno) = @_;
124
125 open my $fh, ">>&=$fileno"
126 or die "Couldn't open service socket: $!";
84 127
85 no strict; 128 no strict;
86 129
87 eval { 130 eval {
88 my $rbuf; 131 my $rbuf;
99 142
100 my $req = Storable::thaw substr $rbuf, 4; 143 my $req = Storable::thaw substr $rbuf, 4;
101 substr $rbuf, 0, $len + 4, ""; # remove length + request 144 substr $rbuf, 0, $len + 4, ""; # remove length + request
102 145
103 my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) }; 146 my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) };
104
105 $wbuf = pack "L/a*", Storable::freeze [undef, "$@"] 147 $wbuf = pack "L/a*", Storable::freeze [undef, ref $@ ? "$@->[0]" : $@ , ref $@ ? $@->[1] : 1]
106 if $@; 148 if $@;
107 149
108 for (my $ofs = 0; $ofs < length $wbuf; ) { 150 for (my $ofs = 0; $ofs < length $wbuf; ) {
109 $ofs += (syswrite $fh, substr $wbuf, $ofs 151 $ofs += (syswrite $fh, substr $wbuf, $ofs
110 or die "unable to write results"); 152 or die "unable to write results");
118 # and the above doesn't even work on windows, it seems the only 160 # and the above doesn't even work on windows, it seems the only
119 # way to is to leak memory and kill 9 from the parent. yay. 161 # way to is to leak memory and kill 9 from the parent. yay.
120 } 162 }
121 163
122 require POSIX; 164 require POSIX;
123 POSIX::_exit (0); 165 POSIX::_exit 0;
124 # and the above kills the parent process on windows 166 # and the above kills the parent process on windows
167}
168
169sub start_server {
170 serve shift @ARGV;
125} 171}
126 172
127=head2 METHODS 173=head2 METHODS
128 174
129=over 4 175=over 4
151 197
152When an error occurs, then this callback will be invoked. On entry, C<$@> 198When an error occurs, then this callback will be invoked. On entry, C<$@>
153is set to the error message. C<$filename> and C<$line> is where the 199is set to the error message. C<$filename> and C<$line> is where the
154original request was submitted. 200original request was submitted.
155 201
156If this callback returns and this was a fatal error (C<$fatal> is true) 202If the fatal argument is true then the database connection shuts down and your
157then AnyEvent::DBI die's, otherwise it calls the original request callback 203database handle becomes invalid. All of your queued request callbacks are
158without any arguments. 204called without any arguments.
159 205
160If omitted, then C<die> will be called on any errors, fatal or not. 206If omitted, then C<die> will be called on any errors, fatal or not.
161 207
208The C<$dbh> argument is always a weak reference to the AnyEvent::DBI object.
209
210=item on_connect => $callback->($dbh)
211
212If you supply an on_connect callback, then this callback will be invoked after
213the database connection is attempted. If the connection succeeds, C<$dbh>
214contains a weak reference to the AnyEvent::DBI object. If the connection fails
215for any reason, no arguments are passed to the callback and C<$@> contains
216$DBI::errstr.
217
218Regardless of whether on_connect is supplied, connect errors will result in
219on_error being called. However, if no on_connect callback is supplied, then
220connection errors are considered fatal. The client will die() and the on_error
221callback will be called with C<$fatal> true. When on_connect is supplied,
222connect error are not fatal and AnyEvent::DBI will not die(). You still
223cannot, however, use the $dbh object you recived from new() to make requests.
224
225=item exec_server => 1
226
227If you supply an exec_server argument, then the DBI server process will call
228something like:
229
230 exec "$^X -MAnyEvent::DBI -e AnyEvent::DBI::start_server"
231
232after forking. This will provide the cleanest possible interpreter for your
233database server. There are special provisions to include C<-Mblib> if the
234current interpreter is running with blib.
235
236If you do not supply the exec_server argument (or supply it with a false value)
237then the traditional method of starting the server within the same forked
238interpreter context is used. The forked interpreter will try to clean itself
239up by calling POSIX::close on all filedescriptors except STDIN, STDOUT, and
240STDERR (and the socket it uses to communicate with the cilent, of course).
241
242=item timeout => seconds
243
244If you supply a timeout parameter (floating point number of seconds), then a
245timer is started any time the DBI handle expects a response from the server.
246This includes connection setup as well as requests made to the backend. The
247timeout spans the duration from the moment the first data is written (or queued
248to be written) until all expected responses are returned, but is postponed for
249"timeout" seconds each time more data is returned from the server. If the
250timer ever goes off then a fatal error is generated. If you have an on_error
251handler installed, then it will be called, otherwise your program will die().
252
253When altering your databases with timeouts it is wise to use transactions. If
254you quit due to timeout while performing insert, update or schema-altering
255commands you can end up not knowing if the action was submitted to the
256database, complicating recovery.
257
258Timeout errors are always fatal.
259
162=back 260=back
261
262Any additional key-value pairs will be rolled into a hash reference and passed
263as the final argument to the DBI->connect(...) call. For example, to supress
264errors on STDERR and send them instead to an AnyEvent::Handle you could do:
265
266 $dbh = new AnyEvent::DBI
267 "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "",
268 PrintError => 0,
269 on_error => sub { $log_handle->push_write("DBI Error: $@ at $_[1]:$_[2]\n"); }
163 270
164=cut 271=cut
165 272
166# stupid Storable autoloading, total loss-loss situation 273# stupid Storable autoloading, total loss-loss situation
167Storable::thaw Storable::freeze []; 274Storable::thaw Storable::freeze [];
170 my ($class, $dbi, $user, $pass, %arg) = @_; 277 my ($class, $dbi, $user, $pass, %arg) = @_;
171 278
172 socketpair my $client, my $server, &Socket::AF_UNIX, &Socket::SOCK_STREAM, &Socket::PF_UNSPEC 279 socketpair my $client, my $server, &Socket::AF_UNIX, &Socket::SOCK_STREAM, &Socket::PF_UNSPEC
173 or croak "unable to create dbi communicaiton pipe: $!"; 280 or croak "unable to create dbi communicaiton pipe: $!";
174 281
282 my %dbi_args = ( %arg ) ;
283 delete @dbi_args{qw( on_connect on_error timeout exec_server )};
284
175 my $self = bless \%arg, $class; 285 my $self = bless \%arg, $class;
176
177 $self->{fh} = $client; 286 $self->{fh} = $client;
178 287
179 Scalar::Util::weaken (my $wself = $self); 288 Scalar::Util::weaken (my $wself = $self);
180 289
181 AnyEvent::Util::fh_nonblocking $client, 1; 290 AnyEvent::Util::fh_nonblocking $client, 1;
182 291
183 my $rbuf; 292 my $rbuf;
184 my @caller = (caller)[1,2]; # the "default" caller 293 my @caller = (caller)[1,2]; # the "default" caller
185 294
186 $self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub { 295 $self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub {
296 return unless $wself;
187 my $len = sysread $client, $rbuf, 65536, length $rbuf; 297 my $len = sysread $client, $rbuf, 65536, length $rbuf;
298 my $err = $!;
188 299
189 if ($len > 0) { 300 if ($len > 0) {
301 # we received data, so reset the timer
302 delete $wself->{timer};
303 if ($wself->{timeout}) {
304 $wself->{timer} = AnyEvent->timer (
305 after => $wself->{timeout},
306 cb => sub { $wself && $wself->_timedout },
307 );
308 }
190 309
191 while () { 310 while () {
192 my $len = unpack "L", $rbuf; 311 my $len = unpack "L", $rbuf;
193 312
194 # full request available? 313 # full response available?
195 last unless $len && $len + 4 <= length $rbuf; 314 last unless $len && $len + 4 <= length $rbuf;
196 315
197 my $res = Storable::thaw substr $rbuf, 4; 316 my $res = Storable::thaw substr $rbuf, 4;
198 substr $rbuf, 0, $len + 4, ""; # remove length + request 317 substr $rbuf, 0, $len + 4, ""; # remove length + request
199 318
319 last unless $wself;
200 my $req = shift @{ $wself->{queue} }; 320 my $req = shift @{ $wself->{queue} };
201 321
202 if (defined $res->[0]) { 322 if (defined $res->[0]) {
323 $res->[0] = $wself;
203 $req->[0](@$res); 324 $req->[0](@$res);
204 } else { 325 } else {
205 my $cb = shift @$req; 326 my $cb = shift @$req;
206 $wself->_error ($res->[1], @$req); 327 $@=$res->[1];
207 $cb->(); 328 $cb->();
329 if ($wself) { # cb() could have deleted it
330 $wself->_error ($res->[1], @$req, $res->[2]); # error, request record, is_fatal
331 }
332 }
333
334 # no more queued requests, so cancel timeout
335 if ($wself) {
336 delete $wself->{timer}
337 unless @{ $wself->{queue} };
208 } 338 }
209 } 339 }
210 340
211 } elsif (defined $len) { 341 } elsif (defined $len) {
212 $wself->_error ("unexpected eof", @caller, 1); 342 $wself->_error ("unexpected eof", @caller, 1);
213 } else { 343 } else {
344 return if $err == EAGAIN;
214 $wself->_error ("read error: $!", @caller, 1); 345 $wself->_error ("read error: $err", @caller, 1);
215 } 346 }
216 }); 347 });
217 348
218 $self->{ww_cb} = sub { 349 $self->{ww_cb} = sub {
350 return unless $wself;
219 my $len = syswrite $client, $wself->{wbuf} 351 my $len = syswrite $client, $wself->{wbuf}
220 or return delete $wself->{ww}; 352 or return delete $wself->{ww};
221 353
222 substr $wself->{wbuf}, 0, $len, ""; 354 substr $wself->{wbuf}, 0, $len, "";
223 }; 355 };
225 my $pid = fork; 357 my $pid = fork;
226 358
227 if ($pid) { 359 if ($pid) {
228 # parent 360 # parent
229 close $server; 361 close $server;
230
231 } elsif (defined $pid) { 362 } elsif (defined $pid) {
232 # child 363 # child
233 close $client; 364 my $serv_fno = fileno $server;
234 @_ = $server;
235 goto &serve;
236 365
366 if ($self->{exec_server}) {
367 fcntl $server, F_SETFD, 0; # don't close the server side
368 exec "$^X -MAnyEvent::DBI -e AnyEvent::DBI::start_server $serv_fno";
369 POSIX::_exit 124;
370 } else {
371 ($_ != $serv_fno) && POSIX::close $_
372 for $^F+1..$fd_max;
373 serve $serv_fno;
374 POSIX::_exit 0; # not reachable
375 }
237 } else { 376 } else {
238 croak "fork: $!"; 377 croak "fork: $!";
239 } 378 }
240 379
241 $self->_req (sub { }, (caller)[1,2], 1, req_open => $dbi, $user, $pass); 380 $self->{child_pid} = $pid;
381 # set a connect timeout
382 if ($self->{timeout}) {
383 $self->{timer} = AnyEvent->timer (
384 after => $self->{timeout},
385 cb => sub { $wself && $wself->_timedout },
386 );
387 }
388 $self->_req (
389 ($self->{on_connect} ? $self->{on_connect} : sub { }),
390 (caller)[1,2],
391 req_open => $dbi, $user, $pass, %dbi_args
392 );
242 393
243 $self 394 $self
395}
396
397sub _server_pid {
398 shift->{child_pid}
399}
400
401sub kill_child {
402 my $self = shift;
403 my $child_pid = delete $self->{child_pid};
404 if ($child_pid) {
405 # send SIGKILL in two seconds
406 my $murder_timer = AnyEvent->timer (
407 after => 2,
408 cb => sub {
409 kill 9, $child_pid;
410 },
411 );
412
413 # reap process
414 my $kid_watcher;
415 $kid_watcher = AnyEvent->child (
416 pid => $child_pid ,
417 cb => sub {
418 # just hold on to this so it won't go away
419 undef $kid_watcher;
420 # cancel SIGKILL
421 undef $murder_timer;
422 },
423 );
424
425 # SIGTERM = the beginning of the end
426 kill TERM => $child_pid;
427 }
428}
429
430sub DESTROY {
431 shift->kill_child;
244} 432}
245 433
246sub _error { 434sub _error {
247 my ($self, $error, $filename, $line, $fatal) = @_; 435 my ($self, $error, $filename, $line, $fatal) = @_;
248 436
437 if ($fatal) {
249 delete $self->{rw}; 438 delete $self->{rw};
250 delete $self->{ww}; 439 delete $self->{ww};
251 delete $self->{fh}; 440 delete $self->{fh};
441 delete $self->{timer};
442
443 # for fatal errors call all enqueued callbacks with error
444 while (my $req = shift @{$self->{queue}}) {
445 $@ = $error;
446 $req->[0]->();
447 }
448 $self->kill_child;
449 }
252 450
253 $@ = $error; 451 $@ = $error;
254 452
255 if ($self->{on_error}) { 453 if ($self->{on_error}) {
256 $self->{on_error}($self, $filename, $line, $fatal); 454 $self->{on_error}($self, $filename, $line, $fatal)
257 return unless $fatal; 455 } else {
258 }
259
260 die "$error at $filename, line $line\n"; 456 die "$error at $filename, line $line\n";
457 }
458}
459
460=item $dbh->on_error ($cb->($dbh, $filename, $line, $fatal))
461
462Sets (or clears, with C<undef>) the on_error handler.
463
464=cut
465
466sub on_error {
467 $_[0]{on_error} = $_[1];
468}
469
470=item $dbh->on_connect ($cb->($dbh))
471
472Sets (or clears, with C<undef>) the on_connect handler.
473
474=cut
475
476sub on_connect {
477 $_[0]{on_connect} = $_[1];
478}
479
480=item $dbh->timeout ($seconds)
481
482Sets (or clears, with C<undef>) the database timeout. Useful to extend the
483timeout when you are about to make a really long query.
484
485=cut
486
487sub timeout {
488 my ($self, $timeout) = @_;
489
490 if ($timeout) {
491 $self->{timeout} = $timeout;
492 # reschedule timer if one was running
493 if ($self->{timer}) {
494 Scalar::Util::weaken (my $wself = $self);
495 $self->{timer} = AnyEvent->timer (
496 after => $self->{timeout},
497 cb => sub { $wself && $wself->_timedout },
498 );
499 }
500 } else {
501 delete @{%$self}[qw(timer timeout)];
502 }
503}
504
505sub _timedout {
506 my ($self) = @_;
507
508 my $req = shift @{ $self->{queue} };
509
510 if ($req) {
511 my $cb = shift @$req;
512 $@ = 'TIMEOUT';
513 $cb->();
514 $self->_error ('TIMEOUT', @$req, 1); # timeouts are always fatal
515 } else {
516 # shouldn't be possible to timeout without a pending request
517 $self->_error ('TIMEOUT', 'NO_PENDING_WTF', 0, 1);
518 }
261} 519}
262 520
263sub _req { 521sub _req {
264 my ($self, $cb, $filename, $line, $fatal) = splice @_, 0, 5, (); 522 my ($self, $cb, $filename, $line) = splice @_, 0, 4, ();
265 523
524 if (!$self->{fh}) {
525 my $err = $@ = 'NO DATABASE CONNECTION';
526 $cb->();
527 $self->_error ($err, $filename, $line, 1);
528 return;
529 }
530
266 push @{ $self->{queue} }, [$cb, $filename, $line, $fatal]; 531 push @{ $self->{queue} }, [$cb, $filename, $line ];
532
533 if ($self->{timeout} && !$self->{timer}) {
534 Scalar::Util::weaken (my $wself = $self);
535 $self->{timer} = AnyEvent->timer (
536 after => $self->{timeout},
537 cb => sub { $wself && $wself->_timedout },
538 );
539 }
267 540
268 $self->{wbuf} .= pack "L/a*", Storable::freeze \@_; 541 $self->{wbuf} .= pack "L/a*", Storable::freeze \@_;
269 542
270 unless ($self->{ww}) { 543 unless ($self->{ww}) {
271 my $len = syswrite $self->{fh}, $self->{wbuf}; 544 my $len = syswrite $self->{fh}, $self->{wbuf};
275 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb}) 548 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb})
276 if length $self->{wbuf}; 549 if length $self->{wbuf};
277 } 550 }
278} 551}
279 552
280=item $dbh->exec ("statement", @args, $cb->($rows, $rv, ...)) 553=item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, \%metadata))
281 554
282Executes the given SQL statement with placeholders replaced by 555Executes the given SQL statement with placeholders replaced by
283C<@args>. The statement will be prepared and cached on the server side, so 556C<@args>. The statement will be prepared and cached on the server side, so
284using placeholders is compulsory. 557using placeholders is compulsory.
285 558
286The callback will be called with the result of C<fetchall_arrayref> as 559The callback will be called with a weakened AnyEvent::DBI object as the first
287first argument (or C<undef> if the statement wasn't a select statement) 560argument and the result of C<fetchall_arrayref> as (or C<undef> if the
288and the return value of C<execute> as second argument. Additional 561statement wasn't a select statement) as the second argument. Third argument is
289arguments might get passed as well. 562a hash reference holding metadata about the request. Currently, the only key
563defined is C<$metadata->{rv}> holding the return value of
564C<execute>. Additional metadata might be added.
290 565
291If an error occurs and the C<on_error> callback returns, then no arguments 566If an error occurs and the C<on_error> callback returns, then no arguments
292will be passed and C<$@> contains the error message. 567will be passed and C<$@> contains the error message.
293 568
569=item $dbh->attr (attr_name, [ $attr_value ], $cb->($dbh, $new_value))
570
571An accessor for the handle attributes, such as AutoCommit, RaiseError,
572PrintError, etc. If you provide an $attr_value, then the given attribute will
573be set to that value.
574
575The callback will be passed the database handle and the
576attribute's value if successful. If accessing the attribute fails, then no
577arguments are passed to your callback, and $@ contains a description of the
578problem instead.
579
580=item $dbh->begin_work ($cb->($dbh))
581
582=item $dbh->commit ($cb->($dbh))
583
584=item $dbh->rollback ($cb->($dbh))
585
586The begin_work, commit, and rollback methods exopose the equivelant transaction
587control methods of the DBI. If something goes wrong, you will get no $dbh in
588your callaback, and will instead have an error to examine in $@.
589
590=item $dbh->func ('string_which_yields_args_when_evaled', $func_name, $cb->($dbh, $result, $handle_error))
591
592This gives access to database driver private methods. Because they are not
593standard you cannot always depend on the value of $result or $handle_error.
594Check the documentation for your specific driver/function combination to see
595what it returns.
596
597Note that the first argument will be eval'ed to produce the argument list to
598the func() method. This must be done because the searialization protocol
599between the AnyEvent::DBI server process and your program does not support the
600passage of closures.
601
602Here's an example to extend the query language in SQLite so it supports an
603intstr() function:
604
605 $cv = AnyEvent->condvar;
606 $dbh->func(
607 q{
608 'instr',
609 2,
610 sub {
611 my ($string, $search) = @_;
612 return index $string, $search;
613 },
614 },
615 'create_function',
616 sub {return $cv->send($@) unless $_[0];$cv->send(undef,@_[1,2]);}
617 );
618 my ($err,$result,$handle_err) = $cv->recv();
619 die "EVAL failed: $err" if $err;
620 # otherwise, we can ignore $result and $handle_err for this particular func
621
294=cut 622=cut
295 623
296sub exec { 624for my $cmd_name (qw(exec attr begin_work commit rollback func)) {
625 eval 'sub ' . $cmd_name . '{
297 my $cb = pop; 626 my $cb = pop;
298 splice @_, 1, 0, $cb, (caller)[1,2], 0, "req_exec"; 627 splice @_, 1, 0, $cb, (caller)[1,2], "req_' . $cmd_name . '";
299
300 goto &_req; 628 goto &_req;
629 }';
301} 630}
302 631
303=back 632=back
304 633
305=head1 SEE ALSO 634=head1 SEE ALSO
309=head1 AUTHOR 638=head1 AUTHOR
310 639
311 Marc Lehmann <schmorp@schmorp.de> 640 Marc Lehmann <schmorp@schmorp.de>
312 http://home.schmorp.de/ 641 http://home.schmorp.de/
313 642
643 Adam Rosenstein <adam@redcondor.com>
644 http://www.redcondor.com/
645
314=cut 646=cut
315 647
3161 6481;
317 649

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines