ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
(Generate patch)

Comparing AnyEvent-DBI/DBI.pm (file contents):
Revision 1.10 by root, Tue Jun 2 16:16:03 2009 UTC vs.
Revision 1.18 by root, Sun Aug 27 09:54:25 2017 UTC

9 my $cv = AnyEvent->condvar; 9 my $cv = AnyEvent->condvar;
10 10
11 my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", ""; 11 my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", "";
12 12
13 $dbh->exec ("select * from test where num=?", 10, sub { 13 $dbh->exec ("select * from test where num=?", 10, sub {
14 my ($rows, $rv) = @_; 14 my ($dbh, $rows, $rv) = @_;
15
16 $#_ or die "failure: $@";
15 17
16 print "@$_\n" 18 print "@$_\n"
17 for @$rows; 19 for @$rows;
18 20
19 $cv->broadcast; 21 $cv->broadcast;
31This module implements asynchronous DBI access by forking or executing 33This module implements asynchronous DBI access by forking or executing
32separate "DBI-Server" processes and sending them requests. 34separate "DBI-Server" processes and sending them requests.
33 35
34It means that you can run DBI requests in parallel to other tasks. 36It means that you can run DBI requests in parallel to other tasks.
35 37
36The overhead for very simple statements ("select 0") is somewhere 38With DBD::mysql, the overhead for very simple statements
37around 120% to 200% (dual/single core CPU) compared to an explicit 39("select 0") is somewhere around 50% compared to an explicit
38prepare_cached/execute/fetchrow_arrayref/finish combination. 40prepare_cached/execute/fetchrow_arrayref/finish combination. With
41DBD::SQlite3, it's more like a factor of 8 for this trivial statement.
42
43=head2 ERROR HANDLING
44
45This module defines a number of functions that accept a callback
46argument. All callbacks used by this module get their AnyEvent::DBI handle
47object passed as first argument.
48
49If the request was successful, then there will be more arguments,
50otherwise there will only be the C<$dbh> argument and C<$@> contains an
51error message.
52
53A convenient way to check whether an error occurred is to check C<$#_> -
54if that is true, then the function was successful, otherwise there was an
55error.
39 56
40=cut 57=cut
41 58
42package AnyEvent::DBI; 59package AnyEvent::DBI;
43 60
44use strict; 61use common::sense;
45no warnings;
46 62
47use Carp; 63use Carp;
48use Socket (); 64use Convert::Scalar ();
49use Scalar::Util (); 65use AnyEvent::Fork ();
50use Storable (); 66use CBOR::XS ();
51
52use DBI ();
53 67
54use AnyEvent (); 68use AnyEvent ();
55use AnyEvent::Util (); 69use AnyEvent::Util ();
56 70
57use Errno qw(:POSIX); 71use Errno ();
58use Fcntl qw(F_SETFD);
59use POSIX qw(sysconf _SC_OPEN_MAX);
60 72
61our $VERSION = '1.2'; 73our $VERSION = '3.01';
62my $fd_max = 1023; # default
63eval { $fd_max = sysconf _SC_OPEN_MAX - 1; };
64
65# this is the forked server code
66
67our $DBH;
68
69sub req_open {
70 my (undef, $dbi, $user, $pass, %attr) = @{+shift};
71
72 $DBH = DBI->connect ($dbi, $user, $pass, \%attr) or die $DBI::errstr;
73
74 [1]
75}
76
77sub req_exec {
78 my (undef, $st, @args) = @{+shift};
79 my $sth = $DBH->prepare_cached ($st, undef, 1)
80 or die [$DBI::errstr];
81
82 my $rv = $sth->execute (@args)
83 or die [$sth->errstr];
84
85 [1, $sth->{NUM_OF_FIELDS} ? $sth->fetchall_arrayref : undef, { rv => $rv }]
86}
87
88sub req_attr {
89 my (undef, $attr_name, $attr_val) = @{+shift};
90
91 if (defined $attr_val) {
92 $DBH->{$attr_name} = $attr_val;
93 }
94
95 [1, $DBH->{$attr_name}]
96}
97
98sub req_begin_work {
99 [scalar $DBH->begin_work or die $DBI::errstr]
100}
101
102sub req_commit {
103 [scalar $DBH->commit or die $DBI::errstr]
104}
105
106sub req_rollback {
107 [scalar $DBH->rollback or die $DBI::errstr]
108}
109
110sub req_func {
111 my (undef, $arg_string, $function) = @{+shift};
112 my @args = eval $arg_string;
113
114 if ($@) {
115 die "Bad func() arg string: $@";
116 }
117
118 my $rv = $DBH->func (@args, $function);
119 return [$rv . $DBH->err];
120}
121
122sub serve {
123 my ($fileno) = @_;
124
125 open my $fh, ">>&=$fileno"
126 or die "Couldn't open service socket: $!";
127
128 no strict;
129
130 eval {
131 my $rbuf;
132
133 while () {
134 sysread $fh, $rbuf, 16384, length $rbuf
135 or last;
136
137 while () {
138 my $len = unpack "L", $rbuf;
139
140 # full request available?
141 last unless $len && $len + 4 <= length $rbuf;
142
143 my $req = Storable::thaw substr $rbuf, 4;
144 substr $rbuf, 0, $len + 4, ""; # remove length + request
145
146 my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) };
147 $wbuf = pack "L/a*", Storable::freeze [undef, ref $@ ? "$@->[0]" : $@ , ref $@ ? $@->[1] : 1]
148 if $@;
149
150 for (my $ofs = 0; $ofs < length $wbuf; ) {
151 $ofs += (syswrite $fh, substr $wbuf, $ofs
152 or die "unable to write results");
153 }
154 }
155 }
156 };
157
158 if (AnyEvent::WIN32) {
159 kill 9, $$; # no other way on the broken windows platform
160 # and the above doesn't even work on windows, it seems the only
161 # way to is to leak memory and kill 9 from the parent. yay.
162 }
163
164 require POSIX;
165 POSIX::_exit 0;
166 # and the above kills the parent process on windows
167}
168
169sub start_server {
170 serve shift @ARGV;
171}
172 74
173=head2 METHODS 75=head2 METHODS
174 76
175=over 4 77=over 4
176 78
197 99
198When an error occurs, then this callback will be invoked. On entry, C<$@> 100When an error occurs, then this callback will be invoked. On entry, C<$@>
199is set to the error message. C<$filename> and C<$line> is where the 101is set to the error message. C<$filename> and C<$line> is where the
200original request was submitted. 102original request was submitted.
201 103
202If the fatal argument is true then the database connection shuts down and your 104If the fatal argument is true then the database connection is shut down
203database handle becomes invalid. All of your queued request callbacks are 105and your database handle became invalid. In addition to invoking the
204called without any arguments. 106C<on_error> callback, all of your queued request callbacks are called
107without only the C<$dbh> argument.
205 108
206If omitted, then C<die> will be called on any errors, fatal or not. 109If omitted, then C<die> will be called on any errors, fatal or not.
207 110
208The C<$dbh> argument is always a weak reference to the AnyEvent::DBI object.
209
210=item on_connect => $callback->($dbh) 111=item on_connect => $callback->($dbh[, $success])
211 112
212If you supply an on_connect callback, then this callback will be invoked after 113If you supply an C<on_connect> callback, then this callback will be
213the database connection is attempted. If the connection succeeds, C<$dbh> 114invoked after the database connect attempt. If the connection succeeds,
214contains a weak reference to the AnyEvent::DBI object. If the connection fails 115C<$success> is true, otherwise it is missing and C<$@> contains the
215for any reason, no arguments are passed to the callback and C<$@> contains
216$DBI::errstr. 116C<$DBI::errstr>.
217 117
218Regardless of whether on_connect is supplied, connect errors will result in 118Regardless of whether C<on_connect> is supplied, connect errors will result in
219on_error being called. However, if no on_connect callback is supplied, then 119C<on_error> being called. However, if no C<on_connect> callback is supplied, then
220connection errors are considered fatal. The client will die() and the on_error 120connection errors are considered fatal. The client will C<die> and the C<on_error>
221callback will be called with C<$fatal> true. When on_connect is supplied, 121callback will be called with C<$fatal> true.
222connect error are not fatal and AnyEvent::DBI will not die(). You still
223cannot, however, use the $dbh object you recived from new() to make requests.
224 122
225=item exec_server => 1 123When on_connect is supplied, connect error are not fatal and AnyEvent::DBI
124will not C<die>. You still cannot, however, use the $dbh object you
125received from C<new> to make requests.
226 126
227If you supply an exec_server argument, then the DBI server process will call 127=item fork_template => $AnyEvent::Fork-object
228something like:
229 128
230 exec "$^X -MAnyEvent::DBI -e AnyEvent::DBI::start_server" 129C<AnyEvent::DBI> uses C<< AnyEvent::Fork->new >> to create the database
130slave, which in turn either C<exec>'s a new process (similar to the old
131C<exec_server> constructor argument) or uses a process forked early (see
132L<AnyEvent::Fork::Early>).
231 133
232after forking. This will provide the cleanest possible interpreter for your 134With this argument you can provide your own fork template. This can be
233database server. There are special provisions to include C<-Mblib> if the 135useful if you create a lot of C<AnyEvent::DBI> handles and want to save
234current interpreter is running with blib. 136memory (And speed up startup) by not having to load C<AnyEvent::DBI> again
137and again into your child processes:
235 138
236If you do not supply the exec_server argument (or supply it with a false value) 139 my $template = AnyEvent::Fork
237then the traditional method of starting the server within the same forked 140 ->new # create new template
238interpreter context is used. The forked interpreter will try to clean itself 141 ->require ("AnyEvent::DBI::Slave"); # preload AnyEvent::DBI::Slave module
239up by calling POSIX::close on all filedescriptors except STDIN, STDOUT, and 142
240STDERR (and the socket it uses to communicate with the cilent, of course). 143 for (...) {
144 $dbh = new AnyEvent::DBI ...
145 fork_template => $template;
241 146
242=item timeout => seconds 147=item timeout => seconds
243 148
244If you supply a timeout parameter (floating point number of seconds), then a 149If you supply a timeout parameter (fractional values are supported), then
245timer is started any time the DBI handle expects a response from the server. 150a timer is started any time the DBI handle expects a response from the
246This includes connection setup as well as requests made to the backend. The 151server. This includes connection setup as well as requests made to the
247timeout spans the duration from the moment the first data is written (or queued 152backend. The timeout spans the duration from the moment the first data
248to be written) until all expected responses are returned, but is postponed for 153is written (or queued to be written) until all expected responses are
249"timeout" seconds each time more data is returned from the server. If the 154returned, but is postponed for "timeout" seconds each time more data is
250timer ever goes off then a fatal error is generated. If you have an on_error 155returned from the server. If the timer ever goes off then a fatal error is
251handler installed, then it will be called, otherwise your program will die(). 156generated. If you have an C<on_error> handler installed, then it will be
157called, otherwise your program will die().
252 158
253When altering your databases with timeouts it is wise to use transactions. If 159When altering your databases with timeouts it is wise to use
254you quit due to timeout while performing insert, update or schema-altering 160transactions. If you quit due to timeout while performing insert, update
255commands you can end up not knowing if the action was submitted to the 161or schema-altering commands you can end up not knowing if the action was
256database, complicating recovery. 162submitted to the database, complicating recovery.
257 163
258Timeout errors are always fatal. 164Timeout errors are always fatal.
259 165
260=back 166=back
261 167
262Any additional key-value pairs will be rolled into a hash reference and passed 168Any additional key-value pairs will be rolled into a hash reference
263as the final argument to the DBI->connect(...) call. For example, to supress 169and passed as the final argument to the C<< DBI->connect (...) >>
264errors on STDERR and send them instead to an AnyEvent::Handle you could do: 170call. For example, to suppress errors on STDERR and send them instead to an
171AnyEvent::Handle you could do:
265 172
266 $dbh = new AnyEvent::DBI 173 $dbh = new AnyEvent::DBI
267 "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "", 174 "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "",
268 PrintError => 0, 175 PrintError => 0,
176 on_error => sub {
269 on_error => sub { $log_handle->push_write("DBI Error: $@ at $_[1]:$_[2]\n"); } 177 $log_handle->push_write ("DBI Error: $@ at $_[1]:$_[2]\n");
178 };
270 179
271=cut 180=cut
272
273# stupid Storable autoloading, total loss-loss situation
274Storable::thaw Storable::freeze [];
275 181
276sub new { 182sub new {
277 my ($class, $dbi, $user, $pass, %arg) = @_; 183 my ($class, $dbi, $user, $pass, %arg) = @_;
278 184
279 socketpair my $client, my $server, &Socket::AF_UNIX, &Socket::SOCK_STREAM, &Socket::PF_UNSPEC 185 # we use our own socketpair, so we always have a socket
186 # available, even before the forked process exsist.
187 # this is mostly done so this module is compatible
188 # to versions of itself older than 3.0.
189 my ($client, $server) = AnyEvent::Util::portable_socketpair
280 or croak "unable to create dbi communicaiton pipe: $!"; 190 or croak "unable to create AnyEvent::DBI communications pipe: $!";
281 191
192 my $fork = delete $arg{fork_template};
193
282 my %dbi_args = ( %arg ) ; 194 my %dbi_args = %arg;
283 delete @dbi_args{qw( on_connect on_error timeout exec_server )}; 195 delete @dbi_args{qw(on_connect on_error timeout fork_template exec_server)};
284 196
285 my $self = bless \%arg, $class; 197 my $self = bless \%arg, $class;
198
286 $self->{fh} = $client; 199 $self->{fh} = $client;
287
288 Scalar::Util::weaken (my $wself = $self);
289
290 AnyEvent::Util::fh_nonblocking $client, 1;
291 200
292 my $rbuf; 201 my $rbuf;
293 my @caller = (caller)[1,2]; # the "default" caller 202 my @caller = (caller)[1,2]; # the "default" caller
294 203
295 $self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub { 204 $fork = $fork ? $fork->fork : AnyEvent::Fork->new
296 return unless $wself; 205 or croak "fork: $!";
297 my $len = sysread $client, $rbuf, 65536, length $rbuf;
298 my $err = $!;
299 206
207 $fork->require ("AnyEvent::DBI::Slave");
208 $fork->send_arg ($VERSION);
209 $fork->send_fh ($server);
210
211 # we don't rely on the callback, because we use our own
212 # socketpair, for better or worse.
213 $fork->run ("AnyEvent::DBI::Slave::serve", sub { });
214
215 {
216 Convert::Scalar::weaken (my $self = $self);
217
218 my $cbor = new CBOR::XS;
219
220 $self->{rw} = AE::io $client, 0, sub {
221 my $len = Convert::Scalar::extend_read $client, $rbuf, 65536;
222
300 if ($len > 0) { 223 if ($len > 0) {
301 # we received data, so reset the timer 224 # we received data, so reset the timer
302 delete $wself->{timer}; 225 $self->{last_activity} = AE::now;
303 if ($wself->{timeout}) {
304 $wself->{timer} = AnyEvent->timer (
305 after => $wself->{timeout},
306 cb => sub { $wself && $wself->_timedout },
307 );
308 }
309 226
310 while () { 227 for my $res ($cbor->incr_parse_multiple ($rbuf)) {
311 my $len = unpack "L", $rbuf;
312
313 # full response available?
314 last unless $len && $len + 4 <= length $rbuf;
315
316 my $res = Storable::thaw substr $rbuf, 4;
317 substr $rbuf, 0, $len + 4, ""; # remove length + request
318
319 last unless $wself; 228 last unless $self;
229
320 my $req = shift @{ $wself->{queue} }; 230 my $req = shift @{ $self->{queue} };
321 231
322 if (defined $res->[0]) { 232 if (defined $res->[0]) {
323 $res->[0] = $wself; 233 $res->[0] = $self;
324 $req->[0](@$res); 234 $req->[0](@$res);
325 } else { 235 } else {
326 my $cb = shift @$req; 236 my $cb = shift @$req;
327 $@=$res->[1]; 237 local $@ = $res->[1];
328 $cb->(); 238 $cb->($self);
329 if ($wself) { # cb() could have deleted it
330 $wself->_error ($res->[1], @$req, $res->[2]); # error, request record, is_fatal 239 $self->_error ($res->[1], @$req, $res->[2]) # error, request record, is_fatal
240 if $self; # cb() could have deleted it
241 }
242
243 # no more queued requests, so become idle
244 if ($self && !@{ $self->{queue} }) {
245 undef $self->{last_activity};
246 $self->{tw_cb}->();
331 } 247 }
332 } 248 }
333 249
334 # no more queued requests, so cancel timeout 250 } elsif (defined $len) {
335 if ($wself) { 251 # todo, caller?
336 delete $wself->{timer} 252 $self->_error ("unexpected eof", @caller, 1);
337 unless @{ $wself->{queue} }; 253 } elsif ($! != Errno::EAGAIN) {
254 # todo, caller?
255 $self->_error ("read error: $!", @caller, 1);
256 }
257 };
258
259 $self->{tw_cb} = sub {
260 if ($self->{timeout} && $self->{last_activity}) {
261 if (AE::now > $self->{last_activity} + $self->{timeout}) {
262 # we did time out
263 my $req = $self->{queue}[0];
264 $self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal
265 } else {
266 # we need to re-set the timeout watcher
267 $self->{tw} = AE::timer
268 $self->{last_activity} + $self->{timeout} - AE::now,
269 0,
270 $self->{tw_cb},
271 ;
338 } 272 }
273 } else {
274 # no timeout check wanted, or idle
275 undef $self->{tw};
339 } 276 }
340
341 } elsif (defined $len) {
342 $wself->_error ("unexpected eof", @caller, 1);
343 } else {
344 return if $err == EAGAIN;
345 $wself->_error ("read error: $err", @caller, 1);
346 } 277 };
347 });
348 278
349 $self->{ww_cb} = sub { 279 $self->{ww_cb} = sub {
350 return unless $wself; 280 $self->{last_activity} = AE::now;
281
351 my $len = syswrite $client, $wself->{wbuf} 282 my $len = syswrite $client, $self->{wbuf}
352 or return delete $wself->{ww}; 283 or return delete $self->{ww};
353 284
354 substr $wself->{wbuf}, 0, $len, ""; 285 substr $self->{wbuf}, 0, $len, "";
355 };
356
357 my $pid = fork;
358
359 if ($pid) {
360 # parent
361 close $server;
362 } elsif (defined $pid) {
363 # child
364 my $serv_fno = fileno $server;
365
366 if ($self->{exec_server}) {
367 fcntl $server, F_SETFD, 0; # don't close the server side
368 exec "$^X -MAnyEvent::DBI -e AnyEvent::DBI::start_server $serv_fno";
369 POSIX::_exit 124;
370 } else {
371 ($_ != $serv_fno) && POSIX::close $_
372 for $^F+1..$fd_max;
373 serve $serv_fno;
374 POSIX::_exit 0; # not reachable
375 } 286 };
376 } else {
377 croak "fork: $!";
378 } 287 }
379 288
380 $self->{child_pid} = $pid;
381 # set a connect timeout
382 if ($self->{timeout}) {
383 $self->{timer} = AnyEvent->timer (
384 after => $self->{timeout},
385 cb => sub { $wself && $wself->_timedout },
386 );
387 }
388 $self->_req ( 289 $self->_req (
290 sub {
291 return unless $self;
292 $self->{child_pid} = $_[1];
293 },
294 (caller)[1,2],
295 "req_pid"
296 );
297
298 $self->_req (
299 sub {
300 return unless $self;
389 ($self->{on_connect} ? $self->{on_connect} : sub { }), 301 &{ $self->{on_connect} } if $self->{on_connect};
302 },
390 (caller)[1,2], 303 (caller)[1,2],
391 req_open => $dbi, $user, $pass, %dbi_args 304 req_open => $dbi, $user, $pass, %dbi_args
392 ); 305 );
393 306
394 $self 307 $self
397sub _server_pid { 310sub _server_pid {
398 shift->{child_pid} 311 shift->{child_pid}
399} 312}
400 313
401sub kill_child { 314sub kill_child {
402 my $self = shift; 315 my $self = shift;
316
403 my $child_pid = delete $self->{child_pid}; 317 if (my $pid = delete $self->{child_pid}) {
404 if ($child_pid) {
405 # send SIGKILL in two seconds
406 my $murder_timer = AnyEvent->timer (
407 after => 2,
408 cb => sub {
409 kill 9, $child_pid;
410 },
411 );
412
413 # reap process 318 # kill and reap process
414 my $kid_watcher; 319 my $kid_watcher; $kid_watcher = AE::child $pid, sub {
415 $kid_watcher = AnyEvent->child (
416 pid => $child_pid ,
417 cb => sub {
418 # just hold on to this so it won't go away
419 undef $kid_watcher; 320 undef $kid_watcher;
420 # cancel SIGKILL
421 undef $murder_timer;
422 },
423 ); 321 };
424
425 # SIGTERM = the beginning of the end
426 kill TERM => $child_pid; 322 kill TERM => $pid;
427 } 323 }
324
325 delete $self->{rw};
326 delete $self->{ww};
327 delete $self->{tw};
328 close delete $self->{fh};
428} 329}
429 330
430sub DESTROY { 331sub DESTROY {
431 shift->kill_child; 332 shift->kill_child;
432} 333}
433 334
434sub _error { 335sub _error {
435 my ($self, $error, $filename, $line, $fatal) = @_; 336 my ($self, $error, $filename, $line, $fatal) = @_;
436 337
437 if ($fatal) { 338 if ($fatal) {
339 delete $self->{tw};
438 delete $self->{rw}; 340 delete $self->{rw};
439 delete $self->{ww}; 341 delete $self->{ww};
440 delete $self->{fh}; 342 delete $self->{fh};
441 delete $self->{timer};
442 343
443 # for fatal errors call all enqueued callbacks with error 344 # for fatal errors call all enqueued callbacks with error
444 while (my $req = shift @{$self->{queue}}) { 345 while (my $req = shift @{$self->{queue}}) {
445 $@ = $error; 346 local $@ = $error;
446 $req->[0]->(); 347 $req->[0]->($self);
447 } 348 }
448 $self->kill_child; 349 $self->kill_child;
449 } 350 }
450 351
451 $@ = $error; 352 local $@ = $error;
452 353
453 if ($self->{on_error}) { 354 if ($self->{on_error}) {
454 $self->{on_error}($self, $filename, $line, $fatal) 355 $self->{on_error}($self, $filename, $line, $fatal)
455 } else { 356 } else {
456 die "$error at $filename, line $line\n"; 357 die "$error at $filename, line $line\n";
457 } 358 }
458} 359}
459 360
460=item $dbh->on_error ($cb->($dbh, $filename, $line, $fatal)) 361=item $dbh->on_error ($cb->($dbh, $filename, $line, $fatal))
461 362
462Sets (or clears, with C<undef>) the on_error handler. 363Sets (or clears, with C<undef>) the C<on_error> handler.
463 364
464=cut 365=cut
465 366
466sub on_error { 367sub on_error {
467 $_[0]{on_error} = $_[1]; 368 $_[0]{on_error} = $_[1];
468} 369}
469 370
470=item $dbh->on_connect ($cb->($dbh))
471
472Sets (or clears, with C<undef>) the on_connect handler.
473
474=cut
475
476sub on_connect {
477 $_[0]{on_connect} = $_[1];
478}
479
480=item $dbh->timeout ($seconds) 371=item $dbh->timeout ($seconds)
481 372
482Sets (or clears, with C<undef>) the database timeout. Useful to extend the 373Sets (or clears, with C<undef>) the database timeout. Useful to extend the
483timeout when you are about to make a really long query. 374timeout when you are about to make a really long query.
484 375
485=cut 376=cut
486 377
487sub timeout { 378sub timeout {
488 my ($self, $timeout) = @_; 379 my ($self, $timeout) = @_;
489 380
490 if ($timeout) {
491 $self->{timeout} = $timeout; 381 $self->{timeout} = $timeout;
382
492 # reschedule timer if one was running 383 # reschedule timer if one was running
493 if ($self->{timer}) { 384 $self->{tw_cb}->();
494 Scalar::Util::weaken (my $wself = $self);
495 $self->{timer} = AnyEvent->timer (
496 after => $self->{timeout},
497 cb => sub { $wself && $wself->_timedout },
498 );
499 }
500 } else {
501 delete @{%$self}[qw(timer timeout)];
502 }
503}
504
505sub _timedout {
506 my ($self) = @_;
507
508 my $req = shift @{ $self->{queue} };
509
510 if ($req) {
511 my $cb = shift @$req;
512 $@ = 'TIMEOUT';
513 $cb->();
514 $self->_error ('TIMEOUT', @$req, 1); # timeouts are always fatal
515 } else {
516 # shouldn't be possible to timeout without a pending request
517 $self->_error ('TIMEOUT', 'NO_PENDING_WTF', 0, 1);
518 }
519} 385}
520 386
521sub _req { 387sub _req {
522 my ($self, $cb, $filename, $line) = splice @_, 0, 4, (); 388 my ($self, $cb, $filename, $line) = splice @_, 0, 4, ();
523 389
524 if (!$self->{fh}) { 390 unless ($self->{fh}) {
525 my $err = $@ = 'NO DATABASE CONNECTION'; 391 local $@ = my $err = 'no database connection';
526 $cb->(); 392 $cb->($self);
527 $self->_error ($err, $filename, $line, 1); 393 $self->_error ($err, $filename, $line, 1);
528 return; 394 return;
529 } 395 }
530 396
531 push @{ $self->{queue} }, [$cb, $filename, $line ]; 397 push @{ $self->{queue} }, [$cb, $filename, $line];
532 398
399 # re-start timeout if necessary
533 if ($self->{timeout} && !$self->{timer}) { 400 if ($self->{timeout} && !$self->{tw}) {
534 Scalar::Util::weaken (my $wself = $self); 401 $self->{last_activity} = AE::now;
535 $self->{timer} = AnyEvent->timer ( 402 $self->{tw_cb}->();
536 after => $self->{timeout},
537 cb => sub { $wself && $wself->_timedout },
538 );
539 } 403 }
540 404
541 $self->{wbuf} .= pack "L/a*", Storable::freeze \@_; 405 $self->{wbuf} .= CBOR::XS::encode_cbor \@_;
542 406
543 unless ($self->{ww}) { 407 unless ($self->{ww}) {
544 my $len = syswrite $self->{fh}, $self->{wbuf}; 408 my $len = syswrite $self->{fh}, $self->{wbuf};
545 substr $self->{wbuf}, 0, $len, ""; 409 substr $self->{wbuf}, 0, $len, "";
546 410
547 # still any left? then install a write watcher 411 # still any left? then install a write watcher
548 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb}) 412 $self->{ww} = AE::io $self->{fh}, 1, $self->{ww_cb}
549 if length $self->{wbuf}; 413 if length $self->{wbuf};
550 } 414 }
551} 415}
552 416
417=item $dbh->attr ($attr_name[, $attr_value], $cb->($dbh, $new_value))
418
419An accessor for the database handle attributes, such as C<AutoCommit>,
420C<RaiseError>, C<PrintError> and so on. If you provide an C<$attr_value>
421(which might be C<undef>), then the given attribute will be set to that
422value.
423
424The callback will be passed the database handle and the attribute's value
425if successful.
426
427If an error occurs and the C<on_error> callback returns, then only C<$dbh>
428will be passed and C<$@> contains the error message.
429
553=item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, \%metadata)) 430=item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, $rv))
554 431
555Executes the given SQL statement with placeholders replaced by 432Executes the given SQL statement with placeholders replaced by
556C<@args>. The statement will be prepared and cached on the server side, so 433C<@args>. The statement will be prepared and cached on the server side, so
557using placeholders is compulsory. 434using placeholders is extremely important.
558 435
559The callback will be called with a weakened AnyEvent::DBI object as the first 436The callback will be called with a weakened AnyEvent::DBI object as the
560argument and the result of C<fetchall_arrayref> as (or C<undef> if the 437first argument and the result of C<fetchall_arrayref> as (or C<undef>
561statement wasn't a select statement) as the second argument. Third argument is 438if the statement wasn't a select statement) as the second argument.
562a hash reference holding metadata about the request. Currently, the only key
563defined is C<$metadata->{rv}> holding the return value of
564C<execute>. Additional metadata might be added.
565 439
440Third argument is the return value from the C<< DBI->execute >> method
441call.
442
566If an error occurs and the C<on_error> callback returns, then no arguments 443If an error occurs and the C<on_error> callback returns, then only C<$dbh>
567will be passed and C<$@> contains the error message. 444will be passed and C<$@> contains the error message.
568 445
569=item $dbh->attr (attr_name, [ $attr_value ], $cb->($dbh, $new_value)) 446=item $dbh->stattr ($attr_name, $cb->($dbh, $value))
570 447
571An accessor for the handle attributes, such as AutoCommit, RaiseError, 448An accessor for the statement attributes of the most recently executed
572PrintError, etc. If you provide an $attr_value, then the given attribute will 449statement, such as C<NAME> or C<TYPE>.
573be set to that value.
574 450
575The callback will be passed the database handle and the 451The callback will be passed the database handle and the attribute's value
576attribute's value if successful. If accessing the attribute fails, then no 452if successful.
577arguments are passed to your callback, and $@ contains a description of the
578problem instead.
579 453
454If an error occurs and the C<on_error> callback returns, then only C<$dbh>
455will be passed and C<$@> contains the error message.
456
580=item $dbh->begin_work ($cb->($dbh)) 457=item $dbh->begin_work ($cb->($dbh[, $rc]))
581 458
582=item $dbh->commit ($cb->($dbh)) 459=item $dbh->commit ($cb->($dbh[, $rc]))
583 460
584=item $dbh->rollback ($cb->($dbh)) 461=item $dbh->rollback ($cb->($dbh[, $rc]))
585 462
586The begin_work, commit, and rollback methods exopose the equivelant transaction 463The begin_work, commit, and rollback methods expose the equivalent
587control methods of the DBI. If something goes wrong, you will get no $dbh in 464transaction control method of the DBI driver. On success, C<$rc> is true.
588your callaback, and will instead have an error to examine in $@.
589 465
466If an error occurs and the C<on_error> callback returns, then only C<$dbh>
467will be passed and C<$@> contains the error message.
468
590=item $dbh->func ('string_which_yields_args_when_evaled', $func_name, $cb->($dbh, $result, $handle_error)) 469=item $dbh->func ('string_which_yields_args_when_evaled', $func_name, $cb->($dbh, $rc, $dbi_err, $dbi_errstr))
591 470
592This gives access to database driver private methods. Because they are not 471This gives access to database driver private methods. Because they
593standard you cannot always depend on the value of $result or $handle_error. 472are not standard you cannot always depend on the value of C<$rc> or
594Check the documentation for your specific driver/function combination to see 473C<$dbi_err>. Check the documentation for your specific driver/function
595what it returns. 474combination to see what it returns.
596 475
597Note that the first argument will be eval'ed to produce the argument list to 476Note that the first argument will be eval'ed to produce the argument list to
598the func() method. This must be done because the searialization protocol 477the func() method. This must be done because the serialization protocol
599between the AnyEvent::DBI server process and your program does not support the 478between the AnyEvent::DBI server process and your program does not support the
600passage of closures. 479passage of closures.
601 480
602Here's an example to extend the query language in SQLite so it supports an 481Here's an example to extend the query language in SQLite so it supports an
603intstr() function: 482intstr() function:
604 483
605 $cv = AnyEvent->condvar; 484 $cv = AnyEvent->condvar;
606 $dbh->func( 485 $dbh->func (
607 q{ 486 q{
608 'instr', 487 instr => 2, sub {
609 2,
610 sub {
611 my ($string, $search) = @_; 488 my ($string, $search) = @_;
612 return index $string, $search; 489 return index $string, $search;
613 }, 490 },
614 }, 491 },
615 'create_function', 492 create_function => sub {
616 sub {return $cv->send($@) unless $_[0];$cv->send(undef,@_[1,2]);} 493 return $cv->send ($@)
494 unless $#_;
495 $cv->send (undef, @_[1,2,3]);
496 }
617 ); 497 );
618 my ($err,$result,$handle_err) = $cv->recv(); 498
499 my ($err,$rc,$errcode,$errstr) = $cv->recv;
500
501 die $err if defined $err;
619 die "EVAL failed: $err" if $err; 502 die "EVAL failed: $errstr"
503 if $errcode;
504
620 # otherwise, we can ignore $result and $handle_err for this particular func 505 # otherwise, we can ignore $rc and $errcode for this particular func
621 506
622=cut 507=cut
623 508
624for my $cmd_name (qw(exec attr begin_work commit rollback func)) { 509for my $cmd_name (qw(attr exec stattr begin_work commit rollback func)) {
625 eval 'sub ' . $cmd_name . '{ 510 eval 'sub ' . $cmd_name . '{
626 my $cb = pop; 511 my $cb = pop;
627 splice @_, 1, 0, $cb, (caller)[1,2], "req_' . $cmd_name . '"; 512 splice @_, 1, 0, $cb, (caller)[1,2], "req_' . $cmd_name . '";
628 goto &_req; 513 &_req
629 }'; 514 }';
630} 515}
631 516
632=back 517=back
633 518
634=head1 SEE ALSO 519=head1 SEE ALSO
635 520
636L<AnyEvent>, L<DBI>. 521L<AnyEvent>, L<DBI>, L<Coro::Mysql>.
637 522
638=head1 AUTHOR 523=head1 AUTHOR AND CONTACT
639 524
640 Marc Lehmann <schmorp@schmorp.de> 525 Marc Lehmann <schmorp@schmorp.de> (current maintainer)
641 http://home.schmorp.de/ 526 http://home.schmorp.de/
642 527
643 Adam Rosenstein <adam@redcondor.com> 528 Adam Rosenstein <adam@redcondor.com>
644 http://www.redcondor.com/ 529 http://www.redcondor.com/
645 530
646=cut 531=cut
647 532
6481; 5331
649

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines