ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
(Generate patch)

Comparing AnyEvent-DBI/DBI.pm (file contents):
Revision 1.10 by root, Tue Jun 2 16:16:03 2009 UTC vs.
Revision 1.20 by root, Sat Mar 24 23:22:25 2018 UTC

9 my $cv = AnyEvent->condvar; 9 my $cv = AnyEvent->condvar;
10 10
11 my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", ""; 11 my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", "";
12 12
13 $dbh->exec ("select * from test where num=?", 10, sub { 13 $dbh->exec ("select * from test where num=?", 10, sub {
14 my ($rows, $rv) = @_; 14 my ($dbh, $rows, $rv) = @_;
15
16 $#_ or die "failure: $@";
15 17
16 print "@$_\n" 18 print "@$_\n"
17 for @$rows; 19 for @$rows;
18 20
19 $cv->broadcast; 21 $cv->broadcast;
31This module implements asynchronous DBI access by forking or executing 33This module implements asynchronous DBI access by forking or executing
32separate "DBI-Server" processes and sending them requests. 34separate "DBI-Server" processes and sending them requests.
33 35
34It means that you can run DBI requests in parallel to other tasks. 36It means that you can run DBI requests in parallel to other tasks.
35 37
36The overhead for very simple statements ("select 0") is somewhere 38With DBD::mysql, the overhead for very simple statements
37around 120% to 200% (dual/single core CPU) compared to an explicit 39("select 0") is somewhere around 50% compared to an explicit
38prepare_cached/execute/fetchrow_arrayref/finish combination. 40prepare_cached/execute/fetchrow_arrayref/finish combination. With
41DBD::SQlite3, it's more like a factor of 8 for this trivial statement.
42
43=head2 ERROR HANDLING
44
45This module defines a number of functions that accept a callback
46argument. All callbacks used by this module get their AnyEvent::DBI handle
47object passed as first argument.
48
49If the request was successful, then there will be more arguments,
50otherwise there will only be the C<$dbh> argument and C<$@> contains an
51error message.
52
53A convenient way to check whether an error occurred is to check C<$#_> -
54if that is true, then the function was successful, otherwise there was an
55error.
39 56
40=cut 57=cut
41 58
42package AnyEvent::DBI; 59package AnyEvent::DBI;
43 60
44use strict; 61use common::sense;
45no warnings;
46 62
47use Carp; 63use Carp;
48use Socket (); 64use Convert::Scalar ();
49use Scalar::Util (); 65use AnyEvent::Fork ();
50use Storable (); 66use CBOR::XS ();
51
52use DBI ();
53 67
54use AnyEvent (); 68use AnyEvent ();
55use AnyEvent::Util (); 69use AnyEvent::Util ();
56 70
57use Errno qw(:POSIX); 71use Errno ();
58use Fcntl qw(F_SETFD);
59use POSIX qw(sysconf _SC_OPEN_MAX);
60 72
61our $VERSION = '1.2'; 73our $VERSION = '3.02';
62my $fd_max = 1023; # default
63eval { $fd_max = sysconf _SC_OPEN_MAX - 1; };
64
65# this is the forked server code
66
67our $DBH;
68
69sub req_open {
70 my (undef, $dbi, $user, $pass, %attr) = @{+shift};
71
72 $DBH = DBI->connect ($dbi, $user, $pass, \%attr) or die $DBI::errstr;
73
74 [1]
75}
76
77sub req_exec {
78 my (undef, $st, @args) = @{+shift};
79 my $sth = $DBH->prepare_cached ($st, undef, 1)
80 or die [$DBI::errstr];
81
82 my $rv = $sth->execute (@args)
83 or die [$sth->errstr];
84
85 [1, $sth->{NUM_OF_FIELDS} ? $sth->fetchall_arrayref : undef, { rv => $rv }]
86}
87
88sub req_attr {
89 my (undef, $attr_name, $attr_val) = @{+shift};
90
91 if (defined $attr_val) {
92 $DBH->{$attr_name} = $attr_val;
93 }
94
95 [1, $DBH->{$attr_name}]
96}
97
98sub req_begin_work {
99 [scalar $DBH->begin_work or die $DBI::errstr]
100}
101
102sub req_commit {
103 [scalar $DBH->commit or die $DBI::errstr]
104}
105
106sub req_rollback {
107 [scalar $DBH->rollback or die $DBI::errstr]
108}
109
110sub req_func {
111 my (undef, $arg_string, $function) = @{+shift};
112 my @args = eval $arg_string;
113
114 if ($@) {
115 die "Bad func() arg string: $@";
116 }
117
118 my $rv = $DBH->func (@args, $function);
119 return [$rv . $DBH->err];
120}
121
122sub serve {
123 my ($fileno) = @_;
124
125 open my $fh, ">>&=$fileno"
126 or die "Couldn't open service socket: $!";
127
128 no strict;
129
130 eval {
131 my $rbuf;
132
133 while () {
134 sysread $fh, $rbuf, 16384, length $rbuf
135 or last;
136
137 while () {
138 my $len = unpack "L", $rbuf;
139
140 # full request available?
141 last unless $len && $len + 4 <= length $rbuf;
142
143 my $req = Storable::thaw substr $rbuf, 4;
144 substr $rbuf, 0, $len + 4, ""; # remove length + request
145
146 my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) };
147 $wbuf = pack "L/a*", Storable::freeze [undef, ref $@ ? "$@->[0]" : $@ , ref $@ ? $@->[1] : 1]
148 if $@;
149
150 for (my $ofs = 0; $ofs < length $wbuf; ) {
151 $ofs += (syswrite $fh, substr $wbuf, $ofs
152 or die "unable to write results");
153 }
154 }
155 }
156 };
157
158 if (AnyEvent::WIN32) {
159 kill 9, $$; # no other way on the broken windows platform
160 # and the above doesn't even work on windows, it seems the only
161 # way to is to leak memory and kill 9 from the parent. yay.
162 }
163
164 require POSIX;
165 POSIX::_exit 0;
166 # and the above kills the parent process on windows
167}
168
169sub start_server {
170 serve shift @ARGV;
171}
172 74
173=head2 METHODS 75=head2 METHODS
174 76
175=over 4 77=over 4
176 78
197 99
198When an error occurs, then this callback will be invoked. On entry, C<$@> 100When an error occurs, then this callback will be invoked. On entry, C<$@>
199is set to the error message. C<$filename> and C<$line> is where the 101is set to the error message. C<$filename> and C<$line> is where the
200original request was submitted. 102original request was submitted.
201 103
202If the fatal argument is true then the database connection shuts down and your 104If the fatal argument is true then the database connection is shut down
203database handle becomes invalid. All of your queued request callbacks are 105and your database handle became invalid. In addition to invoking the
204called without any arguments. 106C<on_error> callback, all of your queued request callbacks are called
107without only the C<$dbh> argument.
205 108
206If omitted, then C<die> will be called on any errors, fatal or not. 109If omitted, then C<die> will be called on any errors, fatal or not.
207 110
208The C<$dbh> argument is always a weak reference to the AnyEvent::DBI object.
209
210=item on_connect => $callback->($dbh) 111=item on_connect => $callback->($dbh[, $success])
211 112
212If you supply an on_connect callback, then this callback will be invoked after 113If you supply an C<on_connect> callback, then this callback will be
213the database connection is attempted. If the connection succeeds, C<$dbh> 114invoked after the database connect attempt. If the connection succeeds,
214contains a weak reference to the AnyEvent::DBI object. If the connection fails 115C<$success> is true, otherwise it is missing and C<$@> contains the
215for any reason, no arguments are passed to the callback and C<$@> contains
216$DBI::errstr. 116C<$DBI::errstr>.
217 117
218Regardless of whether on_connect is supplied, connect errors will result in 118Regardless of whether C<on_connect> is supplied, connect errors will result in
219on_error being called. However, if no on_connect callback is supplied, then 119C<on_error> being called. However, if no C<on_connect> callback is supplied, then
220connection errors are considered fatal. The client will die() and the on_error 120connection errors are considered fatal. The client will C<die> and the C<on_error>
221callback will be called with C<$fatal> true. When on_connect is supplied, 121callback will be called with C<$fatal> true.
222connect error are not fatal and AnyEvent::DBI will not die(). You still
223cannot, however, use the $dbh object you recived from new() to make requests.
224 122
225=item exec_server => 1 123When on_connect is supplied, connect error are not fatal and AnyEvent::DBI
124will not C<die>. You still cannot, however, use the $dbh object you
125received from C<new> to make requests.
226 126
227If you supply an exec_server argument, then the DBI server process will call 127=item fork_template => $AnyEvent::Fork-object
228something like:
229 128
230 exec "$^X -MAnyEvent::DBI -e AnyEvent::DBI::start_server" 129C<AnyEvent::DBI> uses C<< AnyEvent::Fork->new >> to create the database
130slave, which in turn either C<exec>'s a new process (similar to the old
131C<exec_server> constructor argument) or uses a process forked early (see
132L<AnyEvent::Fork::Early>).
231 133
232after forking. This will provide the cleanest possible interpreter for your 134With this argument you can provide your own fork template. This can be
233database server. There are special provisions to include C<-Mblib> if the 135useful if you create a lot of C<AnyEvent::DBI> handles and want to save
234current interpreter is running with blib. 136memory (And speed up startup) by not having to load C<AnyEvent::DBI> again
137and again into your child processes:
235 138
236If you do not supply the exec_server argument (or supply it with a false value) 139 my $template = AnyEvent::Fork
237then the traditional method of starting the server within the same forked 140 ->new # create new template
238interpreter context is used. The forked interpreter will try to clean itself 141 ->require ("AnyEvent::DBI::Slave"); # preload AnyEvent::DBI::Slave module
239up by calling POSIX::close on all filedescriptors except STDIN, STDOUT, and 142
240STDERR (and the socket it uses to communicate with the cilent, of course). 143 for (...) {
144 $dbh = new AnyEvent::DBI ...
145 fork_template => $template;
241 146
242=item timeout => seconds 147=item timeout => seconds
243 148
244If you supply a timeout parameter (floating point number of seconds), then a 149If you supply a timeout parameter (fractional values are supported), then
245timer is started any time the DBI handle expects a response from the server. 150a timer is started any time the DBI handle expects a response from the
246This includes connection setup as well as requests made to the backend. The 151server. This includes connection setup as well as requests made to the
247timeout spans the duration from the moment the first data is written (or queued 152backend. The timeout spans the duration from the moment the first data
248to be written) until all expected responses are returned, but is postponed for 153is written (or queued to be written) until all expected responses are
249"timeout" seconds each time more data is returned from the server. If the 154returned, but is postponed for "timeout" seconds each time more data is
250timer ever goes off then a fatal error is generated. If you have an on_error 155returned from the server. If the timer ever goes off then a fatal error is
251handler installed, then it will be called, otherwise your program will die(). 156generated. If you have an C<on_error> handler installed, then it will be
157called, otherwise your program will die().
252 158
253When altering your databases with timeouts it is wise to use transactions. If 159When altering your databases with timeouts it is wise to use
254you quit due to timeout while performing insert, update or schema-altering 160transactions. If you quit due to timeout while performing insert, update
255commands you can end up not knowing if the action was submitted to the 161or schema-altering commands you can end up not knowing if the action was
256database, complicating recovery. 162submitted to the database, complicating recovery.
257 163
258Timeout errors are always fatal. 164Timeout errors are always fatal.
259 165
260=back 166=back
261 167
262Any additional key-value pairs will be rolled into a hash reference and passed 168Any additional key-value pairs will be rolled into a hash reference
263as the final argument to the DBI->connect(...) call. For example, to supress 169and passed as the final argument to the C<< DBI->connect (...) >>
264errors on STDERR and send them instead to an AnyEvent::Handle you could do: 170call. For example, to suppress errors on STDERR and send them instead to an
171AnyEvent::Handle you could do:
265 172
266 $dbh = new AnyEvent::DBI 173 $dbh = new AnyEvent::DBI
267 "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "", 174 "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "",
268 PrintError => 0, 175 PrintError => 0,
176 on_error => sub {
269 on_error => sub { $log_handle->push_write("DBI Error: $@ at $_[1]:$_[2]\n"); } 177 $log_handle->push_write ("DBI Error: $@ at $_[1]:$_[2]\n");
178 };
270 179
271=cut 180=cut
272
273# stupid Storable autoloading, total loss-loss situation
274Storable::thaw Storable::freeze [];
275 181
276sub new { 182sub new {
277 my ($class, $dbi, $user, $pass, %arg) = @_; 183 my ($class, $dbi, $user, $pass, %arg) = @_;
278 184
279 socketpair my $client, my $server, &Socket::AF_UNIX, &Socket::SOCK_STREAM, &Socket::PF_UNSPEC 185 # we use our own socketpair, so we always have a socket
186 # available, even before the forked process exsist.
187 # this is mostly done so this module is compatible
188 # to versions of itself older than 3.0.
189 my ($client, $server) = AnyEvent::Util::portable_socketpair
280 or croak "unable to create dbi communicaiton pipe: $!"; 190 or croak "unable to create AnyEvent::DBI communications pipe: $!";
281 191
192 AnyEvent::fh_unblock $client;
193
194 my $fork = delete $arg{fork_template};
195
282 my %dbi_args = ( %arg ) ; 196 my %dbi_args = %arg;
283 delete @dbi_args{qw( on_connect on_error timeout exec_server )}; 197 delete @dbi_args{qw(on_connect on_error timeout fork_template exec_server)};
284 198
285 my $self = bless \%arg, $class; 199 my $self = bless \%arg, $class;
200
286 $self->{fh} = $client; 201 $self->{fh} = $client;
287
288 Scalar::Util::weaken (my $wself = $self);
289
290 AnyEvent::Util::fh_nonblocking $client, 1;
291 202
292 my $rbuf; 203 my $rbuf;
293 my @caller = (caller)[1,2]; # the "default" caller 204 my @caller = (caller)[1,2]; # the "default" caller
294 205
295 $self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub { 206 $fork = $fork ? $fork->fork : AnyEvent::Fork->new
296 return unless $wself; 207 or croak "fork: $!";
297 my $len = sysread $client, $rbuf, 65536, length $rbuf;
298 my $err = $!;
299 208
209 $fork->require ("AnyEvent::DBI::Slave");
210 $fork->send_arg ($VERSION);
211 $fork->send_fh ($server);
212
213 # we don't rely on the callback, because we use our own
214 # socketpair, for better or worse.
215 $fork->run ("AnyEvent::DBI::Slave::serve", sub { });
216
217 {
218 Convert::Scalar::weaken (my $self = $self);
219
220 my $cbor = new CBOR::XS;
221
222 $self->{rw} = AE::io $client, 0, sub {
223 my $len = Convert::Scalar::extend_read $client, $rbuf, 65536;
224
300 if ($len > 0) { 225 if ($len > 0) {
301 # we received data, so reset the timer 226 # we received data, so reset the timer
302 delete $wself->{timer}; 227 $self->{last_activity} = AE::now;
303 if ($wself->{timeout}) {
304 $wself->{timer} = AnyEvent->timer (
305 after => $wself->{timeout},
306 cb => sub { $wself && $wself->_timedout },
307 );
308 }
309 228
310 while () { 229 for my $res ($cbor->incr_parse_multiple ($rbuf)) {
311 my $len = unpack "L", $rbuf;
312
313 # full response available?
314 last unless $len && $len + 4 <= length $rbuf;
315
316 my $res = Storable::thaw substr $rbuf, 4;
317 substr $rbuf, 0, $len + 4, ""; # remove length + request
318
319 last unless $wself; 230 last unless $self;
231
320 my $req = shift @{ $wself->{queue} }; 232 my $req = shift @{ $self->{queue} };
321 233
322 if (defined $res->[0]) { 234 if (defined $res->[0]) {
323 $res->[0] = $wself; 235 $res->[0] = $self;
324 $req->[0](@$res); 236 $req->[0](@$res);
325 } else { 237 } else {
326 my $cb = shift @$req; 238 my $cb = shift @$req;
327 $@=$res->[1]; 239 local $@ = $res->[1];
328 $cb->(); 240 $cb->($self);
329 if ($wself) { # cb() could have deleted it
330 $wself->_error ($res->[1], @$req, $res->[2]); # error, request record, is_fatal 241 $self->_error ($res->[1], @$req, $res->[2]) # error, request record, is_fatal
242 if $self; # cb() could have deleted it
243 }
244
245 # no more queued requests, so become idle
246 if ($self && !@{ $self->{queue} }) {
247 undef $self->{last_activity};
248 $self->{tw_cb}->();
331 } 249 }
332 } 250 }
333 251
334 # no more queued requests, so cancel timeout 252 } elsif (defined $len) {
335 if ($wself) { 253 # todo, caller?
336 delete $wself->{timer} 254 $self->_error ("unexpected eof", @caller, 1);
337 unless @{ $wself->{queue} }; 255 } elsif ($! != Errno::EAGAIN) {
256 # todo, caller?
257 $self->_error ("read error: $!", @caller, 1);
258 }
259 };
260
261 $self->{tw_cb} = sub {
262 if ($self->{timeout} && $self->{last_activity}) {
263 if (AE::now > $self->{last_activity} + $self->{timeout}) {
264 # we did time out
265 my $req = $self->{queue}[0];
266 $self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal
267 } else {
268 # we need to re-set the timeout watcher
269 $self->{tw} = AE::timer
270 $self->{last_activity} + $self->{timeout} - AE::now,
271 0,
272 $self->{tw_cb},
273 ;
338 } 274 }
275 } else {
276 # no timeout check wanted, or idle
277 undef $self->{tw};
339 } 278 }
340
341 } elsif (defined $len) {
342 $wself->_error ("unexpected eof", @caller, 1);
343 } else {
344 return if $err == EAGAIN;
345 $wself->_error ("read error: $err", @caller, 1);
346 } 279 };
347 });
348 280
349 $self->{ww_cb} = sub { 281 $self->{ww_cb} = sub {
350 return unless $wself; 282 $self->{last_activity} = AE::now;
283
351 my $len = syswrite $client, $wself->{wbuf} 284 my $len = syswrite $client, $self->{wbuf}
352 or return delete $wself->{ww}; 285 or return delete $self->{ww};
353 286
354 substr $wself->{wbuf}, 0, $len, ""; 287 substr $self->{wbuf}, 0, $len, "";
355 };
356
357 my $pid = fork;
358
359 if ($pid) {
360 # parent
361 close $server;
362 } elsif (defined $pid) {
363 # child
364 my $serv_fno = fileno $server;
365
366 if ($self->{exec_server}) {
367 fcntl $server, F_SETFD, 0; # don't close the server side
368 exec "$^X -MAnyEvent::DBI -e AnyEvent::DBI::start_server $serv_fno";
369 POSIX::_exit 124;
370 } else {
371 ($_ != $serv_fno) && POSIX::close $_
372 for $^F+1..$fd_max;
373 serve $serv_fno;
374 POSIX::_exit 0; # not reachable
375 } 288 };
376 } else {
377 croak "fork: $!";
378 } 289 }
379 290
380 $self->{child_pid} = $pid;
381 # set a connect timeout
382 if ($self->{timeout}) {
383 $self->{timer} = AnyEvent->timer (
384 after => $self->{timeout},
385 cb => sub { $wself && $wself->_timedout },
386 );
387 }
388 $self->_req ( 291 $self->_req (
292 sub {
293 return unless $self;
294 $self->{child_pid} = $_[1];
295 },
296 (caller)[1,2],
297 "req_pid"
298 );
299
300 $self->_req (
301 sub {
302 return unless $self;
389 ($self->{on_connect} ? $self->{on_connect} : sub { }), 303 &{ $self->{on_connect} } if $self->{on_connect};
304 },
390 (caller)[1,2], 305 (caller)[1,2],
391 req_open => $dbi, $user, $pass, %dbi_args 306 req_open => $dbi, $user, $pass, %dbi_args
392 ); 307 );
393 308
394 $self 309 $self
397sub _server_pid { 312sub _server_pid {
398 shift->{child_pid} 313 shift->{child_pid}
399} 314}
400 315
401sub kill_child { 316sub kill_child {
402 my $self = shift; 317 my $self = shift;
318
403 my $child_pid = delete $self->{child_pid}; 319 if (my $pid = delete $self->{child_pid}) {
404 if ($child_pid) {
405 # send SIGKILL in two seconds
406 my $murder_timer = AnyEvent->timer (
407 after => 2,
408 cb => sub {
409 kill 9, $child_pid;
410 },
411 );
412
413 # reap process 320 # kill and reap process
414 my $kid_watcher; 321 my $kid_watcher; $kid_watcher = AE::child $pid, sub {
415 $kid_watcher = AnyEvent->child (
416 pid => $child_pid ,
417 cb => sub {
418 # just hold on to this so it won't go away
419 undef $kid_watcher; 322 undef $kid_watcher;
420 # cancel SIGKILL
421 undef $murder_timer;
422 },
423 ); 323 };
424
425 # SIGTERM = the beginning of the end
426 kill TERM => $child_pid; 324 kill TERM => $pid;
427 } 325 }
326
327 delete $self->{rw};
328 delete $self->{ww};
329 delete $self->{tw};
330 close delete $self->{fh};
428} 331}
429 332
430sub DESTROY { 333sub DESTROY {
431 shift->kill_child; 334 shift->kill_child;
432} 335}
433 336
434sub _error { 337sub _error {
435 my ($self, $error, $filename, $line, $fatal) = @_; 338 my ($self, $error, $filename, $line, $fatal) = @_;
436 339
437 if ($fatal) { 340 if ($fatal) {
341 delete $self->{tw};
438 delete $self->{rw}; 342 delete $self->{rw};
439 delete $self->{ww}; 343 delete $self->{ww};
440 delete $self->{fh}; 344 delete $self->{fh};
441 delete $self->{timer};
442 345
443 # for fatal errors call all enqueued callbacks with error 346 # for fatal errors call all enqueued callbacks with error
444 while (my $req = shift @{$self->{queue}}) { 347 while (my $req = shift @{$self->{queue}}) {
445 $@ = $error; 348 local $@ = $error;
446 $req->[0]->(); 349 $req->[0]->($self);
447 } 350 }
448 $self->kill_child; 351 $self->kill_child;
449 } 352 }
450 353
451 $@ = $error; 354 local $@ = $error;
452 355
453 if ($self->{on_error}) { 356 if ($self->{on_error}) {
454 $self->{on_error}($self, $filename, $line, $fatal) 357 $self->{on_error}($self, $filename, $line, $fatal)
455 } else { 358 } else {
456 die "$error at $filename, line $line\n"; 359 die "$error at $filename, line $line\n";
457 } 360 }
458} 361}
459 362
460=item $dbh->on_error ($cb->($dbh, $filename, $line, $fatal)) 363=item $dbh->on_error ($cb->($dbh, $filename, $line, $fatal))
461 364
462Sets (or clears, with C<undef>) the on_error handler. 365Sets (or clears, with C<undef>) the C<on_error> handler.
463 366
464=cut 367=cut
465 368
466sub on_error { 369sub on_error {
467 $_[0]{on_error} = $_[1]; 370 $_[0]{on_error} = $_[1];
468} 371}
469 372
470=item $dbh->on_connect ($cb->($dbh))
471
472Sets (or clears, with C<undef>) the on_connect handler.
473
474=cut
475
476sub on_connect {
477 $_[0]{on_connect} = $_[1];
478}
479
480=item $dbh->timeout ($seconds) 373=item $dbh->timeout ($seconds)
481 374
482Sets (or clears, with C<undef>) the database timeout. Useful to extend the 375Sets (or clears, with C<undef>) the database timeout. Useful to extend the
483timeout when you are about to make a really long query. 376timeout when you are about to make a really long query.
484 377
485=cut 378=cut
486 379
487sub timeout { 380sub timeout {
488 my ($self, $timeout) = @_; 381 my ($self, $timeout) = @_;
489 382
490 if ($timeout) {
491 $self->{timeout} = $timeout; 383 $self->{timeout} = $timeout;
384
492 # reschedule timer if one was running 385 # reschedule timer if one was running
493 if ($self->{timer}) { 386 $self->{tw_cb}->();
494 Scalar::Util::weaken (my $wself = $self);
495 $self->{timer} = AnyEvent->timer (
496 after => $self->{timeout},
497 cb => sub { $wself && $wself->_timedout },
498 );
499 }
500 } else {
501 delete @{%$self}[qw(timer timeout)];
502 }
503}
504
505sub _timedout {
506 my ($self) = @_;
507
508 my $req = shift @{ $self->{queue} };
509
510 if ($req) {
511 my $cb = shift @$req;
512 $@ = 'TIMEOUT';
513 $cb->();
514 $self->_error ('TIMEOUT', @$req, 1); # timeouts are always fatal
515 } else {
516 # shouldn't be possible to timeout without a pending request
517 $self->_error ('TIMEOUT', 'NO_PENDING_WTF', 0, 1);
518 }
519} 387}
520 388
521sub _req { 389sub _req {
522 my ($self, $cb, $filename, $line) = splice @_, 0, 4, (); 390 my ($self, $cb, $filename, $line) = splice @_, 0, 4, ();
523 391
524 if (!$self->{fh}) { 392 unless ($self->{fh}) {
525 my $err = $@ = 'NO DATABASE CONNECTION'; 393 local $@ = my $err = 'no database connection';
526 $cb->(); 394 $cb->($self);
527 $self->_error ($err, $filename, $line, 1); 395 $self->_error ($err, $filename, $line, 1);
528 return; 396 return;
529 } 397 }
530 398
531 push @{ $self->{queue} }, [$cb, $filename, $line ]; 399 push @{ $self->{queue} }, [$cb, $filename, $line];
532 400
401 # re-start timeout if necessary
533 if ($self->{timeout} && !$self->{timer}) { 402 if ($self->{timeout} && !$self->{tw}) {
534 Scalar::Util::weaken (my $wself = $self); 403 $self->{last_activity} = AE::now;
535 $self->{timer} = AnyEvent->timer ( 404 $self->{tw_cb}->();
536 after => $self->{timeout},
537 cb => sub { $wself && $wself->_timedout },
538 );
539 } 405 }
540 406
541 $self->{wbuf} .= pack "L/a*", Storable::freeze \@_; 407 $self->{wbuf} .= CBOR::XS::encode_cbor \@_;
542 408
543 unless ($self->{ww}) { 409 unless ($self->{ww}) {
544 my $len = syswrite $self->{fh}, $self->{wbuf}; 410 my $len = syswrite $self->{fh}, $self->{wbuf};
545 substr $self->{wbuf}, 0, $len, ""; 411 substr $self->{wbuf}, 0, $len, "";
546 412
547 # still any left? then install a write watcher 413 # still any left? then install a write watcher
548 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb}) 414 $self->{ww} = AE::io $self->{fh}, 1, $self->{ww_cb}
549 if length $self->{wbuf}; 415 if length $self->{wbuf};
550 } 416 }
551} 417}
552 418
419=item $dbh->attr ($attr_name[, $attr_value], $cb->($dbh, $new_value))
420
421An accessor for the database handle attributes, such as C<AutoCommit>,
422C<RaiseError>, C<PrintError> and so on. If you provide an C<$attr_value>
423(which might be C<undef>), then the given attribute will be set to that
424value.
425
426The callback will be passed the database handle and the attribute's value
427if successful.
428
429If an error occurs and the C<on_error> callback returns, then only C<$dbh>
430will be passed and C<$@> contains the error message.
431
553=item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, \%metadata)) 432=item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, $rv))
554 433
555Executes the given SQL statement with placeholders replaced by 434Executes the given SQL statement with placeholders replaced by
556C<@args>. The statement will be prepared and cached on the server side, so 435C<@args>. The statement will be prepared and cached on the server side, so
557using placeholders is compulsory. 436using placeholders is extremely important.
558 437
559The callback will be called with a weakened AnyEvent::DBI object as the first 438The callback will be called with a weakened AnyEvent::DBI object as the
560argument and the result of C<fetchall_arrayref> as (or C<undef> if the 439first argument and the result of C<fetchall_arrayref> as (or C<undef>
561statement wasn't a select statement) as the second argument. Third argument is 440if the statement wasn't a select statement) as the second argument.
562a hash reference holding metadata about the request. Currently, the only key
563defined is C<$metadata->{rv}> holding the return value of
564C<execute>. Additional metadata might be added.
565 441
442Third argument is the return value from the C<< DBI->execute >> method
443call.
444
566If an error occurs and the C<on_error> callback returns, then no arguments 445If an error occurs and the C<on_error> callback returns, then only C<$dbh>
567will be passed and C<$@> contains the error message. 446will be passed and C<$@> contains the error message.
568 447
569=item $dbh->attr (attr_name, [ $attr_value ], $cb->($dbh, $new_value)) 448=item $dbh->stattr ($attr_name, $cb->($dbh, $value))
570 449
571An accessor for the handle attributes, such as AutoCommit, RaiseError, 450An accessor for the statement attributes of the most recently executed
572PrintError, etc. If you provide an $attr_value, then the given attribute will 451statement, such as C<NAME> or C<TYPE>.
573be set to that value.
574 452
575The callback will be passed the database handle and the 453The callback will be passed the database handle and the attribute's value
576attribute's value if successful. If accessing the attribute fails, then no 454if successful.
577arguments are passed to your callback, and $@ contains a description of the
578problem instead.
579 455
456If an error occurs and the C<on_error> callback returns, then only C<$dbh>
457will be passed and C<$@> contains the error message.
458
580=item $dbh->begin_work ($cb->($dbh)) 459=item $dbh->begin_work ($cb->($dbh[, $rc]))
581 460
582=item $dbh->commit ($cb->($dbh)) 461=item $dbh->commit ($cb->($dbh[, $rc]))
583 462
584=item $dbh->rollback ($cb->($dbh)) 463=item $dbh->rollback ($cb->($dbh[, $rc]))
585 464
586The begin_work, commit, and rollback methods exopose the equivelant transaction 465The begin_work, commit, and rollback methods expose the equivalent
587control methods of the DBI. If something goes wrong, you will get no $dbh in 466transaction control method of the DBI driver. On success, C<$rc> is true.
588your callaback, and will instead have an error to examine in $@.
589 467
468If an error occurs and the C<on_error> callback returns, then only C<$dbh>
469will be passed and C<$@> contains the error message.
470
590=item $dbh->func ('string_which_yields_args_when_evaled', $func_name, $cb->($dbh, $result, $handle_error)) 471=item $dbh->func ('string_which_yields_args_when_evaled', $func_name, $cb->($dbh, $rc, $dbi_err, $dbi_errstr))
591 472
592This gives access to database driver private methods. Because they are not 473This gives access to database driver private methods. Because they
593standard you cannot always depend on the value of $result or $handle_error. 474are not standard you cannot always depend on the value of C<$rc> or
594Check the documentation for your specific driver/function combination to see 475C<$dbi_err>. Check the documentation for your specific driver/function
595what it returns. 476combination to see what it returns.
596 477
597Note that the first argument will be eval'ed to produce the argument list to 478Note that the first argument will be eval'ed to produce the argument list to
598the func() method. This must be done because the searialization protocol 479the func() method. This must be done because the serialization protocol
599between the AnyEvent::DBI server process and your program does not support the 480between the AnyEvent::DBI server process and your program does not support the
600passage of closures. 481passage of closures.
601 482
602Here's an example to extend the query language in SQLite so it supports an 483Here's an example to extend the query language in SQLite so it supports an
603intstr() function: 484intstr() function:
604 485
605 $cv = AnyEvent->condvar; 486 $cv = AnyEvent->condvar;
606 $dbh->func( 487 $dbh->func (
607 q{ 488 q{
608 'instr', 489 instr => 2, sub {
609 2,
610 sub {
611 my ($string, $search) = @_; 490 my ($string, $search) = @_;
612 return index $string, $search; 491 return index $string, $search;
613 }, 492 },
614 }, 493 },
615 'create_function', 494 create_function => sub {
616 sub {return $cv->send($@) unless $_[0];$cv->send(undef,@_[1,2]);} 495 return $cv->send ($@)
496 unless $#_;
497 $cv->send (undef, @_[1,2,3]);
498 }
617 ); 499 );
618 my ($err,$result,$handle_err) = $cv->recv(); 500
501 my ($err,$rc,$errcode,$errstr) = $cv->recv;
502
503 die $err if defined $err;
619 die "EVAL failed: $err" if $err; 504 die "EVAL failed: $errstr"
505 if $errcode;
506
620 # otherwise, we can ignore $result and $handle_err for this particular func 507 # otherwise, we can ignore $rc and $errcode for this particular func
621 508
622=cut 509=cut
623 510
624for my $cmd_name (qw(exec attr begin_work commit rollback func)) { 511for my $cmd_name (qw(attr exec stattr begin_work commit rollback func)) {
625 eval 'sub ' . $cmd_name . '{ 512 eval 'sub ' . $cmd_name . '{
626 my $cb = pop; 513 my $cb = pop;
627 splice @_, 1, 0, $cb, (caller)[1,2], "req_' . $cmd_name . '"; 514 splice @_, 1, 0, $cb, (caller)[1,2], "req_' . $cmd_name . '";
628 goto &_req; 515 &_req
629 }'; 516 }';
630} 517}
631 518
632=back 519=back
633 520
634=head1 SEE ALSO 521=head1 SEE ALSO
635 522
636L<AnyEvent>, L<DBI>. 523L<AnyEvent>, L<DBI>, L<Coro::Mysql>.
637 524
638=head1 AUTHOR 525=head1 AUTHOR AND CONTACT
639 526
640 Marc Lehmann <schmorp@schmorp.de> 527 Marc Lehmann <schmorp@schmorp.de> (current maintainer)
641 http://home.schmorp.de/ 528 http://home.schmorp.de/
642 529
643 Adam Rosenstein <adam@redcondor.com> 530 Adam Rosenstein <adam@redcondor.com>
644 http://www.redcondor.com/ 531 http://www.redcondor.com/
645 532
646=cut 533=cut
647 534
6481; 5351
649

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines