ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
(Generate patch)

Comparing AnyEvent-DBI/DBI.pm (file contents):
Revision 1.9 by root, Thu Nov 6 13:56:58 2008 UTC vs.
Revision 1.16 by root, Tue Apr 2 02:13:13 2013 UTC

9 my $cv = AnyEvent->condvar; 9 my $cv = AnyEvent->condvar;
10 10
11 my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", ""; 11 my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", "";
12 12
13 $dbh->exec ("select * from test where num=?", 10, sub { 13 $dbh->exec ("select * from test where num=?", 10, sub {
14 my ($rows, $rv) = @_; 14 my ($dbh, $rows, $rv) = @_;
15
16 $#_ or die "failure: $@";
15 17
16 print "@$_\n" 18 print "@$_\n"
17 for @$rows; 19 for @$rows;
18 20
19 $cv->broadcast; 21 $cv->broadcast;
32separate "DBI-Server" processes and sending them requests. 34separate "DBI-Server" processes and sending them requests.
33 35
34It means that you can run DBI requests in parallel to other tasks. 36It means that you can run DBI requests in parallel to other tasks.
35 37
36The overhead for very simple statements ("select 0") is somewhere 38The overhead for very simple statements ("select 0") is somewhere
37around 120% to 200% (dual/single core CPU) compared to an explicit 39around 100% to 120% (dual/single core CPU) compared to an explicit
38prepare_cached/execute/fetchrow_arrayref/finish combination. 40prepare_cached/execute/fetchrow_arrayref/finish combination.
39 41
42=head2 ERROR HANDLING
43
44This module defines a number of functions that accept a callback
45argument. All callbacks used by this module get their AnyEvent::DBI handle
46object passed as first argument.
47
48If the request was successful, then there will be more arguments,
49otherwise there will only be the C<$dbh> argument and C<$@> contains an
50error message.
51
52A convinient way to check whether an error occured is to check C<$#_> -
53if that is true, then the function was successful, otherwise there was an
54error.
55
40=cut 56=cut
41 57
42package AnyEvent::DBI; 58package AnyEvent::DBI;
43 59
44use strict; 60use common::sense;
45no warnings;
46 61
47use Carp; 62use Carp;
48use Socket (); 63use Socket ();
49use Scalar::Util (); 64use Scalar::Util ();
50use Storable (); 65use Storable ();
51 66
52use DBI (); 67use DBI (); # only needed in child actually - do it before fork & !exec?
53 68
54use AnyEvent (); 69use AnyEvent ();
55use AnyEvent::Util (); 70use AnyEvent::Util ();
56 71
72use Errno ();
73use Fcntl ();
74use POSIX ();
75
57our $VERSION = '1.1'; 76our $VERSION = '2.3';
58 77
59# this is the forked server code 78our $FD_MAX = eval { POSIX::sysconf (&POSIX::_SC_OPEN_MAX) - 1 } || 1023;
79
80# this is the forked server code, could/should be bundled as it's own file
60 81
61our $DBH; 82our $DBH;
62 83
63sub req_open { 84sub req_open {
64 my (undef, $dbi, $user, $pass, %attr) = @{+shift}; 85 my (undef, $dbi, $user, $pass, %attr) = @{+shift};
65 86
66 $DBH = DBI->connect ($dbi, $user, $pass, \%attr); 87 $DBH = DBI->connect ($dbi, $user, $pass, \%attr) or die $DBI::errstr;
67 88
68 [1] 89 [1, 1]
69} 90}
70 91
71sub req_exec { 92sub req_exec {
72 my (undef, $st, @args) = @{+shift}; 93 my (undef, $st, @args) = @{+shift};
73
74 my $sth = $DBH->prepare_cached ($st, undef, 1); 94 my $sth = $DBH->prepare_cached ($st, undef, 1)
95 or die [$DBI::errstr];
75 96
76 my $rv = $sth->execute (@args) 97 my $rv = $sth->execute (@args)
77 or die $sth->errstr; 98 or die [$sth->errstr];
78 99
79 [1, $sth->{NUM_OF_FIELDS} ? $sth->fetchall_arrayref : undef, { rv => $rv }] 100 [1, $sth->{NUM_OF_FIELDS} ? $sth->fetchall_arrayref : undef, $rv]
80} 101}
81 102
103sub req_attr {
104 my (undef, $attr_name, @attr_val) = @{+shift};
105
106 $DBH->{$attr_name} = $attr_val[0]
107 if @attr_val;
108
109 [1, $DBH->{$attr_name}]
110}
111
112sub req_begin_work {
113 [1, $DBH->begin_work || die [$DBI::errstr]]
114}
115
116sub req_commit {
117 [1, $DBH->commit || die [$DBI::errstr]]
118}
119
120sub req_rollback {
121 [1, $DBH->rollback || die [$DBI::errstr]]
122}
123
124sub req_func {
125 my (undef, $arg_string, $function) = @{+shift};
126 my @args = eval $arg_string;
127
128 die "error evaling \$dbh->func() arg_string: $@"
129 if $@;
130
131 my $rc = $DBH->func (@args, $function);
132 return [1, $rc, $DBI::err, $DBI::errstr];
133}
134
82sub serve { 135sub serve_fh($$) {
83 my ($fh) = @_; 136 my ($fh, $version) = @_;
84 137
85 no strict; 138 if ($VERSION != $version) {
139 syswrite $fh,
140 pack "L/a*",
141 Storable::freeze
142 [undef, "AnyEvent::DBI version mismatch ($VERSION vs. $version)"];
143 return;
144 }
86 145
87 eval { 146 eval {
88 my $rbuf; 147 my $rbuf;
89 148
90 while () { 149 while () {
99 158
100 my $req = Storable::thaw substr $rbuf, 4; 159 my $req = Storable::thaw substr $rbuf, 4;
101 substr $rbuf, 0, $len + 4, ""; # remove length + request 160 substr $rbuf, 0, $len + 4, ""; # remove length + request
102 161
103 my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) }; 162 my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) };
104
105 $wbuf = pack "L/a*", Storable::freeze [undef, "$@"] 163 $wbuf = pack "L/a*", Storable::freeze [undef, ref $@ ? ("$@->[0]", $@->[1]) : ("$@", 1)]
106 if $@; 164 if $@;
107 165
108 for (my $ofs = 0; $ofs < length $wbuf; ) { 166 for (my $ofs = 0; $ofs < length $wbuf; ) {
109 $ofs += (syswrite $fh, substr $wbuf, $ofs 167 $ofs += (syswrite $fh, substr $wbuf, $ofs
110 or die "unable to write results"); 168 or die "unable to write results");
111 } 169 }
112 } 170 }
113 } 171 }
114 }; 172 };
173}
115 174
116 if (AnyEvent::WIN32) { 175sub serve_fd($$) {
117 kill 9, $$; # no other way on the broken windows platform 176 open my $fh, ">>&=$_[0]"
118 # and the above doesn't even work on windows, it seems the only 177 or die "Couldn't open server file descriptor: $!";
119 # way to is to leak memory and kill 9 from the parent. yay.
120 }
121 178
122 require POSIX; 179 serve_fh $fh, $_[1];
123 POSIX::_exit (0);
124 # and the above kills the parent process on windows
125} 180}
126 181
127=head2 METHODS 182=head2 METHODS
128 183
129=over 4 184=over 4
151 206
152When an error occurs, then this callback will be invoked. On entry, C<$@> 207When an error occurs, then this callback will be invoked. On entry, C<$@>
153is set to the error message. C<$filename> and C<$line> is where the 208is set to the error message. C<$filename> and C<$line> is where the
154original request was submitted. 209original request was submitted.
155 210
156If this callback returns and this was a fatal error (C<$fatal> is true) 211If the fatal argument is true then the database connection is shut down
157then AnyEvent::DBI die's, otherwise it calls the original request callback 212and your database handle became invalid. In addition to invoking the
158without any arguments. 213C<on_error> callback, all of your queued request callbacks are called
214without only the C<$dbh> argument.
159 215
160If omitted, then C<die> will be called on any errors, fatal or not. 216If omitted, then C<die> will be called on any errors, fatal or not.
161 217
218=item on_connect => $callback->($dbh[, $success])
219
220If you supply an C<on_connect> callback, then this callback will be
221invoked after the database connect attempt. If the connection succeeds,
222C<$success> is true, otherwise it is missing and C<$@> contains the
223C<$DBI::errstr>.
224
225Regardless of whether C<on_connect> is supplied, connect errors will result in
226C<on_error> being called. However, if no C<on_connect> callback is supplied, then
227connection errors are considered fatal. The client will C<die> and the C<on_error>
228callback will be called with C<$fatal> true.
229
230When on_connect is supplied, connect error are not fatal and AnyEvent::DBI
231will not C<die>. You still cannot, however, use the $dbh object you
232received from C<new> to make requests.
233
234=item exec_server => 1
235
236If you supply an C<exec_server> argument, then the DBI server process will
237fork and exec another perl interpreter (using C<$^X>) with just the
238AnyEvent::DBI proxy running. This will provide the cleanest possible proxy
239for your database server.
240
241If you do not supply the C<exec_server> argument (or supply it with a
242false value) then the traditional method of starting the server by forking
243the current process is used. The forked interpreter will try to clean
244itself up by calling POSIX::close on all file descriptors except STDIN,
245STDOUT, and STDERR (and the socket it uses to communicate with the cilent,
246of course).
247
248=item timeout => seconds
249
250If you supply a timeout parameter (fractional values are supported), then
251a timer is started any time the DBI handle expects a response from the
252server. This includes connection setup as well as requests made to the
253backend. The timeout spans the duration from the moment the first data
254is written (or queued to be written) until all expected responses are
255returned, but is postponed for "timeout" seconds each time more data is
256returned from the server. If the timer ever goes off then a fatal error is
257generated. If you have an C<on_error> handler installed, then it will be
258called, otherwise your program will die().
259
260When altering your databases with timeouts it is wise to use
261transactions. If you quit due to timeout while performing insert, update
262or schema-altering commands you can end up not knowing if the action was
263submitted to the database, complicating recovery.
264
265Timeout errors are always fatal.
266
162=back 267=back
268
269Any additional key-value pairs will be rolled into a hash reference
270and passed as the final argument to the C<< DBI->connect (...) >>
271call. For example, to supress errors on STDERR and send them instead to an
272AnyEvent::Handle you could do:
273
274 $dbh = new AnyEvent::DBI
275 "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "",
276 PrintError => 0,
277 on_error => sub {
278 $log_handle->push_write ("DBI Error: $@ at $_[1]:$_[2]\n");
279 };
163 280
164=cut 281=cut
165 282
166# stupid Storable autoloading, total loss-loss situation 283# stupid Storable autoloading, total loss-loss situation
167Storable::thaw Storable::freeze []; 284Storable::thaw Storable::freeze [];
168 285
169sub new { 286sub new {
170 my ($class, $dbi, $user, $pass, %arg) = @_; 287 my ($class, $dbi, $user, $pass, %arg) = @_;
171 288
172 socketpair my $client, my $server, &Socket::AF_UNIX, &Socket::SOCK_STREAM, &Socket::PF_UNSPEC 289 my ($client, $server) = AnyEvent::Util::portable_socketpair
173 or croak "unable to create dbi communicaiton pipe: $!"; 290 or croak "unable to create AnyEvent::DBI communications pipe: $!";
291
292 my %dbi_args = %arg;
293 delete @dbi_args{qw(on_connect on_error timeout exec_server)};
174 294
175 my $self = bless \%arg, $class; 295 my $self = bless \%arg, $class;
176
177 $self->{fh} = $client; 296 $self->{fh} = $client;
178
179 Scalar::Util::weaken (my $wself = $self);
180 297
181 AnyEvent::Util::fh_nonblocking $client, 1; 298 AnyEvent::Util::fh_nonblocking $client, 1;
182 299
183 my $rbuf; 300 my $rbuf;
184 my @caller = (caller)[1,2]; # the "default" caller 301 my @caller = (caller)[1,2]; # the "default" caller
185 302
186 $self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub { 303 {
304 Scalar::Util::weaken (my $self = $self);
305
306 $self->{rw} = AE::io $client, 0, sub {
307 return unless $self;
308
187 my $len = sysread $client, $rbuf, 65536, length $rbuf; 309 my $len = sysread $client, $rbuf, 65536, length $rbuf;
188 310
189 if ($len > 0) { 311 if ($len > 0) {
312 # we received data, so reset the timer
313 $self->{last_activity} = AE::now;
190 314
191 while () { 315 while () {
192 my $len = unpack "L", $rbuf; 316 my $len = unpack "L", $rbuf;
193 317
194 # full request available? 318 # full response available?
195 last unless $len && $len + 4 <= length $rbuf; 319 last unless $len && $len + 4 <= length $rbuf;
196 320
197 my $res = Storable::thaw substr $rbuf, 4; 321 my $res = Storable::thaw substr $rbuf, 4;
198 substr $rbuf, 0, $len + 4, ""; # remove length + request 322 substr $rbuf, 0, $len + 4, ""; # remove length + request
199 323
324 last unless $self;
200 my $req = shift @{ $wself->{queue} }; 325 my $req = shift @{ $self->{queue} };
201 326
202 if (defined $res->[0]) { 327 if (defined $res->[0]) {
328 $res->[0] = $self;
203 $req->[0](@$res); 329 $req->[0](@$res);
330 } else {
331 my $cb = shift @$req;
332 local $@ = $res->[1];
333 $cb->($self);
334 $self->_error ($res->[1], @$req, $res->[2]) # error, request record, is_fatal
335 if $self; # cb() could have deleted it
336 }
337
338 # no more queued requests, so become idle
339 if ($self && !@{ $self->{queue} }) {
340 undef $self->{last_activity};
341 $self->{tw_cb}->();
342 }
343 }
344
345 } elsif (defined $len) {
346 # todo, caller?
347 $self->_error ("unexpected eof", @caller, 1);
348 } elsif ($! != Errno::EAGAIN) {
349 # todo, caller?
350 $self->_error ("read error: $!", @caller, 1);
351 }
352 };
353
354 $self->{tw_cb} = sub {
355 if ($self->{timeout} && $self->{last_activity}) {
356 if (AE::now > $self->{last_activity} + $self->{timeout}) {
357 # we did time out
358 my $req = $self->{queue}[0];
359 $self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal
204 } else { 360 } else {
205 my $cb = shift @$req; 361 # we need to re-set the timeout watcher
206 $wself->_error ($res->[1], @$req); 362 $self->{tw} = AE::timer
363 $self->{last_activity} + $self->{timeout} - AE::now,
364 0,
365 $self->{tw_cb},
207 $cb->(); 366 ;
208 } 367 }
368 } else {
369 # no timeout check wanted, or idle
370 undef $self->{tw};
209 } 371 }
210
211 } elsif (defined $len) {
212 $wself->_error ("unexpected eof", @caller, 1);
213 } else {
214 $wself->_error ("read error: $!", @caller, 1);
215 } 372 };
216 });
217 373
218 $self->{ww_cb} = sub { 374 $self->{ww_cb} = sub {
375 return unless $self;
376
377 $self->{last_activity} = AE::now;
378
219 my $len = syswrite $client, $wself->{wbuf} 379 my $len = syswrite $client, $self->{wbuf}
220 or return delete $wself->{ww}; 380 or return delete $self->{ww};
221 381
222 substr $wself->{wbuf}, 0, $len, ""; 382 substr $self->{wbuf}, 0, $len, "";
383 };
223 }; 384 }
224 385
225 my $pid = fork; 386 my $pid = fork;
226 387
227 if ($pid) { 388 if ($pid) {
228 # parent 389 # parent
229 close $server; 390 close $server;
230
231 } elsif (defined $pid) { 391 } elsif (defined $pid) {
232 # child 392 # child
233 close $client; 393 my $serv_fno = fileno $server;
234 @_ = $server;
235 goto &serve;
236 394
395 if ($self->{exec_server}) {
396 fcntl $server, &Fcntl::F_SETFD, 0; # don't close the server side
397 exec {$^X}
398 "$0 dbi slave",
399 -e => "require shift; AnyEvent::DBI::serve_fd ($serv_fno, $VERSION)",
400 $INC{"AnyEvent/DBI.pm"};
401 POSIX::_exit 124;
402 } else {
403 ($_ != $serv_fno) && POSIX::close $_
404 for $^F+1..$FD_MAX;
405 serve_fh $server, $VERSION;
406
407 # no other way on the broken windows platform, even this leaks
408 # memory and might fail.
409 kill 9, $$
410 if AnyEvent::WIN32;
411
412 # and this kills the parent process on windows
413 POSIX::_exit 0;
414 }
237 } else { 415 } else {
238 croak "fork: $!"; 416 croak "fork: $!";
239 } 417 }
240 418
241 $self->_req (sub { }, (caller)[1,2], 1, req_open => $dbi, $user, $pass); 419 $self->{child_pid} = $pid;
420
421 $self->_req (
422 ($self->{on_connect} ? $self->{on_connect} : sub { }),
423 (caller)[1,2],
424 req_open => $dbi, $user, $pass, %dbi_args
425 );
242 426
243 $self 427 $self
428}
429
430sub _server_pid {
431 shift->{child_pid}
432}
433
434sub kill_child {
435 my $self = shift;
436
437 if (my $pid = delete $self->{child_pid}) {
438 # kill and reap process
439 my $kid_watcher; $kid_watcher = AE::child $pid, sub {
440 undef $kid_watcher;
441 };
442 kill TERM => $pid;
443 }
444
445 close delete $self->{fh};
446}
447
448sub DESTROY {
449 shift->kill_child;
244} 450}
245 451
246sub _error { 452sub _error {
247 my ($self, $error, $filename, $line, $fatal) = @_; 453 my ($self, $error, $filename, $line, $fatal) = @_;
248 454
455 if ($fatal) {
456 delete $self->{tw};
249 delete $self->{rw}; 457 delete $self->{rw};
250 delete $self->{ww}; 458 delete $self->{ww};
251 delete $self->{fh}; 459 delete $self->{fh};
252 460
461 # for fatal errors call all enqueued callbacks with error
462 while (my $req = shift @{$self->{queue}}) {
463 local $@ = $error;
464 $req->[0]->($self);
465 }
466 $self->kill_child;
467 }
468
253 $@ = $error; 469 local $@ = $error;
254 470
255 if ($self->{on_error}) { 471 if ($self->{on_error}) {
256 $self->{on_error}($self, $filename, $line, $fatal); 472 $self->{on_error}($self, $filename, $line, $fatal)
257 return unless $fatal; 473 } else {
258 }
259
260 die "$error at $filename, line $line\n"; 474 die "$error at $filename, line $line\n";
475 }
476}
477
478=item $dbh->on_error ($cb->($dbh, $filename, $line, $fatal))
479
480Sets (or clears, with C<undef>) the C<on_error> handler.
481
482=cut
483
484sub on_error {
485 $_[0]{on_error} = $_[1];
486}
487
488=item $dbh->timeout ($seconds)
489
490Sets (or clears, with C<undef>) the database timeout. Useful to extend the
491timeout when you are about to make a really long query.
492
493=cut
494
495sub timeout {
496 my ($self, $timeout) = @_;
497
498 $self->{timeout} = $timeout;
499
500 # reschedule timer if one was running
501 $self->{tw_cb}->();
261} 502}
262 503
263sub _req { 504sub _req {
264 my ($self, $cb, $filename, $line, $fatal) = splice @_, 0, 5, (); 505 my ($self, $cb, $filename, $line) = splice @_, 0, 4, ();
265 506
507 unless ($self->{fh}) {
508 local $@ = my $err = 'no database connection';
509 $cb->($self);
510 $self->_error ($err, $filename, $line, 1);
511 return;
512 }
513
266 push @{ $self->{queue} }, [$cb, $filename, $line, $fatal]; 514 push @{ $self->{queue} }, [$cb, $filename, $line];
515
516 # re-start timeout if necessary
517 if ($self->{timeout} && !$self->{tw}) {
518 $self->{last_activity} = AE::now;
519 $self->{tw_cb}->();
520 }
267 521
268 $self->{wbuf} .= pack "L/a*", Storable::freeze \@_; 522 $self->{wbuf} .= pack "L/a*", Storable::freeze \@_;
269 523
270 unless ($self->{ww}) { 524 unless ($self->{ww}) {
271 my $len = syswrite $self->{fh}, $self->{wbuf}; 525 my $len = syswrite $self->{fh}, $self->{wbuf};
272 substr $self->{wbuf}, 0, $len, ""; 526 substr $self->{wbuf}, 0, $len, "";
273 527
274 # still any left? then install a write watcher 528 # still any left? then install a write watcher
275 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb}) 529 $self->{ww} = AE::io $self->{fh}, 1, $self->{ww_cb}
276 if length $self->{wbuf}; 530 if length $self->{wbuf};
277 } 531 }
278} 532}
279 533
280=item $dbh->exec ("statement", @args, $cb->($rows, $rv, ...)) 534=item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, $rv))
281 535
282Executes the given SQL statement with placeholders replaced by 536Executes the given SQL statement with placeholders replaced by
283C<@args>. The statement will be prepared and cached on the server side, so 537C<@args>. The statement will be prepared and cached on the server side, so
284using placeholders is compulsory. 538using placeholders is extremely important.
285 539
286The callback will be called with the result of C<fetchall_arrayref> as 540The callback will be called with a weakened AnyEvent::DBI object as the
287first argument (or C<undef> if the statement wasn't a select statement) 541first argument and the result of C<fetchall_arrayref> as (or C<undef>
288and the return value of C<execute> as second argument. Additional 542if the statement wasn't a select statement) as the second argument.
289arguments might get passed as well.
290 543
544Third argument is the return value from the C<< DBI->execute >> method
545call.
546
291If an error occurs and the C<on_error> callback returns, then no arguments 547If an error occurs and the C<on_error> callback returns, then only C<$dbh>
292will be passed and C<$@> contains the error message. 548will be passed and C<$@> contains the error message.
293 549
550=item $dbh->attr ($attr_name[, $attr_value], $cb->($dbh, $new_value))
551
552An accessor for the handle attributes, such as C<AutoCommit>,
553C<RaiseError>, C<PrintError> and so on. If you provide an C<$attr_value>
554(which might be C<undef>), then the given attribute will be set to that
555value.
556
557The callback will be passed the database handle and the attribute's value
558if successful.
559
560If an error occurs and the C<on_error> callback returns, then only C<$dbh>
561will be passed and C<$@> contains the error message.
562
563=item $dbh->begin_work ($cb->($dbh[, $rc]))
564
565=item $dbh->commit ($cb->($dbh[, $rc]))
566
567=item $dbh->rollback ($cb->($dbh[, $rc]))
568
569The begin_work, commit, and rollback methods expose the equivalent
570transaction control method of the DBI driver. On success, C<$rc> is true.
571
572If an error occurs and the C<on_error> callback returns, then only C<$dbh>
573will be passed and C<$@> contains the error message.
574
575=item $dbh->func ('string_which_yields_args_when_evaled', $func_name, $cb->($dbh, $rc, $dbi_err, $dbi_errstr))
576
577This gives access to database driver private methods. Because they
578are not standard you cannot always depend on the value of C<$rc> or
579C<$dbi_err>. Check the documentation for your specific driver/function
580combination to see what it returns.
581
582Note that the first argument will be eval'ed to produce the argument list to
583the func() method. This must be done because the serialization protocol
584between the AnyEvent::DBI server process and your program does not support the
585passage of closures.
586
587Here's an example to extend the query language in SQLite so it supports an
588intstr() function:
589
590 $cv = AnyEvent->condvar;
591 $dbh->func (
592 q{
593 instr => 2, sub {
594 my ($string, $search) = @_;
595 return index $string, $search;
596 },
597 },
598 create_function => sub {
599 return $cv->send ($@)
600 unless $#_;
601 $cv->send (undef, @_[1,2,3]);
602 }
603 );
604
605 my ($err,$rc,$errcode,$errstr) = $cv->recv;
606
607 die $err if defined $err;
608 die "EVAL failed: $errstr"
609 if $errcode;
610
611 # otherwise, we can ignore $rc and $errcode for this particular func
612
294=cut 613=cut
295 614
296sub exec { 615for my $cmd_name (qw(exec attr begin_work commit rollback func)) {
616 eval 'sub ' . $cmd_name . '{
297 my $cb = pop; 617 my $cb = pop;
298 splice @_, 1, 0, $cb, (caller)[1,2], 0, "req_exec"; 618 splice @_, 1, 0, $cb, (caller)[1,2], "req_' . $cmd_name . '";
299 619 &_req
300 goto &_req; 620 }';
301} 621}
302 622
303=back 623=back
304 624
305=head1 SEE ALSO 625=head1 SEE ALSO
306 626
307L<AnyEvent>, L<DBI>. 627L<AnyEvent>, L<DBI>, L<Coro::Mysql>.
308 628
309=head1 AUTHOR 629=head1 AUTHOR
310 630
311 Marc Lehmann <schmorp@schmorp.de> 631 Marc Lehmann <schmorp@schmorp.de>
312 http://home.schmorp.de/ 632 http://home.schmorp.de/
313 633
634 Adam Rosenstein <adam@redcondor.com>
635 http://www.redcondor.com/
636
314=cut 637=cut
315 638
3161 6391;
317 640

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines