ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
(Generate patch)

Comparing AnyEvent-DBI/DBI.pm (file contents):
Revision 1.14 by root, Sat Oct 30 20:23:44 2010 UTC vs.
Revision 1.20 by root, Sat Mar 24 23:22:25 2018 UTC

33This module implements asynchronous DBI access by forking or executing 33This module implements asynchronous DBI access by forking or executing
34separate "DBI-Server" processes and sending them requests. 34separate "DBI-Server" processes and sending them requests.
35 35
36It means that you can run DBI requests in parallel to other tasks. 36It means that you can run DBI requests in parallel to other tasks.
37 37
38The overhead for very simple statements ("select 0") is somewhere 38With DBD::mysql, the overhead for very simple statements
39around 100% to 120% (dual/single core CPU) compared to an explicit 39("select 0") is somewhere around 50% compared to an explicit
40prepare_cached/execute/fetchrow_arrayref/finish combination. 40prepare_cached/execute/fetchrow_arrayref/finish combination. With
41DBD::SQlite3, it's more like a factor of 8 for this trivial statement.
41 42
42=head2 ERROR HANDLING 43=head2 ERROR HANDLING
43 44
44This module defines a number of functions that accept a callback 45This module defines a number of functions that accept a callback
45argument. All callbacks used by this module get their AnyEvent::DBI handle 46argument. All callbacks used by this module get their AnyEvent::DBI handle
47 48
48If the request was successful, then there will be more arguments, 49If the request was successful, then there will be more arguments,
49otherwise there will only be the C<$dbh> argument and C<$@> contains an 50otherwise there will only be the C<$dbh> argument and C<$@> contains an
50error message. 51error message.
51 52
52A convinient way to check whether an error occured is to check C<$#_> - 53A convenient way to check whether an error occurred is to check C<$#_> -
53if that is true, then the function was successful, otherwise there was an 54if that is true, then the function was successful, otherwise there was an
54error. 55error.
55 56
56=cut 57=cut
57 58
58package AnyEvent::DBI; 59package AnyEvent::DBI;
59 60
60use common::sense; 61use common::sense;
61 62
62use Carp; 63use Carp;
63use Socket (); 64use Convert::Scalar ();
64use Scalar::Util (); 65use AnyEvent::Fork ();
65use Storable (); 66use CBOR::XS ();
66
67use DBI (); # only needed in child actually - do it before fork & !exec?
68 67
69use AnyEvent (); 68use AnyEvent ();
70use AnyEvent::Util (); 69use AnyEvent::Util ();
71 70
72use Errno (); 71use Errno ();
73use Fcntl ();
74use POSIX ();
75 72
76our $VERSION = '2.1'; 73our $VERSION = '3.02';
77
78our $FD_MAX = eval { POSIX::sysconf (&POSIX::_SC_OPEN_MAX) - 1 } || 1023;
79
80# this is the forked server code, could/should be bundled as it's own file
81
82our $DBH;
83
84sub req_open {
85 my (undef, $dbi, $user, $pass, %attr) = @{+shift};
86
87 $DBH = DBI->connect ($dbi, $user, $pass, \%attr) or die $DBI::errstr;
88
89 [1, 1]
90}
91
92sub req_exec {
93 my (undef, $st, @args) = @{+shift};
94 my $sth = $DBH->prepare_cached ($st, undef, 1)
95 or die [$DBI::errstr];
96
97 my $rv = $sth->execute (@args)
98 or die [$sth->errstr];
99
100 [1, $sth->{NUM_OF_FIELDS} ? $sth->fetchall_arrayref : undef, $rv]
101}
102
103sub req_attr {
104 my (undef, $attr_name, @attr_val) = @{+shift};
105
106 $DBH->{$attr_name} = $attr_val[0]
107 if @attr_val;
108
109 [1, $DBH->{$attr_name}]
110}
111
112sub req_begin_work {
113 [1, $DBH->begin_work or die [$DBI::errstr]]
114}
115
116sub req_commit {
117 [1, $DBH->commit or die [$DBI::errstr]]
118}
119
120sub req_rollback {
121 [1, $DBH->rollback or die [$DBI::errstr]]
122}
123
124sub req_func {
125 my (undef, $arg_string, $function) = @{+shift};
126 my @args = eval $arg_string;
127
128 die "error evaling \$dbh->func() arg_string: $@"
129 if $@;
130
131 my $rc = $DBH->func (@args, $function);
132 return [1, $rc, $DBI::err, $DBI::errstr];
133}
134
135sub serve_fh($$) {
136 my ($fh, $version) = @_;
137
138 if ($VERSION != $version) {
139 syswrite $fh,
140 pack "L/a*",
141 Storable::freeze
142 [undef, "AnyEvent::DBI version mismatch ($VERSION vs. $version)"];
143 return;
144 }
145
146 eval {
147 my $rbuf;
148
149 while () {
150 sysread $fh, $rbuf, 16384, length $rbuf
151 or last;
152
153 while () {
154 my $len = unpack "L", $rbuf;
155
156 # full request available?
157 last unless $len && $len + 4 <= length $rbuf;
158
159 my $req = Storable::thaw substr $rbuf, 4;
160 substr $rbuf, 0, $len + 4, ""; # remove length + request
161
162 my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) };
163 $wbuf = pack "L/a*", Storable::freeze [undef, ref $@ ? ("$@->[0]", $@->[1]) : ("$@", 1)]
164 if $@;
165
166 for (my $ofs = 0; $ofs < length $wbuf; ) {
167 $ofs += (syswrite $fh, substr $wbuf, $ofs
168 or die "unable to write results");
169 }
170 }
171 }
172 };
173}
174
175sub serve_fd($$) {
176 open my $fh, ">>&=$_[0]"
177 or die "Couldn't open server file descriptor: $!";
178
179 serve_fh $fh, $_[1];
180}
181 74
182=head2 METHODS 75=head2 METHODS
183 76
184=over 4 77=over 4
185 78
229 122
230When on_connect is supplied, connect error are not fatal and AnyEvent::DBI 123When on_connect is supplied, connect error are not fatal and AnyEvent::DBI
231will not C<die>. You still cannot, however, use the $dbh object you 124will not C<die>. You still cannot, however, use the $dbh object you
232received from C<new> to make requests. 125received from C<new> to make requests.
233 126
234=item exec_server => 1 127=item fork_template => $AnyEvent::Fork-object
235 128
236If you supply an C<exec_server> argument, then the DBI server process will 129C<AnyEvent::DBI> uses C<< AnyEvent::Fork->new >> to create the database
237fork and exec another perl interpreter (using C<$^X>) with just the 130slave, which in turn either C<exec>'s a new process (similar to the old
238AnyEvent::DBI proxy running. This will provide the cleanest possible proxy 131C<exec_server> constructor argument) or uses a process forked early (see
239for your database server. 132L<AnyEvent::Fork::Early>).
240 133
241If you do not supply the C<exec_server> argument (or supply it with a 134With this argument you can provide your own fork template. This can be
242false value) then the traditional method of starting the server by forking 135useful if you create a lot of C<AnyEvent::DBI> handles and want to save
243the current process is used. The forked interpreter will try to clean 136memory (And speed up startup) by not having to load C<AnyEvent::DBI> again
244itself up by calling POSIX::close on all file descriptors except STDIN, 137and again into your child processes:
245STDOUT, and STDERR (and the socket it uses to communicate with the cilent, 138
246of course). 139 my $template = AnyEvent::Fork
140 ->new # create new template
141 ->require ("AnyEvent::DBI::Slave"); # preload AnyEvent::DBI::Slave module
142
143 for (...) {
144 $dbh = new AnyEvent::DBI ...
145 fork_template => $template;
247 146
248=item timeout => seconds 147=item timeout => seconds
249 148
250If you supply a timeout parameter (fractional values are supported), then 149If you supply a timeout parameter (fractional values are supported), then
251a timer is started any time the DBI handle expects a response from the 150a timer is started any time the DBI handle expects a response from the
266 165
267=back 166=back
268 167
269Any additional key-value pairs will be rolled into a hash reference 168Any additional key-value pairs will be rolled into a hash reference
270and passed as the final argument to the C<< DBI->connect (...) >> 169and passed as the final argument to the C<< DBI->connect (...) >>
271call. For example, to supress errors on STDERR and send them instead to an 170call. For example, to suppress errors on STDERR and send them instead to an
272AnyEvent::Handle you could do: 171AnyEvent::Handle you could do:
273 172
274 $dbh = new AnyEvent::DBI 173 $dbh = new AnyEvent::DBI
275 "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "", 174 "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "",
276 PrintError => 0, 175 PrintError => 0,
278 $log_handle->push_write ("DBI Error: $@ at $_[1]:$_[2]\n"); 177 $log_handle->push_write ("DBI Error: $@ at $_[1]:$_[2]\n");
279 }; 178 };
280 179
281=cut 180=cut
282 181
283# stupid Storable autoloading, total loss-loss situation
284Storable::thaw Storable::freeze [];
285
286sub new { 182sub new {
287 my ($class, $dbi, $user, $pass, %arg) = @_; 183 my ($class, $dbi, $user, $pass, %arg) = @_;
288 184
185 # we use our own socketpair, so we always have a socket
186 # available, even before the forked process exsist.
187 # this is mostly done so this module is compatible
188 # to versions of itself older than 3.0.
289 my ($client, $server) = AnyEvent::Util::portable_socketpair 189 my ($client, $server) = AnyEvent::Util::portable_socketpair
290 or croak "unable to create AnyEvent::DBI communications pipe: $!"; 190 or croak "unable to create AnyEvent::DBI communications pipe: $!";
291 191
192 AnyEvent::fh_unblock $client;
193
194 my $fork = delete $arg{fork_template};
195
292 my %dbi_args = %arg; 196 my %dbi_args = %arg;
293 delete @dbi_args{qw(on_connect on_error timeout exec_server)}; 197 delete @dbi_args{qw(on_connect on_error timeout fork_template exec_server)};
294 198
295 my $self = bless \%arg, $class; 199 my $self = bless \%arg, $class;
200
296 $self->{fh} = $client; 201 $self->{fh} = $client;
297
298 AnyEvent::Util::fh_nonblocking $client, 1;
299 202
300 my $rbuf; 203 my $rbuf;
301 my @caller = (caller)[1,2]; # the "default" caller 204 my @caller = (caller)[1,2]; # the "default" caller
302 205
206 $fork = $fork ? $fork->fork : AnyEvent::Fork->new
207 or croak "fork: $!";
208
209 $fork->require ("AnyEvent::DBI::Slave");
210 $fork->send_arg ($VERSION);
211 $fork->send_fh ($server);
212
213 # we don't rely on the callback, because we use our own
214 # socketpair, for better or worse.
215 $fork->run ("AnyEvent::DBI::Slave::serve", sub { });
216
303 { 217 {
304 Scalar::Util::weaken (my $self = $self); 218 Convert::Scalar::weaken (my $self = $self);
219
220 my $cbor = new CBOR::XS;
305 221
306 $self->{rw} = AE::io $client, 0, sub { 222 $self->{rw} = AE::io $client, 0, sub {
307 return unless $self; 223 my $len = Convert::Scalar::extend_read $client, $rbuf, 65536;
308
309 $self->{last_activity} = AE::now;
310
311 my $len = sysread $client, $rbuf, 65536, length $rbuf;
312 224
313 if ($len > 0) { 225 if ($len > 0) {
314 # we received data, so reset the timer 226 # we received data, so reset the timer
227 $self->{last_activity} = AE::now;
315 228
316 while () { 229 for my $res ($cbor->incr_parse_multiple ($rbuf)) {
317 my $len = unpack "L", $rbuf;
318
319 # full response available?
320 last unless $len && $len + 4 <= length $rbuf;
321
322 my $res = Storable::thaw substr $rbuf, 4;
323 substr $rbuf, 0, $len + 4, ""; # remove length + request
324
325 last unless $self; 230 last unless $self;
231
326 my $req = shift @{ $self->{queue} }; 232 my $req = shift @{ $self->{queue} };
327 233
328 if (defined $res->[0]) { 234 if (defined $res->[0]) {
329 $res->[0] = $self; 235 $res->[0] = $self;
330 $req->[0](@$res); 236 $req->[0](@$res);
335 $self->_error ($res->[1], @$req, $res->[2]) # error, request record, is_fatal 241 $self->_error ($res->[1], @$req, $res->[2]) # error, request record, is_fatal
336 if $self; # cb() could have deleted it 242 if $self; # cb() could have deleted it
337 } 243 }
338 244
339 # no more queued requests, so become idle 245 # no more queued requests, so become idle
246 if ($self && !@{ $self->{queue} }) {
340 undef $self->{last_activity} 247 undef $self->{last_activity};
341 if $self && !@{ $self->{queue} }; 248 $self->{tw_cb}->();
249 }
342 } 250 }
343 251
344 } elsif (defined $len) { 252 } elsif (defined $len) {
345 # todo, caller? 253 # todo, caller?
346 $self->_error ("unexpected eof", @caller, 1); 254 $self->_error ("unexpected eof", @caller, 1);
369 undef $self->{tw}; 277 undef $self->{tw};
370 } 278 }
371 }; 279 };
372 280
373 $self->{ww_cb} = sub { 281 $self->{ww_cb} = sub {
374 return unless $self;
375
376 $self->{last_activity} = AE::now; 282 $self->{last_activity} = AE::now;
377 283
378 my $len = syswrite $client, $self->{wbuf} 284 my $len = syswrite $client, $self->{wbuf}
379 or return delete $self->{ww}; 285 or return delete $self->{ww};
380 286
381 substr $self->{wbuf}, 0, $len, ""; 287 substr $self->{wbuf}, 0, $len, "";
382 }; 288 };
383 } 289 }
384 290
385 my $pid = fork;
386
387 if ($pid) {
388 # parent
389 close $server;
390 } elsif (defined $pid) {
391 # child
392 my $serv_fno = fileno $server;
393
394 if ($self->{exec_server}) {
395 fcntl $server, &Fcntl::F_SETFD, 0; # don't close the server side
396 exec {$^X}
397 "$0 dbi slave",
398 -e => "require shift; AnyEvent::DBI::serve_fd ($serv_fno, $VERSION)",
399 $INC{"AnyEvent/DBI.pm"};
400 POSIX::_exit 124;
401 } else {
402 ($_ != $serv_fno) && POSIX::close $_
403 for $^F+1..$FD_MAX;
404 serve_fh $server, $VERSION;
405
406 # no other way on the broken windows platform, even this leaks
407 # memory and might fail.
408 kill 9, $$
409 if AnyEvent::WIN32;
410
411 # and this kills the parent process on windows
412 POSIX::_exit 0;
413 }
414 } else {
415 croak "fork: $!";
416 }
417
418 $self->{child_pid} = $pid;
419
420 $self->_req ( 291 $self->_req (
292 sub {
293 return unless $self;
294 $self->{child_pid} = $_[1];
295 },
296 (caller)[1,2],
297 "req_pid"
298 );
299
300 $self->_req (
301 sub {
302 return unless $self;
421 ($self->{on_connect} ? $self->{on_connect} : sub { }), 303 &{ $self->{on_connect} } if $self->{on_connect};
304 },
422 (caller)[1,2], 305 (caller)[1,2],
423 req_open => $dbi, $user, $pass, %dbi_args 306 req_open => $dbi, $user, $pass, %dbi_args
424 ); 307 );
425 308
426 $self 309 $self
432 315
433sub kill_child { 316sub kill_child {
434 my $self = shift; 317 my $self = shift;
435 318
436 if (my $pid = delete $self->{child_pid}) { 319 if (my $pid = delete $self->{child_pid}) {
320 # kill and reap process
321 my $kid_watcher; $kid_watcher = AE::child $pid, sub {
322 undef $kid_watcher;
323 };
437 kill TERM => $pid; 324 kill TERM => $pid;
438 } 325 }
326
327 delete $self->{rw};
328 delete $self->{ww};
329 delete $self->{tw};
439 close delete $self->{fh}; 330 close delete $self->{fh};
440} 331}
441 332
442sub DESTROY { 333sub DESTROY {
443 shift->kill_child; 334 shift->kill_child;
511 if ($self->{timeout} && !$self->{tw}) { 402 if ($self->{timeout} && !$self->{tw}) {
512 $self->{last_activity} = AE::now; 403 $self->{last_activity} = AE::now;
513 $self->{tw_cb}->(); 404 $self->{tw_cb}->();
514 } 405 }
515 406
516 $self->{wbuf} .= pack "L/a*", Storable::freeze \@_; 407 $self->{wbuf} .= CBOR::XS::encode_cbor \@_;
517 408
518 unless ($self->{ww}) { 409 unless ($self->{ww}) {
519 my $len = syswrite $self->{fh}, $self->{wbuf}; 410 my $len = syswrite $self->{fh}, $self->{wbuf};
520 substr $self->{wbuf}, 0, $len, ""; 411 substr $self->{wbuf}, 0, $len, "";
521 412
523 $self->{ww} = AE::io $self->{fh}, 1, $self->{ww_cb} 414 $self->{ww} = AE::io $self->{fh}, 1, $self->{ww_cb}
524 if length $self->{wbuf}; 415 if length $self->{wbuf};
525 } 416 }
526} 417}
527 418
419=item $dbh->attr ($attr_name[, $attr_value], $cb->($dbh, $new_value))
420
421An accessor for the database handle attributes, such as C<AutoCommit>,
422C<RaiseError>, C<PrintError> and so on. If you provide an C<$attr_value>
423(which might be C<undef>), then the given attribute will be set to that
424value.
425
426The callback will be passed the database handle and the attribute's value
427if successful.
428
429If an error occurs and the C<on_error> callback returns, then only C<$dbh>
430will be passed and C<$@> contains the error message.
431
528=item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, $rv)) 432=item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, $rv))
529 433
530Executes the given SQL statement with placeholders replaced by 434Executes the given SQL statement with placeholders replaced by
531C<@args>. The statement will be prepared and cached on the server side, so 435C<@args>. The statement will be prepared and cached on the server side, so
532using placeholders is extremely important. 436using placeholders is extremely important.
539call. 443call.
540 444
541If an error occurs and the C<on_error> callback returns, then only C<$dbh> 445If an error occurs and the C<on_error> callback returns, then only C<$dbh>
542will be passed and C<$@> contains the error message. 446will be passed and C<$@> contains the error message.
543 447
544=item $dbh->attr ($attr_name[, $attr_value], $cb->($dbh, $new_value)) 448=item $dbh->stattr ($attr_name, $cb->($dbh, $value))
545 449
546An accessor for the handle attributes, such as C<AutoCommit>, 450An accessor for the statement attributes of the most recently executed
547C<RaiseError>, C<PrintError> and so on. If you provide an C<$attr_value> 451statement, such as C<NAME> or C<TYPE>.
548(which might be C<undef>), then the given attribute will be set to that
549value.
550 452
551The callback will be passed the database handle and the attribute's value 453The callback will be passed the database handle and the attribute's value
552if successful. 454if successful.
553 455
554If an error occurs and the C<on_error> callback returns, then only C<$dbh> 456If an error occurs and the C<on_error> callback returns, then only C<$dbh>
604 506
605 # otherwise, we can ignore $rc and $errcode for this particular func 507 # otherwise, we can ignore $rc and $errcode for this particular func
606 508
607=cut 509=cut
608 510
609for my $cmd_name (qw(exec attr begin_work commit rollback func)) { 511for my $cmd_name (qw(attr exec stattr begin_work commit rollback func)) {
610 eval 'sub ' . $cmd_name . '{ 512 eval 'sub ' . $cmd_name . '{
611 my $cb = pop; 513 my $cb = pop;
612 splice @_, 1, 0, $cb, (caller)[1,2], "req_' . $cmd_name . '"; 514 splice @_, 1, 0, $cb, (caller)[1,2], "req_' . $cmd_name . '";
613 &_req 515 &_req
614 }'; 516 }';
618 520
619=head1 SEE ALSO 521=head1 SEE ALSO
620 522
621L<AnyEvent>, L<DBI>, L<Coro::Mysql>. 523L<AnyEvent>, L<DBI>, L<Coro::Mysql>.
622 524
623=head1 AUTHOR 525=head1 AUTHOR AND CONTACT
624 526
625 Marc Lehmann <schmorp@schmorp.de> 527 Marc Lehmann <schmorp@schmorp.de> (current maintainer)
626 http://home.schmorp.de/ 528 http://home.schmorp.de/
627 529
628 Adam Rosenstein <adam@redcondor.com> 530 Adam Rosenstein <adam@redcondor.com>
629 http://www.redcondor.com/ 531 http://www.redcondor.com/
630 532
631=cut 533=cut
632 534
6331; 5351
634

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines