ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
Revision: 1.5
Committed: Mon Jun 9 15:30:41 2008 UTC (15 years, 11 months ago) by root
Branch: MAIN
Changes since 1.4: +17 -0 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::DBI - asynchronous DBI access
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::DBI;
8
9 my $cv = AnyEvent->condvar;
10
11 my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", "";
12
13 $dbh->exec ("select * from test where num=?", 10, sub {
14 my ($rows) = @_;
15
16 print "@$_\n"
17 for @$rows;
18
19 $cv->broadcast;
20 });
21
22 # asynchronously do sth. else here
23
24 $cv->wait;
25
26 =head1 DESCRIPTION
27
28 This module is an L<AnyEvent> user, you need to make sure that you use and
29 run a supported event loop.
30
31 This module implements asynchronous DBI access my forking or executing
32 separate "DBI-Server" processes and sending them requests.
33
34 It means that you can run DBI requests in parallel to other tasks.
35
36 The overhead for very simple statements ("select 0") is somewhere
37 around 120% to 200% (single/dual core CPU) compared to an explicit
38 prepare_cached/execute/fetchrow_arrayref/finish combination.
39
40 =cut
41
42 package AnyEvent::DBI;
43
44 use strict;
45 no warnings;
46
47 use Carp;
48 use Socket ();
49 use Scalar::Util ();
50 use Storable ();
51
52 use DBI ();
53
54 use AnyEvent ();
55 use AnyEvent::Util ();
56
57 our $VERSION = '1.0';
58
59 # this is the forked server code
60
61 our $DBH;
62
63 sub req_open {
64 my (undef, $dbi, $user, $pass, %attr) = @{+shift};
65
66 $DBH = DBI->connect ($dbi, $user, $pass, \%attr);
67
68 [1]
69 }
70
71 sub req_exec {
72 my (undef, $st, @args) = @{+shift};
73
74 my $sth = $DBH->prepare_cached ($st, undef, 1);
75
76 $sth->execute (@args)
77 or die $sth->errstr;
78
79 [$sth->fetchall_arrayref]
80 }
81
82 sub serve {
83 my ($fh) = @_;
84
85 no strict;
86
87 eval {
88 my $rbuf;
89
90 while () {
91 sysread $fh, $rbuf, 16384, length $rbuf
92 or last;
93
94 while () {
95 my $len = unpack "L", $rbuf;
96
97 # full request available?
98 last unless $len && $len + 4 <= length $rbuf;
99
100 my $req = Storable::thaw substr $rbuf, 4;
101 substr $rbuf, 0, $len + 4, ""; # remove length + request
102
103 my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) };
104
105 $wbuf = pack "L/a*", Storable::freeze [undef, "$@"]
106 if $@;
107
108 for (my $ofs = 0; $ofs < length $wbuf; ) {
109 $ofs += (syswrite $fh, substr $wbuf, $ofs
110 or die "unable to write results");
111 }
112 }
113 }
114 };
115
116 kill 9, $$; # no other way on the broken windows platform
117 }
118
119 =head2 METHODS
120
121 =over 4
122
123 =item $dbh = new AnyEvent::DBI $database, $user, $pass, [key => value]...
124
125 Returns a database handle for the given database. Each database handle
126 has an associated server process that executes statements in order. If
127 you want to run more than one statement in parallel, you need to create
128 additional database handles.
129
130 The advantage of this approach is that transactions work as state is
131 preserved.
132
133 Example:
134
135 $dbh = new AnyEvent::DBI
136 "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "";
137
138 Additional key-value pairs can be used to adjust behaviour:
139
140 =over 4
141
142 =item on_error => $callback->($dbh, $filename, $line, $fatal)
143
144 When an error occurs, then this callback will be invoked. On entry, C<$@>
145 is set to the error message. C<$filename> and C<$line> is where the
146 original request was submitted.
147
148 If this callback returns and this was a fatal error (C<$fatal> is true)
149 then AnyEvent::DBI die's, otherwise it calls the original request callback
150 without any arguments.
151
152 If omitted, then C<die> will be called on any errors, fatal or not.
153
154 =back
155
156 =cut
157
158 # stupid Storable autoloading, total loss-loss situation
159 Storable::thaw Storable::freeze [];
160
161 sub new {
162 my ($class, $dbi, $user, $pass, %arg) = @_;
163
164 socketpair my $client, my $server, &Socket::AF_UNIX, &Socket::SOCK_STREAM, &Socket::PF_UNSPEC
165 or croak "unable to create dbi communicaiton pipe: $!";
166
167 my $self = bless \%arg, $class;
168
169 $self->{fh} = $client;
170
171 Scalar::Util::weaken (my $wself = $self);
172
173 AnyEvent::Util::fh_nonblocking $client, 1;
174
175 my $rbuf;
176 my @caller = (caller)[1,2]; # the "default" caller
177
178 $self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub {
179 my $len = sysread $client, $rbuf, 65536, length $rbuf;
180
181 if ($len > 0) {
182
183 while () {
184 my $len = unpack "L", $rbuf;
185
186 # full request available?
187 last unless $len && $len + 4 <= length $rbuf;
188
189 my $res = Storable::thaw substr $rbuf, 4;
190 substr $rbuf, 0, $len + 4, ""; # remove length + request
191
192 my $req = shift @{ $wself->{queue} };
193
194 if (defined $res->[0]) {
195 $req->[0](@$res);
196 } else {
197 my $cb = shift @$req;
198 $wself->_error ($res->[1], @$req);
199 $cb->();
200 }
201 }
202
203 } elsif (defined $len) {
204 $wself->_error ("unexpected eof", @caller, 1);
205 } else {
206 $wself->_error ("read error: $!", @caller, 1);
207 }
208 });
209
210 $self->{ww_cb} = sub {
211 my $len = syswrite $client, $wself->{wbuf}
212 or return delete $wself->{ww};
213
214 substr $wself->{wbuf}, 0, $len, "";
215 };
216
217 my $pid = fork;
218
219 if ($pid) {
220 # parent
221 close $server;
222
223 } elsif (defined $pid) {
224 # child
225 close $client;
226 @_ = $server;
227 goto &serve;
228
229 } else {
230 croak "fork: $!";
231 }
232
233 $self->_req (sub { }, (caller)[1,2], 1, req_open => $dbi, $user, $pass);
234
235 $self
236 }
237
238 sub _error {
239 my ($self, $error, $filename, $line, $fatal) = @_;
240
241 delete $self->{rw};
242 delete $self->{ww};
243 delete $self->{fh};
244
245 $@ = $error;
246
247 $self->{on_error}($self, $filename, $line, $fatal)
248 if $self->{on_error};
249
250 die "$error at $filename, line $line\n";
251 }
252
253 sub _req {
254 my ($self, $cb, $filename, $line, $fatal) = splice @_, 0, 5, ();
255
256 push @{ $self->{queue} }, [$cb, $filename, $line, $fatal];
257
258 $self->{wbuf} .= pack "L/a*", Storable::freeze \@_;
259
260 unless ($self->{ww}) {
261 my $len = syswrite $self->{fh}, $self->{wbuf};
262 substr $self->{wbuf}, 0, $len, "";
263
264 # still any left? then install a write watcher
265 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb})
266 if length $self->{wbuf};
267 }
268 }
269
270 =item $dbh->exec ("statement", @args, $cb->($rows, %extra))
271
272 Executes the given SQL statement with placeholders replaced by
273 C<@args>. The statement will be prepared and cached on the server side, so
274 using placeholders is compulsory.
275
276 The callback will be called with the result of C<fetchall_arrayref> as
277 first argument and possibly a hash reference with additional information.
278
279 If an error occurs and the C<on_error> callback returns, then no arguments
280 will be passed and C<$@> contains the error message.
281
282 =cut
283
284 sub exec {
285 my $cb = pop;
286 splice @_, 1, 0, $cb, (caller)[1,2], 0, "req_exec";
287
288 goto &_req;
289 }
290
291 =back
292
293 =head1 SEE ALSO
294
295 L<AnyEvent>, L<DBI>.
296
297 =head1 AUTHOR
298
299 Marc Lehmann <schmorp@schmorp.de>
300 http://home.schmorp.de/
301
302 =cut
303
304 1
305