ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
(Generate patch)

Comparing AnyEvent-DBI/DBI.pm (file contents):
Revision 1.6 by root, Thu Jun 12 11:56:59 2008 UTC vs.
Revision 1.20 by root, Sat Mar 24 23:22:25 2018 UTC

9 my $cv = AnyEvent->condvar; 9 my $cv = AnyEvent->condvar;
10 10
11 my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", ""; 11 my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", "";
12 12
13 $dbh->exec ("select * from test where num=?", 10, sub { 13 $dbh->exec ("select * from test where num=?", 10, sub {
14 my ($rows) = @_; 14 my ($dbh, $rows, $rv) = @_;
15
16 $#_ or die "failure: $@";
15 17
16 print "@$_\n" 18 print "@$_\n"
17 for @$rows; 19 for @$rows;
18 20
19 $cv->broadcast; 21 $cv->broadcast;
26=head1 DESCRIPTION 28=head1 DESCRIPTION
27 29
28This module is an L<AnyEvent> user, you need to make sure that you use and 30This module is an L<AnyEvent> user, you need to make sure that you use and
29run a supported event loop. 31run a supported event loop.
30 32
31This module implements asynchronous DBI access my forking or executing 33This module implements asynchronous DBI access by forking or executing
32separate "DBI-Server" processes and sending them requests. 34separate "DBI-Server" processes and sending them requests.
33 35
34It means that you can run DBI requests in parallel to other tasks. 36It means that you can run DBI requests in parallel to other tasks.
35 37
36The overhead for very simple statements ("select 0") is somewhere 38With DBD::mysql, the overhead for very simple statements
37around 120% to 200% (dual/single core CPU) compared to an explicit 39("select 0") is somewhere around 50% compared to an explicit
38prepare_cached/execute/fetchrow_arrayref/finish combination. 40prepare_cached/execute/fetchrow_arrayref/finish combination. With
41DBD::SQlite3, it's more like a factor of 8 for this trivial statement.
42
43=head2 ERROR HANDLING
44
45This module defines a number of functions that accept a callback
46argument. All callbacks used by this module get their AnyEvent::DBI handle
47object passed as first argument.
48
49If the request was successful, then there will be more arguments,
50otherwise there will only be the C<$dbh> argument and C<$@> contains an
51error message.
52
53A convenient way to check whether an error occurred is to check C<$#_> -
54if that is true, then the function was successful, otherwise there was an
55error.
39 56
40=cut 57=cut
41 58
42package AnyEvent::DBI; 59package AnyEvent::DBI;
43 60
44use strict; 61use common::sense;
45no warnings;
46 62
47use Carp; 63use Carp;
48use Socket (); 64use Convert::Scalar ();
49use Scalar::Util (); 65use AnyEvent::Fork ();
50use Storable (); 66use CBOR::XS ();
51
52use DBI ();
53 67
54use AnyEvent (); 68use AnyEvent ();
55use AnyEvent::Util (); 69use AnyEvent::Util ();
56 70
71use Errno ();
72
57our $VERSION = '1.0'; 73our $VERSION = '3.02';
58
59# this is the forked server code
60
61our $DBH;
62
63sub req_open {
64 my (undef, $dbi, $user, $pass, %attr) = @{+shift};
65
66 $DBH = DBI->connect ($dbi, $user, $pass, \%attr);
67
68 [1]
69}
70
71sub req_exec {
72 my (undef, $st, @args) = @{+shift};
73
74 my $sth = $DBH->prepare_cached ($st, undef, 1);
75
76 $sth->execute (@args)
77 or die $sth->errstr;
78
79 [$sth->fetchall_arrayref]
80}
81
82sub serve {
83 my ($fh) = @_;
84
85 no strict;
86
87 eval {
88 my $rbuf;
89
90 while () {
91 sysread $fh, $rbuf, 16384, length $rbuf
92 or last;
93
94 while () {
95 my $len = unpack "L", $rbuf;
96
97 # full request available?
98 last unless $len && $len + 4 <= length $rbuf;
99
100 my $req = Storable::thaw substr $rbuf, 4;
101 substr $rbuf, 0, $len + 4, ""; # remove length + request
102
103 my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) };
104
105 $wbuf = pack "L/a*", Storable::freeze [undef, "$@"]
106 if $@;
107
108 for (my $ofs = 0; $ofs < length $wbuf; ) {
109 $ofs += (syswrite $fh, substr $wbuf, $ofs
110 or die "unable to write results");
111 }
112 }
113 }
114 };
115
116 kill 9, $$; # no other way on the broken windows platform
117}
118 74
119=head2 METHODS 75=head2 METHODS
120 76
121=over 4 77=over 4
122 78
143 99
144When an error occurs, then this callback will be invoked. On entry, C<$@> 100When an error occurs, then this callback will be invoked. On entry, C<$@>
145is set to the error message. C<$filename> and C<$line> is where the 101is set to the error message. C<$filename> and C<$line> is where the
146original request was submitted. 102original request was submitted.
147 103
148If this callback returns and this was a fatal error (C<$fatal> is true) 104If the fatal argument is true then the database connection is shut down
149then AnyEvent::DBI die's, otherwise it calls the original request callback 105and your database handle became invalid. In addition to invoking the
150without any arguments. 106C<on_error> callback, all of your queued request callbacks are called
107without only the C<$dbh> argument.
151 108
152If omitted, then C<die> will be called on any errors, fatal or not. 109If omitted, then C<die> will be called on any errors, fatal or not.
153 110
111=item on_connect => $callback->($dbh[, $success])
112
113If you supply an C<on_connect> callback, then this callback will be
114invoked after the database connect attempt. If the connection succeeds,
115C<$success> is true, otherwise it is missing and C<$@> contains the
116C<$DBI::errstr>.
117
118Regardless of whether C<on_connect> is supplied, connect errors will result in
119C<on_error> being called. However, if no C<on_connect> callback is supplied, then
120connection errors are considered fatal. The client will C<die> and the C<on_error>
121callback will be called with C<$fatal> true.
122
123When on_connect is supplied, connect error are not fatal and AnyEvent::DBI
124will not C<die>. You still cannot, however, use the $dbh object you
125received from C<new> to make requests.
126
127=item fork_template => $AnyEvent::Fork-object
128
129C<AnyEvent::DBI> uses C<< AnyEvent::Fork->new >> to create the database
130slave, which in turn either C<exec>'s a new process (similar to the old
131C<exec_server> constructor argument) or uses a process forked early (see
132L<AnyEvent::Fork::Early>).
133
134With this argument you can provide your own fork template. This can be
135useful if you create a lot of C<AnyEvent::DBI> handles and want to save
136memory (And speed up startup) by not having to load C<AnyEvent::DBI> again
137and again into your child processes:
138
139 my $template = AnyEvent::Fork
140 ->new # create new template
141 ->require ("AnyEvent::DBI::Slave"); # preload AnyEvent::DBI::Slave module
142
143 for (...) {
144 $dbh = new AnyEvent::DBI ...
145 fork_template => $template;
146
147=item timeout => seconds
148
149If you supply a timeout parameter (fractional values are supported), then
150a timer is started any time the DBI handle expects a response from the
151server. This includes connection setup as well as requests made to the
152backend. The timeout spans the duration from the moment the first data
153is written (or queued to be written) until all expected responses are
154returned, but is postponed for "timeout" seconds each time more data is
155returned from the server. If the timer ever goes off then a fatal error is
156generated. If you have an C<on_error> handler installed, then it will be
157called, otherwise your program will die().
158
159When altering your databases with timeouts it is wise to use
160transactions. If you quit due to timeout while performing insert, update
161or schema-altering commands you can end up not knowing if the action was
162submitted to the database, complicating recovery.
163
164Timeout errors are always fatal.
165
154=back 166=back
155 167
168Any additional key-value pairs will be rolled into a hash reference
169and passed as the final argument to the C<< DBI->connect (...) >>
170call. For example, to suppress errors on STDERR and send them instead to an
171AnyEvent::Handle you could do:
172
173 $dbh = new AnyEvent::DBI
174 "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "",
175 PrintError => 0,
176 on_error => sub {
177 $log_handle->push_write ("DBI Error: $@ at $_[1]:$_[2]\n");
178 };
179
156=cut 180=cut
157
158# stupid Storable autoloading, total loss-loss situation
159Storable::thaw Storable::freeze [];
160 181
161sub new { 182sub new {
162 my ($class, $dbi, $user, $pass, %arg) = @_; 183 my ($class, $dbi, $user, $pass, %arg) = @_;
163 184
164 socketpair my $client, my $server, &Socket::AF_UNIX, &Socket::SOCK_STREAM, &Socket::PF_UNSPEC 185 # we use our own socketpair, so we always have a socket
186 # available, even before the forked process exsist.
187 # this is mostly done so this module is compatible
188 # to versions of itself older than 3.0.
189 my ($client, $server) = AnyEvent::Util::portable_socketpair
165 or croak "unable to create dbi communicaiton pipe: $!"; 190 or croak "unable to create AnyEvent::DBI communications pipe: $!";
191
192 AnyEvent::fh_unblock $client;
193
194 my $fork = delete $arg{fork_template};
195
196 my %dbi_args = %arg;
197 delete @dbi_args{qw(on_connect on_error timeout fork_template exec_server)};
166 198
167 my $self = bless \%arg, $class; 199 my $self = bless \%arg, $class;
168 200
169 $self->{fh} = $client; 201 $self->{fh} = $client;
170
171 Scalar::Util::weaken (my $wself = $self);
172
173 AnyEvent::Util::fh_nonblocking $client, 1;
174 202
175 my $rbuf; 203 my $rbuf;
176 my @caller = (caller)[1,2]; # the "default" caller 204 my @caller = (caller)[1,2]; # the "default" caller
177 205
178 $self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub { 206 $fork = $fork ? $fork->fork : AnyEvent::Fork->new
179 my $len = sysread $client, $rbuf, 65536, length $rbuf; 207 or croak "fork: $!";
180 208
209 $fork->require ("AnyEvent::DBI::Slave");
210 $fork->send_arg ($VERSION);
211 $fork->send_fh ($server);
212
213 # we don't rely on the callback, because we use our own
214 # socketpair, for better or worse.
215 $fork->run ("AnyEvent::DBI::Slave::serve", sub { });
216
217 {
218 Convert::Scalar::weaken (my $self = $self);
219
220 my $cbor = new CBOR::XS;
221
222 $self->{rw} = AE::io $client, 0, sub {
223 my $len = Convert::Scalar::extend_read $client, $rbuf, 65536;
224
181 if ($len > 0) { 225 if ($len > 0) {
226 # we received data, so reset the timer
227 $self->{last_activity} = AE::now;
182 228
183 while () { 229 for my $res ($cbor->incr_parse_multiple ($rbuf)) {
184 my $len = unpack "L", $rbuf; 230 last unless $self;
185 231
186 # full request available?
187 last unless $len && $len + 4 <= length $rbuf;
188
189 my $res = Storable::thaw substr $rbuf, 4;
190 substr $rbuf, 0, $len + 4, ""; # remove length + request
191
192 my $req = shift @{ $wself->{queue} }; 232 my $req = shift @{ $self->{queue} };
193 233
194 if (defined $res->[0]) { 234 if (defined $res->[0]) {
235 $res->[0] = $self;
195 $req->[0](@$res); 236 $req->[0](@$res);
237 } else {
238 my $cb = shift @$req;
239 local $@ = $res->[1];
240 $cb->($self);
241 $self->_error ($res->[1], @$req, $res->[2]) # error, request record, is_fatal
242 if $self; # cb() could have deleted it
243 }
244
245 # no more queued requests, so become idle
246 if ($self && !@{ $self->{queue} }) {
247 undef $self->{last_activity};
248 $self->{tw_cb}->();
249 }
250 }
251
252 } elsif (defined $len) {
253 # todo, caller?
254 $self->_error ("unexpected eof", @caller, 1);
255 } elsif ($! != Errno::EAGAIN) {
256 # todo, caller?
257 $self->_error ("read error: $!", @caller, 1);
258 }
259 };
260
261 $self->{tw_cb} = sub {
262 if ($self->{timeout} && $self->{last_activity}) {
263 if (AE::now > $self->{last_activity} + $self->{timeout}) {
264 # we did time out
265 my $req = $self->{queue}[0];
266 $self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal
196 } else { 267 } else {
197 my $cb = shift @$req; 268 # we need to re-set the timeout watcher
198 $wself->_error ($res->[1], @$req); 269 $self->{tw} = AE::timer
270 $self->{last_activity} + $self->{timeout} - AE::now,
271 0,
272 $self->{tw_cb},
199 $cb->(); 273 ;
200 } 274 }
275 } else {
276 # no timeout check wanted, or idle
277 undef $self->{tw};
201 } 278 }
202
203 } elsif (defined $len) {
204 $wself->_error ("unexpected eof", @caller, 1);
205 } else {
206 $wself->_error ("read error: $!", @caller, 1);
207 } 279 };
208 });
209 280
210 $self->{ww_cb} = sub { 281 $self->{ww_cb} = sub {
282 $self->{last_activity} = AE::now;
283
211 my $len = syswrite $client, $wself->{wbuf} 284 my $len = syswrite $client, $self->{wbuf}
212 or return delete $wself->{ww}; 285 or return delete $self->{ww};
213 286
214 substr $wself->{wbuf}, 0, $len, ""; 287 substr $self->{wbuf}, 0, $len, "";
288 };
215 }; 289 }
216 290
217 my $pid = fork; 291 $self->_req (
292 sub {
293 return unless $self;
294 $self->{child_pid} = $_[1];
295 },
296 (caller)[1,2],
297 "req_pid"
298 );
218 299
219 if ($pid) { 300 $self->_req (
220 # parent 301 sub {
221 close $server; 302 return unless $self;
222 303 &{ $self->{on_connect} } if $self->{on_connect};
223 } elsif (defined $pid) { 304 },
224 # child 305 (caller)[1,2],
225 close $client; 306 req_open => $dbi, $user, $pass, %dbi_args
226 @_ = $server; 307 );
227 goto &serve;
228
229 } else {
230 croak "fork: $!";
231 }
232
233 $self->_req (sub { }, (caller)[1,2], 1, req_open => $dbi, $user, $pass);
234 308
235 $self 309 $self
310}
311
312sub _server_pid {
313 shift->{child_pid}
314}
315
316sub kill_child {
317 my $self = shift;
318
319 if (my $pid = delete $self->{child_pid}) {
320 # kill and reap process
321 my $kid_watcher; $kid_watcher = AE::child $pid, sub {
322 undef $kid_watcher;
323 };
324 kill TERM => $pid;
325 }
326
327 delete $self->{rw};
328 delete $self->{ww};
329 delete $self->{tw};
330 close delete $self->{fh};
331}
332
333sub DESTROY {
334 shift->kill_child;
236} 335}
237 336
238sub _error { 337sub _error {
239 my ($self, $error, $filename, $line, $fatal) = @_; 338 my ($self, $error, $filename, $line, $fatal) = @_;
240 339
340 if ($fatal) {
341 delete $self->{tw};
241 delete $self->{rw}; 342 delete $self->{rw};
242 delete $self->{ww}; 343 delete $self->{ww};
243 delete $self->{fh}; 344 delete $self->{fh};
244 345
346 # for fatal errors call all enqueued callbacks with error
347 while (my $req = shift @{$self->{queue}}) {
348 local $@ = $error;
349 $req->[0]->($self);
350 }
351 $self->kill_child;
352 }
353
245 $@ = $error; 354 local $@ = $error;
246 355
356 if ($self->{on_error}) {
247 $self->{on_error}($self, $filename, $line, $fatal) 357 $self->{on_error}($self, $filename, $line, $fatal)
248 if $self->{on_error}; 358 } else {
249
250 die "$error at $filename, line $line\n"; 359 die "$error at $filename, line $line\n";
360 }
361}
362
363=item $dbh->on_error ($cb->($dbh, $filename, $line, $fatal))
364
365Sets (or clears, with C<undef>) the C<on_error> handler.
366
367=cut
368
369sub on_error {
370 $_[0]{on_error} = $_[1];
371}
372
373=item $dbh->timeout ($seconds)
374
375Sets (or clears, with C<undef>) the database timeout. Useful to extend the
376timeout when you are about to make a really long query.
377
378=cut
379
380sub timeout {
381 my ($self, $timeout) = @_;
382
383 $self->{timeout} = $timeout;
384
385 # reschedule timer if one was running
386 $self->{tw_cb}->();
251} 387}
252 388
253sub _req { 389sub _req {
254 my ($self, $cb, $filename, $line, $fatal) = splice @_, 0, 5, (); 390 my ($self, $cb, $filename, $line) = splice @_, 0, 4, ();
255 391
392 unless ($self->{fh}) {
393 local $@ = my $err = 'no database connection';
394 $cb->($self);
395 $self->_error ($err, $filename, $line, 1);
396 return;
397 }
398
256 push @{ $self->{queue} }, [$cb, $filename, $line, $fatal]; 399 push @{ $self->{queue} }, [$cb, $filename, $line];
257 400
258 $self->{wbuf} .= pack "L/a*", Storable::freeze \@_; 401 # re-start timeout if necessary
402 if ($self->{timeout} && !$self->{tw}) {
403 $self->{last_activity} = AE::now;
404 $self->{tw_cb}->();
405 }
406
407 $self->{wbuf} .= CBOR::XS::encode_cbor \@_;
259 408
260 unless ($self->{ww}) { 409 unless ($self->{ww}) {
261 my $len = syswrite $self->{fh}, $self->{wbuf}; 410 my $len = syswrite $self->{fh}, $self->{wbuf};
262 substr $self->{wbuf}, 0, $len, ""; 411 substr $self->{wbuf}, 0, $len, "";
263 412
264 # still any left? then install a write watcher 413 # still any left? then install a write watcher
265 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb}) 414 $self->{ww} = AE::io $self->{fh}, 1, $self->{ww_cb}
266 if length $self->{wbuf}; 415 if length $self->{wbuf};
267 } 416 }
268} 417}
269 418
419=item $dbh->attr ($attr_name[, $attr_value], $cb->($dbh, $new_value))
420
421An accessor for the database handle attributes, such as C<AutoCommit>,
422C<RaiseError>, C<PrintError> and so on. If you provide an C<$attr_value>
423(which might be C<undef>), then the given attribute will be set to that
424value.
425
426The callback will be passed the database handle and the attribute's value
427if successful.
428
429If an error occurs and the C<on_error> callback returns, then only C<$dbh>
430will be passed and C<$@> contains the error message.
431
270=item $dbh->exec ("statement", @args, $cb->($rows, %extra)) 432=item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, $rv))
271 433
272Executes the given SQL statement with placeholders replaced by 434Executes the given SQL statement with placeholders replaced by
273C<@args>. The statement will be prepared and cached on the server side, so 435C<@args>. The statement will be prepared and cached on the server side, so
274using placeholders is compulsory. 436using placeholders is extremely important.
275 437
276The callback will be called with the result of C<fetchall_arrayref> as 438The callback will be called with a weakened AnyEvent::DBI object as the
277first argument and possibly a hash reference with additional information. 439first argument and the result of C<fetchall_arrayref> as (or C<undef>
440if the statement wasn't a select statement) as the second argument.
278 441
442Third argument is the return value from the C<< DBI->execute >> method
443call.
444
279If an error occurs and the C<on_error> callback returns, then no arguments 445If an error occurs and the C<on_error> callback returns, then only C<$dbh>
280will be passed and C<$@> contains the error message. 446will be passed and C<$@> contains the error message.
281 447
448=item $dbh->stattr ($attr_name, $cb->($dbh, $value))
449
450An accessor for the statement attributes of the most recently executed
451statement, such as C<NAME> or C<TYPE>.
452
453The callback will be passed the database handle and the attribute's value
454if successful.
455
456If an error occurs and the C<on_error> callback returns, then only C<$dbh>
457will be passed and C<$@> contains the error message.
458
459=item $dbh->begin_work ($cb->($dbh[, $rc]))
460
461=item $dbh->commit ($cb->($dbh[, $rc]))
462
463=item $dbh->rollback ($cb->($dbh[, $rc]))
464
465The begin_work, commit, and rollback methods expose the equivalent
466transaction control method of the DBI driver. On success, C<$rc> is true.
467
468If an error occurs and the C<on_error> callback returns, then only C<$dbh>
469will be passed and C<$@> contains the error message.
470
471=item $dbh->func ('string_which_yields_args_when_evaled', $func_name, $cb->($dbh, $rc, $dbi_err, $dbi_errstr))
472
473This gives access to database driver private methods. Because they
474are not standard you cannot always depend on the value of C<$rc> or
475C<$dbi_err>. Check the documentation for your specific driver/function
476combination to see what it returns.
477
478Note that the first argument will be eval'ed to produce the argument list to
479the func() method. This must be done because the serialization protocol
480between the AnyEvent::DBI server process and your program does not support the
481passage of closures.
482
483Here's an example to extend the query language in SQLite so it supports an
484intstr() function:
485
486 $cv = AnyEvent->condvar;
487 $dbh->func (
488 q{
489 instr => 2, sub {
490 my ($string, $search) = @_;
491 return index $string, $search;
492 },
493 },
494 create_function => sub {
495 return $cv->send ($@)
496 unless $#_;
497 $cv->send (undef, @_[1,2,3]);
498 }
499 );
500
501 my ($err,$rc,$errcode,$errstr) = $cv->recv;
502
503 die $err if defined $err;
504 die "EVAL failed: $errstr"
505 if $errcode;
506
507 # otherwise, we can ignore $rc and $errcode for this particular func
508
282=cut 509=cut
283 510
284sub exec { 511for my $cmd_name (qw(attr exec stattr begin_work commit rollback func)) {
512 eval 'sub ' . $cmd_name . '{
285 my $cb = pop; 513 my $cb = pop;
286 splice @_, 1, 0, $cb, (caller)[1,2], 0, "req_exec"; 514 splice @_, 1, 0, $cb, (caller)[1,2], "req_' . $cmd_name . '";
287 515 &_req
288 goto &_req; 516 }';
289} 517}
290 518
291=back 519=back
292 520
293=head1 SEE ALSO 521=head1 SEE ALSO
294 522
295L<AnyEvent>, L<DBI>. 523L<AnyEvent>, L<DBI>, L<Coro::Mysql>.
296 524
297=head1 AUTHOR 525=head1 AUTHOR AND CONTACT
298 526
299 Marc Lehmann <schmorp@schmorp.de> 527 Marc Lehmann <schmorp@schmorp.de> (current maintainer)
300 http://home.schmorp.de/ 528 http://home.schmorp.de/
301 529
530 Adam Rosenstein <adam@redcondor.com>
531 http://www.redcondor.com/
532
302=cut 533=cut
303 534
3041 5351
305

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines