ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
Revision: 1.10
Committed: Tue Jun 2 16:16:03 2009 UTC (15 years ago) by root
Branch: MAIN
Changes since 1.9: +372 -40 lines
Log Message:
big patch by adam

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::DBI - asynchronous DBI access
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::DBI;
8
9 my $cv = AnyEvent->condvar;
10
11 my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", "";
12
13 $dbh->exec ("select * from test where num=?", 10, sub {
14 my ($rows, $rv) = @_;
15
16 print "@$_\n"
17 for @$rows;
18
19 $cv->broadcast;
20 });
21
22 # asynchronously do sth. else here
23
24 $cv->wait;
25
26 =head1 DESCRIPTION
27
28 This module is an L<AnyEvent> user, you need to make sure that you use and
29 run a supported event loop.
30
31 This module implements asynchronous DBI access by forking or executing
32 separate "DBI-Server" processes and sending them requests.
33
34 It means that you can run DBI requests in parallel to other tasks.
35
36 The overhead for very simple statements ("select 0") is somewhere
37 around 120% to 200% (dual/single core CPU) compared to an explicit
38 prepare_cached/execute/fetchrow_arrayref/finish combination.
39
40 =cut
41
42 package AnyEvent::DBI;
43
44 use strict;
45 no warnings;
46
47 use Carp;
48 use Socket ();
49 use Scalar::Util ();
50 use Storable ();
51
52 use DBI ();
53
54 use AnyEvent ();
55 use AnyEvent::Util ();
56
57 use Errno qw(:POSIX);
58 use Fcntl qw(F_SETFD);
59 use POSIX qw(sysconf _SC_OPEN_MAX);
60
61 our $VERSION = '1.2';
62 my $fd_max = 1023; # default
63 eval { $fd_max = sysconf _SC_OPEN_MAX - 1; };
64
65 # this is the forked server code
66
67 our $DBH;
68
69 sub req_open {
70 my (undef, $dbi, $user, $pass, %attr) = @{+shift};
71
72 $DBH = DBI->connect ($dbi, $user, $pass, \%attr) or die $DBI::errstr;
73
74 [1]
75 }
76
77 sub req_exec {
78 my (undef, $st, @args) = @{+shift};
79 my $sth = $DBH->prepare_cached ($st, undef, 1)
80 or die [$DBI::errstr];
81
82 my $rv = $sth->execute (@args)
83 or die [$sth->errstr];
84
85 [1, $sth->{NUM_OF_FIELDS} ? $sth->fetchall_arrayref : undef, { rv => $rv }]
86 }
87
88 sub req_attr {
89 my (undef, $attr_name, $attr_val) = @{+shift};
90
91 if (defined $attr_val) {
92 $DBH->{$attr_name} = $attr_val;
93 }
94
95 [1, $DBH->{$attr_name}]
96 }
97
98 sub req_begin_work {
99 [scalar $DBH->begin_work or die $DBI::errstr]
100 }
101
102 sub req_commit {
103 [scalar $DBH->commit or die $DBI::errstr]
104 }
105
106 sub req_rollback {
107 [scalar $DBH->rollback or die $DBI::errstr]
108 }
109
110 sub req_func {
111 my (undef, $arg_string, $function) = @{+shift};
112 my @args = eval $arg_string;
113
114 if ($@) {
115 die "Bad func() arg string: $@";
116 }
117
118 my $rv = $DBH->func (@args, $function);
119 return [$rv . $DBH->err];
120 }
121
122 sub serve {
123 my ($fileno) = @_;
124
125 open my $fh, ">>&=$fileno"
126 or die "Couldn't open service socket: $!";
127
128 no strict;
129
130 eval {
131 my $rbuf;
132
133 while () {
134 sysread $fh, $rbuf, 16384, length $rbuf
135 or last;
136
137 while () {
138 my $len = unpack "L", $rbuf;
139
140 # full request available?
141 last unless $len && $len + 4 <= length $rbuf;
142
143 my $req = Storable::thaw substr $rbuf, 4;
144 substr $rbuf, 0, $len + 4, ""; # remove length + request
145
146 my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) };
147 $wbuf = pack "L/a*", Storable::freeze [undef, ref $@ ? "$@->[0]" : $@ , ref $@ ? $@->[1] : 1]
148 if $@;
149
150 for (my $ofs = 0; $ofs < length $wbuf; ) {
151 $ofs += (syswrite $fh, substr $wbuf, $ofs
152 or die "unable to write results");
153 }
154 }
155 }
156 };
157
158 if (AnyEvent::WIN32) {
159 kill 9, $$; # no other way on the broken windows platform
160 # and the above doesn't even work on windows, it seems the only
161 # way to is to leak memory and kill 9 from the parent. yay.
162 }
163
164 require POSIX;
165 POSIX::_exit 0;
166 # and the above kills the parent process on windows
167 }
168
169 sub start_server {
170 serve shift @ARGV;
171 }
172
173 =head2 METHODS
174
175 =over 4
176
177 =item $dbh = new AnyEvent::DBI $database, $user, $pass, [key => value]...
178
179 Returns a database handle for the given database. Each database handle
180 has an associated server process that executes statements in order. If
181 you want to run more than one statement in parallel, you need to create
182 additional database handles.
183
184 The advantage of this approach is that transactions work as state is
185 preserved.
186
187 Example:
188
189 $dbh = new AnyEvent::DBI
190 "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "";
191
192 Additional key-value pairs can be used to adjust behaviour:
193
194 =over 4
195
196 =item on_error => $callback->($dbh, $filename, $line, $fatal)
197
198 When an error occurs, then this callback will be invoked. On entry, C<$@>
199 is set to the error message. C<$filename> and C<$line> is where the
200 original request was submitted.
201
202 If the fatal argument is true then the database connection shuts down and your
203 database handle becomes invalid. All of your queued request callbacks are
204 called without any arguments.
205
206 If omitted, then C<die> will be called on any errors, fatal or not.
207
208 The C<$dbh> argument is always a weak reference to the AnyEvent::DBI object.
209
210 =item on_connect => $callback->($dbh)
211
212 If you supply an on_connect callback, then this callback will be invoked after
213 the database connection is attempted. If the connection succeeds, C<$dbh>
214 contains a weak reference to the AnyEvent::DBI object. If the connection fails
215 for any reason, no arguments are passed to the callback and C<$@> contains
216 $DBI::errstr.
217
218 Regardless of whether on_connect is supplied, connect errors will result in
219 on_error being called. However, if no on_connect callback is supplied, then
220 connection errors are considered fatal. The client will die() and the on_error
221 callback will be called with C<$fatal> true. When on_connect is supplied,
222 connect error are not fatal and AnyEvent::DBI will not die(). You still
223 cannot, however, use the $dbh object you recived from new() to make requests.
224
225 =item exec_server => 1
226
227 If you supply an exec_server argument, then the DBI server process will call
228 something like:
229
230 exec "$^X -MAnyEvent::DBI -e AnyEvent::DBI::start_server"
231
232 after forking. This will provide the cleanest possible interpreter for your
233 database server. There are special provisions to include C<-Mblib> if the
234 current interpreter is running with blib.
235
236 If you do not supply the exec_server argument (or supply it with a false value)
237 then the traditional method of starting the server within the same forked
238 interpreter context is used. The forked interpreter will try to clean itself
239 up by calling POSIX::close on all filedescriptors except STDIN, STDOUT, and
240 STDERR (and the socket it uses to communicate with the cilent, of course).
241
242 =item timeout => seconds
243
244 If you supply a timeout parameter (floating point number of seconds), then a
245 timer is started any time the DBI handle expects a response from the server.
246 This includes connection setup as well as requests made to the backend. The
247 timeout spans the duration from the moment the first data is written (or queued
248 to be written) until all expected responses are returned, but is postponed for
249 "timeout" seconds each time more data is returned from the server. If the
250 timer ever goes off then a fatal error is generated. If you have an on_error
251 handler installed, then it will be called, otherwise your program will die().
252
253 When altering your databases with timeouts it is wise to use transactions. If
254 you quit due to timeout while performing insert, update or schema-altering
255 commands you can end up not knowing if the action was submitted to the
256 database, complicating recovery.
257
258 Timeout errors are always fatal.
259
260 =back
261
262 Any additional key-value pairs will be rolled into a hash reference and passed
263 as the final argument to the DBI->connect(...) call. For example, to supress
264 errors on STDERR and send them instead to an AnyEvent::Handle you could do:
265
266 $dbh = new AnyEvent::DBI
267 "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "",
268 PrintError => 0,
269 on_error => sub { $log_handle->push_write("DBI Error: $@ at $_[1]:$_[2]\n"); }
270
271 =cut
272
273 # stupid Storable autoloading, total loss-loss situation
274 Storable::thaw Storable::freeze [];
275
276 sub new {
277 my ($class, $dbi, $user, $pass, %arg) = @_;
278
279 socketpair my $client, my $server, &Socket::AF_UNIX, &Socket::SOCK_STREAM, &Socket::PF_UNSPEC
280 or croak "unable to create dbi communicaiton pipe: $!";
281
282 my %dbi_args = ( %arg ) ;
283 delete @dbi_args{qw( on_connect on_error timeout exec_server )};
284
285 my $self = bless \%arg, $class;
286 $self->{fh} = $client;
287
288 Scalar::Util::weaken (my $wself = $self);
289
290 AnyEvent::Util::fh_nonblocking $client, 1;
291
292 my $rbuf;
293 my @caller = (caller)[1,2]; # the "default" caller
294
295 $self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub {
296 return unless $wself;
297 my $len = sysread $client, $rbuf, 65536, length $rbuf;
298 my $err = $!;
299
300 if ($len > 0) {
301 # we received data, so reset the timer
302 delete $wself->{timer};
303 if ($wself->{timeout}) {
304 $wself->{timer} = AnyEvent->timer (
305 after => $wself->{timeout},
306 cb => sub { $wself && $wself->_timedout },
307 );
308 }
309
310 while () {
311 my $len = unpack "L", $rbuf;
312
313 # full response available?
314 last unless $len && $len + 4 <= length $rbuf;
315
316 my $res = Storable::thaw substr $rbuf, 4;
317 substr $rbuf, 0, $len + 4, ""; # remove length + request
318
319 last unless $wself;
320 my $req = shift @{ $wself->{queue} };
321
322 if (defined $res->[0]) {
323 $res->[0] = $wself;
324 $req->[0](@$res);
325 } else {
326 my $cb = shift @$req;
327 $@=$res->[1];
328 $cb->();
329 if ($wself) { # cb() could have deleted it
330 $wself->_error ($res->[1], @$req, $res->[2]); # error, request record, is_fatal
331 }
332 }
333
334 # no more queued requests, so cancel timeout
335 if ($wself) {
336 delete $wself->{timer}
337 unless @{ $wself->{queue} };
338 }
339 }
340
341 } elsif (defined $len) {
342 $wself->_error ("unexpected eof", @caller, 1);
343 } else {
344 return if $err == EAGAIN;
345 $wself->_error ("read error: $err", @caller, 1);
346 }
347 });
348
349 $self->{ww_cb} = sub {
350 return unless $wself;
351 my $len = syswrite $client, $wself->{wbuf}
352 or return delete $wself->{ww};
353
354 substr $wself->{wbuf}, 0, $len, "";
355 };
356
357 my $pid = fork;
358
359 if ($pid) {
360 # parent
361 close $server;
362 } elsif (defined $pid) {
363 # child
364 my $serv_fno = fileno $server;
365
366 if ($self->{exec_server}) {
367 fcntl $server, F_SETFD, 0; # don't close the server side
368 exec "$^X -MAnyEvent::DBI -e AnyEvent::DBI::start_server $serv_fno";
369 POSIX::_exit 124;
370 } else {
371 ($_ != $serv_fno) && POSIX::close $_
372 for $^F+1..$fd_max;
373 serve $serv_fno;
374 POSIX::_exit 0; # not reachable
375 }
376 } else {
377 croak "fork: $!";
378 }
379
380 $self->{child_pid} = $pid;
381 # set a connect timeout
382 if ($self->{timeout}) {
383 $self->{timer} = AnyEvent->timer (
384 after => $self->{timeout},
385 cb => sub { $wself && $wself->_timedout },
386 );
387 }
388 $self->_req (
389 ($self->{on_connect} ? $self->{on_connect} : sub { }),
390 (caller)[1,2],
391 req_open => $dbi, $user, $pass, %dbi_args
392 );
393
394 $self
395 }
396
397 sub _server_pid {
398 shift->{child_pid}
399 }
400
401 sub kill_child {
402 my $self = shift;
403 my $child_pid = delete $self->{child_pid};
404 if ($child_pid) {
405 # send SIGKILL in two seconds
406 my $murder_timer = AnyEvent->timer (
407 after => 2,
408 cb => sub {
409 kill 9, $child_pid;
410 },
411 );
412
413 # reap process
414 my $kid_watcher;
415 $kid_watcher = AnyEvent->child (
416 pid => $child_pid ,
417 cb => sub {
418 # just hold on to this so it won't go away
419 undef $kid_watcher;
420 # cancel SIGKILL
421 undef $murder_timer;
422 },
423 );
424
425 # SIGTERM = the beginning of the end
426 kill TERM => $child_pid;
427 }
428 }
429
430 sub DESTROY {
431 shift->kill_child;
432 }
433
434 sub _error {
435 my ($self, $error, $filename, $line, $fatal) = @_;
436
437 if ($fatal) {
438 delete $self->{rw};
439 delete $self->{ww};
440 delete $self->{fh};
441 delete $self->{timer};
442
443 # for fatal errors call all enqueued callbacks with error
444 while (my $req = shift @{$self->{queue}}) {
445 $@ = $error;
446 $req->[0]->();
447 }
448 $self->kill_child;
449 }
450
451 $@ = $error;
452
453 if ($self->{on_error}) {
454 $self->{on_error}($self, $filename, $line, $fatal)
455 } else {
456 die "$error at $filename, line $line\n";
457 }
458 }
459
460 =item $dbh->on_error ($cb->($dbh, $filename, $line, $fatal))
461
462 Sets (or clears, with C<undef>) the on_error handler.
463
464 =cut
465
466 sub on_error {
467 $_[0]{on_error} = $_[1];
468 }
469
470 =item $dbh->on_connect ($cb->($dbh))
471
472 Sets (or clears, with C<undef>) the on_connect handler.
473
474 =cut
475
476 sub on_connect {
477 $_[0]{on_connect} = $_[1];
478 }
479
480 =item $dbh->timeout ($seconds)
481
482 Sets (or clears, with C<undef>) the database timeout. Useful to extend the
483 timeout when you are about to make a really long query.
484
485 =cut
486
487 sub timeout {
488 my ($self, $timeout) = @_;
489
490 if ($timeout) {
491 $self->{timeout} = $timeout;
492 # reschedule timer if one was running
493 if ($self->{timer}) {
494 Scalar::Util::weaken (my $wself = $self);
495 $self->{timer} = AnyEvent->timer (
496 after => $self->{timeout},
497 cb => sub { $wself && $wself->_timedout },
498 );
499 }
500 } else {
501 delete @{%$self}[qw(timer timeout)];
502 }
503 }
504
505 sub _timedout {
506 my ($self) = @_;
507
508 my $req = shift @{ $self->{queue} };
509
510 if ($req) {
511 my $cb = shift @$req;
512 $@ = 'TIMEOUT';
513 $cb->();
514 $self->_error ('TIMEOUT', @$req, 1); # timeouts are always fatal
515 } else {
516 # shouldn't be possible to timeout without a pending request
517 $self->_error ('TIMEOUT', 'NO_PENDING_WTF', 0, 1);
518 }
519 }
520
521 sub _req {
522 my ($self, $cb, $filename, $line) = splice @_, 0, 4, ();
523
524 if (!$self->{fh}) {
525 my $err = $@ = 'NO DATABASE CONNECTION';
526 $cb->();
527 $self->_error ($err, $filename, $line, 1);
528 return;
529 }
530
531 push @{ $self->{queue} }, [$cb, $filename, $line ];
532
533 if ($self->{timeout} && !$self->{timer}) {
534 Scalar::Util::weaken (my $wself = $self);
535 $self->{timer} = AnyEvent->timer (
536 after => $self->{timeout},
537 cb => sub { $wself && $wself->_timedout },
538 );
539 }
540
541 $self->{wbuf} .= pack "L/a*", Storable::freeze \@_;
542
543 unless ($self->{ww}) {
544 my $len = syswrite $self->{fh}, $self->{wbuf};
545 substr $self->{wbuf}, 0, $len, "";
546
547 # still any left? then install a write watcher
548 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb})
549 if length $self->{wbuf};
550 }
551 }
552
553 =item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, \%metadata))
554
555 Executes the given SQL statement with placeholders replaced by
556 C<@args>. The statement will be prepared and cached on the server side, so
557 using placeholders is compulsory.
558
559 The callback will be called with a weakened AnyEvent::DBI object as the first
560 argument and the result of C<fetchall_arrayref> as (or C<undef> if the
561 statement wasn't a select statement) as the second argument. Third argument is
562 a hash reference holding metadata about the request. Currently, the only key
563 defined is C<$metadata->{rv}> holding the return value of
564 C<execute>. Additional metadata might be added.
565
566 If an error occurs and the C<on_error> callback returns, then no arguments
567 will be passed and C<$@> contains the error message.
568
569 =item $dbh->attr (attr_name, [ $attr_value ], $cb->($dbh, $new_value))
570
571 An accessor for the handle attributes, such as AutoCommit, RaiseError,
572 PrintError, etc. If you provide an $attr_value, then the given attribute will
573 be set to that value.
574
575 The callback will be passed the database handle and the
576 attribute's value if successful. If accessing the attribute fails, then no
577 arguments are passed to your callback, and $@ contains a description of the
578 problem instead.
579
580 =item $dbh->begin_work ($cb->($dbh))
581
582 =item $dbh->commit ($cb->($dbh))
583
584 =item $dbh->rollback ($cb->($dbh))
585
586 The begin_work, commit, and rollback methods exopose the equivelant transaction
587 control methods of the DBI. If something goes wrong, you will get no $dbh in
588 your callaback, and will instead have an error to examine in $@.
589
590 =item $dbh->func ('string_which_yields_args_when_evaled', $func_name, $cb->($dbh, $result, $handle_error))
591
592 This gives access to database driver private methods. Because they are not
593 standard you cannot always depend on the value of $result or $handle_error.
594 Check the documentation for your specific driver/function combination to see
595 what it returns.
596
597 Note that the first argument will be eval'ed to produce the argument list to
598 the func() method. This must be done because the searialization protocol
599 between the AnyEvent::DBI server process and your program does not support the
600 passage of closures.
601
602 Here's an example to extend the query language in SQLite so it supports an
603 intstr() function:
604
605 $cv = AnyEvent->condvar;
606 $dbh->func(
607 q{
608 'instr',
609 2,
610 sub {
611 my ($string, $search) = @_;
612 return index $string, $search;
613 },
614 },
615 'create_function',
616 sub {return $cv->send($@) unless $_[0];$cv->send(undef,@_[1,2]);}
617 );
618 my ($err,$result,$handle_err) = $cv->recv();
619 die "EVAL failed: $err" if $err;
620 # otherwise, we can ignore $result and $handle_err for this particular func
621
622 =cut
623
624 for my $cmd_name (qw(exec attr begin_work commit rollback func)) {
625 eval 'sub ' . $cmd_name . '{
626 my $cb = pop;
627 splice @_, 1, 0, $cb, (caller)[1,2], "req_' . $cmd_name . '";
628 goto &_req;
629 }';
630 }
631
632 =back
633
634 =head1 SEE ALSO
635
636 L<AnyEvent>, L<DBI>.
637
638 =head1 AUTHOR
639
640 Marc Lehmann <schmorp@schmorp.de>
641 http://home.schmorp.de/
642
643 Adam Rosenstein <adam@redcondor.com>
644 http://www.redcondor.com/
645
646 =cut
647
648 1;
649