ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
(Generate patch)

Comparing AnyEvent-DBI/DBI.pm (file contents):
Revision 1.1 by root, Fri Jun 6 15:35:46 2008 UTC vs.
Revision 1.9 by root, Thu Nov 6 13:56:58 2008 UTC

3AnyEvent::DBI - asynchronous DBI access 3AnyEvent::DBI - asynchronous DBI access
4 4
5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use AnyEvent::DBI; 7 use AnyEvent::DBI;
8
9 my $cv = AnyEvent->condvar;
10
11 my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", "";
12
13 $dbh->exec ("select * from test where num=?", 10, sub {
14 my ($rows, $rv) = @_;
15
16 print "@$_\n"
17 for @$rows;
18
19 $cv->broadcast;
20 });
21
22 # asynchronously do sth. else here
23
24 $cv->wait;
8 25
9=head1 DESCRIPTION 26=head1 DESCRIPTION
10 27
11This module is an L<AnyEvent> user, you need to make sure that you use and 28This module is an L<AnyEvent> user, you need to make sure that you use and
12run a supported event loop. 29run a supported event loop.
13 30
14This module implements asynchronous DBI access my forking or executing 31This module implements asynchronous DBI access by forking or executing
15separate "DBI-Server" processes and sending them requests. 32separate "DBI-Server" processes and sending them requests.
16 33
17It means that you can run DBI requests in parallel to other tasks. 34It means that you can run DBI requests in parallel to other tasks.
35
36The overhead for very simple statements ("select 0") is somewhere
37around 120% to 200% (dual/single core CPU) compared to an explicit
38prepare_cached/execute/fetchrow_arrayref/finish combination.
18 39
19=cut 40=cut
20 41
21package AnyEvent::DBI; 42package AnyEvent::DBI;
22 43
31use DBI (); 52use DBI ();
32 53
33use AnyEvent (); 54use AnyEvent ();
34use AnyEvent::Util (); 55use AnyEvent::Util ();
35 56
36our $VERSION = '1.0'; 57our $VERSION = '1.1';
37 58
38# this is the forked server code 59# this is the forked server code
39 60
40our $DBH; 61our $DBH;
41 62
43 my (undef, $dbi, $user, $pass, %attr) = @{+shift}; 64 my (undef, $dbi, $user, $pass, %attr) = @{+shift};
44 65
45 $DBH = DBI->connect ($dbi, $user, $pass, \%attr); 66 $DBH = DBI->connect ($dbi, $user, $pass, \%attr);
46 67
47 [1] 68 [1]
69}
70
71sub req_exec {
72 my (undef, $st, @args) = @{+shift};
73
74 my $sth = $DBH->prepare_cached ($st, undef, 1);
75
76 my $rv = $sth->execute (@args)
77 or die $sth->errstr;
78
79 [1, $sth->{NUM_OF_FIELDS} ? $sth->fetchall_arrayref : undef, { rv => $rv }]
48} 80}
49 81
50sub serve { 82sub serve {
51 my ($fh) = @_; 83 my ($fh) = @_;
52 84
79 } 111 }
80 } 112 }
81 } 113 }
82 }; 114 };
83 115
84 warn $@;#d# 116 if (AnyEvent::WIN32) {
85
86 kill 9, $$; # no other way on the broken windows platform 117 kill 9, $$; # no other way on the broken windows platform
118 # and the above doesn't even work on windows, it seems the only
119 # way to is to leak memory and kill 9 from the parent. yay.
120 }
121
122 require POSIX;
123 POSIX::_exit (0);
124 # and the above kills the parent process on windows
87} 125}
88 126
89=head2 METHODS 127=head2 METHODS
90 128
91=over 4 129=over 4
117 155
118If this callback returns and this was a fatal error (C<$fatal> is true) 156If this callback returns and this was a fatal error (C<$fatal> is true)
119then AnyEvent::DBI die's, otherwise it calls the original request callback 157then AnyEvent::DBI die's, otherwise it calls the original request callback
120without any arguments. 158without any arguments.
121 159
122If omitted, then C<die> will be called on any fatal errors, others will be ignored. 160If omitted, then C<die> will be called on any errors, fatal or not.
123 161
124=back 162=back
125 163
126=cut 164=cut
127 165
164 if (defined $res->[0]) { 202 if (defined $res->[0]) {
165 $req->[0](@$res); 203 $req->[0](@$res);
166 } else { 204 } else {
167 my $cb = shift @$req; 205 my $cb = shift @$req;
168 $wself->_error ($res->[1], @$req); 206 $wself->_error ($res->[1], @$req);
169 $cb->[0](); 207 $cb->();
170 } 208 }
171 } 209 }
172 210
173 } elsif (defined $len) { 211 } elsif (defined $len) {
174 $wself->_error ("unexpected eof", @caller, 1); 212 $wself->_error ("unexpected eof", @caller, 1);
175 } else { 213 } else {
176 $wself->_error ("read error: $!", @caller, 1); 214 $wself->_error ("read error: $!", @caller, 1);
177 } 215 }
178 }); 216 });
179 217
218 $self->{ww_cb} = sub {
219 my $len = syswrite $client, $wself->{wbuf}
220 or return delete $wself->{ww};
221
222 substr $wself->{wbuf}, 0, $len, "";
223 };
224
180 my $pid = fork; 225 my $pid = fork;
181 226
182 if ($pid) { 227 if ($pid) {
183 # parent 228 # parent
184 close $server; 229 close $server;
205 delete $self->{ww}; 250 delete $self->{ww};
206 delete $self->{fh}; 251 delete $self->{fh};
207 252
208 $@ = $error; 253 $@ = $error;
209 254
255 if ($self->{on_error}) {
210 $self->{on_error}($self, $filename, $line, $fatal) 256 $self->{on_error}($self, $filename, $line, $fatal);
211 if $self->{on_error}; 257 return unless $fatal;
258 }
212 259
213 die "$error at $filename, line $line\n" 260 die "$error at $filename, line $line\n";
214 if $fatal;
215} 261}
216 262
217sub _req { 263sub _req {
218 warn "<req(@_>\n";#d#
219 my ($self, $cb, $filename, $line, $fatal) = splice @_, 0, 5, (); 264 my ($self, $cb, $filename, $line, $fatal) = splice @_, 0, 5, ();
220 265
221 push @{ $self->{queue} }, [$cb, $filename, $line, $fatal]; 266 push @{ $self->{queue} }, [$cb, $filename, $line, $fatal];
222 267
223 $self->{wbuf} .= pack "L/a*", Storable::freeze \@_; 268 $self->{wbuf} .= pack "L/a*", Storable::freeze \@_;
224 269
225 unless ($self->{ww}) { 270 unless ($self->{ww}) {
226 my $len = syswrite $self->{fh}, $self->{wbuf}; 271 my $len = syswrite $self->{fh}, $self->{wbuf};
227 substr $self->{wbuf}, 0, $len, ""; 272 substr $self->{wbuf}, 0, $len, "";
228 273
229 #TODO, ww_cb
230 # still any left? then install a write watcher 274 # still any left? then install a write watcher
231 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb}) 275 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb})
232 if length $self->{wbuf}; 276 if length $self->{wbuf};
233 } 277 }
234} 278}
235 279
236=item $dbh->exec ("statement", @args, $cb->($rows, %extra)) 280=item $dbh->exec ("statement", @args, $cb->($rows, $rv, ...))
237 281
238Executes the given SQL statement with placeholders replaced by 282Executes the given SQL statement with placeholders replaced by
239C<@args>. The statement will be prepared and cached on the 283C<@args>. The statement will be prepared and cached on the server side, so
240server side, so using placeholders is compulsory. 284using placeholders is compulsory.
241 285
242The callback will be called with the result of C<fetchall_arrayref> as 286The callback will be called with the result of C<fetchall_arrayref> as
243first argument and possibly a hash reference with additional information. 287first argument (or C<undef> if the statement wasn't a select statement)
288and the return value of C<execute> as second argument. Additional
289arguments might get passed as well.
290
291If an error occurs and the C<on_error> callback returns, then no arguments
292will be passed and C<$@> contains the error message.
244 293
245=cut 294=cut
246 295
247sub exec { 296sub exec {
248 my $cb = pop; 297 my $cb = pop;
249 splice @_, 1, 0, $cb, (caller)[1,2], 0, "exec"; 298 splice @_, 1, 0, $cb, (caller)[1,2], 0, "req_exec";
250 299
251 goto &_req; 300 goto &_req;
252} 301}
253 302
254=back 303=back
257 306
258L<AnyEvent>, L<DBI>. 307L<AnyEvent>, L<DBI>.
259 308
260=head1 AUTHOR 309=head1 AUTHOR
261 310
262 Marc Lehmann <schmorp@schmorp.de> 311 Marc Lehmann <schmorp@schmorp.de>
263 http://home.schmorp.de/ 312 http://home.schmorp.de/
264 313
265=cut 314=cut
266 315
2671 3161
268 317

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines