ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
(Generate patch)

Comparing AnyEvent-DBI/DBI.pm (file contents):
Revision 1.4 by root, Fri Jun 6 20:06:34 2008 UTC vs.
Revision 1.13 by root, Sat Oct 23 21:47:13 2010 UTC

3AnyEvent::DBI - asynchronous DBI access 3AnyEvent::DBI - asynchronous DBI access
4 4
5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use AnyEvent::DBI; 7 use AnyEvent::DBI;
8
9 my $cv = AnyEvent->condvar;
10
11 my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", "";
12
13 $dbh->exec ("select * from test where num=?", 10, sub {
14 my ($dbh, $rows, $rv) = @_;
15
16 $#_ or die "failure: $@";
17
18 print "@$_\n"
19 for @$rows;
20
21 $cv->broadcast;
22 });
23
24 # asynchronously do sth. else here
25
26 $cv->wait;
8 27
9=head1 DESCRIPTION 28=head1 DESCRIPTION
10 29
11This module is an L<AnyEvent> user, you need to make sure that you use and 30This module is an L<AnyEvent> user, you need to make sure that you use and
12run a supported event loop. 31run a supported event loop.
13 32
14This module implements asynchronous DBI access my forking or executing 33This module implements asynchronous DBI access by forking or executing
15separate "DBI-Server" processes and sending them requests. 34separate "DBI-Server" processes and sending them requests.
16 35
17It means that you can run DBI requests in parallel to other tasks. 36It means that you can run DBI requests in parallel to other tasks.
18 37
19The overhead for very simple statements ("select 0") is somewhere 38The overhead for very simple statements ("select 0") is somewhere
20around 120% to 200% (single/dual core CPU) compared to an explicit 39around 120% to 200% (dual/single core CPU) compared to an explicit
21prepare_cached/execute/fetchrow_arrayref/finish combination. 40prepare_cached/execute/fetchrow_arrayref/finish combination.
22 41
42=head2 ERROR HANDLING
43
44This module defines a number of functions that accept a callback
45argument. All callbacks used by this module get their AnyEvent::DBI handle
46object passed as first argument.
47
48If the request was successful, then there will be more arguments,
49otherwise there will only be the C<$dbh> argument and C<$@> contains an
50error message.
51
52A convinient way to check whether an error occured is to check C<$#_> -
53if that is true, then the function was successful, otherwise there was an
54error.
55
23=cut 56=cut
24 57
25package AnyEvent::DBI; 58package AnyEvent::DBI;
26 59
27use strict; 60use strict qw(vars subs);
28no warnings; 61no warnings;
29 62
30use Carp; 63use Carp;
31use Socket (); 64use Socket ();
32use Scalar::Util (); 65use Scalar::Util ();
35use DBI (); 68use DBI ();
36 69
37use AnyEvent (); 70use AnyEvent ();
38use AnyEvent::Util (); 71use AnyEvent::Util ();
39 72
73use Errno ();
74use Fcntl ();
75use POSIX ();
76
40our $VERSION = '1.0'; 77our $VERSION = '2.0';
41 78
42# this is the forked server code 79our $FD_MAX = eval { POSIX::sysconf (&POSIX::_SC_OPEN_MAX) - 1 } || 1023;
80
81# this is the forked server code, could/should be bundled as it's own file
43 82
44our $DBH; 83our $DBH;
45 84
46sub req_open { 85sub req_open {
47 my (undef, $dbi, $user, $pass, %attr) = @{+shift}; 86 my (undef, $dbi, $user, $pass, %attr) = @{+shift};
48 87
49 $DBH = DBI->connect ($dbi, $user, $pass, \%attr); 88 $DBH = DBI->connect ($dbi, $user, $pass, \%attr) or die $DBI::errstr;
50 89
51 [1] 90 [1, 1]
52} 91}
53 92
54sub req_exec { 93sub req_exec {
55 my (undef, $st, @args) = @{+shift}; 94 my (undef, $st, @args) = @{+shift};
56
57 my $sth = $DBH->prepare_cached ($st, undef, 1); 95 my $sth = $DBH->prepare_cached ($st, undef, 1)
96 or die [$DBI::errstr];
58 97
59 $sth->execute (@args) 98 my $rv = $sth->execute (@args)
60 or die $sth->errstr; 99 or die [$sth->errstr];
61 100
62 [$sth->fetchall_arrayref] 101 [1, $sth->{NUM_OF_FIELDS} ? $sth->fetchall_arrayref : undef, $rv]
63} 102}
64 103
104sub req_attr {
105 my (undef, $attr_name, @attr_val) = @{+shift};
106
107 $DBH->{$attr_name} = $attr_val[0]
108 if @attr_val;
109
110 [1, $DBH->{$attr_name}]
111}
112
113sub req_begin_work {
114 [1, $DBH->begin_work or die [$DBI::errstr]]
115}
116
117sub req_commit {
118 [1, $DBH->commit or die [$DBI::errstr]]
119}
120
121sub req_rollback {
122 [1, $DBH->rollback or die [$DBI::errstr]]
123}
124
125sub req_func {
126 my (undef, $arg_string, $function) = @{+shift};
127 my @args = eval $arg_string;
128
129 die "error evaling \$dbh->func() arg_string: $@"
130 if $@;
131
132 my $rc = $DBH->func (@args, $function);
133 return [1, $rc, $DBI::err, $DBI::errstr];
134}
135
65sub serve { 136sub serve_fh($$) {
66 my ($fh) = @_; 137 my ($fh, $version) = @_;
67 138
68 no strict; 139 if ($VERSION != $version) {
140 syswrite $fh,
141 pack "L/a*",
142 Storable::freeze
143 [undef, "AnyEvent::DBI version mismatch ($VERSION vs. $version)"];
144 return;
145 }
69 146
70 eval { 147 eval {
71 my $rbuf; 148 my $rbuf;
72 149
73 while () { 150 while () {
82 159
83 my $req = Storable::thaw substr $rbuf, 4; 160 my $req = Storable::thaw substr $rbuf, 4;
84 substr $rbuf, 0, $len + 4, ""; # remove length + request 161 substr $rbuf, 0, $len + 4, ""; # remove length + request
85 162
86 my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) }; 163 my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) };
87
88 $wbuf = pack "L/a*", Storable::freeze [undef, "$@"] 164 $wbuf = pack "L/a*", Storable::freeze [undef, ref $@ ? ("$@->[0]", $@->[1]) : ("$@", 1)]
89 if $@; 165 if $@;
90 166
91 for (my $ofs = 0; $ofs < length $wbuf; ) { 167 for (my $ofs = 0; $ofs < length $wbuf; ) {
92 $ofs += (syswrite $fh, substr $wbuf, $ofs 168 $ofs += (syswrite $fh, substr $wbuf, $ofs
93 or die "unable to write results"); 169 or die "unable to write results");
94 } 170 }
95 } 171 }
96 } 172 }
97 }; 173 };
174}
98 175
99 kill 9, $$; # no other way on the broken windows platform 176sub serve_fd($$) {
177 open my $fh, ">>&=$_[0]"
178 or die "Couldn't open server file descriptor: $!";
179
180 serve_fh $fh, $_[1];
100} 181}
101 182
102=head2 METHODS 183=head2 METHODS
103 184
104=over 4 185=over 4
126 207
127When an error occurs, then this callback will be invoked. On entry, C<$@> 208When an error occurs, then this callback will be invoked. On entry, C<$@>
128is set to the error message. C<$filename> and C<$line> is where the 209is set to the error message. C<$filename> and C<$line> is where the
129original request was submitted. 210original request was submitted.
130 211
131If this callback returns and this was a fatal error (C<$fatal> is true) 212If the fatal argument is true then the database connection is shut down
132then AnyEvent::DBI die's, otherwise it calls the original request callback 213and your database handle became invalid. In addition to invoking the
133without any arguments. 214C<on_error> callback, all of your queued request callbacks are called
215without only the C<$dbh> argument.
134 216
135If omitted, then C<die> will be called on any errors, fatal or not. 217If omitted, then C<die> will be called on any errors, fatal or not.
136 218
219=item on_connect => $callback->($dbh[, $success])
220
221If you supply an C<on_connect> callback, then this callback will be
222invoked after the database connect attempt. If the connection succeeds,
223C<$success> is true, otherwise it is missing and C<$@> contains the
224C<$DBI::errstr>.
225
226Regardless of whether C<on_connect> is supplied, connect errors will result in
227C<on_error> being called. However, if no C<on_connect> callback is supplied, then
228connection errors are considered fatal. The client will C<die> and the C<on_error>
229callback will be called with C<$fatal> true.
230
231When on_connect is supplied, connect error are not fatal and AnyEvent::DBI
232will not C<die>. You still cannot, however, use the $dbh object you
233received from C<new> to make requests.
234
235=item exec_server => 1
236
237If you supply an C<exec_server> argument, then the DBI server process will
238fork and exec another perl interpreter (using C<$^X>) with just the
239AnyEvent::DBI proxy running. This will provide the cleanest possible proxy
240for your database server.
241
242If you do not supply the C<exec_server> argument (or supply it with a
243false value) then the traditional method of starting the server by forking
244the current process is used. The forked interpreter will try to clean
245itself up by calling POSIX::close on all file descriptors except STDIN,
246STDOUT, and STDERR (and the socket it uses to communicate with the cilent,
247of course).
248
249=item timeout => seconds
250
251If you supply a timeout parameter (fractional values are supported), then
252a timer is started any time the DBI handle expects a response from the
253server. This includes connection setup as well as requests made to the
254backend. The timeout spans the duration from the moment the first data
255is written (or queued to be written) until all expected responses are
256returned, but is postponed for "timeout" seconds each time more data is
257returned from the server. If the timer ever goes off then a fatal error is
258generated. If you have an C<on_error> handler installed, then it will be
259called, otherwise your program will die().
260
261When altering your databases with timeouts it is wise to use
262transactions. If you quit due to timeout while performing insert, update
263or schema-altering commands you can end up not knowing if the action was
264submitted to the database, complicating recovery.
265
266Timeout errors are always fatal.
267
137=back 268=back
269
270Any additional key-value pairs will be rolled into a hash reference
271and passed as the final argument to the C<< DBI->connect (...) >>
272call. For example, to supress errors on STDERR and send them instead to an
273AnyEvent::Handle you could do:
274
275 $dbh = new AnyEvent::DBI
276 "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "",
277 PrintError => 0,
278 on_error => sub {
279 $log_handle->push_write ("DBI Error: $@ at $_[1]:$_[2]\n");
280 };
138 281
139=cut 282=cut
140 283
141# stupid Storable autoloading, total loss-loss situation 284# stupid Storable autoloading, total loss-loss situation
142Storable::thaw Storable::freeze []; 285Storable::thaw Storable::freeze [];
143 286
144sub new { 287sub new {
145 my ($class, $dbi, $user, $pass, %arg) = @_; 288 my ($class, $dbi, $user, $pass, %arg) = @_;
146 289
147 socketpair my $client, my $server, &Socket::AF_UNIX, &Socket::SOCK_STREAM, &Socket::PF_UNSPEC 290 my ($client, $server) = AnyEvent::Util::portable_socketpair
148 or croak "unable to create dbi communicaiton pipe: $!"; 291 or croak "unable to create Anyevent::DBI communications pipe: $!";
292
293 my %dbi_args = %arg;
294 delete @dbi_args{qw(on_connect on_error timeout exec_server)};
149 295
150 my $self = bless \%arg, $class; 296 my $self = bless \%arg, $class;
151
152 $self->{fh} = $client; 297 $self->{fh} = $client;
153
154 Scalar::Util::weaken (my $wself = $self);
155 298
156 AnyEvent::Util::fh_nonblocking $client, 1; 299 AnyEvent::Util::fh_nonblocking $client, 1;
157 300
158 my $rbuf; 301 my $rbuf;
159 my @caller = (caller)[1,2]; # the "default" caller 302 my @caller = (caller)[1,2]; # the "default" caller
160 303
304 {
305 Scalar::Util::weaken (my $self = $self);
306
161 $self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub { 307 $self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub {
308 return unless $self;
309
310 $self->{last_activity} = AnyEvent->now;
311
162 my $len = sysread $client, $rbuf, 65536, length $rbuf; 312 my $len = sysread $client, $rbuf, 65536, length $rbuf;
163 313
164 if ($len > 0) { 314 if ($len > 0) {
315 # we received data, so reset the timer
165 316
166 while () { 317 while () {
167 my $len = unpack "L", $rbuf; 318 my $len = unpack "L", $rbuf;
168 319
169 # full request available? 320 # full response available?
170 last unless $len && $len + 4 <= length $rbuf; 321 last unless $len && $len + 4 <= length $rbuf;
171 322
172 my $res = Storable::thaw substr $rbuf, 4; 323 my $res = Storable::thaw substr $rbuf, 4;
173 substr $rbuf, 0, $len + 4, ""; # remove length + request 324 substr $rbuf, 0, $len + 4, ""; # remove length + request
174 325
326 last unless $self;
175 my $req = shift @{ $wself->{queue} }; 327 my $req = shift @{ $self->{queue} };
176 328
177 if (defined $res->[0]) { 329 if (defined $res->[0]) {
330 $res->[0] = $self;
178 $req->[0](@$res); 331 $req->[0](@$res);
332 } else {
333 my $cb = shift @$req;
334 local $@ = $res->[1];
335 $cb->($self);
336 $self->_error ($res->[1], @$req, $res->[2]) # error, request record, is_fatal
337 if $self; # cb() could have deleted it
338 }
339
340 # no more queued requests, so become idle
341 undef $self->{last_activity}
342 if $self && !@{ $self->{queue} };
343 }
344
345 } elsif (defined $len) {
346 # todo, caller?
347 $self->_error ("unexpected eof", @caller, 1);
348 } elsif ($! != Errno::EAGAIN) {
349 # todo, caller?
350 $self->_error ("read error: $!", @caller, 1);
351 }
352 });
353
354 $self->{tw_cb} = sub {
355 if ($self->{timeout} && $self->{last_activity}) {
356 if (AnyEvent->now > $self->{last_activity} + $self->{timeout}) {
357 # we did time out
358 my $req = $self->{queue}[0];
359 $self->_error (timeout => $req->[1], $req->[2], 1); # timeouts are always fatal
179 } else { 360 } else {
180 my $cb = shift @$req; 361 # we need to re-set the timeout watcher
181 $wself->_error ($res->[1], @$req); 362 $self->{tw} = AnyEvent->timer (
363 after => $self->{last_activity} + $self->{timeout} - AnyEvent->now,
364 cb => $self->{tw_cb},
182 $cb->(); 365 );
366 Scalar::Util::weaken $self;
183 } 367 }
368 } else {
369 # no timeout check wanted, or idle
370 undef $self->{tw};
184 } 371 }
185
186 } elsif (defined $len) {
187 $wself->_error ("unexpected eof", @caller, 1);
188 } else {
189 $wself->_error ("read error: $!", @caller, 1);
190 } 372 };
191 });
192 373
193 $self->{ww_cb} = sub { 374 $self->{ww_cb} = sub {
375 return unless $self;
376
377 $self->{last_activity} = AnyEvent->now;
378
194 my $len = syswrite $client, $wself->{wbuf} 379 my $len = syswrite $client, $self->{wbuf}
195 or return delete $wself->{ww}; 380 or return delete $self->{ww};
196 381
197 substr $wself->{wbuf}, 0, $len, ""; 382 substr $self->{wbuf}, 0, $len, "";
383 };
198 }; 384 }
199 385
200 my $pid = fork; 386 my $pid = fork;
201 387
202 if ($pid) { 388 if ($pid) {
203 # parent 389 # parent
204 close $server; 390 close $server;
205
206 } elsif (defined $pid) { 391 } elsif (defined $pid) {
207 # child 392 # child
208 close $client; 393 my $serv_fno = fileno $server;
209 @_ = $server;
210 goto &serve;
211 394
395 if ($self->{exec_server}) {
396 fcntl $server, &Fcntl::F_SETFD, 0; # don't close the server side
397 exec {$^X}
398 "$0 dbi slave",
399 -e => "require shift; AnyEvent::DBI::serve_fd ($serv_fno, $VERSION)",
400 $INC{"AnyEvent/DBI.pm"};
401 POSIX::_exit 124;
402 } else {
403 ($_ != $serv_fno) && POSIX::close $_
404 for $^F+1..$FD_MAX;
405 serve_fh $server, $VERSION;
406
407 # no other way on the broken windows platform, even this leaks
408 # memory and might fail.
409 kill 9, $$
410 if AnyEvent::WIN32;
411
412 # and this kills the parent process on windows
413 POSIX::_exit 0;
414 }
212 } else { 415 } else {
213 croak "fork: $!"; 416 croak "fork: $!";
214 } 417 }
215 418
216 $self->_req (sub { }, (caller)[1,2], 1, req_open => $dbi, $user, $pass); 419 $self->{child_pid} = $pid;
420
421 $self->_req (
422 ($self->{on_connect} ? $self->{on_connect} : sub { }),
423 (caller)[1,2],
424 req_open => $dbi, $user, $pass, %dbi_args
425 );
217 426
218 $self 427 $self
428}
429
430sub _server_pid {
431 shift->{child_pid}
432}
433
434sub kill_child {
435 my $self = shift;
436 my $child_pid = delete $self->{child_pid};
437 if ($child_pid) {
438 # send SIGKILL in two seconds
439 my $murder_timer = AnyEvent->timer (
440 after => 2,
441 cb => sub {
442 kill 9, $child_pid;
443 },
444 );
445
446 # reap process
447 my $kid_watcher; $kid_watcher = AnyEvent->child (
448 pid => $child_pid,
449 cb => sub {
450 # just hold on to this so it won't go away
451 undef $kid_watcher;
452 # cancel SIGKILL
453 undef $murder_timer;
454 },
455 );
456
457 close $self->{fh};
458 }
459}
460
461sub DESTROY {
462 shift->kill_child;
219} 463}
220 464
221sub _error { 465sub _error {
222 my ($self, $error, $filename, $line, $fatal) = @_; 466 my ($self, $error, $filename, $line, $fatal) = @_;
223 467
468 if ($fatal) {
469 delete $self->{tw};
224 delete $self->{rw}; 470 delete $self->{rw};
225 delete $self->{ww}; 471 delete $self->{ww};
226 delete $self->{fh}; 472 delete $self->{fh};
227 473
474 # for fatal errors call all enqueued callbacks with error
475 while (my $req = shift @{$self->{queue}}) {
476 local $@ = $error;
477 $req->[0]->($self);
478 }
479 $self->kill_child;
480 }
481
228 $@ = $error; 482 local $@ = $error;
229 483
484 if ($self->{on_error}) {
230 $self->{on_error}($self, $filename, $line, $fatal) 485 $self->{on_error}($self, $filename, $line, $fatal)
231 if $self->{on_error}; 486 } else {
232
233 die "$error at $filename, line $line\n"; 487 die "$error at $filename, line $line\n";
488 }
489}
490
491=item $dbh->on_error ($cb->($dbh, $filename, $line, $fatal))
492
493Sets (or clears, with C<undef>) the C<on_error> handler.
494
495=cut
496
497sub on_error {
498 $_[0]{on_error} = $_[1];
499}
500
501=item $dbh->timeout ($seconds)
502
503Sets (or clears, with C<undef>) the database timeout. Useful to extend the
504timeout when you are about to make a really long query.
505
506=cut
507
508sub timeout {
509 my ($self, $timeout) = @_;
510
511 $self->{timeout} = $timeout;
512
513 # reschedule timer if one was running
514 $self->{tw_cb}->();
234} 515}
235 516
236sub _req { 517sub _req {
237 my ($self, $cb, $filename, $line, $fatal) = splice @_, 0, 5, (); 518 my ($self, $cb, $filename, $line) = splice @_, 0, 4, ();
238 519
520 unless ($self->{fh}) {
521 local $@ = my $err = 'no database connection';
522 $cb->($self);
523 $self->_error ($err, $filename, $line, 1);
524 return;
525 }
526
239 push @{ $self->{queue} }, [$cb, $filename, $line, $fatal]; 527 push @{ $self->{queue} }, [$cb, $filename, $line];
528
529 # re-start timeout if necessary
530 if ($self->{timeout} && !$self->{tw}) {
531 $self->{last_activity} = AnyEvent->now;
532 $self->{tw_cb}->();
533 }
240 534
241 $self->{wbuf} .= pack "L/a*", Storable::freeze \@_; 535 $self->{wbuf} .= pack "L/a*", Storable::freeze \@_;
242 536
243 unless ($self->{ww}) { 537 unless ($self->{ww}) {
244 my $len = syswrite $self->{fh}, $self->{wbuf}; 538 my $len = syswrite $self->{fh}, $self->{wbuf};
248 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb}) 542 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb})
249 if length $self->{wbuf}; 543 if length $self->{wbuf};
250 } 544 }
251} 545}
252 546
253=item $dbh->exec ("statement", @args, $cb->($rows, %extra)) 547=item $dbh->exec ("statement", @args, $cb->($dbh, \@rows, $rv))
254 548
255Executes the given SQL statement with placeholders replaced by 549Executes the given SQL statement with placeholders replaced by
256C<@args>. The statement will be prepared and cached on the server side, so 550C<@args>. The statement will be prepared and cached on the server side, so
257using placeholders is compulsory. 551using placeholders is extremely important.
258 552
259The callback will be called with the result of C<fetchall_arrayref> as 553The callback will be called with a weakened AnyEvent::DBI object as the
260first argument and possibly a hash reference with additional information. 554first argument and the result of C<fetchall_arrayref> as (or C<undef>
555if the statement wasn't a select statement) as the second argument.
261 556
557Third argument is the return value from the C<< DBI->execute >> method
558call.
559
262If an error occurs and the C<on_error> callback returns, then no arguments 560If an error occurs and the C<on_error> callback returns, then only C<$dbh>
263will be passed and C<$@> contains the error message. 561will be passed and C<$@> contains the error message.
264 562
563=item $dbh->attr ($attr_name[, $attr_value], $cb->($dbh, $new_value))
564
565An accessor for the handle attributes, such as C<AutoCommit>,
566C<RaiseError>, C<PrintError> and so on. If you provide an C<$attr_value>
567(which might be C<undef>), then the given attribute will be set to that
568value.
569
570The callback will be passed the database handle and the attribute's value
571if successful.
572
573If an error occurs and the C<on_error> callback returns, then only C<$dbh>
574will be passed and C<$@> contains the error message.
575
576=item $dbh->begin_work ($cb->($dbh[, $rc]))
577
578=item $dbh->commit ($cb->($dbh[, $rc]))
579
580=item $dbh->rollback ($cb->($dbh[, $rc]))
581
582The begin_work, commit, and rollback methods expose the equivalent
583transaction control method of the DBI driver. On success, C<$rc> is true.
584
585If an error occurs and the C<on_error> callback returns, then only C<$dbh>
586will be passed and C<$@> contains the error message.
587
588=item $dbh->func ('string_which_yields_args_when_evaled', $func_name, $cb->($dbh, $rc, $dbi_err, $dbi_errstr))
589
590This gives access to database driver private methods. Because they
591are not standard you cannot always depend on the value of C<$rc> or
592C<$dbi_err>. Check the documentation for your specific driver/function
593combination to see what it returns.
594
595Note that the first argument will be eval'ed to produce the argument list to
596the func() method. This must be done because the serialization protocol
597between the AnyEvent::DBI server process and your program does not support the
598passage of closures.
599
600Here's an example to extend the query language in SQLite so it supports an
601intstr() function:
602
603 $cv = AnyEvent->condvar;
604 $dbh->func (
605 q{
606 instr => 2, sub {
607 my ($string, $search) = @_;
608 return index $string, $search;
609 },
610 },
611 create_function => sub {
612 return $cv->send ($@)
613 unless $#_;
614 $cv->send (undef, @_[1,2,3]);
615 }
616 );
617
618 my ($err,$rc,$errcode,$errstr) = $cv->recv;
619
620 die $err if defined $err;
621 die "EVAL failed: $errstr"
622 if $errcode;
623
624 # otherwise, we can ignore $rc and $errcode for this particular func
625
265=cut 626=cut
266 627
267sub exec { 628for my $cmd_name (qw(exec attr begin_work commit rollback func)) {
629 eval 'sub ' . $cmd_name . '{
268 my $cb = pop; 630 my $cb = pop;
269 splice @_, 1, 0, $cb, (caller)[1,2], 0, "req_exec"; 631 splice @_, 1, 0, $cb, (caller)[1,2], "req_' . $cmd_name . '";
270 632 &_req
271 goto &_req; 633 }';
272} 634}
273 635
274=back 636=back
275 637
276=head1 SEE ALSO 638=head1 SEE ALSO
277 639
278L<AnyEvent>, L<DBI>. 640L<AnyEvent>, L<DBI>, L<Coro::Mysql>.
279 641
280=head1 AUTHOR 642=head1 AUTHOR
281 643
282 Marc Lehmann <schmorp@schmorp.de> 644 Marc Lehmann <schmorp@schmorp.de>
283 http://home.schmorp.de/ 645 http://home.schmorp.de/
284 646
647 Adam Rosenstein <adam@redcondor.com>
648 http://www.redcondor.com/
649
285=cut 650=cut
286 651
2871 6521;
288 653

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines