ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
(Generate patch)

Comparing AnyEvent-DBI/DBI.pm (file contents):
Revision 1.3 by root, Fri Jun 6 16:13:07 2008 UTC vs.
Revision 1.14 by root, Sat Oct 30 20:23:44 2010 UTC

3AnyEvent::DBI - asynchronous DBI access 3AnyEvent::DBI - asynchronous DBI access
4 4
5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use AnyEvent::DBI; 7 use AnyEvent::DBI;
8
9 my $cv = AnyEvent->condvar;
10
11 my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", "";
12
13 $dbh->exec ("select * from test where num=?", 10, sub {
14 my ($dbh, $rows, $rv) = @_;
15
16 $#_ or die "failure: $@";
17
18 print "@$_\n"
19 for @$rows;
20
21 $cv->broadcast;
22 });
23
24 # asynchronously do sth. else here
25
26 $cv->wait;
8 27
9=head1 DESCRIPTION 28=head1 DESCRIPTION
10 29
11This module is an L<AnyEvent> user, you need to make sure that you use and 30This module is an L<AnyEvent> user, you need to make sure that you use and
12run a supported event loop. 31run a supported event loop.
13 32
14This module implements asynchronous DBI access my forking or executing 33This module implements asynchronous DBI access by forking or executing
15separate "DBI-Server" processes and sending them requests. 34separate "DBI-Server" processes and sending them requests.
16 35
17It means that you can run DBI requests in parallel to other tasks. 36It means that you can run DBI requests in parallel to other tasks.
18 37
19The overhead for very simple statements ("select 0") is somewhere 38The overhead for very simple statements ("select 0") is somewhere
20around 120% to 200% (single/dual core CPU) compared to an explicit 39around 100% to 120% (dual/single core CPU) compared to an explicit
21prepare_cached/execute/fetchrow_arrayref/finish combination. 40prepare_cached/execute/fetchrow_arrayref/finish combination.
22 41
42=head2 ERROR HANDLING
43
44This module defines a number of functions that accept a callback
45argument. All callbacks used by this module get their AnyEvent::DBI handle
46object passed as first argument.
47
48If the request was successful, then there will be more arguments,
49otherwise there will only be the C<$dbh> argument and C<$@> contains an
50error message.
51
52A convinient way to check whether an error occured is to check C<$#_> -
53if that is true, then the function was successful, otherwise there was an
54error.
55
23=cut 56=cut
24 57
25package AnyEvent::DBI; 58package AnyEvent::DBI;
26 59
27use strict; 60use common::sense;
28no warnings;
29 61
30use Carp; 62use Carp;
31use Socket (); 63use Socket ();
32use Scalar::Util (); 64use Scalar::Util ();
33use Storable (); 65use Storable ();
34 66
35use DBI (); 67use DBI (); # only needed in child actually - do it before fork & !exec?
36 68
37use AnyEvent (); 69use AnyEvent ();
38use AnyEvent::Util (); 70use AnyEvent::Util ();
39 71
72use Errno ();
73use Fcntl ();
74use POSIX ();
75
40our $VERSION = '1.0'; 76our $VERSION = '2.1';
41 77
42# this is the forked server code 78our $FD_MAX = eval { POSIX::sysconf (&POSIX::_SC_OPEN_MAX) - 1 } || 1023;
79
80# this is the forked server code, could/should be bundled as it's own file
43 81
44our $DBH; 82our $DBH;
45 83
46sub req_open { 84sub req_open {
47 my (undef, $dbi, $user, $pass, %attr) = @{+shift}; 85 my (undef, $dbi, $user, $pass, %attr) = @{+shift};
48 86
49 $DBH = DBI->connect ($dbi, $user, $pass, \%attr); 87 $DBH = DBI->connect ($dbi, $user, $pass, \%attr) or die $DBI::errstr;
50 88
51 [1] 89 [1, 1]
52} 90}
53 91
54sub req_exec { 92sub req_exec {
55 my (undef, $st, @args) = @{+shift}; 93 my (undef, $st, @args) = @{+shift};
56
57 my $sth = $DBH->prepare_cached ($st, undef, 1); 94 my $sth = $DBH->prepare_cached ($st, undef, 1)
95 or die [$DBI::errstr];
58 96
59 $sth->execute (@args) 97 my $rv = $sth->execute (@args)
60 or die $sth->errstr; 98 or die [$sth->errstr];
61 99
62 [$sth->fetchall_arrayref] 100 [1, $sth->{NUM_OF_FIELDS} ? $sth->fetchall_arrayref : undef, $rv]
63} 101}
64 102
103sub req_attr {
104 my (undef, $attr_name, @attr_val) = @{+shift};
105
106 $DBH->{$attr_name} = $attr_val[0]
107 if @attr_val;
108
109 [1, $DBH->{$attr_name}]
110}
111
112sub req_begin_work {
113 [1, $DBH->begin_work or die [$DBI::errstr]]
114}
115
116sub req_commit {
117 [1, $DBH->commit or die [$DBI::errstr]]
118}
119
120sub req_rollback {
121 [1, $DBH->rollback or die [$DBI::errstr]]
122}
123
124sub req_func {
125 my (undef, $arg_string, $function) = @{+shift};
126 my @args = eval $arg_string;
127
128 die "error evaling \$dbh->func() arg_string: $@"
129 if $@;
130
131 my $rc = $DBH->func (@args, $function);
132 return [1, $rc, $DBI::err, $DBI::errstr];
133}
134
65sub serve { 135sub serve_fh($$) {
66 my ($fh) = @_; 136 my ($fh, $version) = @_;
67 137
68 no strict; 138 if ($VERSION != $version) {
139 syswrite $fh,
140 pack "L/a*",
141 Storable::freeze
142 [undef, "AnyEvent::DBI version mismatch ($VERSION vs. $version)"];
143 return;
144 }
69 145
70 eval { 146 eval {
71 my $rbuf; 147 my $rbuf;
72 148
73 while () { 149 while () {
82 158
83 my $req = Storable::thaw substr $rbuf, 4; 159 my $req = Storable::thaw substr $rbuf, 4;
84 substr $rbuf, 0, $len + 4, ""; # remove length + request 160 substr $rbuf, 0, $len + 4, ""; # remove length + request
85 161
86 my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) }; 162 my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) };
87
88 $wbuf = pack "L/a*", Storable::freeze [undef, "$@"] 163 $wbuf = pack "L/a*", Storable::freeze [undef, ref $@ ? ("$@->[0]", $@->[1]) : ("$@", 1)]
89 if $@; 164 if $@;
90 165
91 for (my $ofs = 0; $ofs < length $wbuf; ) { 166 for (my $ofs = 0; $ofs < length $wbuf; ) {
92 $ofs += (syswrite $fh, substr $wbuf, $ofs 167 $ofs += (syswrite $fh, substr $wbuf, $ofs
93 or die "unable to write results"); 168 or die "unable to write results");
94 } 169 }
95 } 170 }
96 } 171 }
97 }; 172 };
173}
98 174
99 kill 9, $$; # no other way on the broken windows platform 175sub serve_fd($$) {
176 open my $fh, ">>&=$_[0]"
177 or die "Couldn't open server file descriptor: $!";
178
179 serve_fh $fh, $_[1];
100} 180}
101 181
102=head2 METHODS 182=head2 METHODS
103 183
104=over 4 184=over 4
126 206
127When an error occurs, then this callback will be invoked. On entry, C<$@> 207When an error occurs, then this callback will be invoked. On entry, C<$@>
128is set to the error message. C<$filename> and C<$line> is where the 208is set to the error message. C<$filename> and C<$line> is where the
129original request was submitted. 209original request was submitted.
130 210
131If this callback returns and this was a fatal error (C<$fatal> is true) 211If the fatal argument is true then the database connection is shut down
132then AnyEvent::DBI die's, otherwise it calls the original request callback 212and your database handle became invalid. In addition to invoking the
133without any arguments. 213C<on_error> callback, all of your queued request callbacks are called
214without only the C<$dbh> argument.
134 215
135If omitted, then C<die> will be called on any errors, fatal or not. 216If omitted, then C<die> will be called on any errors, fatal or not.
136 217
218=item on_connect => $callback->($dbh[, $success])
219
220If you supply an C<on_connect> callback, then this callback will be
221invoked after the database connect attempt. If the connection succeeds,
222C<$success> is true, otherwise it is missing and C<$@> contains the
223C<$DBI::errstr>.
224
225Regardless of whether C<on_connect> is supplied, connect errors will result in
226C<on_error> being called. However, if no C<on_connect> callback is supplied, then
227connection errors are considered fatal. The client will C<die> and the C<on_error>
228callback will be called with C<$fatal> true.
229
230When on_connect is supplied, connect error are not fatal and AnyEvent::DBI
231will not C<die>. You still cannot, however, use the $dbh object you
232received from C<new> to make requests.
233
234=item exec_server => 1
235
236If you supply an C<exec_server> argument, then the DBI server process will
237fork and exec another perl interpreter (using C<$^X>) with just the
238AnyEvent::DBI proxy running. This will provide the cleanest possible proxy
239for your database server.
240
241If you do not supply the C<exec_server> argument (or supply it with a
242false value) then the traditional method of starting the server by forking
243the current process is used. The forked interpreter will try to clean
244itself up by calling POSIX::close on all file descriptors except STDIN,
245STDOUT, and STDERR (and the socket it uses to communicate with the cilent,
246of course).
247
248=item timeout => seconds
249
250If you supply a timeout parameter (fractional values are supported), then
251a timer is started any time the DBI handle expects a response from the
252server. This includes connection setup as well as requests made to the
253backend. The timeout spans the duration from the moment the first data
254is written (or queued to be written) until all expected responses are
255returned, but is postponed for "timeout" seconds each time more data is
256returned from the server. If the timer ever goes off then a fatal error is
257generated. If you have an C<on_error> handler installed, then it will be
258called, otherwise your program will die().
259
260When altering your databases with timeouts it is wise to use
261transactions. If you quit due to timeout while performing insert, update
262or schema-altering commands you can end up not knowing if the action was
263submitted to the database, complicating recovery.
264
265Timeout errors are always fatal.
266
137=back 267=back
268
269Any additional key-value pairs will be rolled into a hash reference
270and passed as the final argument to the C<< DBI->connect (...) >>
271call. For example, to supress errors on STDERR and send them instead to an
272AnyEvent::Handle you could do:
273
274 $dbh = new AnyEvent::DBI
275 "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "",
276 PrintError => 0,
277 on_error => sub {
278 $log_handle->push_write ("DBI Error: $@ at $_[1]:$_[2]\n");
279 };
138 280
139=cut 281=cut
140 282
141# stupid Storable autoloading, total loss-loss situation 283# stupid Storable autoloading, total loss-loss situation
142Storable::thaw Storable::freeze []; 284Storable::thaw Storable::freeze [];
143 285
144sub new { 286sub new {
145 my ($class, $dbi, $user, $pass, %arg) = @_; 287 my ($class, $dbi, $user, $pass, %arg) = @_;
146 288
147 socketpair my $client, my $server, &Socket::AF_UNIX, &Socket::SOCK_STREAM, &Socket::PF_UNSPEC 289 my ($client, $server) = AnyEvent::Util::portable_socketpair
148 or croak "unable to create dbi communicaiton pipe: $!"; 290 or croak "unable to create AnyEvent::DBI communications pipe: $!";
291
292 my %dbi_args = %arg;
293 delete @dbi_args{qw(on_connect on_error timeout exec_server)};
149 294
150 my $self = bless \%arg, $class; 295 my $self = bless \%arg, $class;
151
152 $self->{fh} = $client; 296 $self->{fh} = $client;
153
154 Scalar::Util::weaken (my $wself = $self);
155 297
156 AnyEvent::Util::fh_nonblocking $client, 1; 298 AnyEvent::Util::fh_nonblocking $client, 1;
157 299
158 my $rbuf; 300 my $rbuf;
159 my @caller = (caller)[1,2]; # the "default" caller 301 my @caller = (caller)[1,2]; # the "default" caller
160 302
161 $self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub { 303 {
304 Scalar::Util::weaken (my $self = $self);
305
306 $self->{rw} = AE::io $client, 0, sub {
307 return unless $self;
308
309 $self->{last_activity} = AE::now;
310
162 my $len = sysread $client, $rbuf, 65536, length $rbuf; 311 my $len = sysread $client, $rbuf, 65536, length $rbuf;
163 312
164 if ($len > 0) { 313 if ($len > 0) {
314 # we received data, so reset the timer
165 315
166 while () { 316 while () {
167 my $len = unpack "L", $rbuf; 317 my $len = unpack "L", $rbuf;
168 318
169 # full request available? 319 # full response available?
170 last unless $len && $len + 4 <= length $rbuf; 320 last unless $len && $len + 4 <= length $rbuf;
171 321
172 my $res = Storable::thaw substr $rbuf, 4; 322 my $res = Storable::thaw substr $rbuf, 4;
173 substr $rbuf, 0, $len + 4, ""; # remove length + request 323 substr $rbuf, 0, $len + 4, ""; # remove length + request
174 324
325 last unless $self;
175 my $req = shift @{ $wself->{queue} }; 326 my $req = shift @{ $self->{queue} };
176 327
177 if (defined $res->[0]) { 328 if (defined $res->[0]) {
329 $res->[0] = $self;
178 $req->[0](@$res); 330 $req->[0](@$res);
331 } else {
332 my $cb = shift @$req;
333 local $@ = $res->[1];
334 $cb->($self);
335 $self->_error ($res->[1], @$req, $res->[2]) # error, request record, is_fatal
336 if $self; # cb() could have deleted it
337 }
338
339 # no more queued requests, so become idle
340 undef $self->{last_activity}
341 if $self && !@{ $self->{queue} };
342 }
343
344 } elsif (defined $len) {
345 # todo, caller?
346 $self->_error ("unexpected eof", @caller, 1);
347 } elsif ($! != Errno::EAGAIN) {
348 # todo, caller?
349 $self->_error ("read error: $!", @caller, 1);
350 }
351 };
352
353 $self->{tw_cb} = sub {
354 if ($self->{timeout} && $self->{last_activity}) {
355 if (AE::now > $self->{last_activity} + $self->{timeout}) {
356 # we did time out
357 my $req = $self->{queue}[0];
358 $self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal
179 } else { 359 } else {
180 my $cb = shift @$req; 360 # we need to re-set the timeout watcher
181 $wself->_error ($res->[1], @$req); 361 $self->{tw} = AE::timer
362 $self->{last_activity} + $self->{timeout} - AE::now,
363 0,
364 $self->{tw_cb},
182 $cb->(); 365 ;
183 } 366 }
367 } else {
368 # no timeout check wanted, or idle
369 undef $self->{tw};
184 } 370 }
185
186 } elsif (defined $len) {
187 $wself->_error ("unexpected eof", @caller, 1);
188 } else {
189 $wself->_error ("read error: $!", @caller, 1);
190 } 371 };
191 });
192 372
193 $self->{ww_cb} = sub { 373 $self->{ww_cb} = sub {
374 return unless $self;
375
376 $self->{last_activity} = AE::now;
377
194 my $len = syswrite $client, $wself->{wbuf} 378 my $len = syswrite $client, $self->{wbuf}
195 or return delete $wself->{ww}; 379 or return delete $self->{ww};
196 380
197 substr $wself->{wbuf}, 0, $len, ""; 381 substr $self->{wbuf}, 0, $len, "";
382 };
198 }; 383 }
199 384
200 my $pid = fork; 385 my $pid = fork;
201 386
202 if ($pid) { 387 if ($pid) {
203 # parent 388 # parent
204 close $server; 389 close $server;
205
206 } elsif (defined $pid) { 390 } elsif (defined $pid) {
207 # child 391 # child
208 close $client; 392 my $serv_fno = fileno $server;
209 @_ = $server;
210 goto &serve;
211 393
394 if ($self->{exec_server}) {
395 fcntl $server, &Fcntl::F_SETFD, 0; # don't close the server side
396 exec {$^X}
397 "$0 dbi slave",
398 -e => "require shift; AnyEvent::DBI::serve_fd ($serv_fno, $VERSION)",
399 $INC{"AnyEvent/DBI.pm"};
400 POSIX::_exit 124;
401 } else {
402 ($_ != $serv_fno) && POSIX::close $_
403 for $^F+1..$FD_MAX;
404 serve_fh $server, $VERSION;
405
406 # no other way on the broken windows platform, even this leaks
407 # memory and might fail.
408 kill 9, $$
409 if AnyEvent::WIN32;
410
411 # and this kills the parent process on windows
412 POSIX::_exit 0;
413 }
212 } else { 414 } else {
213 croak "fork: $!"; 415 croak "fork: $!";
214 } 416 }
215 417
216 $self->_req (sub { }, (caller)[1,2], 1, req_open => $dbi, $user, $pass); 418 $self->{child_pid} = $pid;
419
420 $self->_req (
421 ($self->{on_connect} ? $self->{on_connect} : sub { }),
422 (caller)[1,2],
423 req_open => $dbi, $user, $pass, %dbi_args
424 );
217 425
218 $self 426 $self
427}
428
429sub _server_pid {
430 shift->{child_pid}
431}
432
433sub kill_child {
434 my $self = shift;
435
436 if (my $pid = delete $self->{child_pid}) {
437 kill TERM => $pid;
438 }
439 close delete $self->{fh};
440}
441
442sub DESTROY {
443 shift->kill_child;
219} 444}
220 445
221sub _error { 446sub _error {
222 my ($self, $error, $filename, $line, $fatal) = @_; 447 my ($self, $error, $filename, $line, $fatal) = @_;
223 448
449 if ($fatal) {
450 delete $self->{tw};
224 delete $self->{rw}; 451 delete $self->{rw};
225 delete $self->{ww}; 452 delete $self->{ww};
226 delete $self->{fh}; 453 delete $self->{fh};
227 454
455 # for fatal errors call all enqueued callbacks with error
456 while (my $req = shift @{$self->{queue}}) {
457 local $@ = $error;
458 $req->[0]->($self);
459 }
460 $self->kill_child;
461 }
462
228 $@ = $error; 463 local $@ = $error;
229 464
465 if ($self->{on_error}) {
230 $self->{on_error}($self, $filename, $line, $fatal) 466 $self->{on_error}($self, $filename, $line, $fatal)
231 if $self->{on_error}; 467 } else {
232
233 die "$error at $filename, line $line\n"; 468 die "$error at $filename, line $line\n";
469 }
470}
471
472=item $dbh->on_error ($cb->($dbh, $filename, $line, $fatal))
473
474Sets (or clears, with C<undef>) the C<on_error> handler.
475
476=cut
477
478sub on_error {
479 $_[0]{on_error} = $_[1];
480}
481
482=item $dbh->timeout ($seconds)
483
484Sets (or clears, with C<undef>) the database timeout. Useful to extend the
485timeout when you are about to make a really long query.
486
487=cut
488
489sub timeout {
490 my ($self, $timeout) = @_;
491
492 $self->{timeout} = $timeout;
493
494 # reschedule timer if one was running
495 $self->{tw_cb}->();
234} 496}
235 497
236sub _req { 498sub _req {
237 my ($self, $cb, $filename, $line, $fatal) = splice @_, 0, 5, (); 499 my ($self, $cb, $filename, $line) = splice @_, 0, 4, ();
238 500
501 unless ($self->{fh}) {
502 local $@ = my $err = 'no database connection';
503 $cb->($self);
504 $self->_error ($err, $filename, $line, 1);
505 return;
506 }
507
239 push @{ $self->{queue} }, [$cb, $filename, $line, $fatal]; 508 push @{ $self->{queue} }, [$cb, $filename, $line];
509
510 # re-start timeout if necessary
511 if ($self->{timeout} && !$self->{tw}) {
512 $self->{last_activity} = AE::now;
513 $self->{tw_cb}->();
514 }
240 515
241 $self->{wbuf} .= pack "L/a*", Storable::freeze \@_; 516 $self->{wbuf} .= pack "L/a*", Storable::freeze \@_;
242 517
243 unless ($self->{ww}) { 518 unless ($self->{ww}) {
244 my $len = syswrite $self->{fh}, $self->{wbuf}; 519 my $len = syswrite $self->{fh}, $self->{wbuf};
245 substr $self->{wbuf}, 0, $len, ""; 520 substr $self->{wbuf}, 0, $len, "";
246 521
247 # still any left? then install a write watcher 522 # still any left? then install a write watcher
248 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb}) 523 $self->{ww} = AE::io $self->{fh}, 1, $self->{ww_cb}
249 if length $self->{wbuf}; 524 if length $self->{wbuf};
250 } 525 }
251} 526}
252 527
253=item $dbh->exec ("statement", @args, $cb->($rows, %extra)) 528=item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, $rv))
254 529
255Executes the given SQL statement with placeholders replaced by 530Executes the given SQL statement with placeholders replaced by
256C<@args>. The statement will be prepared and cached on the server side, so 531C<@args>. The statement will be prepared and cached on the server side, so
257using placeholders is compulsory. 532using placeholders is extremely important.
258 533
259The callback will be called with the result of C<fetchall_arrayref> as 534The callback will be called with a weakened AnyEvent::DBI object as the
260first argument and possibly a hash reference with additional information. 535first argument and the result of C<fetchall_arrayref> as (or C<undef>
536if the statement wasn't a select statement) as the second argument.
261 537
538Third argument is the return value from the C<< DBI->execute >> method
539call.
540
262If an error occurs and the C<on_error> callback returns, then no arguments 541If an error occurs and the C<on_error> callback returns, then only C<$dbh>
263will be passed and C<$@> contains the error message. 542will be passed and C<$@> contains the error message.
264 543
544=item $dbh->attr ($attr_name[, $attr_value], $cb->($dbh, $new_value))
545
546An accessor for the handle attributes, such as C<AutoCommit>,
547C<RaiseError>, C<PrintError> and so on. If you provide an C<$attr_value>
548(which might be C<undef>), then the given attribute will be set to that
549value.
550
551The callback will be passed the database handle and the attribute's value
552if successful.
553
554If an error occurs and the C<on_error> callback returns, then only C<$dbh>
555will be passed and C<$@> contains the error message.
556
557=item $dbh->begin_work ($cb->($dbh[, $rc]))
558
559=item $dbh->commit ($cb->($dbh[, $rc]))
560
561=item $dbh->rollback ($cb->($dbh[, $rc]))
562
563The begin_work, commit, and rollback methods expose the equivalent
564transaction control method of the DBI driver. On success, C<$rc> is true.
565
566If an error occurs and the C<on_error> callback returns, then only C<$dbh>
567will be passed and C<$@> contains the error message.
568
569=item $dbh->func ('string_which_yields_args_when_evaled', $func_name, $cb->($dbh, $rc, $dbi_err, $dbi_errstr))
570
571This gives access to database driver private methods. Because they
572are not standard you cannot always depend on the value of C<$rc> or
573C<$dbi_err>. Check the documentation for your specific driver/function
574combination to see what it returns.
575
576Note that the first argument will be eval'ed to produce the argument list to
577the func() method. This must be done because the serialization protocol
578between the AnyEvent::DBI server process and your program does not support the
579passage of closures.
580
581Here's an example to extend the query language in SQLite so it supports an
582intstr() function:
583
584 $cv = AnyEvent->condvar;
585 $dbh->func (
586 q{
587 instr => 2, sub {
588 my ($string, $search) = @_;
589 return index $string, $search;
590 },
591 },
592 create_function => sub {
593 return $cv->send ($@)
594 unless $#_;
595 $cv->send (undef, @_[1,2,3]);
596 }
597 );
598
599 my ($err,$rc,$errcode,$errstr) = $cv->recv;
600
601 die $err if defined $err;
602 die "EVAL failed: $errstr"
603 if $errcode;
604
605 # otherwise, we can ignore $rc and $errcode for this particular func
606
265=cut 607=cut
266 608
267sub exec { 609for my $cmd_name (qw(exec attr begin_work commit rollback func)) {
610 eval 'sub ' . $cmd_name . '{
268 my $cb = pop; 611 my $cb = pop;
269 splice @_, 1, 0, $cb, (caller)[1,2], 0, "req_exec"; 612 splice @_, 1, 0, $cb, (caller)[1,2], "req_' . $cmd_name . '";
270 613 &_req
271 goto &_req; 614 }';
272} 615}
273 616
274=back 617=back
275 618
276=head1 SEE ALSO 619=head1 SEE ALSO
277 620
278L<AnyEvent>, L<DBI>. 621L<AnyEvent>, L<DBI>, L<Coro::Mysql>.
279 622
280=head1 AUTHOR 623=head1 AUTHOR
281 624
282 Marc Lehmann <schmorp@schmorp.de> 625 Marc Lehmann <schmorp@schmorp.de>
283 http://home.schmorp.de/ 626 http://home.schmorp.de/
627
628 Adam Rosenstein <adam@redcondor.com>
629 http://www.redcondor.com/
284 630
285=cut 631=cut
286 632
2871 6331;
288 634

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines