ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
(Generate patch)

Comparing AnyEvent-DBI/DBI.pm (file contents):
Revision 1.10 by root, Tue Jun 2 16:16:03 2009 UTC vs.
Revision 1.11 by root, Sun Jun 28 14:59:51 2009 UTC

9 my $cv = AnyEvent->condvar; 9 my $cv = AnyEvent->condvar;
10 10
11 my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", ""; 11 my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", "";
12 12
13 $dbh->exec ("select * from test where num=?", 10, sub { 13 $dbh->exec ("select * from test where num=?", 10, sub {
14 my ($rows, $rv) = @_; 14 my ($dbh, $rows, $rv) = @_;
15
16 $rows or die "failure: $@";
15 17
16 print "@$_\n" 18 print "@$_\n"
17 for @$rows; 19 for @$rows;
18 20
19 $cv->broadcast; 21 $cv->broadcast;
35 37
36The overhead for very simple statements ("select 0") is somewhere 38The overhead for very simple statements ("select 0") is somewhere
37around 120% to 200% (dual/single core CPU) compared to an explicit 39around 120% to 200% (dual/single core CPU) compared to an explicit
38prepare_cached/execute/fetchrow_arrayref/finish combination. 40prepare_cached/execute/fetchrow_arrayref/finish combination.
39 41
42=head2 ERROR HANDLING
43
44This module defines a number of functions that accept a callback
45argument. All callbacks used by this module get their AnyEvent::DBI handle
46object passed as first argument.
47
48If the request was successful, then there will be more arguments,
49otherwise there will only be the C<$dbh> argument and C<$@> contains an
50error message.
51
52A convinient way to check whether an error occured is to check C<$#_> -
53if that is true, then the function was successful, otherwise there was an
54error.
55
40=cut 56=cut
41 57
42package AnyEvent::DBI; 58package AnyEvent::DBI;
43 59
44use strict; 60use strict qw(vars subs);
45no warnings; 61no warnings;
46 62
47use Carp; 63use Carp;
48use Socket (); 64use Socket ();
49use Scalar::Util (); 65use Scalar::Util ();
52use DBI (); 68use DBI ();
53 69
54use AnyEvent (); 70use AnyEvent ();
55use AnyEvent::Util (); 71use AnyEvent::Util ();
56 72
57use Errno qw(:POSIX); 73use Errno ();
58use Fcntl qw(F_SETFD); 74use Fcntl ();
59use POSIX qw(sysconf _SC_OPEN_MAX); 75use POSIX ();
60 76
61our $VERSION = '1.2'; 77our $VERSION = '1.19';
62my $fd_max = 1023; # default
63eval { $fd_max = sysconf _SC_OPEN_MAX - 1; };
64 78
65# this is the forked server code 79our $FD_MAX = eval { POSIX::sysconf (&POSIX::_SC_OPEN_MAX) - 1 } || 1023;
80
81# this is the forked server code, could/should be bundled as it's own file
66 82
67our $DBH; 83our $DBH;
68 84
69sub req_open { 85sub req_open {
70 my (undef, $dbi, $user, $pass, %attr) = @{+shift}; 86 my (undef, $dbi, $user, $pass, %attr) = @{+shift};
71 87
72 $DBH = DBI->connect ($dbi, $user, $pass, \%attr) or die $DBI::errstr; 88 $DBH = DBI->connect ($dbi, $user, $pass, \%attr) or die $DBI::errstr;
73 89
74 [1] 90 [1, 1]
75} 91}
76 92
77sub req_exec { 93sub req_exec {
78 my (undef, $st, @args) = @{+shift}; 94 my (undef, $st, @args) = @{+shift};
79 my $sth = $DBH->prepare_cached ($st, undef, 1) 95 my $sth = $DBH->prepare_cached ($st, undef, 1)
80 or die [$DBI::errstr]; 96 or die [$DBI::errstr];
81 97
82 my $rv = $sth->execute (@args) 98 my $rv = $sth->execute (@args)
83 or die [$sth->errstr]; 99 or die [$sth->errstr];
84 100
85 [1, $sth->{NUM_OF_FIELDS} ? $sth->fetchall_arrayref : undef, { rv => $rv }] 101 [1, $sth->{NUM_OF_FIELDS} ? $sth->fetchall_arrayref : undef, $rv]
86} 102}
87 103
88sub req_attr { 104sub req_attr {
89 my (undef, $attr_name, $attr_val) = @{+shift}; 105 my (undef, $attr_name, @attr_val) = @{+shift};
90 106
91 if (defined $attr_val) {
92 $DBH->{$attr_name} = $attr_val; 107 $DBH->{$attr_name} = $attr_val[0]
93 } 108 if @attr_val;
94 109
95 [1, $DBH->{$attr_name}] 110 [1, $DBH->{$attr_name}]
96} 111}
97 112
98sub req_begin_work { 113sub req_begin_work {
99 [scalar $DBH->begin_work or die $DBI::errstr] 114 [1, $DBH->begin_work or die [$DBI::errstr]]
100} 115}
101 116
102sub req_commit { 117sub req_commit {
103 [scalar $DBH->commit or die $DBI::errstr] 118 [1, $DBH->commit or die [$DBI::errstr]]
104} 119}
105 120
106sub req_rollback { 121sub req_rollback {
107 [scalar $DBH->rollback or die $DBI::errstr] 122 [1, $DBH->rollback or die [$DBI::errstr]]
108} 123}
109 124
110sub req_func { 125sub req_func {
111 my (undef, $arg_string, $function) = @{+shift}; 126 my (undef, $arg_string, $function) = @{+shift};
112 my @args = eval $arg_string; 127 my @args = eval $arg_string;
113 128
114 if ($@) {
115 die "Bad func() arg string: $@"; 129 die "Bad func () arg string: $@"
116 } 130 if $@;
117 131
118 my $rv = $DBH->func (@args, $function); 132 my $rv = $DBH->func (@args, $function);
119 return [$rv . $DBH->err]; 133 return [$rv, $DBH->err];
120} 134}
121 135
122sub serve { 136sub serve_fh($$) {
123 my ($fileno) = @_; 137 my ($fh, $version) = @_;
124 138
125 open my $fh, ">>&=$fileno" 139 if ($VERSION != $version) {
126 or die "Couldn't open service socket: $!"; 140 syswrite $fh,
127 141 pack "L/a*",
128 no strict; 142 Storable::freeze
143 [undef, "AnyEvent::DBI version mismatch ($VERSION vs. $version)"];
144 return;
145 }
129 146
130 eval { 147 eval {
131 my $rbuf; 148 my $rbuf;
132 149
133 while () { 150 while () {
142 159
143 my $req = Storable::thaw substr $rbuf, 4; 160 my $req = Storable::thaw substr $rbuf, 4;
144 substr $rbuf, 0, $len + 4, ""; # remove length + request 161 substr $rbuf, 0, $len + 4, ""; # remove length + request
145 162
146 my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) }; 163 my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) };
147 $wbuf = pack "L/a*", Storable::freeze [undef, ref $@ ? "$@->[0]" : $@ , ref $@ ? $@->[1] : 1] 164 $wbuf = pack "L/a*", Storable::freeze [undef, ref $@ ? ("$@->[0]", $@->[1]) : ("$@", 1)]
148 if $@; 165 if $@;
149 166
150 for (my $ofs = 0; $ofs < length $wbuf; ) { 167 for (my $ofs = 0; $ofs < length $wbuf; ) {
151 $ofs += (syswrite $fh, substr $wbuf, $ofs 168 $ofs += (syswrite $fh, substr $wbuf, $ofs
152 or die "unable to write results"); 169 or die "unable to write results");
153 } 170 }
154 } 171 }
155 } 172 }
156 }; 173 };
157
158 if (AnyEvent::WIN32) {
159 kill 9, $$; # no other way on the broken windows platform
160 # and the above doesn't even work on windows, it seems the only
161 # way to is to leak memory and kill 9 from the parent. yay.
162 }
163
164 require POSIX;
165 POSIX::_exit 0;
166 # and the above kills the parent process on windows
167} 174}
168 175
169sub start_server { 176sub serve_fd($$) {
170 serve shift @ARGV; 177 open my $fh, ">>&=$_[0]"
178 or die "Couldn't open server file descriptor: $!";
179
180 serve_fh $fh, $_[1];
171} 181}
172 182
173=head2 METHODS 183=head2 METHODS
174 184
175=over 4 185=over 4
197 207
198When an error occurs, then this callback will be invoked. On entry, C<$@> 208When an error occurs, then this callback will be invoked. On entry, C<$@>
199is set to the error message. C<$filename> and C<$line> is where the 209is set to the error message. C<$filename> and C<$line> is where the
200original request was submitted. 210original request was submitted.
201 211
202If the fatal argument is true then the database connection shuts down and your 212If the fatal argument is true then the database connection is shut down
203database handle becomes invalid. All of your queued request callbacks are 213and your database handle became invalid. In addition to invoking the
204called without any arguments. 214C<on_error> callback, all of your queued request callbacks are called
215without only the C<$dbh> argument.
205 216
206If omitted, then C<die> will be called on any errors, fatal or not. 217If omitted, then C<die> will be called on any errors, fatal or not.
207 218
208The C<$dbh> argument is always a weak reference to the AnyEvent::DBI object.
209
210=item on_connect => $callback->($dbh) 219=item on_connect => $callback->($dbh[, $success])
211 220
212If you supply an on_connect callback, then this callback will be invoked after 221If you supply an C<on_connect> callback, then this callback will be
213the database connection is attempted. If the connection succeeds, C<$dbh> 222invoked after the database connect attempt. If the connection succeeds,
214contains a weak reference to the AnyEvent::DBI object. If the connection fails 223C<$success> is true, otherwise it is missing and C<$@> contains the
215for any reason, no arguments are passed to the callback and C<$@> contains
216$DBI::errstr. 224C<$DBI::errstr>.
217 225
218Regardless of whether on_connect is supplied, connect errors will result in 226Regardless of whether C<on_connect> is supplied, connect errors will result in
219on_error being called. However, if no on_connect callback is supplied, then 227C<on_error> being called. However, if no C<on_connect> callback is supplied, then
220connection errors are considered fatal. The client will die() and the on_error 228connection errors are considered fatal. The client will C<die> and the C<on_error>
221callback will be called with C<$fatal> true. When on_connect is supplied, 229callback will be called with C<$fatal> true.
222connect error are not fatal and AnyEvent::DBI will not die(). You still 230
223cannot, however, use the $dbh object you recived from new() to make requests. 231When on_connect is supplied, connect error are not fatal and AnyEvent::DBI
232will not C<die>. You still cannot, however, use the $dbh object you
233received from C<new> to make requests.
224 234
225=item exec_server => 1 235=item exec_server => 1
226 236
227If you supply an exec_server argument, then the DBI server process will call 237If you supply an C<exec_server> argument, then the DBI server process will
228something like: 238fork and exec another perl interpreter (using C<$^X>) with just the
239AnyEvent::DBI proxy running. This will provide the cleanest possible porxy
240for your database server.
229 241
230 exec "$^X -MAnyEvent::DBI -e AnyEvent::DBI::start_server"
231
232after forking. This will provide the cleanest possible interpreter for your
233database server. There are special provisions to include C<-Mblib> if the
234current interpreter is running with blib.
235
236If you do not supply the exec_server argument (or supply it with a false value) 242If you do not supply the C<exec_server> argument (or supply it with a
237then the traditional method of starting the server within the same forked 243false value) then the traditional method of starting the server by forking
238interpreter context is used. The forked interpreter will try to clean itself 244the current process is used. The forked interpreter will try to clean
239up by calling POSIX::close on all filedescriptors except STDIN, STDOUT, and 245itself up by calling POSIX::close on all file descriptors except STDIN,
240STDERR (and the socket it uses to communicate with the cilent, of course). 246STDOUT, and STDERR (and the socket it uses to communicate with the cilent,
247of course).
241 248
242=item timeout => seconds 249=item timeout => seconds
243 250
244If you supply a timeout parameter (floating point number of seconds), then a 251If you supply a timeout parameter (fractional values are supported), then
245timer is started any time the DBI handle expects a response from the server. 252a timer is started any time the DBI handle expects a response from the
246This includes connection setup as well as requests made to the backend. The 253server. This includes connection setup as well as requests made to the
247timeout spans the duration from the moment the first data is written (or queued 254backend. The timeout spans the duration from the moment the first data
248to be written) until all expected responses are returned, but is postponed for 255is written (or queued to be written) until all expected responses are
249"timeout" seconds each time more data is returned from the server. If the 256returned, but is postponed for "timeout" seconds each time more data is
250timer ever goes off then a fatal error is generated. If you have an on_error 257returned from the server. If the timer ever goes off then a fatal error is
251handler installed, then it will be called, otherwise your program will die(). 258generated. If you have an C<on_error> handler installed, then it will be
259called, otherwise your program will die().
252 260
253When altering your databases with timeouts it is wise to use transactions. If 261When altering your databases with timeouts it is wise to use
254you quit due to timeout while performing insert, update or schema-altering 262transactions. If you quit due to timeout while performing insert, update
255commands you can end up not knowing if the action was submitted to the 263or schema-altering commands you can end up not knowing if the action was
256database, complicating recovery. 264submitted to the database, complicating recovery.
257 265
258Timeout errors are always fatal. 266Timeout errors are always fatal.
259 267
260=back 268=back
261 269
262Any additional key-value pairs will be rolled into a hash reference and passed 270Any additional key-value pairs will be rolled into a hash reference
263as the final argument to the DBI->connect(...) call. For example, to supress 271and passed as the final argument to the C<< DBI->connect (...) >>
264errors on STDERR and send them instead to an AnyEvent::Handle you could do: 272call. For example, to supress errors on STDERR and send them instead to an
273AnyEvent::Handle you could do:
265 274
266 $dbh = new AnyEvent::DBI 275 $dbh = new AnyEvent::DBI
267 "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "", 276 "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "",
268 PrintError => 0, 277 PrintError => 0,
278 on_error => sub {
269 on_error => sub { $log_handle->push_write("DBI Error: $@ at $_[1]:$_[2]\n"); } 279 $log_handle->push_write ("DBI Error: $@ at $_[1]:$_[2]\n");
280 };
270 281
271=cut 282=cut
272 283
273# stupid Storable autoloading, total loss-loss situation 284# stupid Storable autoloading, total loss-loss situation
274Storable::thaw Storable::freeze []; 285Storable::thaw Storable::freeze [];
275 286
276sub new { 287sub new {
277 my ($class, $dbi, $user, $pass, %arg) = @_; 288 my ($class, $dbi, $user, $pass, %arg) = @_;
278 289
279 socketpair my $client, my $server, &Socket::AF_UNIX, &Socket::SOCK_STREAM, &Socket::PF_UNSPEC 290 my ($client, $server) = AnyEvent::Util::portable_socketpair
280 or croak "unable to create dbi communicaiton pipe: $!"; 291 or croak "unable to create Anyevent::DBI communications pipe: $!";
281 292
282 my %dbi_args = ( %arg ) ; 293 my %dbi_args = %arg;
283 delete @dbi_args{qw( on_connect on_error timeout exec_server )}; 294 delete @dbi_args{qw(on_connect on_error timeout exec_server)};
284 295
285 my $self = bless \%arg, $class; 296 my $self = bless \%arg, $class;
286 $self->{fh} = $client; 297 $self->{fh} = $client;
287 298
288 Scalar::Util::weaken (my $wself = $self);
289
290 AnyEvent::Util::fh_nonblocking $client, 1; 299 AnyEvent::Util::fh_nonblocking $client, 1;
291 300
292 my $rbuf; 301 my $rbuf;
293 my @caller = (caller)[1,2]; # the "default" caller 302 my @caller = (caller)[1,2]; # the "default" caller
294 303
304 {
305 Scalar::Util::weaken (my $self = $self);
306
295 $self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub { 307 $self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub {
296 return unless $wself; 308 return unless $self;
309
310 $self->{last_activity} = AnyEvent->now;
311
297 my $len = sysread $client, $rbuf, 65536, length $rbuf; 312 my $len = sysread $client, $rbuf, 65536, length $rbuf;
298 my $err = $!;
299 313
300 if ($len > 0) { 314 if ($len > 0) {
301 # we received data, so reset the timer 315 # we received data, so reset the timer
302 delete $wself->{timer}; 316
303 if ($wself->{timeout}) { 317 while () {
304 $wself->{timer} = AnyEvent->timer ( 318 my $len = unpack "L", $rbuf;
305 after => $wself->{timeout}, 319
306 cb => sub { $wself && $wself->_timedout }, 320 # full response available?
321 last unless $len && $len + 4 <= length $rbuf;
322
323 my $res = Storable::thaw substr $rbuf, 4;
324 substr $rbuf, 0, $len + 4, ""; # remove length + request
325
326 last unless $self;
327 my $req = shift @{ $self->{queue} };
328
329 if (defined $res->[0]) {
330 $res->[0] = $self;
331 $req->[0](@$res);
332 } else {
333 my $cb = shift @$req;
334 local $@ = $res->[1];
335 $cb->($self);
336 $self->_error ($res->[1], @$req, $res->[2]) # error, request record, is_fatal
337 if $self; # cb() could have deleted it
338 }
339
340 # no more queued requests, so become idle
341 undef $self->{last_activity}
342 if $self && !@{ $self->{queue} };
307 ); 343 }
344
345 } elsif (defined $len) {
346 # todo, caller?
347 $self->_error ("unexpected eof", @caller, 1);
348 } elsif ($! != Errno::EAGAIN) {
349 # todo, caller?
350 $self->_error ("read error: $!", @caller, 1);
308 } 351 }
352 });
309 353
310 while () { 354 $self->{tw_cb} = sub {
311 my $len = unpack "L", $rbuf; 355 if ($self->{timeout} && $self->{last_activity}) {
312 356 if (AnyEvent->now > $self->{last_activity} + $self->{timeout}) {
313 # full response available? 357 # we did time out
314 last unless $len && $len + 4 <= length $rbuf;
315
316 my $res = Storable::thaw substr $rbuf, 4;
317 substr $rbuf, 0, $len + 4, ""; # remove length + request
318
319 last unless $wself;
320 my $req = shift @{ $wself->{queue} }; 358 my $req = $self->{queue}[0];
321 359 $self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal
322 if (defined $res->[0]) {
323 $res->[0] = $wself;
324 $req->[0](@$res);
325 } else { 360 } else {
326 my $cb = shift @$req; 361 # we need to re-set the timeout watcher
327 $@=$res->[1]; 362 $self->{tw} = AnyEvent->timer (
328 $cb->(); 363 after => $self->{last_activity} + $self->{timeout} - AnyEvent->now,
329 if ($wself) { # cb() could have deleted it 364 cb => $self->{tw_cb},
330 $wself->_error ($res->[1], @$req, $res->[2]); # error, request record, is_fatal
331 } 365 );
366 Scalar::Util::weaken $self;
332 } 367 }
333 368 } else {
334 # no more queued requests, so cancel timeout 369 # no timeout check wanted, or idle
335 if ($wself) { 370 undef $self->{tw};
336 delete $wself->{timer}
337 unless @{ $wself->{queue} };
338 }
339 } 371 }
340
341 } elsif (defined $len) {
342 $wself->_error ("unexpected eof", @caller, 1);
343 } else {
344 return if $err == EAGAIN;
345 $wself->_error ("read error: $err", @caller, 1);
346 } 372 };
347 });
348 373
349 $self->{ww_cb} = sub { 374 $self->{ww_cb} = sub {
350 return unless $wself; 375 return unless $self;
376
377 $self->{last_activity} = AnyEvent->now;
378
351 my $len = syswrite $client, $wself->{wbuf} 379 my $len = syswrite $client, $self->{wbuf}
352 or return delete $wself->{ww}; 380 or return delete $self->{ww};
353 381
354 substr $wself->{wbuf}, 0, $len, ""; 382 substr $self->{wbuf}, 0, $len, "";
383 };
355 }; 384 }
356 385
357 my $pid = fork; 386 my $pid = fork;
358 387
359 if ($pid) { 388 if ($pid) {
360 # parent 389 # parent
362 } elsif (defined $pid) { 391 } elsif (defined $pid) {
363 # child 392 # child
364 my $serv_fno = fileno $server; 393 my $serv_fno = fileno $server;
365 394
366 if ($self->{exec_server}) { 395 if ($self->{exec_server}) {
367 fcntl $server, F_SETFD, 0; # don't close the server side 396 fcntl $server, &Fcntl::F_SETFD, 0; # don't close the server side
368 exec "$^X -MAnyEvent::DBI -e AnyEvent::DBI::start_server $serv_fno"; 397 exec {$^X}
398 "$0 dbi slave",
399 -e => "require shift; AnyEvent::DBI::serve_fd ($serv_fno, $VERSION)",
400 $INC{"AnyEvent/DBI.pm"};
369 POSIX::_exit 124; 401 POSIX::_exit 124;
370 } else { 402 } else {
371 ($_ != $serv_fno) && POSIX::close $_ 403 ($_ != $serv_fno) && POSIX::close $_
372 for $^F+1..$fd_max; 404 for $^F+1..$FD_MAX;
373 serve $serv_fno; 405 serve_fh $server, $VERSION;
406
407 # no other way on the broken windows platform, even this leaks
408 # memory and might fail.
409 kill 9, $$
410 if AnyEvent::WIN32;
411
412 # and this kills the parent process on windows
374 POSIX::_exit 0; # not reachable 413 POSIX::_exit 0;
375 } 414 }
376 } else { 415 } else {
377 croak "fork: $!"; 416 croak "fork: $!";
378 } 417 }
379 418
380 $self->{child_pid} = $pid; 419 $self->{child_pid} = $pid;
381 # set a connect timeout 420
382 if ($self->{timeout}) {
383 $self->{timer} = AnyEvent->timer (
384 after => $self->{timeout},
385 cb => sub { $wself && $wself->_timedout },
386 );
387 }
388 $self->_req ( 421 $self->_req (
389 ($self->{on_connect} ? $self->{on_connect} : sub { }), 422 ($self->{on_connect} ? $self->{on_connect} : sub { }),
390 (caller)[1,2], 423 (caller)[1,2],
391 req_open => $dbi, $user, $pass, %dbi_args 424 req_open => $dbi, $user, $pass, %dbi_args
392 ); 425 );
409 kill 9, $child_pid; 442 kill 9, $child_pid;
410 }, 443 },
411 ); 444 );
412 445
413 # reap process 446 # reap process
414 my $kid_watcher;
415 $kid_watcher = AnyEvent->child ( 447 my $kid_watcher; $kid_watcher = AnyEvent->child (
416 pid => $child_pid , 448 pid => $child_pid,
417 cb => sub { 449 cb => sub {
418 # just hold on to this so it won't go away 450 # just hold on to this so it won't go away
419 undef $kid_watcher; 451 undef $kid_watcher;
420 # cancel SIGKILL 452 # cancel SIGKILL
421 undef $murder_timer; 453 undef $murder_timer;
422 }, 454 },
423 ); 455 );
424 456
425 # SIGTERM = the beginning of the end 457 close $self->{fh};
426 kill TERM => $child_pid;
427 } 458 }
428} 459}
429 460
430sub DESTROY { 461sub DESTROY {
431 shift->kill_child; 462 shift->kill_child;
433 464
434sub _error { 465sub _error {
435 my ($self, $error, $filename, $line, $fatal) = @_; 466 my ($self, $error, $filename, $line, $fatal) = @_;
436 467
437 if ($fatal) { 468 if ($fatal) {
469 delete $self->{tw};
438 delete $self->{rw}; 470 delete $self->{rw};
439 delete $self->{ww}; 471 delete $self->{ww};
440 delete $self->{fh}; 472 delete $self->{fh};
441 delete $self->{timer};
442 473
443 # for fatal errors call all enqueued callbacks with error 474 # for fatal errors call all enqueued callbacks with error
444 while (my $req = shift @{$self->{queue}}) { 475 while (my $req = shift @{$self->{queue}}) {
445 $@ = $error; 476 local $@ = $error;
446 $req->[0]->(); 477 $req->[0]->($self);
447 } 478 }
448 $self->kill_child; 479 $self->kill_child;
449 } 480 }
450 481
451 $@ = $error; 482 local $@ = $error;
452 483
453 if ($self->{on_error}) { 484 if ($self->{on_error}) {
454 $self->{on_error}($self, $filename, $line, $fatal) 485 $self->{on_error}($self, $filename, $line, $fatal)
455 } else { 486 } else {
456 die "$error at $filename, line $line\n"; 487 die "$error at $filename, line $line\n";
457 } 488 }
458} 489}
459 490
460=item $dbh->on_error ($cb->($dbh, $filename, $line, $fatal)) 491=item $dbh->on_error ($cb->($dbh, $filename, $line, $fatal))
461 492
462Sets (or clears, with C<undef>) the on_error handler. 493Sets (or clears, with C<undef>) the C<on_error> handler.
463 494
464=cut 495=cut
465 496
466sub on_error { 497sub on_error {
467 $_[0]{on_error} = $_[1]; 498 $_[0]{on_error} = $_[1];
468} 499}
469 500
470=item $dbh->on_connect ($cb->($dbh))
471
472Sets (or clears, with C<undef>) the on_connect handler.
473
474=cut
475
476sub on_connect {
477 $_[0]{on_connect} = $_[1];
478}
479
480=item $dbh->timeout ($seconds) 501=item $dbh->timeout ($seconds)
481 502
482Sets (or clears, with C<undef>) the database timeout. Useful to extend the 503Sets (or clears, with C<undef>) the database timeout. Useful to extend the
483timeout when you are about to make a really long query. 504timeout when you are about to make a really long query.
484 505
485=cut 506=cut
486 507
487sub timeout { 508sub timeout {
488 my ($self, $timeout) = @_; 509 my ($self, $timeout) = @_;
489 510
490 if ($timeout) {
491 $self->{timeout} = $timeout; 511 $self->{timeout} = $timeout;
512
492 # reschedule timer if one was running 513 # reschedule timer if one was running
493 if ($self->{timer}) { 514 $self->{tw_cb}->();
494 Scalar::Util::weaken (my $wself = $self);
495 $self->{timer} = AnyEvent->timer (
496 after => $self->{timeout},
497 cb => sub { $wself && $wself->_timedout },
498 );
499 }
500 } else {
501 delete @{%$self}[qw(timer timeout)];
502 }
503}
504
505sub _timedout {
506 my ($self) = @_;
507
508 my $req = shift @{ $self->{queue} };
509
510 if ($req) {
511 my $cb = shift @$req;
512 $@ = 'TIMEOUT';
513 $cb->();
514 $self->_error ('TIMEOUT', @$req, 1); # timeouts are always fatal
515 } else {
516 # shouldn't be possible to timeout without a pending request
517 $self->_error ('TIMEOUT', 'NO_PENDING_WTF', 0, 1);
518 }
519} 515}
520 516
521sub _req { 517sub _req {
522 my ($self, $cb, $filename, $line) = splice @_, 0, 4, (); 518 my ($self, $cb, $filename, $line) = splice @_, 0, 4, ();
523 519
524 if (!$self->{fh}) { 520 unless ($self->{fh}) {
525 my $err = $@ = 'NO DATABASE CONNECTION'; 521 local $@ = my $err = 'no database connection';
526 $cb->(); 522 $cb->($self);
527 $self->_error ($err, $filename, $line, 1); 523 $self->_error ($err, $filename, $line, 1);
528 return; 524 return;
529 } 525 }
530 526
531 push @{ $self->{queue} }, [$cb, $filename, $line ]; 527 push @{ $self->{queue} }, [$cb, $filename, $line];
532 528
529 # re-start timeout if necessary
533 if ($self->{timeout} && !$self->{timer}) { 530 if ($self->{timeout} && !$self->{tw}) {
534 Scalar::Util::weaken (my $wself = $self); 531 $self->{last_activity} = AnyEvent->now;
535 $self->{timer} = AnyEvent->timer ( 532 $self->{tw_cb}->();
536 after => $self->{timeout},
537 cb => sub { $wself && $wself->_timedout },
538 );
539 } 533 }
540 534
541 $self->{wbuf} .= pack "L/a*", Storable::freeze \@_; 535 $self->{wbuf} .= pack "L/a*", Storable::freeze \@_;
542 536
543 unless ($self->{ww}) { 537 unless ($self->{ww}) {
548 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb}) 542 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb})
549 if length $self->{wbuf}; 543 if length $self->{wbuf};
550 } 544 }
551} 545}
552 546
553=item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, \%metadata)) 547=item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, $rv))
554 548
555Executes the given SQL statement with placeholders replaced by 549Executes the given SQL statement with placeholders replaced by
556C<@args>. The statement will be prepared and cached on the server side, so 550C<@args>. The statement will be prepared and cached on the server side, so
557using placeholders is compulsory. 551using placeholders is extremely important.
558 552
559The callback will be called with a weakened AnyEvent::DBI object as the first 553The callback will be called with a weakened AnyEvent::DBI object as the
560argument and the result of C<fetchall_arrayref> as (or C<undef> if the 554first argument and the result of C<fetchall_arrayref> as (or C<undef>
561statement wasn't a select statement) as the second argument. Third argument is 555if the statement wasn't a select statement) as the second argument.
562a hash reference holding metadata about the request. Currently, the only key
563defined is C<$metadata->{rv}> holding the return value of
564C<execute>. Additional metadata might be added.
565 556
557Third argument is the return value from the C<< DBI->execute >> method
558call.
559
566If an error occurs and the C<on_error> callback returns, then no arguments 560If an error occurs and the C<on_error> callback returns, then only C<$dbh>
567will be passed and C<$@> contains the error message. 561will be passed and C<$@> contains the error message.
568 562
569=item $dbh->attr (attr_name, [ $attr_value ], $cb->($dbh, $new_value)) 563=item $dbh->attr ($attr_name[, $attr_value], $cb->($dbh, $new_value))
570 564
571An accessor for the handle attributes, such as AutoCommit, RaiseError, 565An accessor for the handle attributes, such as C<AutoCommit>,
572PrintError, etc. If you provide an $attr_value, then the given attribute will 566C<RaiseError>, C<PrintError> and so on. If you provide an C<$attr_value>
573be set to that value. 567(which might be C<undef>), then the given attribute will be set to that
568value.
574 569
575The callback will be passed the database handle and the 570The callback will be passed the database handle and the attribute's value
576attribute's value if successful. If accessing the attribute fails, then no 571if successful.
577arguments are passed to your callback, and $@ contains a description of the
578problem instead.
579 572
573If an error occurs and the C<on_error> callback returns, then only C<$dbh>
574will be passed and C<$@> contains the error message.
575
580=item $dbh->begin_work ($cb->($dbh)) 576=item $dbh->begin_work ($cb->($dbh[, $success]))
581 577
582=item $dbh->commit ($cb->($dbh)) 578=item $dbh->commit ($cb->($dbh[, $success]))
583 579
584=item $dbh->rollback ($cb->($dbh)) 580=item $dbh->rollback ($cb->($dbh[, $success]))
585 581
586The begin_work, commit, and rollback methods exopose the equivelant transaction 582The begin_work, commit, and rollback methods expose the equivalent
587control methods of the DBI. If something goes wrong, you will get no $dbh in 583transaction control method of the DBI driver. On success, C<$success>
588your callaback, and will instead have an error to examine in $@. 584is true.
585
586If an error occurs and the C<on_error> callback returns, then only C<$dbh>
587will be passed and C<$@> contains the error message.
589 588
590=item $dbh->func ('string_which_yields_args_when_evaled', $func_name, $cb->($dbh, $result, $handle_error)) 589=item $dbh->func ('string_which_yields_args_when_evaled', $func_name, $cb->($dbh, $result, $handle_error))
591 590
592This gives access to database driver private methods. Because they are not 591This gives access to database driver private methods. Because they
593standard you cannot always depend on the value of $result or $handle_error. 592are not standard you cannot always depend on the value of C<$result>
594Check the documentation for your specific driver/function combination to see 593or C<$handle_error>. Check the documentation for your specific
595what it returns. 594driver/function combination to see what it returns.
596 595
597Note that the first argument will be eval'ed to produce the argument list to 596Note that the first argument will be eval'ed to produce the argument list to
598the func() method. This must be done because the searialization protocol 597the func() method. This must be done because the serialization protocol
599between the AnyEvent::DBI server process and your program does not support the 598between the AnyEvent::DBI server process and your program does not support the
600passage of closures. 599passage of closures.
601 600
602Here's an example to extend the query language in SQLite so it supports an 601Here's an example to extend the query language in SQLite so it supports an
603intstr() function: 602intstr() function:
604 603
605 $cv = AnyEvent->condvar; 604 $cv = AnyEvent->condvar;
606 $dbh->func( 605 $dbh->func (
607 q{ 606 q{
608 'instr', 607 instr => 2, sub {
609 2,
610 sub {
611 my ($string, $search) = @_; 608 my ($string, $search) = @_;
612 return index $string, $search; 609 return index $string, $search;
613 }, 610 },
614 }, 611 },
615 'create_function', 612 create_function => sub {
616 sub {return $cv->send($@) unless $_[0];$cv->send(undef,@_[1,2]);} 613 return $cv->send($@)
614 unless $_[0];
615 $cv->send (undef, @_[1,2]);
616 }
617 ); 617 );
618
618 my ($err,$result,$handle_err) = $cv->recv(); 619 my ($err,$result,$handle_err) = $cv->recv;
620
619 die "EVAL failed: $err" if $err; 621 die "EVAL failed: $err"
622 if $err;
623
620 # otherwise, we can ignore $result and $handle_err for this particular func 624 # otherwise, we can ignore $result and $handle_err for this particular func
621 625
622=cut 626=cut
623 627
624for my $cmd_name (qw(exec attr begin_work commit rollback func)) { 628for my $cmd_name (qw(exec attr begin_work commit rollback func)) {
625 eval 'sub ' . $cmd_name . '{ 629 eval 'sub ' . $cmd_name . '{
626 my $cb = pop; 630 my $cb = pop;
627 splice @_, 1, 0, $cb, (caller)[1,2], "req_' . $cmd_name . '"; 631 splice @_, 1, 0, $cb, (caller)[1,2], "req_' . $cmd_name . '";
628 goto &_req; 632 &_req
629 }'; 633 }';
630} 634}
631 635
632=back 636=back
633 637
634=head1 SEE ALSO 638=head1 SEE ALSO
635 639
636L<AnyEvent>, L<DBI>. 640L<AnyEvent>, L<DBI>, L<Coro::Mysql>.
637 641
638=head1 AUTHOR 642=head1 AUTHOR
639 643
640 Marc Lehmann <schmorp@schmorp.de> 644 Marc Lehmann <schmorp@schmorp.de>
641 http://home.schmorp.de/ 645 http://home.schmorp.de/

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines