ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
(Generate patch)

Comparing AnyEvent-DBI/DBI.pm (file contents):
Revision 1.7 by root, Mon Jul 21 02:34:40 2008 UTC vs.
Revision 1.18 by root, Sun Aug 27 09:54:25 2017 UTC

9 my $cv = AnyEvent->condvar; 9 my $cv = AnyEvent->condvar;
10 10
11 my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", ""; 11 my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", "";
12 12
13 $dbh->exec ("select * from test where num=?", 10, sub { 13 $dbh->exec ("select * from test where num=?", 10, sub {
14 my ($rows, $rv) = @_; 14 my ($dbh, $rows, $rv) = @_;
15
16 $#_ or die "failure: $@";
15 17
16 print "@$_\n" 18 print "@$_\n"
17 for @$rows; 19 for @$rows;
18 20
19 $cv->broadcast; 21 $cv->broadcast;
26=head1 DESCRIPTION 28=head1 DESCRIPTION
27 29
28This module is an L<AnyEvent> user, you need to make sure that you use and 30This module is an L<AnyEvent> user, you need to make sure that you use and
29run a supported event loop. 31run a supported event loop.
30 32
31This module implements asynchronous DBI access my forking or executing 33This module implements asynchronous DBI access by forking or executing
32separate "DBI-Server" processes and sending them requests. 34separate "DBI-Server" processes and sending them requests.
33 35
34It means that you can run DBI requests in parallel to other tasks. 36It means that you can run DBI requests in parallel to other tasks.
35 37
36The overhead for very simple statements ("select 0") is somewhere 38With DBD::mysql, the overhead for very simple statements
37around 120% to 200% (dual/single core CPU) compared to an explicit 39("select 0") is somewhere around 50% compared to an explicit
38prepare_cached/execute/fetchrow_arrayref/finish combination. 40prepare_cached/execute/fetchrow_arrayref/finish combination. With
41DBD::SQlite3, it's more like a factor of 8 for this trivial statement.
42
43=head2 ERROR HANDLING
44
45This module defines a number of functions that accept a callback
46argument. All callbacks used by this module get their AnyEvent::DBI handle
47object passed as first argument.
48
49If the request was successful, then there will be more arguments,
50otherwise there will only be the C<$dbh> argument and C<$@> contains an
51error message.
52
53A convenient way to check whether an error occurred is to check C<$#_> -
54if that is true, then the function was successful, otherwise there was an
55error.
39 56
40=cut 57=cut
41 58
42package AnyEvent::DBI; 59package AnyEvent::DBI;
43 60
44use strict; 61use common::sense;
45no warnings;
46 62
47use Carp; 63use Carp;
48use Socket (); 64use Convert::Scalar ();
49use Scalar::Util (); 65use AnyEvent::Fork ();
50use Storable (); 66use CBOR::XS ();
51
52use DBI ();
53 67
54use AnyEvent (); 68use AnyEvent ();
55use AnyEvent::Util (); 69use AnyEvent::Util ();
56 70
71use Errno ();
72
57our $VERSION = '1.1'; 73our $VERSION = '3.01';
58
59# this is the forked server code
60
61our $DBH;
62
63sub req_open {
64 my (undef, $dbi, $user, $pass, %attr) = @{+shift};
65
66 $DBH = DBI->connect ($dbi, $user, $pass, \%attr);
67
68 [1]
69}
70
71sub req_exec {
72 my (undef, $st, @args) = @{+shift};
73
74 my $sth = $DBH->prepare_cached ($st, undef, 1);
75
76 my $rv = $sth->execute (@args)
77 or die $sth->errstr;
78
79 [1, $sth->{NUM_OF_FIELDS} ? $sth->fetchall_arrayref : undef, { rv => $rv }]
80}
81
82sub serve {
83 my ($fh) = @_;
84
85 no strict;
86
87 eval {
88 my $rbuf;
89
90 while () {
91 sysread $fh, $rbuf, 16384, length $rbuf
92 or last;
93
94 while () {
95 my $len = unpack "L", $rbuf;
96
97 # full request available?
98 last unless $len && $len + 4 <= length $rbuf;
99
100 my $req = Storable::thaw substr $rbuf, 4;
101 substr $rbuf, 0, $len + 4, ""; # remove length + request
102
103 my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) };
104
105 $wbuf = pack "L/a*", Storable::freeze [undef, "$@"]
106 if $@;
107
108 for (my $ofs = 0; $ofs < length $wbuf; ) {
109 $ofs += (syswrite $fh, substr $wbuf, $ofs
110 or die "unable to write results");
111 }
112 }
113 }
114 };
115
116 if (AnyEvent::WIN32) {
117 kill 9, $$; # no other way on the broken windows platform
118 # and the above doesn't even work on windows, it seems the only
119 # way to is to leak memory and kill 9 from the parent. yay.
120 }
121
122 require POSIX;
123 POSIX::_exit (0);
124 # and the above kills the parent process on windows
125}
126 74
127=head2 METHODS 75=head2 METHODS
128 76
129=over 4 77=over 4
130 78
151 99
152When an error occurs, then this callback will be invoked. On entry, C<$@> 100When an error occurs, then this callback will be invoked. On entry, C<$@>
153is set to the error message. C<$filename> and C<$line> is where the 101is set to the error message. C<$filename> and C<$line> is where the
154original request was submitted. 102original request was submitted.
155 103
156If this callback returns and this was a fatal error (C<$fatal> is true) 104If the fatal argument is true then the database connection is shut down
157then AnyEvent::DBI die's, otherwise it calls the original request callback 105and your database handle became invalid. In addition to invoking the
158without any arguments. 106C<on_error> callback, all of your queued request callbacks are called
107without only the C<$dbh> argument.
159 108
160If omitted, then C<die> will be called on any errors, fatal or not. 109If omitted, then C<die> will be called on any errors, fatal or not.
161 110
111=item on_connect => $callback->($dbh[, $success])
112
113If you supply an C<on_connect> callback, then this callback will be
114invoked after the database connect attempt. If the connection succeeds,
115C<$success> is true, otherwise it is missing and C<$@> contains the
116C<$DBI::errstr>.
117
118Regardless of whether C<on_connect> is supplied, connect errors will result in
119C<on_error> being called. However, if no C<on_connect> callback is supplied, then
120connection errors are considered fatal. The client will C<die> and the C<on_error>
121callback will be called with C<$fatal> true.
122
123When on_connect is supplied, connect error are not fatal and AnyEvent::DBI
124will not C<die>. You still cannot, however, use the $dbh object you
125received from C<new> to make requests.
126
127=item fork_template => $AnyEvent::Fork-object
128
129C<AnyEvent::DBI> uses C<< AnyEvent::Fork->new >> to create the database
130slave, which in turn either C<exec>'s a new process (similar to the old
131C<exec_server> constructor argument) or uses a process forked early (see
132L<AnyEvent::Fork::Early>).
133
134With this argument you can provide your own fork template. This can be
135useful if you create a lot of C<AnyEvent::DBI> handles and want to save
136memory (And speed up startup) by not having to load C<AnyEvent::DBI> again
137and again into your child processes:
138
139 my $template = AnyEvent::Fork
140 ->new # create new template
141 ->require ("AnyEvent::DBI::Slave"); # preload AnyEvent::DBI::Slave module
142
143 for (...) {
144 $dbh = new AnyEvent::DBI ...
145 fork_template => $template;
146
147=item timeout => seconds
148
149If you supply a timeout parameter (fractional values are supported), then
150a timer is started any time the DBI handle expects a response from the
151server. This includes connection setup as well as requests made to the
152backend. The timeout spans the duration from the moment the first data
153is written (or queued to be written) until all expected responses are
154returned, but is postponed for "timeout" seconds each time more data is
155returned from the server. If the timer ever goes off then a fatal error is
156generated. If you have an C<on_error> handler installed, then it will be
157called, otherwise your program will die().
158
159When altering your databases with timeouts it is wise to use
160transactions. If you quit due to timeout while performing insert, update
161or schema-altering commands you can end up not knowing if the action was
162submitted to the database, complicating recovery.
163
164Timeout errors are always fatal.
165
162=back 166=back
163 167
168Any additional key-value pairs will be rolled into a hash reference
169and passed as the final argument to the C<< DBI->connect (...) >>
170call. For example, to suppress errors on STDERR and send them instead to an
171AnyEvent::Handle you could do:
172
173 $dbh = new AnyEvent::DBI
174 "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "",
175 PrintError => 0,
176 on_error => sub {
177 $log_handle->push_write ("DBI Error: $@ at $_[1]:$_[2]\n");
178 };
179
164=cut 180=cut
165
166# stupid Storable autoloading, total loss-loss situation
167Storable::thaw Storable::freeze [];
168 181
169sub new { 182sub new {
170 my ($class, $dbi, $user, $pass, %arg) = @_; 183 my ($class, $dbi, $user, $pass, %arg) = @_;
171 184
172 socketpair my $client, my $server, &Socket::AF_UNIX, &Socket::SOCK_STREAM, &Socket::PF_UNSPEC 185 # we use our own socketpair, so we always have a socket
186 # available, even before the forked process exsist.
187 # this is mostly done so this module is compatible
188 # to versions of itself older than 3.0.
189 my ($client, $server) = AnyEvent::Util::portable_socketpair
173 or croak "unable to create dbi communicaiton pipe: $!"; 190 or croak "unable to create AnyEvent::DBI communications pipe: $!";
191
192 my $fork = delete $arg{fork_template};
193
194 my %dbi_args = %arg;
195 delete @dbi_args{qw(on_connect on_error timeout fork_template exec_server)};
174 196
175 my $self = bless \%arg, $class; 197 my $self = bless \%arg, $class;
176 198
177 $self->{fh} = $client; 199 $self->{fh} = $client;
178
179 Scalar::Util::weaken (my $wself = $self);
180
181 AnyEvent::Util::fh_nonblocking $client, 1;
182 200
183 my $rbuf; 201 my $rbuf;
184 my @caller = (caller)[1,2]; # the "default" caller 202 my @caller = (caller)[1,2]; # the "default" caller
185 203
186 $self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub { 204 $fork = $fork ? $fork->fork : AnyEvent::Fork->new
187 my $len = sysread $client, $rbuf, 65536, length $rbuf; 205 or croak "fork: $!";
188 206
207 $fork->require ("AnyEvent::DBI::Slave");
208 $fork->send_arg ($VERSION);
209 $fork->send_fh ($server);
210
211 # we don't rely on the callback, because we use our own
212 # socketpair, for better or worse.
213 $fork->run ("AnyEvent::DBI::Slave::serve", sub { });
214
215 {
216 Convert::Scalar::weaken (my $self = $self);
217
218 my $cbor = new CBOR::XS;
219
220 $self->{rw} = AE::io $client, 0, sub {
221 my $len = Convert::Scalar::extend_read $client, $rbuf, 65536;
222
189 if ($len > 0) { 223 if ($len > 0) {
224 # we received data, so reset the timer
225 $self->{last_activity} = AE::now;
190 226
191 while () { 227 for my $res ($cbor->incr_parse_multiple ($rbuf)) {
192 my $len = unpack "L", $rbuf; 228 last unless $self;
193 229
194 # full request available?
195 last unless $len && $len + 4 <= length $rbuf;
196
197 my $res = Storable::thaw substr $rbuf, 4;
198 substr $rbuf, 0, $len + 4, ""; # remove length + request
199
200 my $req = shift @{ $wself->{queue} }; 230 my $req = shift @{ $self->{queue} };
201 231
202 if (defined $res->[0]) { 232 if (defined $res->[0]) {
233 $res->[0] = $self;
203 $req->[0](@$res); 234 $req->[0](@$res);
235 } else {
236 my $cb = shift @$req;
237 local $@ = $res->[1];
238 $cb->($self);
239 $self->_error ($res->[1], @$req, $res->[2]) # error, request record, is_fatal
240 if $self; # cb() could have deleted it
241 }
242
243 # no more queued requests, so become idle
244 if ($self && !@{ $self->{queue} }) {
245 undef $self->{last_activity};
246 $self->{tw_cb}->();
247 }
248 }
249
250 } elsif (defined $len) {
251 # todo, caller?
252 $self->_error ("unexpected eof", @caller, 1);
253 } elsif ($! != Errno::EAGAIN) {
254 # todo, caller?
255 $self->_error ("read error: $!", @caller, 1);
256 }
257 };
258
259 $self->{tw_cb} = sub {
260 if ($self->{timeout} && $self->{last_activity}) {
261 if (AE::now > $self->{last_activity} + $self->{timeout}) {
262 # we did time out
263 my $req = $self->{queue}[0];
264 $self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal
204 } else { 265 } else {
205 my $cb = shift @$req; 266 # we need to re-set the timeout watcher
206 $wself->_error ($res->[1], @$req); 267 $self->{tw} = AE::timer
268 $self->{last_activity} + $self->{timeout} - AE::now,
269 0,
270 $self->{tw_cb},
207 $cb->(); 271 ;
208 } 272 }
273 } else {
274 # no timeout check wanted, or idle
275 undef $self->{tw};
209 } 276 }
210
211 } elsif (defined $len) {
212 $wself->_error ("unexpected eof", @caller, 1);
213 } else {
214 $wself->_error ("read error: $!", @caller, 1);
215 } 277 };
216 });
217 278
218 $self->{ww_cb} = sub { 279 $self->{ww_cb} = sub {
280 $self->{last_activity} = AE::now;
281
219 my $len = syswrite $client, $wself->{wbuf} 282 my $len = syswrite $client, $self->{wbuf}
220 or return delete $wself->{ww}; 283 or return delete $self->{ww};
221 284
222 substr $wself->{wbuf}, 0, $len, ""; 285 substr $self->{wbuf}, 0, $len, "";
286 };
223 }; 287 }
224 288
225 my $pid = fork; 289 $self->_req (
290 sub {
291 return unless $self;
292 $self->{child_pid} = $_[1];
293 },
294 (caller)[1,2],
295 "req_pid"
296 );
226 297
227 if ($pid) { 298 $self->_req (
228 # parent 299 sub {
229 close $server; 300 return unless $self;
230 301 &{ $self->{on_connect} } if $self->{on_connect};
231 } elsif (defined $pid) { 302 },
232 # child 303 (caller)[1,2],
233 close $client; 304 req_open => $dbi, $user, $pass, %dbi_args
234 @_ = $server; 305 );
235 goto &serve;
236
237 } else {
238 croak "fork: $!";
239 }
240
241 $self->_req (sub { }, (caller)[1,2], 1, req_open => $dbi, $user, $pass);
242 306
243 $self 307 $self
308}
309
310sub _server_pid {
311 shift->{child_pid}
312}
313
314sub kill_child {
315 my $self = shift;
316
317 if (my $pid = delete $self->{child_pid}) {
318 # kill and reap process
319 my $kid_watcher; $kid_watcher = AE::child $pid, sub {
320 undef $kid_watcher;
321 };
322 kill TERM => $pid;
323 }
324
325 delete $self->{rw};
326 delete $self->{ww};
327 delete $self->{tw};
328 close delete $self->{fh};
329}
330
331sub DESTROY {
332 shift->kill_child;
244} 333}
245 334
246sub _error { 335sub _error {
247 my ($self, $error, $filename, $line, $fatal) = @_; 336 my ($self, $error, $filename, $line, $fatal) = @_;
248 337
338 if ($fatal) {
339 delete $self->{tw};
249 delete $self->{rw}; 340 delete $self->{rw};
250 delete $self->{ww}; 341 delete $self->{ww};
251 delete $self->{fh}; 342 delete $self->{fh};
252 343
344 # for fatal errors call all enqueued callbacks with error
345 while (my $req = shift @{$self->{queue}}) {
346 local $@ = $error;
347 $req->[0]->($self);
348 }
349 $self->kill_child;
350 }
351
253 $@ = $error; 352 local $@ = $error;
254 353
354 if ($self->{on_error}) {
255 $self->{on_error}($self, $filename, $line, $fatal) 355 $self->{on_error}($self, $filename, $line, $fatal)
256 if $self->{on_error}; 356 } else {
257
258 die "$error at $filename, line $line\n"; 357 die "$error at $filename, line $line\n";
358 }
359}
360
361=item $dbh->on_error ($cb->($dbh, $filename, $line, $fatal))
362
363Sets (or clears, with C<undef>) the C<on_error> handler.
364
365=cut
366
367sub on_error {
368 $_[0]{on_error} = $_[1];
369}
370
371=item $dbh->timeout ($seconds)
372
373Sets (or clears, with C<undef>) the database timeout. Useful to extend the
374timeout when you are about to make a really long query.
375
376=cut
377
378sub timeout {
379 my ($self, $timeout) = @_;
380
381 $self->{timeout} = $timeout;
382
383 # reschedule timer if one was running
384 $self->{tw_cb}->();
259} 385}
260 386
261sub _req { 387sub _req {
262 my ($self, $cb, $filename, $line, $fatal) = splice @_, 0, 5, (); 388 my ($self, $cb, $filename, $line) = splice @_, 0, 4, ();
263 389
390 unless ($self->{fh}) {
391 local $@ = my $err = 'no database connection';
392 $cb->($self);
393 $self->_error ($err, $filename, $line, 1);
394 return;
395 }
396
264 push @{ $self->{queue} }, [$cb, $filename, $line, $fatal]; 397 push @{ $self->{queue} }, [$cb, $filename, $line];
265 398
266 $self->{wbuf} .= pack "L/a*", Storable::freeze \@_; 399 # re-start timeout if necessary
400 if ($self->{timeout} && !$self->{tw}) {
401 $self->{last_activity} = AE::now;
402 $self->{tw_cb}->();
403 }
404
405 $self->{wbuf} .= CBOR::XS::encode_cbor \@_;
267 406
268 unless ($self->{ww}) { 407 unless ($self->{ww}) {
269 my $len = syswrite $self->{fh}, $self->{wbuf}; 408 my $len = syswrite $self->{fh}, $self->{wbuf};
270 substr $self->{wbuf}, 0, $len, ""; 409 substr $self->{wbuf}, 0, $len, "";
271 410
272 # still any left? then install a write watcher 411 # still any left? then install a write watcher
273 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb}) 412 $self->{ww} = AE::io $self->{fh}, 1, $self->{ww_cb}
274 if length $self->{wbuf}; 413 if length $self->{wbuf};
275 } 414 }
276} 415}
277 416
417=item $dbh->attr ($attr_name[, $attr_value], $cb->($dbh, $new_value))
418
419An accessor for the database handle attributes, such as C<AutoCommit>,
420C<RaiseError>, C<PrintError> and so on. If you provide an C<$attr_value>
421(which might be C<undef>), then the given attribute will be set to that
422value.
423
424The callback will be passed the database handle and the attribute's value
425if successful.
426
427If an error occurs and the C<on_error> callback returns, then only C<$dbh>
428will be passed and C<$@> contains the error message.
429
278=item $dbh->exec ("statement", @args, $cb->($rows, $rv, ...)) 430=item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, $rv))
279 431
280Executes the given SQL statement with placeholders replaced by 432Executes the given SQL statement with placeholders replaced by
281C<@args>. The statement will be prepared and cached on the server side, so 433C<@args>. The statement will be prepared and cached on the server side, so
282using placeholders is compulsory. 434using placeholders is extremely important.
283 435
284The callback will be called with the result of C<fetchall_arrayref> as 436The callback will be called with a weakened AnyEvent::DBI object as the
285first argument (or C<undef> if the statement wasn't a select statement) 437first argument and the result of C<fetchall_arrayref> as (or C<undef>
286and the return value of C<execute> as second argument. Additional 438if the statement wasn't a select statement) as the second argument.
287arguments might get passed as well.
288 439
440Third argument is the return value from the C<< DBI->execute >> method
441call.
442
289If an error occurs and the C<on_error> callback returns, then no arguments 443If an error occurs and the C<on_error> callback returns, then only C<$dbh>
290will be passed and C<$@> contains the error message. 444will be passed and C<$@> contains the error message.
291 445
446=item $dbh->stattr ($attr_name, $cb->($dbh, $value))
447
448An accessor for the statement attributes of the most recently executed
449statement, such as C<NAME> or C<TYPE>.
450
451The callback will be passed the database handle and the attribute's value
452if successful.
453
454If an error occurs and the C<on_error> callback returns, then only C<$dbh>
455will be passed and C<$@> contains the error message.
456
457=item $dbh->begin_work ($cb->($dbh[, $rc]))
458
459=item $dbh->commit ($cb->($dbh[, $rc]))
460
461=item $dbh->rollback ($cb->($dbh[, $rc]))
462
463The begin_work, commit, and rollback methods expose the equivalent
464transaction control method of the DBI driver. On success, C<$rc> is true.
465
466If an error occurs and the C<on_error> callback returns, then only C<$dbh>
467will be passed and C<$@> contains the error message.
468
469=item $dbh->func ('string_which_yields_args_when_evaled', $func_name, $cb->($dbh, $rc, $dbi_err, $dbi_errstr))
470
471This gives access to database driver private methods. Because they
472are not standard you cannot always depend on the value of C<$rc> or
473C<$dbi_err>. Check the documentation for your specific driver/function
474combination to see what it returns.
475
476Note that the first argument will be eval'ed to produce the argument list to
477the func() method. This must be done because the serialization protocol
478between the AnyEvent::DBI server process and your program does not support the
479passage of closures.
480
481Here's an example to extend the query language in SQLite so it supports an
482intstr() function:
483
484 $cv = AnyEvent->condvar;
485 $dbh->func (
486 q{
487 instr => 2, sub {
488 my ($string, $search) = @_;
489 return index $string, $search;
490 },
491 },
492 create_function => sub {
493 return $cv->send ($@)
494 unless $#_;
495 $cv->send (undef, @_[1,2,3]);
496 }
497 );
498
499 my ($err,$rc,$errcode,$errstr) = $cv->recv;
500
501 die $err if defined $err;
502 die "EVAL failed: $errstr"
503 if $errcode;
504
505 # otherwise, we can ignore $rc and $errcode for this particular func
506
292=cut 507=cut
293 508
294sub exec { 509for my $cmd_name (qw(attr exec stattr begin_work commit rollback func)) {
510 eval 'sub ' . $cmd_name . '{
295 my $cb = pop; 511 my $cb = pop;
296 splice @_, 1, 0, $cb, (caller)[1,2], 0, "req_exec"; 512 splice @_, 1, 0, $cb, (caller)[1,2], "req_' . $cmd_name . '";
297 513 &_req
298 goto &_req; 514 }';
299} 515}
300 516
301=back 517=back
302 518
303=head1 SEE ALSO 519=head1 SEE ALSO
304 520
305L<AnyEvent>, L<DBI>. 521L<AnyEvent>, L<DBI>, L<Coro::Mysql>.
306 522
307=head1 AUTHOR 523=head1 AUTHOR AND CONTACT
308 524
309 Marc Lehmann <schmorp@schmorp.de> 525 Marc Lehmann <schmorp@schmorp.de> (current maintainer)
310 http://home.schmorp.de/ 526 http://home.schmorp.de/
311 527
528 Adam Rosenstein <adam@redcondor.com>
529 http://www.redcondor.com/
530
312=cut 531=cut
313 532
3141 5331
315

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines