ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
Revision: 1.9
Committed: Thu Nov 6 13:56:58 2008 UTC (15 years, 6 months ago) by root
Branch: MAIN
CVS Tags: rel-1_1
Changes since 1.8: +4 -2 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::DBI - asynchronous DBI access
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::DBI;
8
9 my $cv = AnyEvent->condvar;
10
11 my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", "";
12
13 $dbh->exec ("select * from test where num=?", 10, sub {
14 my ($rows, $rv) = @_;
15
16 print "@$_\n"
17 for @$rows;
18
19 $cv->broadcast;
20 });
21
22 # asynchronously do sth. else here
23
24 $cv->wait;
25
26 =head1 DESCRIPTION
27
28 This module is an L<AnyEvent> user, you need to make sure that you use and
29 run a supported event loop.
30
31 This module implements asynchronous DBI access by forking or executing
32 separate "DBI-Server" processes and sending them requests.
33
34 It means that you can run DBI requests in parallel to other tasks.
35
36 The overhead for very simple statements ("select 0") is somewhere
37 around 120% to 200% (dual/single core CPU) compared to an explicit
38 prepare_cached/execute/fetchrow_arrayref/finish combination.
39
40 =cut
41
42 package AnyEvent::DBI;
43
44 use strict;
45 no warnings;
46
47 use Carp;
48 use Socket ();
49 use Scalar::Util ();
50 use Storable ();
51
52 use DBI ();
53
54 use AnyEvent ();
55 use AnyEvent::Util ();
56
57 our $VERSION = '1.1';
58
59 # this is the forked server code
60
61 our $DBH;
62
63 sub req_open {
64 my (undef, $dbi, $user, $pass, %attr) = @{+shift};
65
66 $DBH = DBI->connect ($dbi, $user, $pass, \%attr);
67
68 [1]
69 }
70
71 sub req_exec {
72 my (undef, $st, @args) = @{+shift};
73
74 my $sth = $DBH->prepare_cached ($st, undef, 1);
75
76 my $rv = $sth->execute (@args)
77 or die $sth->errstr;
78
79 [1, $sth->{NUM_OF_FIELDS} ? $sth->fetchall_arrayref : undef, { rv => $rv }]
80 }
81
82 sub serve {
83 my ($fh) = @_;
84
85 no strict;
86
87 eval {
88 my $rbuf;
89
90 while () {
91 sysread $fh, $rbuf, 16384, length $rbuf
92 or last;
93
94 while () {
95 my $len = unpack "L", $rbuf;
96
97 # full request available?
98 last unless $len && $len + 4 <= length $rbuf;
99
100 my $req = Storable::thaw substr $rbuf, 4;
101 substr $rbuf, 0, $len + 4, ""; # remove length + request
102
103 my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) };
104
105 $wbuf = pack "L/a*", Storable::freeze [undef, "$@"]
106 if $@;
107
108 for (my $ofs = 0; $ofs < length $wbuf; ) {
109 $ofs += (syswrite $fh, substr $wbuf, $ofs
110 or die "unable to write results");
111 }
112 }
113 }
114 };
115
116 if (AnyEvent::WIN32) {
117 kill 9, $$; # no other way on the broken windows platform
118 # and the above doesn't even work on windows, it seems the only
119 # way to is to leak memory and kill 9 from the parent. yay.
120 }
121
122 require POSIX;
123 POSIX::_exit (0);
124 # and the above kills the parent process on windows
125 }
126
127 =head2 METHODS
128
129 =over 4
130
131 =item $dbh = new AnyEvent::DBI $database, $user, $pass, [key => value]...
132
133 Returns a database handle for the given database. Each database handle
134 has an associated server process that executes statements in order. If
135 you want to run more than one statement in parallel, you need to create
136 additional database handles.
137
138 The advantage of this approach is that transactions work as state is
139 preserved.
140
141 Example:
142
143 $dbh = new AnyEvent::DBI
144 "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "";
145
146 Additional key-value pairs can be used to adjust behaviour:
147
148 =over 4
149
150 =item on_error => $callback->($dbh, $filename, $line, $fatal)
151
152 When an error occurs, then this callback will be invoked. On entry, C<$@>
153 is set to the error message. C<$filename> and C<$line> is where the
154 original request was submitted.
155
156 If this callback returns and this was a fatal error (C<$fatal> is true)
157 then AnyEvent::DBI die's, otherwise it calls the original request callback
158 without any arguments.
159
160 If omitted, then C<die> will be called on any errors, fatal or not.
161
162 =back
163
164 =cut
165
166 # stupid Storable autoloading, total loss-loss situation
167 Storable::thaw Storable::freeze [];
168
169 sub new {
170 my ($class, $dbi, $user, $pass, %arg) = @_;
171
172 socketpair my $client, my $server, &Socket::AF_UNIX, &Socket::SOCK_STREAM, &Socket::PF_UNSPEC
173 or croak "unable to create dbi communicaiton pipe: $!";
174
175 my $self = bless \%arg, $class;
176
177 $self->{fh} = $client;
178
179 Scalar::Util::weaken (my $wself = $self);
180
181 AnyEvent::Util::fh_nonblocking $client, 1;
182
183 my $rbuf;
184 my @caller = (caller)[1,2]; # the "default" caller
185
186 $self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub {
187 my $len = sysread $client, $rbuf, 65536, length $rbuf;
188
189 if ($len > 0) {
190
191 while () {
192 my $len = unpack "L", $rbuf;
193
194 # full request available?
195 last unless $len && $len + 4 <= length $rbuf;
196
197 my $res = Storable::thaw substr $rbuf, 4;
198 substr $rbuf, 0, $len + 4, ""; # remove length + request
199
200 my $req = shift @{ $wself->{queue} };
201
202 if (defined $res->[0]) {
203 $req->[0](@$res);
204 } else {
205 my $cb = shift @$req;
206 $wself->_error ($res->[1], @$req);
207 $cb->();
208 }
209 }
210
211 } elsif (defined $len) {
212 $wself->_error ("unexpected eof", @caller, 1);
213 } else {
214 $wself->_error ("read error: $!", @caller, 1);
215 }
216 });
217
218 $self->{ww_cb} = sub {
219 my $len = syswrite $client, $wself->{wbuf}
220 or return delete $wself->{ww};
221
222 substr $wself->{wbuf}, 0, $len, "";
223 };
224
225 my $pid = fork;
226
227 if ($pid) {
228 # parent
229 close $server;
230
231 } elsif (defined $pid) {
232 # child
233 close $client;
234 @_ = $server;
235 goto &serve;
236
237 } else {
238 croak "fork: $!";
239 }
240
241 $self->_req (sub { }, (caller)[1,2], 1, req_open => $dbi, $user, $pass);
242
243 $self
244 }
245
246 sub _error {
247 my ($self, $error, $filename, $line, $fatal) = @_;
248
249 delete $self->{rw};
250 delete $self->{ww};
251 delete $self->{fh};
252
253 $@ = $error;
254
255 if ($self->{on_error}) {
256 $self->{on_error}($self, $filename, $line, $fatal);
257 return unless $fatal;
258 }
259
260 die "$error at $filename, line $line\n";
261 }
262
263 sub _req {
264 my ($self, $cb, $filename, $line, $fatal) = splice @_, 0, 5, ();
265
266 push @{ $self->{queue} }, [$cb, $filename, $line, $fatal];
267
268 $self->{wbuf} .= pack "L/a*", Storable::freeze \@_;
269
270 unless ($self->{ww}) {
271 my $len = syswrite $self->{fh}, $self->{wbuf};
272 substr $self->{wbuf}, 0, $len, "";
273
274 # still any left? then install a write watcher
275 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb})
276 if length $self->{wbuf};
277 }
278 }
279
280 =item $dbh->exec ("statement", @args, $cb->($rows, $rv, ...))
281
282 Executes the given SQL statement with placeholders replaced by
283 C<@args>. The statement will be prepared and cached on the server side, so
284 using placeholders is compulsory.
285
286 The callback will be called with the result of C<fetchall_arrayref> as
287 first argument (or C<undef> if the statement wasn't a select statement)
288 and the return value of C<execute> as second argument. Additional
289 arguments might get passed as well.
290
291 If an error occurs and the C<on_error> callback returns, then no arguments
292 will be passed and C<$@> contains the error message.
293
294 =cut
295
296 sub exec {
297 my $cb = pop;
298 splice @_, 1, 0, $cb, (caller)[1,2], 0, "req_exec";
299
300 goto &_req;
301 }
302
303 =back
304
305 =head1 SEE ALSO
306
307 L<AnyEvent>, L<DBI>.
308
309 =head1 AUTHOR
310
311 Marc Lehmann <schmorp@schmorp.de>
312 http://home.schmorp.de/
313
314 =cut
315
316 1
317