ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
Revision: 1.11
Committed: Sun Jun 28 14:59:51 2009 UTC (14 years, 11 months ago) by root
Branch: MAIN
Changes since 1.10: +261 -257 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::DBI - asynchronous DBI access
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::DBI;
8
9 my $cv = AnyEvent->condvar;
10
11 my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", "";
12
13 $dbh->exec ("select * from test where num=?", 10, sub {
14 my ($dbh, $rows, $rv) = @_;
15
16 $rows or die "failure: $@";
17
18 print "@$_\n"
19 for @$rows;
20
21 $cv->broadcast;
22 });
23
24 # asynchronously do sth. else here
25
26 $cv->wait;
27
28 =head1 DESCRIPTION
29
30 This module is an L<AnyEvent> user, you need to make sure that you use and
31 run a supported event loop.
32
33 This module implements asynchronous DBI access by forking or executing
34 separate "DBI-Server" processes and sending them requests.
35
36 It means that you can run DBI requests in parallel to other tasks.
37
38 The overhead for very simple statements ("select 0") is somewhere
39 around 120% to 200% (dual/single core CPU) compared to an explicit
40 prepare_cached/execute/fetchrow_arrayref/finish combination.
41
42 =head2 ERROR HANDLING
43
44 This module defines a number of functions that accept a callback
45 argument. All callbacks used by this module get their AnyEvent::DBI handle
46 object passed as first argument.
47
48 If the request was successful, then there will be more arguments,
49 otherwise there will only be the C<$dbh> argument and C<$@> contains an
50 error message.
51
52 A convinient way to check whether an error occured is to check C<$#_> -
53 if that is true, then the function was successful, otherwise there was an
54 error.
55
56 =cut
57
58 package AnyEvent::DBI;
59
60 use strict qw(vars subs);
61 no warnings;
62
63 use Carp;
64 use Socket ();
65 use Scalar::Util ();
66 use Storable ();
67
68 use DBI ();
69
70 use AnyEvent ();
71 use AnyEvent::Util ();
72
73 use Errno ();
74 use Fcntl ();
75 use POSIX ();
76
77 our $VERSION = '1.19';
78
79 our $FD_MAX = eval { POSIX::sysconf (&POSIX::_SC_OPEN_MAX) - 1 } || 1023;
80
81 # this is the forked server code, could/should be bundled as it's own file
82
83 our $DBH;
84
85 sub req_open {
86 my (undef, $dbi, $user, $pass, %attr) = @{+shift};
87
88 $DBH = DBI->connect ($dbi, $user, $pass, \%attr) or die $DBI::errstr;
89
90 [1, 1]
91 }
92
93 sub req_exec {
94 my (undef, $st, @args) = @{+shift};
95 my $sth = $DBH->prepare_cached ($st, undef, 1)
96 or die [$DBI::errstr];
97
98 my $rv = $sth->execute (@args)
99 or die [$sth->errstr];
100
101 [1, $sth->{NUM_OF_FIELDS} ? $sth->fetchall_arrayref : undef, $rv]
102 }
103
104 sub req_attr {
105 my (undef, $attr_name, @attr_val) = @{+shift};
106
107 $DBH->{$attr_name} = $attr_val[0]
108 if @attr_val;
109
110 [1, $DBH->{$attr_name}]
111 }
112
113 sub req_begin_work {
114 [1, $DBH->begin_work or die [$DBI::errstr]]
115 }
116
117 sub req_commit {
118 [1, $DBH->commit or die [$DBI::errstr]]
119 }
120
121 sub req_rollback {
122 [1, $DBH->rollback or die [$DBI::errstr]]
123 }
124
125 sub req_func {
126 my (undef, $arg_string, $function) = @{+shift};
127 my @args = eval $arg_string;
128
129 die "Bad func () arg string: $@"
130 if $@;
131
132 my $rv = $DBH->func (@args, $function);
133 return [$rv, $DBH->err];
134 }
135
136 sub serve_fh($$) {
137 my ($fh, $version) = @_;
138
139 if ($VERSION != $version) {
140 syswrite $fh,
141 pack "L/a*",
142 Storable::freeze
143 [undef, "AnyEvent::DBI version mismatch ($VERSION vs. $version)"];
144 return;
145 }
146
147 eval {
148 my $rbuf;
149
150 while () {
151 sysread $fh, $rbuf, 16384, length $rbuf
152 or last;
153
154 while () {
155 my $len = unpack "L", $rbuf;
156
157 # full request available?
158 last unless $len && $len + 4 <= length $rbuf;
159
160 my $req = Storable::thaw substr $rbuf, 4;
161 substr $rbuf, 0, $len + 4, ""; # remove length + request
162
163 my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) };
164 $wbuf = pack "L/a*", Storable::freeze [undef, ref $@ ? ("$@->[0]", $@->[1]) : ("$@", 1)]
165 if $@;
166
167 for (my $ofs = 0; $ofs < length $wbuf; ) {
168 $ofs += (syswrite $fh, substr $wbuf, $ofs
169 or die "unable to write results");
170 }
171 }
172 }
173 };
174 }
175
176 sub serve_fd($$) {
177 open my $fh, ">>&=$_[0]"
178 or die "Couldn't open server file descriptor: $!";
179
180 serve_fh $fh, $_[1];
181 }
182
183 =head2 METHODS
184
185 =over 4
186
187 =item $dbh = new AnyEvent::DBI $database, $user, $pass, [key => value]...
188
189 Returns a database handle for the given database. Each database handle
190 has an associated server process that executes statements in order. If
191 you want to run more than one statement in parallel, you need to create
192 additional database handles.
193
194 The advantage of this approach is that transactions work as state is
195 preserved.
196
197 Example:
198
199 $dbh = new AnyEvent::DBI
200 "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "";
201
202 Additional key-value pairs can be used to adjust behaviour:
203
204 =over 4
205
206 =item on_error => $callback->($dbh, $filename, $line, $fatal)
207
208 When an error occurs, then this callback will be invoked. On entry, C<$@>
209 is set to the error message. C<$filename> and C<$line> is where the
210 original request was submitted.
211
212 If the fatal argument is true then the database connection is shut down
213 and your database handle became invalid. In addition to invoking the
214 C<on_error> callback, all of your queued request callbacks are called
215 without only the C<$dbh> argument.
216
217 If omitted, then C<die> will be called on any errors, fatal or not.
218
219 =item on_connect => $callback->($dbh[, $success])
220
221 If you supply an C<on_connect> callback, then this callback will be
222 invoked after the database connect attempt. If the connection succeeds,
223 C<$success> is true, otherwise it is missing and C<$@> contains the
224 C<$DBI::errstr>.
225
226 Regardless of whether C<on_connect> is supplied, connect errors will result in
227 C<on_error> being called. However, if no C<on_connect> callback is supplied, then
228 connection errors are considered fatal. The client will C<die> and the C<on_error>
229 callback will be called with C<$fatal> true.
230
231 When on_connect is supplied, connect error are not fatal and AnyEvent::DBI
232 will not C<die>. You still cannot, however, use the $dbh object you
233 received from C<new> to make requests.
234
235 =item exec_server => 1
236
237 If you supply an C<exec_server> argument, then the DBI server process will
238 fork and exec another perl interpreter (using C<$^X>) with just the
239 AnyEvent::DBI proxy running. This will provide the cleanest possible porxy
240 for your database server.
241
242 If you do not supply the C<exec_server> argument (or supply it with a
243 false value) then the traditional method of starting the server by forking
244 the current process is used. The forked interpreter will try to clean
245 itself up by calling POSIX::close on all file descriptors except STDIN,
246 STDOUT, and STDERR (and the socket it uses to communicate with the cilent,
247 of course).
248
249 =item timeout => seconds
250
251 If you supply a timeout parameter (fractional values are supported), then
252 a timer is started any time the DBI handle expects a response from the
253 server. This includes connection setup as well as requests made to the
254 backend. The timeout spans the duration from the moment the first data
255 is written (or queued to be written) until all expected responses are
256 returned, but is postponed for "timeout" seconds each time more data is
257 returned from the server. If the timer ever goes off then a fatal error is
258 generated. If you have an C<on_error> handler installed, then it will be
259 called, otherwise your program will die().
260
261 When altering your databases with timeouts it is wise to use
262 transactions. If you quit due to timeout while performing insert, update
263 or schema-altering commands you can end up not knowing if the action was
264 submitted to the database, complicating recovery.
265
266 Timeout errors are always fatal.
267
268 =back
269
270 Any additional key-value pairs will be rolled into a hash reference
271 and passed as the final argument to the C<< DBI->connect (...) >>
272 call. For example, to supress errors on STDERR and send them instead to an
273 AnyEvent::Handle you could do:
274
275 $dbh = new AnyEvent::DBI
276 "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "",
277 PrintError => 0,
278 on_error => sub {
279 $log_handle->push_write ("DBI Error: $@ at $_[1]:$_[2]\n");
280 };
281
282 =cut
283
284 # stupid Storable autoloading, total loss-loss situation
285 Storable::thaw Storable::freeze [];
286
287 sub new {
288 my ($class, $dbi, $user, $pass, %arg) = @_;
289
290 my ($client, $server) = AnyEvent::Util::portable_socketpair
291 or croak "unable to create Anyevent::DBI communications pipe: $!";
292
293 my %dbi_args = %arg;
294 delete @dbi_args{qw(on_connect on_error timeout exec_server)};
295
296 my $self = bless \%arg, $class;
297 $self->{fh} = $client;
298
299 AnyEvent::Util::fh_nonblocking $client, 1;
300
301 my $rbuf;
302 my @caller = (caller)[1,2]; # the "default" caller
303
304 {
305 Scalar::Util::weaken (my $self = $self);
306
307 $self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub {
308 return unless $self;
309
310 $self->{last_activity} = AnyEvent->now;
311
312 my $len = sysread $client, $rbuf, 65536, length $rbuf;
313
314 if ($len > 0) {
315 # we received data, so reset the timer
316
317 while () {
318 my $len = unpack "L", $rbuf;
319
320 # full response available?
321 last unless $len && $len + 4 <= length $rbuf;
322
323 my $res = Storable::thaw substr $rbuf, 4;
324 substr $rbuf, 0, $len + 4, ""; # remove length + request
325
326 last unless $self;
327 my $req = shift @{ $self->{queue} };
328
329 if (defined $res->[0]) {
330 $res->[0] = $self;
331 $req->[0](@$res);
332 } else {
333 my $cb = shift @$req;
334 local $@ = $res->[1];
335 $cb->($self);
336 $self->_error ($res->[1], @$req, $res->[2]) # error, request record, is_fatal
337 if $self; # cb() could have deleted it
338 }
339
340 # no more queued requests, so become idle
341 undef $self->{last_activity}
342 if $self && !@{ $self->{queue} };
343 }
344
345 } elsif (defined $len) {
346 # todo, caller?
347 $self->_error ("unexpected eof", @caller, 1);
348 } elsif ($! != Errno::EAGAIN) {
349 # todo, caller?
350 $self->_error ("read error: $!", @caller, 1);
351 }
352 });
353
354 $self->{tw_cb} = sub {
355 if ($self->{timeout} && $self->{last_activity}) {
356 if (AnyEvent->now > $self->{last_activity} + $self->{timeout}) {
357 # we did time out
358 my $req = $self->{queue}[0];
359 $self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal
360 } else {
361 # we need to re-set the timeout watcher
362 $self->{tw} = AnyEvent->timer (
363 after => $self->{last_activity} + $self->{timeout} - AnyEvent->now,
364 cb => $self->{tw_cb},
365 );
366 Scalar::Util::weaken $self;
367 }
368 } else {
369 # no timeout check wanted, or idle
370 undef $self->{tw};
371 }
372 };
373
374 $self->{ww_cb} = sub {
375 return unless $self;
376
377 $self->{last_activity} = AnyEvent->now;
378
379 my $len = syswrite $client, $self->{wbuf}
380 or return delete $self->{ww};
381
382 substr $self->{wbuf}, 0, $len, "";
383 };
384 }
385
386 my $pid = fork;
387
388 if ($pid) {
389 # parent
390 close $server;
391 } elsif (defined $pid) {
392 # child
393 my $serv_fno = fileno $server;
394
395 if ($self->{exec_server}) {
396 fcntl $server, &Fcntl::F_SETFD, 0; # don't close the server side
397 exec {$^X}
398 "$0 dbi slave",
399 -e => "require shift; AnyEvent::DBI::serve_fd ($serv_fno, $VERSION)",
400 $INC{"AnyEvent/DBI.pm"};
401 POSIX::_exit 124;
402 } else {
403 ($_ != $serv_fno) && POSIX::close $_
404 for $^F+1..$FD_MAX;
405 serve_fh $server, $VERSION;
406
407 # no other way on the broken windows platform, even this leaks
408 # memory and might fail.
409 kill 9, $$
410 if AnyEvent::WIN32;
411
412 # and this kills the parent process on windows
413 POSIX::_exit 0;
414 }
415 } else {
416 croak "fork: $!";
417 }
418
419 $self->{child_pid} = $pid;
420
421 $self->_req (
422 ($self->{on_connect} ? $self->{on_connect} : sub { }),
423 (caller)[1,2],
424 req_open => $dbi, $user, $pass, %dbi_args
425 );
426
427 $self
428 }
429
430 sub _server_pid {
431 shift->{child_pid}
432 }
433
434 sub kill_child {
435 my $self = shift;
436 my $child_pid = delete $self->{child_pid};
437 if ($child_pid) {
438 # send SIGKILL in two seconds
439 my $murder_timer = AnyEvent->timer (
440 after => 2,
441 cb => sub {
442 kill 9, $child_pid;
443 },
444 );
445
446 # reap process
447 my $kid_watcher; $kid_watcher = AnyEvent->child (
448 pid => $child_pid,
449 cb => sub {
450 # just hold on to this so it won't go away
451 undef $kid_watcher;
452 # cancel SIGKILL
453 undef $murder_timer;
454 },
455 );
456
457 close $self->{fh};
458 }
459 }
460
461 sub DESTROY {
462 shift->kill_child;
463 }
464
465 sub _error {
466 my ($self, $error, $filename, $line, $fatal) = @_;
467
468 if ($fatal) {
469 delete $self->{tw};
470 delete $self->{rw};
471 delete $self->{ww};
472 delete $self->{fh};
473
474 # for fatal errors call all enqueued callbacks with error
475 while (my $req = shift @{$self->{queue}}) {
476 local $@ = $error;
477 $req->[0]->($self);
478 }
479 $self->kill_child;
480 }
481
482 local $@ = $error;
483
484 if ($self->{on_error}) {
485 $self->{on_error}($self, $filename, $line, $fatal)
486 } else {
487 die "$error at $filename, line $line\n";
488 }
489 }
490
491 =item $dbh->on_error ($cb->($dbh, $filename, $line, $fatal))
492
493 Sets (or clears, with C<undef>) the C<on_error> handler.
494
495 =cut
496
497 sub on_error {
498 $_[0]{on_error} = $_[1];
499 }
500
501 =item $dbh->timeout ($seconds)
502
503 Sets (or clears, with C<undef>) the database timeout. Useful to extend the
504 timeout when you are about to make a really long query.
505
506 =cut
507
508 sub timeout {
509 my ($self, $timeout) = @_;
510
511 $self->{timeout} = $timeout;
512
513 # reschedule timer if one was running
514 $self->{tw_cb}->();
515 }
516
517 sub _req {
518 my ($self, $cb, $filename, $line) = splice @_, 0, 4, ();
519
520 unless ($self->{fh}) {
521 local $@ = my $err = 'no database connection';
522 $cb->($self);
523 $self->_error ($err, $filename, $line, 1);
524 return;
525 }
526
527 push @{ $self->{queue} }, [$cb, $filename, $line];
528
529 # re-start timeout if necessary
530 if ($self->{timeout} && !$self->{tw}) {
531 $self->{last_activity} = AnyEvent->now;
532 $self->{tw_cb}->();
533 }
534
535 $self->{wbuf} .= pack "L/a*", Storable::freeze \@_;
536
537 unless ($self->{ww}) {
538 my $len = syswrite $self->{fh}, $self->{wbuf};
539 substr $self->{wbuf}, 0, $len, "";
540
541 # still any left? then install a write watcher
542 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb})
543 if length $self->{wbuf};
544 }
545 }
546
547 =item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, $rv))
548
549 Executes the given SQL statement with placeholders replaced by
550 C<@args>. The statement will be prepared and cached on the server side, so
551 using placeholders is extremely important.
552
553 The callback will be called with a weakened AnyEvent::DBI object as the
554 first argument and the result of C<fetchall_arrayref> as (or C<undef>
555 if the statement wasn't a select statement) as the second argument.
556
557 Third argument is the return value from the C<< DBI->execute >> method
558 call.
559
560 If an error occurs and the C<on_error> callback returns, then only C<$dbh>
561 will be passed and C<$@> contains the error message.
562
563 =item $dbh->attr ($attr_name[, $attr_value], $cb->($dbh, $new_value))
564
565 An accessor for the handle attributes, such as C<AutoCommit>,
566 C<RaiseError>, C<PrintError> and so on. If you provide an C<$attr_value>
567 (which might be C<undef>), then the given attribute will be set to that
568 value.
569
570 The callback will be passed the database handle and the attribute's value
571 if successful.
572
573 If an error occurs and the C<on_error> callback returns, then only C<$dbh>
574 will be passed and C<$@> contains the error message.
575
576 =item $dbh->begin_work ($cb->($dbh[, $success]))
577
578 =item $dbh->commit ($cb->($dbh[, $success]))
579
580 =item $dbh->rollback ($cb->($dbh[, $success]))
581
582 The begin_work, commit, and rollback methods expose the equivalent
583 transaction control method of the DBI driver. On success, C<$success>
584 is true.
585
586 If an error occurs and the C<on_error> callback returns, then only C<$dbh>
587 will be passed and C<$@> contains the error message.
588
589 =item $dbh->func ('string_which_yields_args_when_evaled', $func_name, $cb->($dbh, $result, $handle_error))
590
591 This gives access to database driver private methods. Because they
592 are not standard you cannot always depend on the value of C<$result>
593 or C<$handle_error>. Check the documentation for your specific
594 driver/function combination to see what it returns.
595
596 Note that the first argument will be eval'ed to produce the argument list to
597 the func() method. This must be done because the serialization protocol
598 between the AnyEvent::DBI server process and your program does not support the
599 passage of closures.
600
601 Here's an example to extend the query language in SQLite so it supports an
602 intstr() function:
603
604 $cv = AnyEvent->condvar;
605 $dbh->func (
606 q{
607 instr => 2, sub {
608 my ($string, $search) = @_;
609 return index $string, $search;
610 },
611 },
612 create_function => sub {
613 return $cv->send($@)
614 unless $_[0];
615 $cv->send (undef, @_[1,2]);
616 }
617 );
618
619 my ($err,$result,$handle_err) = $cv->recv;
620
621 die "EVAL failed: $err"
622 if $err;
623
624 # otherwise, we can ignore $result and $handle_err for this particular func
625
626 =cut
627
628 for my $cmd_name (qw(exec attr begin_work commit rollback func)) {
629 eval 'sub ' . $cmd_name . '{
630 my $cb = pop;
631 splice @_, 1, 0, $cb, (caller)[1,2], "req_' . $cmd_name . '";
632 &_req
633 }';
634 }
635
636 =back
637
638 =head1 SEE ALSO
639
640 L<AnyEvent>, L<DBI>, L<Coro::Mysql>.
641
642 =head1 AUTHOR
643
644 Marc Lehmann <schmorp@schmorp.de>
645 http://home.schmorp.de/
646
647 Adam Rosenstein <adam@redcondor.com>
648 http://www.redcondor.com/
649
650 =cut
651
652 1;
653