… | |
… | |
3 | AnyEvent::DBI - asynchronous DBI access |
3 | AnyEvent::DBI - asynchronous DBI access |
4 | |
4 | |
5 | =head1 SYNOPSIS |
5 | =head1 SYNOPSIS |
6 | |
6 | |
7 | use AnyEvent::DBI; |
7 | use AnyEvent::DBI; |
|
|
8 | |
|
|
9 | my $cv = AnyEvent->condvar; |
|
|
10 | |
|
|
11 | my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", ""; |
|
|
12 | |
|
|
13 | $dbh->exec ("select * from test where num=?", 10, sub { |
|
|
14 | my ($rows, $rv) = @_; |
|
|
15 | |
|
|
16 | print "@$_\n" |
|
|
17 | for @$rows; |
|
|
18 | |
|
|
19 | $cv->broadcast; |
|
|
20 | }); |
|
|
21 | |
|
|
22 | # asynchronously do sth. else here |
|
|
23 | |
|
|
24 | $cv->wait; |
8 | |
25 | |
9 | =head1 DESCRIPTION |
26 | =head1 DESCRIPTION |
10 | |
27 | |
11 | This module is an L<AnyEvent> user, you need to make sure that you use and |
28 | This module is an L<AnyEvent> user, you need to make sure that you use and |
12 | run a supported event loop. |
29 | run a supported event loop. |
13 | |
30 | |
14 | This module implements asynchronous DBI access my forking or executing |
31 | This module implements asynchronous DBI access by forking or executing |
15 | separate "DBI-Server" processes and sending them requests. |
32 | separate "DBI-Server" processes and sending them requests. |
16 | |
33 | |
17 | It means that you can run DBI requests in parallel to other tasks. |
34 | It means that you can run DBI requests in parallel to other tasks. |
|
|
35 | |
|
|
36 | The overhead for very simple statements ("select 0") is somewhere |
|
|
37 | around 120% to 200% (dual/single core CPU) compared to an explicit |
|
|
38 | prepare_cached/execute/fetchrow_arrayref/finish combination. |
18 | |
39 | |
19 | =cut |
40 | =cut |
20 | |
41 | |
21 | package AnyEvent::DBI; |
42 | package AnyEvent::DBI; |
22 | |
43 | |
… | |
… | |
31 | use DBI (); |
52 | use DBI (); |
32 | |
53 | |
33 | use AnyEvent (); |
54 | use AnyEvent (); |
34 | use AnyEvent::Util (); |
55 | use AnyEvent::Util (); |
35 | |
56 | |
36 | our $VERSION = '1.0'; |
57 | our $VERSION = '1.1'; |
37 | |
58 | |
38 | # this is the forked server code |
59 | # this is the forked server code |
39 | |
60 | |
40 | our $DBH; |
61 | our $DBH; |
41 | |
62 | |
… | |
… | |
43 | my (undef, $dbi, $user, $pass, %attr) = @{+shift}; |
64 | my (undef, $dbi, $user, $pass, %attr) = @{+shift}; |
44 | |
65 | |
45 | $DBH = DBI->connect ($dbi, $user, $pass, \%attr); |
66 | $DBH = DBI->connect ($dbi, $user, $pass, \%attr); |
46 | |
67 | |
47 | [1] |
68 | [1] |
|
|
69 | } |
|
|
70 | |
|
|
71 | sub req_exec { |
|
|
72 | my (undef, $st, @args) = @{+shift}; |
|
|
73 | |
|
|
74 | my $sth = $DBH->prepare_cached ($st, undef, 1); |
|
|
75 | |
|
|
76 | my $rv = $sth->execute (@args) |
|
|
77 | or die $sth->errstr; |
|
|
78 | |
|
|
79 | [1, $sth->{NUM_OF_FIELDS} ? $sth->fetchall_arrayref : undef, { rv => $rv }] |
48 | } |
80 | } |
49 | |
81 | |
50 | sub serve { |
82 | sub serve { |
51 | my ($fh) = @_; |
83 | my ($fh) = @_; |
52 | |
84 | |
… | |
… | |
79 | } |
111 | } |
80 | } |
112 | } |
81 | } |
113 | } |
82 | }; |
114 | }; |
83 | |
115 | |
84 | warn $@;#d# |
116 | if (AnyEvent::WIN32) { |
85 | |
|
|
86 | kill 9, $$; # no other way on the broken windows platform |
117 | kill 9, $$; # no other way on the broken windows platform |
|
|
118 | # and the above doesn't even work on windows, it seems the only |
|
|
119 | # way to is to leak memory and kill 9 from the parent. yay. |
|
|
120 | } |
|
|
121 | |
|
|
122 | require POSIX; |
|
|
123 | POSIX::_exit (0); |
|
|
124 | # and the above kills the parent process on windows |
87 | } |
125 | } |
88 | |
126 | |
89 | =head2 METHODS |
127 | =head2 METHODS |
90 | |
128 | |
91 | =over 4 |
129 | =over 4 |
… | |
… | |
117 | |
155 | |
118 | If this callback returns and this was a fatal error (C<$fatal> is true) |
156 | If this callback returns and this was a fatal error (C<$fatal> is true) |
119 | then AnyEvent::DBI die's, otherwise it calls the original request callback |
157 | then AnyEvent::DBI die's, otherwise it calls the original request callback |
120 | without any arguments. |
158 | without any arguments. |
121 | |
159 | |
122 | If omitted, then C<die> will be called on any fatal errors, others will be ignored. |
160 | If omitted, then C<die> will be called on any errors, fatal or not. |
123 | |
161 | |
124 | =back |
162 | =back |
125 | |
163 | |
126 | =cut |
164 | =cut |
127 | |
165 | |
… | |
… | |
164 | if (defined $res->[0]) { |
202 | if (defined $res->[0]) { |
165 | $req->[0](@$res); |
203 | $req->[0](@$res); |
166 | } else { |
204 | } else { |
167 | my $cb = shift @$req; |
205 | my $cb = shift @$req; |
168 | $wself->_error ($res->[1], @$req); |
206 | $wself->_error ($res->[1], @$req); |
169 | $cb->[0](); |
207 | $cb->(); |
170 | } |
208 | } |
171 | } |
209 | } |
172 | |
210 | |
173 | } elsif (defined $len) { |
211 | } elsif (defined $len) { |
174 | $wself->_error ("unexpected eof", @caller, 1); |
212 | $wself->_error ("unexpected eof", @caller, 1); |
175 | } else { |
213 | } else { |
176 | $wself->_error ("read error: $!", @caller, 1); |
214 | $wself->_error ("read error: $!", @caller, 1); |
177 | } |
215 | } |
178 | }); |
216 | }); |
179 | |
217 | |
|
|
218 | $self->{ww_cb} = sub { |
|
|
219 | my $len = syswrite $client, $wself->{wbuf} |
|
|
220 | or return delete $wself->{ww}; |
|
|
221 | |
|
|
222 | substr $wself->{wbuf}, 0, $len, ""; |
|
|
223 | }; |
|
|
224 | |
180 | my $pid = fork; |
225 | my $pid = fork; |
181 | |
226 | |
182 | if ($pid) { |
227 | if ($pid) { |
183 | # parent |
228 | # parent |
184 | close $server; |
229 | close $server; |
… | |
… | |
205 | delete $self->{ww}; |
250 | delete $self->{ww}; |
206 | delete $self->{fh}; |
251 | delete $self->{fh}; |
207 | |
252 | |
208 | $@ = $error; |
253 | $@ = $error; |
209 | |
254 | |
|
|
255 | if ($self->{on_error}) { |
210 | $self->{on_error}($self, $filename, $line, $fatal) |
256 | $self->{on_error}($self, $filename, $line, $fatal); |
211 | if $self->{on_error}; |
257 | return unless $fatal; |
|
|
258 | } |
212 | |
259 | |
213 | die "$error at $filename, line $line\n" |
260 | die "$error at $filename, line $line\n"; |
214 | if $fatal; |
|
|
215 | } |
261 | } |
216 | |
262 | |
217 | sub _req { |
263 | sub _req { |
218 | warn "<req(@_>\n";#d# |
|
|
219 | my ($self, $cb, $filename, $line, $fatal) = splice @_, 0, 5, (); |
264 | my ($self, $cb, $filename, $line, $fatal) = splice @_, 0, 5, (); |
220 | |
265 | |
221 | push @{ $self->{queue} }, [$cb, $filename, $line, $fatal]; |
266 | push @{ $self->{queue} }, [$cb, $filename, $line, $fatal]; |
222 | |
267 | |
223 | $self->{wbuf} .= pack "L/a*", Storable::freeze \@_; |
268 | $self->{wbuf} .= pack "L/a*", Storable::freeze \@_; |
224 | |
269 | |
225 | unless ($self->{ww}) { |
270 | unless ($self->{ww}) { |
226 | my $len = syswrite $self->{fh}, $self->{wbuf}; |
271 | my $len = syswrite $self->{fh}, $self->{wbuf}; |
227 | substr $self->{wbuf}, 0, $len, ""; |
272 | substr $self->{wbuf}, 0, $len, ""; |
228 | |
273 | |
229 | #TODO, ww_cb |
|
|
230 | # still any left? then install a write watcher |
274 | # still any left? then install a write watcher |
231 | $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb}) |
275 | $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb}) |
232 | if length $self->{wbuf}; |
276 | if length $self->{wbuf}; |
233 | } |
277 | } |
234 | } |
278 | } |
235 | |
279 | |
236 | =item $dbh->exec ("statement", @args, $cb->($rows, %extra)) |
280 | =item $dbh->exec ("statement", @args, $cb->($rows, $rv, ...)) |
237 | |
281 | |
238 | Executes the given SQL statement with placeholders replaced by |
282 | Executes the given SQL statement with placeholders replaced by |
239 | C<@args>. The statement will be prepared and cached on the |
283 | C<@args>. The statement will be prepared and cached on the server side, so |
240 | server side, so using placeholders is compulsory. |
284 | using placeholders is compulsory. |
241 | |
285 | |
242 | The callback will be called with the result of C<fetchall_arrayref> as |
286 | The callback will be called with the result of C<fetchall_arrayref> as |
243 | first argument and possibly a hash reference with additional information. |
287 | first argument (or C<undef> if the statement wasn't a select statement) |
|
|
288 | and the return value of C<execute> as second argument. Additional |
|
|
289 | arguments might get passed as well. |
|
|
290 | |
|
|
291 | If an error occurs and the C<on_error> callback returns, then no arguments |
|
|
292 | will be passed and C<$@> contains the error message. |
244 | |
293 | |
245 | =cut |
294 | =cut |
246 | |
295 | |
247 | sub exec { |
296 | sub exec { |
248 | my $cb = pop; |
297 | my $cb = pop; |
249 | splice @_, 1, 0, $cb, (caller)[1,2], 0, "exec"; |
298 | splice @_, 1, 0, $cb, (caller)[1,2], 0, "req_exec"; |
250 | |
299 | |
251 | goto &_req; |
300 | goto &_req; |
252 | } |
301 | } |
253 | |
302 | |
254 | =back |
303 | =back |
… | |
… | |
257 | |
306 | |
258 | L<AnyEvent>, L<DBI>. |
307 | L<AnyEvent>, L<DBI>. |
259 | |
308 | |
260 | =head1 AUTHOR |
309 | =head1 AUTHOR |
261 | |
310 | |
262 | Marc Lehmann <schmorp@schmorp.de> |
311 | Marc Lehmann <schmorp@schmorp.de> |
263 | http://home.schmorp.de/ |
312 | http://home.schmorp.de/ |
264 | |
313 | |
265 | =cut |
314 | =cut |
266 | |
315 | |
267 | 1 |
316 | 1 |
268 | |
317 | |