ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
Revision: 1.7
Committed: Mon Jul 21 02:34:40 2008 UTC (15 years, 10 months ago) by root
Branch: MAIN
Changes since 1.6: +17 -7 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::DBI - asynchronous DBI access
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::DBI;
8
9 my $cv = AnyEvent->condvar;
10
11 my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", "";
12
13 $dbh->exec ("select * from test where num=?", 10, sub {
14 my ($rows, $rv) = @_;
15
16 print "@$_\n"
17 for @$rows;
18
19 $cv->broadcast;
20 });
21
22 # asynchronously do sth. else here
23
24 $cv->wait;
25
26 =head1 DESCRIPTION
27
28 This module is an L<AnyEvent> user, you need to make sure that you use and
29 run a supported event loop.
30
31 This module implements asynchronous DBI access my forking or executing
32 separate "DBI-Server" processes and sending them requests.
33
34 It means that you can run DBI requests in parallel to other tasks.
35
36 The overhead for very simple statements ("select 0") is somewhere
37 around 120% to 200% (dual/single core CPU) compared to an explicit
38 prepare_cached/execute/fetchrow_arrayref/finish combination.
39
40 =cut
41
42 package AnyEvent::DBI;
43
44 use strict;
45 no warnings;
46
47 use Carp;
48 use Socket ();
49 use Scalar::Util ();
50 use Storable ();
51
52 use DBI ();
53
54 use AnyEvent ();
55 use AnyEvent::Util ();
56
57 our $VERSION = '1.1';
58
59 # this is the forked server code
60
61 our $DBH;
62
63 sub req_open {
64 my (undef, $dbi, $user, $pass, %attr) = @{+shift};
65
66 $DBH = DBI->connect ($dbi, $user, $pass, \%attr);
67
68 [1]
69 }
70
71 sub req_exec {
72 my (undef, $st, @args) = @{+shift};
73
74 my $sth = $DBH->prepare_cached ($st, undef, 1);
75
76 my $rv = $sth->execute (@args)
77 or die $sth->errstr;
78
79 [1, $sth->{NUM_OF_FIELDS} ? $sth->fetchall_arrayref : undef, { rv => $rv }]
80 }
81
82 sub serve {
83 my ($fh) = @_;
84
85 no strict;
86
87 eval {
88 my $rbuf;
89
90 while () {
91 sysread $fh, $rbuf, 16384, length $rbuf
92 or last;
93
94 while () {
95 my $len = unpack "L", $rbuf;
96
97 # full request available?
98 last unless $len && $len + 4 <= length $rbuf;
99
100 my $req = Storable::thaw substr $rbuf, 4;
101 substr $rbuf, 0, $len + 4, ""; # remove length + request
102
103 my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) };
104
105 $wbuf = pack "L/a*", Storable::freeze [undef, "$@"]
106 if $@;
107
108 for (my $ofs = 0; $ofs < length $wbuf; ) {
109 $ofs += (syswrite $fh, substr $wbuf, $ofs
110 or die "unable to write results");
111 }
112 }
113 }
114 };
115
116 if (AnyEvent::WIN32) {
117 kill 9, $$; # no other way on the broken windows platform
118 # and the above doesn't even work on windows, it seems the only
119 # way to is to leak memory and kill 9 from the parent. yay.
120 }
121
122 require POSIX;
123 POSIX::_exit (0);
124 # and the above kills the parent process on windows
125 }
126
127 =head2 METHODS
128
129 =over 4
130
131 =item $dbh = new AnyEvent::DBI $database, $user, $pass, [key => value]...
132
133 Returns a database handle for the given database. Each database handle
134 has an associated server process that executes statements in order. If
135 you want to run more than one statement in parallel, you need to create
136 additional database handles.
137
138 The advantage of this approach is that transactions work as state is
139 preserved.
140
141 Example:
142
143 $dbh = new AnyEvent::DBI
144 "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "";
145
146 Additional key-value pairs can be used to adjust behaviour:
147
148 =over 4
149
150 =item on_error => $callback->($dbh, $filename, $line, $fatal)
151
152 When an error occurs, then this callback will be invoked. On entry, C<$@>
153 is set to the error message. C<$filename> and C<$line> is where the
154 original request was submitted.
155
156 If this callback returns and this was a fatal error (C<$fatal> is true)
157 then AnyEvent::DBI die's, otherwise it calls the original request callback
158 without any arguments.
159
160 If omitted, then C<die> will be called on any errors, fatal or not.
161
162 =back
163
164 =cut
165
166 # stupid Storable autoloading, total loss-loss situation
167 Storable::thaw Storable::freeze [];
168
169 sub new {
170 my ($class, $dbi, $user, $pass, %arg) = @_;
171
172 socketpair my $client, my $server, &Socket::AF_UNIX, &Socket::SOCK_STREAM, &Socket::PF_UNSPEC
173 or croak "unable to create dbi communicaiton pipe: $!";
174
175 my $self = bless \%arg, $class;
176
177 $self->{fh} = $client;
178
179 Scalar::Util::weaken (my $wself = $self);
180
181 AnyEvent::Util::fh_nonblocking $client, 1;
182
183 my $rbuf;
184 my @caller = (caller)[1,2]; # the "default" caller
185
186 $self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub {
187 my $len = sysread $client, $rbuf, 65536, length $rbuf;
188
189 if ($len > 0) {
190
191 while () {
192 my $len = unpack "L", $rbuf;
193
194 # full request available?
195 last unless $len && $len + 4 <= length $rbuf;
196
197 my $res = Storable::thaw substr $rbuf, 4;
198 substr $rbuf, 0, $len + 4, ""; # remove length + request
199
200 my $req = shift @{ $wself->{queue} };
201
202 if (defined $res->[0]) {
203 $req->[0](@$res);
204 } else {
205 my $cb = shift @$req;
206 $wself->_error ($res->[1], @$req);
207 $cb->();
208 }
209 }
210
211 } elsif (defined $len) {
212 $wself->_error ("unexpected eof", @caller, 1);
213 } else {
214 $wself->_error ("read error: $!", @caller, 1);
215 }
216 });
217
218 $self->{ww_cb} = sub {
219 my $len = syswrite $client, $wself->{wbuf}
220 or return delete $wself->{ww};
221
222 substr $wself->{wbuf}, 0, $len, "";
223 };
224
225 my $pid = fork;
226
227 if ($pid) {
228 # parent
229 close $server;
230
231 } elsif (defined $pid) {
232 # child
233 close $client;
234 @_ = $server;
235 goto &serve;
236
237 } else {
238 croak "fork: $!";
239 }
240
241 $self->_req (sub { }, (caller)[1,2], 1, req_open => $dbi, $user, $pass);
242
243 $self
244 }
245
246 sub _error {
247 my ($self, $error, $filename, $line, $fatal) = @_;
248
249 delete $self->{rw};
250 delete $self->{ww};
251 delete $self->{fh};
252
253 $@ = $error;
254
255 $self->{on_error}($self, $filename, $line, $fatal)
256 if $self->{on_error};
257
258 die "$error at $filename, line $line\n";
259 }
260
261 sub _req {
262 my ($self, $cb, $filename, $line, $fatal) = splice @_, 0, 5, ();
263
264 push @{ $self->{queue} }, [$cb, $filename, $line, $fatal];
265
266 $self->{wbuf} .= pack "L/a*", Storable::freeze \@_;
267
268 unless ($self->{ww}) {
269 my $len = syswrite $self->{fh}, $self->{wbuf};
270 substr $self->{wbuf}, 0, $len, "";
271
272 # still any left? then install a write watcher
273 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb})
274 if length $self->{wbuf};
275 }
276 }
277
278 =item $dbh->exec ("statement", @args, $cb->($rows, $rv, ...))
279
280 Executes the given SQL statement with placeholders replaced by
281 C<@args>. The statement will be prepared and cached on the server side, so
282 using placeholders is compulsory.
283
284 The callback will be called with the result of C<fetchall_arrayref> as
285 first argument (or C<undef> if the statement wasn't a select statement)
286 and the return value of C<execute> as second argument. Additional
287 arguments might get passed as well.
288
289 If an error occurs and the C<on_error> callback returns, then no arguments
290 will be passed and C<$@> contains the error message.
291
292 =cut
293
294 sub exec {
295 my $cb = pop;
296 splice @_, 1, 0, $cb, (caller)[1,2], 0, "req_exec";
297
298 goto &_req;
299 }
300
301 =back
302
303 =head1 SEE ALSO
304
305 L<AnyEvent>, L<DBI>.
306
307 =head1 AUTHOR
308
309 Marc Lehmann <schmorp@schmorp.de>
310 http://home.schmorp.de/
311
312 =cut
313
314 1
315