ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
(Generate patch)

Comparing AnyEvent-DBI/DBI.pm (file contents):
Revision 1.7 by root, Mon Jul 21 02:34:40 2008 UTC vs.
Revision 1.14 by root, Sat Oct 30 20:23:44 2010 UTC

9 my $cv = AnyEvent->condvar; 9 my $cv = AnyEvent->condvar;
10 10
11 my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", ""; 11 my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", "";
12 12
13 $dbh->exec ("select * from test where num=?", 10, sub { 13 $dbh->exec ("select * from test where num=?", 10, sub {
14 my ($rows, $rv) = @_; 14 my ($dbh, $rows, $rv) = @_;
15
16 $#_ or die "failure: $@";
15 17
16 print "@$_\n" 18 print "@$_\n"
17 for @$rows; 19 for @$rows;
18 20
19 $cv->broadcast; 21 $cv->broadcast;
26=head1 DESCRIPTION 28=head1 DESCRIPTION
27 29
28This module is an L<AnyEvent> user, you need to make sure that you use and 30This module is an L<AnyEvent> user, you need to make sure that you use and
29run a supported event loop. 31run a supported event loop.
30 32
31This module implements asynchronous DBI access my forking or executing 33This module implements asynchronous DBI access by forking or executing
32separate "DBI-Server" processes and sending them requests. 34separate "DBI-Server" processes and sending them requests.
33 35
34It means that you can run DBI requests in parallel to other tasks. 36It means that you can run DBI requests in parallel to other tasks.
35 37
36The overhead for very simple statements ("select 0") is somewhere 38The overhead for very simple statements ("select 0") is somewhere
37around 120% to 200% (dual/single core CPU) compared to an explicit 39around 100% to 120% (dual/single core CPU) compared to an explicit
38prepare_cached/execute/fetchrow_arrayref/finish combination. 40prepare_cached/execute/fetchrow_arrayref/finish combination.
39 41
42=head2 ERROR HANDLING
43
44This module defines a number of functions that accept a callback
45argument. All callbacks used by this module get their AnyEvent::DBI handle
46object passed as first argument.
47
48If the request was successful, then there will be more arguments,
49otherwise there will only be the C<$dbh> argument and C<$@> contains an
50error message.
51
52A convinient way to check whether an error occured is to check C<$#_> -
53if that is true, then the function was successful, otherwise there was an
54error.
55
40=cut 56=cut
41 57
42package AnyEvent::DBI; 58package AnyEvent::DBI;
43 59
44use strict; 60use common::sense;
45no warnings;
46 61
47use Carp; 62use Carp;
48use Socket (); 63use Socket ();
49use Scalar::Util (); 64use Scalar::Util ();
50use Storable (); 65use Storable ();
51 66
52use DBI (); 67use DBI (); # only needed in child actually - do it before fork & !exec?
53 68
54use AnyEvent (); 69use AnyEvent ();
55use AnyEvent::Util (); 70use AnyEvent::Util ();
56 71
72use Errno ();
73use Fcntl ();
74use POSIX ();
75
57our $VERSION = '1.1'; 76our $VERSION = '2.1';
58 77
59# this is the forked server code 78our $FD_MAX = eval { POSIX::sysconf (&POSIX::_SC_OPEN_MAX) - 1 } || 1023;
79
80# this is the forked server code, could/should be bundled as it's own file
60 81
61our $DBH; 82our $DBH;
62 83
63sub req_open { 84sub req_open {
64 my (undef, $dbi, $user, $pass, %attr) = @{+shift}; 85 my (undef, $dbi, $user, $pass, %attr) = @{+shift};
65 86
66 $DBH = DBI->connect ($dbi, $user, $pass, \%attr); 87 $DBH = DBI->connect ($dbi, $user, $pass, \%attr) or die $DBI::errstr;
67 88
68 [1] 89 [1, 1]
69} 90}
70 91
71sub req_exec { 92sub req_exec {
72 my (undef, $st, @args) = @{+shift}; 93 my (undef, $st, @args) = @{+shift};
73
74 my $sth = $DBH->prepare_cached ($st, undef, 1); 94 my $sth = $DBH->prepare_cached ($st, undef, 1)
95 or die [$DBI::errstr];
75 96
76 my $rv = $sth->execute (@args) 97 my $rv = $sth->execute (@args)
77 or die $sth->errstr; 98 or die [$sth->errstr];
78 99
79 [1, $sth->{NUM_OF_FIELDS} ? $sth->fetchall_arrayref : undef, { rv => $rv }] 100 [1, $sth->{NUM_OF_FIELDS} ? $sth->fetchall_arrayref : undef, $rv]
80} 101}
81 102
103sub req_attr {
104 my (undef, $attr_name, @attr_val) = @{+shift};
105
106 $DBH->{$attr_name} = $attr_val[0]
107 if @attr_val;
108
109 [1, $DBH->{$attr_name}]
110}
111
112sub req_begin_work {
113 [1, $DBH->begin_work or die [$DBI::errstr]]
114}
115
116sub req_commit {
117 [1, $DBH->commit or die [$DBI::errstr]]
118}
119
120sub req_rollback {
121 [1, $DBH->rollback or die [$DBI::errstr]]
122}
123
124sub req_func {
125 my (undef, $arg_string, $function) = @{+shift};
126 my @args = eval $arg_string;
127
128 die "error evaling \$dbh->func() arg_string: $@"
129 if $@;
130
131 my $rc = $DBH->func (@args, $function);
132 return [1, $rc, $DBI::err, $DBI::errstr];
133}
134
82sub serve { 135sub serve_fh($$) {
83 my ($fh) = @_; 136 my ($fh, $version) = @_;
84 137
85 no strict; 138 if ($VERSION != $version) {
139 syswrite $fh,
140 pack "L/a*",
141 Storable::freeze
142 [undef, "AnyEvent::DBI version mismatch ($VERSION vs. $version)"];
143 return;
144 }
86 145
87 eval { 146 eval {
88 my $rbuf; 147 my $rbuf;
89 148
90 while () { 149 while () {
99 158
100 my $req = Storable::thaw substr $rbuf, 4; 159 my $req = Storable::thaw substr $rbuf, 4;
101 substr $rbuf, 0, $len + 4, ""; # remove length + request 160 substr $rbuf, 0, $len + 4, ""; # remove length + request
102 161
103 my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) }; 162 my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) };
104
105 $wbuf = pack "L/a*", Storable::freeze [undef, "$@"] 163 $wbuf = pack "L/a*", Storable::freeze [undef, ref $@ ? ("$@->[0]", $@->[1]) : ("$@", 1)]
106 if $@; 164 if $@;
107 165
108 for (my $ofs = 0; $ofs < length $wbuf; ) { 166 for (my $ofs = 0; $ofs < length $wbuf; ) {
109 $ofs += (syswrite $fh, substr $wbuf, $ofs 167 $ofs += (syswrite $fh, substr $wbuf, $ofs
110 or die "unable to write results"); 168 or die "unable to write results");
111 } 169 }
112 } 170 }
113 } 171 }
114 }; 172 };
173}
115 174
116 if (AnyEvent::WIN32) { 175sub serve_fd($$) {
117 kill 9, $$; # no other way on the broken windows platform 176 open my $fh, ">>&=$_[0]"
118 # and the above doesn't even work on windows, it seems the only 177 or die "Couldn't open server file descriptor: $!";
119 # way to is to leak memory and kill 9 from the parent. yay.
120 }
121 178
122 require POSIX; 179 serve_fh $fh, $_[1];
123 POSIX::_exit (0);
124 # and the above kills the parent process on windows
125} 180}
126 181
127=head2 METHODS 182=head2 METHODS
128 183
129=over 4 184=over 4
151 206
152When an error occurs, then this callback will be invoked. On entry, C<$@> 207When an error occurs, then this callback will be invoked. On entry, C<$@>
153is set to the error message. C<$filename> and C<$line> is where the 208is set to the error message. C<$filename> and C<$line> is where the
154original request was submitted. 209original request was submitted.
155 210
156If this callback returns and this was a fatal error (C<$fatal> is true) 211If the fatal argument is true then the database connection is shut down
157then AnyEvent::DBI die's, otherwise it calls the original request callback 212and your database handle became invalid. In addition to invoking the
158without any arguments. 213C<on_error> callback, all of your queued request callbacks are called
214without only the C<$dbh> argument.
159 215
160If omitted, then C<die> will be called on any errors, fatal or not. 216If omitted, then C<die> will be called on any errors, fatal or not.
161 217
218=item on_connect => $callback->($dbh[, $success])
219
220If you supply an C<on_connect> callback, then this callback will be
221invoked after the database connect attempt. If the connection succeeds,
222C<$success> is true, otherwise it is missing and C<$@> contains the
223C<$DBI::errstr>.
224
225Regardless of whether C<on_connect> is supplied, connect errors will result in
226C<on_error> being called. However, if no C<on_connect> callback is supplied, then
227connection errors are considered fatal. The client will C<die> and the C<on_error>
228callback will be called with C<$fatal> true.
229
230When on_connect is supplied, connect error are not fatal and AnyEvent::DBI
231will not C<die>. You still cannot, however, use the $dbh object you
232received from C<new> to make requests.
233
234=item exec_server => 1
235
236If you supply an C<exec_server> argument, then the DBI server process will
237fork and exec another perl interpreter (using C<$^X>) with just the
238AnyEvent::DBI proxy running. This will provide the cleanest possible proxy
239for your database server.
240
241If you do not supply the C<exec_server> argument (or supply it with a
242false value) then the traditional method of starting the server by forking
243the current process is used. The forked interpreter will try to clean
244itself up by calling POSIX::close on all file descriptors except STDIN,
245STDOUT, and STDERR (and the socket it uses to communicate with the cilent,
246of course).
247
248=item timeout => seconds
249
250If you supply a timeout parameter (fractional values are supported), then
251a timer is started any time the DBI handle expects a response from the
252server. This includes connection setup as well as requests made to the
253backend. The timeout spans the duration from the moment the first data
254is written (or queued to be written) until all expected responses are
255returned, but is postponed for "timeout" seconds each time more data is
256returned from the server. If the timer ever goes off then a fatal error is
257generated. If you have an C<on_error> handler installed, then it will be
258called, otherwise your program will die().
259
260When altering your databases with timeouts it is wise to use
261transactions. If you quit due to timeout while performing insert, update
262or schema-altering commands you can end up not knowing if the action was
263submitted to the database, complicating recovery.
264
265Timeout errors are always fatal.
266
162=back 267=back
268
269Any additional key-value pairs will be rolled into a hash reference
270and passed as the final argument to the C<< DBI->connect (...) >>
271call. For example, to supress errors on STDERR and send them instead to an
272AnyEvent::Handle you could do:
273
274 $dbh = new AnyEvent::DBI
275 "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "",
276 PrintError => 0,
277 on_error => sub {
278 $log_handle->push_write ("DBI Error: $@ at $_[1]:$_[2]\n");
279 };
163 280
164=cut 281=cut
165 282
166# stupid Storable autoloading, total loss-loss situation 283# stupid Storable autoloading, total loss-loss situation
167Storable::thaw Storable::freeze []; 284Storable::thaw Storable::freeze [];
168 285
169sub new { 286sub new {
170 my ($class, $dbi, $user, $pass, %arg) = @_; 287 my ($class, $dbi, $user, $pass, %arg) = @_;
171 288
172 socketpair my $client, my $server, &Socket::AF_UNIX, &Socket::SOCK_STREAM, &Socket::PF_UNSPEC 289 my ($client, $server) = AnyEvent::Util::portable_socketpair
173 or croak "unable to create dbi communicaiton pipe: $!"; 290 or croak "unable to create AnyEvent::DBI communications pipe: $!";
291
292 my %dbi_args = %arg;
293 delete @dbi_args{qw(on_connect on_error timeout exec_server)};
174 294
175 my $self = bless \%arg, $class; 295 my $self = bless \%arg, $class;
176
177 $self->{fh} = $client; 296 $self->{fh} = $client;
178
179 Scalar::Util::weaken (my $wself = $self);
180 297
181 AnyEvent::Util::fh_nonblocking $client, 1; 298 AnyEvent::Util::fh_nonblocking $client, 1;
182 299
183 my $rbuf; 300 my $rbuf;
184 my @caller = (caller)[1,2]; # the "default" caller 301 my @caller = (caller)[1,2]; # the "default" caller
185 302
186 $self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub { 303 {
304 Scalar::Util::weaken (my $self = $self);
305
306 $self->{rw} = AE::io $client, 0, sub {
307 return unless $self;
308
309 $self->{last_activity} = AE::now;
310
187 my $len = sysread $client, $rbuf, 65536, length $rbuf; 311 my $len = sysread $client, $rbuf, 65536, length $rbuf;
188 312
189 if ($len > 0) { 313 if ($len > 0) {
314 # we received data, so reset the timer
190 315
191 while () { 316 while () {
192 my $len = unpack "L", $rbuf; 317 my $len = unpack "L", $rbuf;
193 318
194 # full request available? 319 # full response available?
195 last unless $len && $len + 4 <= length $rbuf; 320 last unless $len && $len + 4 <= length $rbuf;
196 321
197 my $res = Storable::thaw substr $rbuf, 4; 322 my $res = Storable::thaw substr $rbuf, 4;
198 substr $rbuf, 0, $len + 4, ""; # remove length + request 323 substr $rbuf, 0, $len + 4, ""; # remove length + request
199 324
325 last unless $self;
200 my $req = shift @{ $wself->{queue} }; 326 my $req = shift @{ $self->{queue} };
201 327
202 if (defined $res->[0]) { 328 if (defined $res->[0]) {
329 $res->[0] = $self;
203 $req->[0](@$res); 330 $req->[0](@$res);
331 } else {
332 my $cb = shift @$req;
333 local $@ = $res->[1];
334 $cb->($self);
335 $self->_error ($res->[1], @$req, $res->[2]) # error, request record, is_fatal
336 if $self; # cb() could have deleted it
337 }
338
339 # no more queued requests, so become idle
340 undef $self->{last_activity}
341 if $self && !@{ $self->{queue} };
342 }
343
344 } elsif (defined $len) {
345 # todo, caller?
346 $self->_error ("unexpected eof", @caller, 1);
347 } elsif ($! != Errno::EAGAIN) {
348 # todo, caller?
349 $self->_error ("read error: $!", @caller, 1);
350 }
351 };
352
353 $self->{tw_cb} = sub {
354 if ($self->{timeout} && $self->{last_activity}) {
355 if (AE::now > $self->{last_activity} + $self->{timeout}) {
356 # we did time out
357 my $req = $self->{queue}[0];
358 $self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal
204 } else { 359 } else {
205 my $cb = shift @$req; 360 # we need to re-set the timeout watcher
206 $wself->_error ($res->[1], @$req); 361 $self->{tw} = AE::timer
362 $self->{last_activity} + $self->{timeout} - AE::now,
363 0,
364 $self->{tw_cb},
207 $cb->(); 365 ;
208 } 366 }
367 } else {
368 # no timeout check wanted, or idle
369 undef $self->{tw};
209 } 370 }
210
211 } elsif (defined $len) {
212 $wself->_error ("unexpected eof", @caller, 1);
213 } else {
214 $wself->_error ("read error: $!", @caller, 1);
215 } 371 };
216 });
217 372
218 $self->{ww_cb} = sub { 373 $self->{ww_cb} = sub {
374 return unless $self;
375
376 $self->{last_activity} = AE::now;
377
219 my $len = syswrite $client, $wself->{wbuf} 378 my $len = syswrite $client, $self->{wbuf}
220 or return delete $wself->{ww}; 379 or return delete $self->{ww};
221 380
222 substr $wself->{wbuf}, 0, $len, ""; 381 substr $self->{wbuf}, 0, $len, "";
382 };
223 }; 383 }
224 384
225 my $pid = fork; 385 my $pid = fork;
226 386
227 if ($pid) { 387 if ($pid) {
228 # parent 388 # parent
229 close $server; 389 close $server;
230
231 } elsif (defined $pid) { 390 } elsif (defined $pid) {
232 # child 391 # child
233 close $client; 392 my $serv_fno = fileno $server;
234 @_ = $server;
235 goto &serve;
236 393
394 if ($self->{exec_server}) {
395 fcntl $server, &Fcntl::F_SETFD, 0; # don't close the server side
396 exec {$^X}
397 "$0 dbi slave",
398 -e => "require shift; AnyEvent::DBI::serve_fd ($serv_fno, $VERSION)",
399 $INC{"AnyEvent/DBI.pm"};
400 POSIX::_exit 124;
401 } else {
402 ($_ != $serv_fno) && POSIX::close $_
403 for $^F+1..$FD_MAX;
404 serve_fh $server, $VERSION;
405
406 # no other way on the broken windows platform, even this leaks
407 # memory and might fail.
408 kill 9, $$
409 if AnyEvent::WIN32;
410
411 # and this kills the parent process on windows
412 POSIX::_exit 0;
413 }
237 } else { 414 } else {
238 croak "fork: $!"; 415 croak "fork: $!";
239 } 416 }
240 417
241 $self->_req (sub { }, (caller)[1,2], 1, req_open => $dbi, $user, $pass); 418 $self->{child_pid} = $pid;
419
420 $self->_req (
421 ($self->{on_connect} ? $self->{on_connect} : sub { }),
422 (caller)[1,2],
423 req_open => $dbi, $user, $pass, %dbi_args
424 );
242 425
243 $self 426 $self
427}
428
429sub _server_pid {
430 shift->{child_pid}
431}
432
433sub kill_child {
434 my $self = shift;
435
436 if (my $pid = delete $self->{child_pid}) {
437 kill TERM => $pid;
438 }
439 close delete $self->{fh};
440}
441
442sub DESTROY {
443 shift->kill_child;
244} 444}
245 445
246sub _error { 446sub _error {
247 my ($self, $error, $filename, $line, $fatal) = @_; 447 my ($self, $error, $filename, $line, $fatal) = @_;
248 448
449 if ($fatal) {
450 delete $self->{tw};
249 delete $self->{rw}; 451 delete $self->{rw};
250 delete $self->{ww}; 452 delete $self->{ww};
251 delete $self->{fh}; 453 delete $self->{fh};
252 454
455 # for fatal errors call all enqueued callbacks with error
456 while (my $req = shift @{$self->{queue}}) {
457 local $@ = $error;
458 $req->[0]->($self);
459 }
460 $self->kill_child;
461 }
462
253 $@ = $error; 463 local $@ = $error;
254 464
465 if ($self->{on_error}) {
255 $self->{on_error}($self, $filename, $line, $fatal) 466 $self->{on_error}($self, $filename, $line, $fatal)
256 if $self->{on_error}; 467 } else {
257
258 die "$error at $filename, line $line\n"; 468 die "$error at $filename, line $line\n";
469 }
470}
471
472=item $dbh->on_error ($cb->($dbh, $filename, $line, $fatal))
473
474Sets (or clears, with C<undef>) the C<on_error> handler.
475
476=cut
477
478sub on_error {
479 $_[0]{on_error} = $_[1];
480}
481
482=item $dbh->timeout ($seconds)
483
484Sets (or clears, with C<undef>) the database timeout. Useful to extend the
485timeout when you are about to make a really long query.
486
487=cut
488
489sub timeout {
490 my ($self, $timeout) = @_;
491
492 $self->{timeout} = $timeout;
493
494 # reschedule timer if one was running
495 $self->{tw_cb}->();
259} 496}
260 497
261sub _req { 498sub _req {
262 my ($self, $cb, $filename, $line, $fatal) = splice @_, 0, 5, (); 499 my ($self, $cb, $filename, $line) = splice @_, 0, 4, ();
263 500
501 unless ($self->{fh}) {
502 local $@ = my $err = 'no database connection';
503 $cb->($self);
504 $self->_error ($err, $filename, $line, 1);
505 return;
506 }
507
264 push @{ $self->{queue} }, [$cb, $filename, $line, $fatal]; 508 push @{ $self->{queue} }, [$cb, $filename, $line];
509
510 # re-start timeout if necessary
511 if ($self->{timeout} && !$self->{tw}) {
512 $self->{last_activity} = AE::now;
513 $self->{tw_cb}->();
514 }
265 515
266 $self->{wbuf} .= pack "L/a*", Storable::freeze \@_; 516 $self->{wbuf} .= pack "L/a*", Storable::freeze \@_;
267 517
268 unless ($self->{ww}) { 518 unless ($self->{ww}) {
269 my $len = syswrite $self->{fh}, $self->{wbuf}; 519 my $len = syswrite $self->{fh}, $self->{wbuf};
270 substr $self->{wbuf}, 0, $len, ""; 520 substr $self->{wbuf}, 0, $len, "";
271 521
272 # still any left? then install a write watcher 522 # still any left? then install a write watcher
273 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb}) 523 $self->{ww} = AE::io $self->{fh}, 1, $self->{ww_cb}
274 if length $self->{wbuf}; 524 if length $self->{wbuf};
275 } 525 }
276} 526}
277 527
278=item $dbh->exec ("statement", @args, $cb->($rows, $rv, ...)) 528=item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, $rv))
279 529
280Executes the given SQL statement with placeholders replaced by 530Executes the given SQL statement with placeholders replaced by
281C<@args>. The statement will be prepared and cached on the server side, so 531C<@args>. The statement will be prepared and cached on the server side, so
282using placeholders is compulsory. 532using placeholders is extremely important.
283 533
284The callback will be called with the result of C<fetchall_arrayref> as 534The callback will be called with a weakened AnyEvent::DBI object as the
285first argument (or C<undef> if the statement wasn't a select statement) 535first argument and the result of C<fetchall_arrayref> as (or C<undef>
286and the return value of C<execute> as second argument. Additional 536if the statement wasn't a select statement) as the second argument.
287arguments might get passed as well.
288 537
538Third argument is the return value from the C<< DBI->execute >> method
539call.
540
289If an error occurs and the C<on_error> callback returns, then no arguments 541If an error occurs and the C<on_error> callback returns, then only C<$dbh>
290will be passed and C<$@> contains the error message. 542will be passed and C<$@> contains the error message.
291 543
544=item $dbh->attr ($attr_name[, $attr_value], $cb->($dbh, $new_value))
545
546An accessor for the handle attributes, such as C<AutoCommit>,
547C<RaiseError>, C<PrintError> and so on. If you provide an C<$attr_value>
548(which might be C<undef>), then the given attribute will be set to that
549value.
550
551The callback will be passed the database handle and the attribute's value
552if successful.
553
554If an error occurs and the C<on_error> callback returns, then only C<$dbh>
555will be passed and C<$@> contains the error message.
556
557=item $dbh->begin_work ($cb->($dbh[, $rc]))
558
559=item $dbh->commit ($cb->($dbh[, $rc]))
560
561=item $dbh->rollback ($cb->($dbh[, $rc]))
562
563The begin_work, commit, and rollback methods expose the equivalent
564transaction control method of the DBI driver. On success, C<$rc> is true.
565
566If an error occurs and the C<on_error> callback returns, then only C<$dbh>
567will be passed and C<$@> contains the error message.
568
569=item $dbh->func ('string_which_yields_args_when_evaled', $func_name, $cb->($dbh, $rc, $dbi_err, $dbi_errstr))
570
571This gives access to database driver private methods. Because they
572are not standard you cannot always depend on the value of C<$rc> or
573C<$dbi_err>. Check the documentation for your specific driver/function
574combination to see what it returns.
575
576Note that the first argument will be eval'ed to produce the argument list to
577the func() method. This must be done because the serialization protocol
578between the AnyEvent::DBI server process and your program does not support the
579passage of closures.
580
581Here's an example to extend the query language in SQLite so it supports an
582intstr() function:
583
584 $cv = AnyEvent->condvar;
585 $dbh->func (
586 q{
587 instr => 2, sub {
588 my ($string, $search) = @_;
589 return index $string, $search;
590 },
591 },
592 create_function => sub {
593 return $cv->send ($@)
594 unless $#_;
595 $cv->send (undef, @_[1,2,3]);
596 }
597 );
598
599 my ($err,$rc,$errcode,$errstr) = $cv->recv;
600
601 die $err if defined $err;
602 die "EVAL failed: $errstr"
603 if $errcode;
604
605 # otherwise, we can ignore $rc and $errcode for this particular func
606
292=cut 607=cut
293 608
294sub exec { 609for my $cmd_name (qw(exec attr begin_work commit rollback func)) {
610 eval 'sub ' . $cmd_name . '{
295 my $cb = pop; 611 my $cb = pop;
296 splice @_, 1, 0, $cb, (caller)[1,2], 0, "req_exec"; 612 splice @_, 1, 0, $cb, (caller)[1,2], "req_' . $cmd_name . '";
297 613 &_req
298 goto &_req; 614 }';
299} 615}
300 616
301=back 617=back
302 618
303=head1 SEE ALSO 619=head1 SEE ALSO
304 620
305L<AnyEvent>, L<DBI>. 621L<AnyEvent>, L<DBI>, L<Coro::Mysql>.
306 622
307=head1 AUTHOR 623=head1 AUTHOR
308 624
309 Marc Lehmann <schmorp@schmorp.de> 625 Marc Lehmann <schmorp@schmorp.de>
310 http://home.schmorp.de/ 626 http://home.schmorp.de/
311 627
628 Adam Rosenstein <adam@redcondor.com>
629 http://www.redcondor.com/
630
312=cut 631=cut
313 632
3141 6331;
315 634

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines