ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
Revision: 1.3
Committed: Fri Jun 6 16:13:07 2008 UTC (16 years ago) by root
Branch: MAIN
Changes since 1.2: +11 -1 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::DBI - asynchronous DBI access
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::DBI;
8
9 =head1 DESCRIPTION
10
11 This module is an L<AnyEvent> user, you need to make sure that you use and
12 run a supported event loop.
13
14 This module implements asynchronous DBI access my forking or executing
15 separate "DBI-Server" processes and sending them requests.
16
17 It means that you can run DBI requests in parallel to other tasks.
18
19 The overhead for very simple statements ("select 0") is somewhere
20 around 120% to 200% (single/dual core CPU) compared to an explicit
21 prepare_cached/execute/fetchrow_arrayref/finish combination.
22
23 =cut
24
25 package AnyEvent::DBI;
26
27 use strict;
28 no warnings;
29
30 use Carp;
31 use Socket ();
32 use Scalar::Util ();
33 use Storable ();
34
35 use DBI ();
36
37 use AnyEvent ();
38 use AnyEvent::Util ();
39
40 our $VERSION = '1.0';
41
42 # this is the forked server code
43
44 our $DBH;
45
46 sub req_open {
47 my (undef, $dbi, $user, $pass, %attr) = @{+shift};
48
49 $DBH = DBI->connect ($dbi, $user, $pass, \%attr);
50
51 [1]
52 }
53
54 sub req_exec {
55 my (undef, $st, @args) = @{+shift};
56
57 my $sth = $DBH->prepare_cached ($st, undef, 1);
58
59 $sth->execute (@args)
60 or die $sth->errstr;
61
62 [$sth->fetchall_arrayref]
63 }
64
65 sub serve {
66 my ($fh) = @_;
67
68 no strict;
69
70 eval {
71 my $rbuf;
72
73 while () {
74 sysread $fh, $rbuf, 16384, length $rbuf
75 or last;
76
77 while () {
78 my $len = unpack "L", $rbuf;
79
80 # full request available?
81 last unless $len && $len + 4 <= length $rbuf;
82
83 my $req = Storable::thaw substr $rbuf, 4;
84 substr $rbuf, 0, $len + 4, ""; # remove length + request
85
86 my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) };
87
88 $wbuf = pack "L/a*", Storable::freeze [undef, "$@"]
89 if $@;
90
91 for (my $ofs = 0; $ofs < length $wbuf; ) {
92 $ofs += (syswrite $fh, substr $wbuf, $ofs
93 or die "unable to write results");
94 }
95 }
96 }
97 };
98
99 kill 9, $$; # no other way on the broken windows platform
100 }
101
102 =head2 METHODS
103
104 =over 4
105
106 =item $dbh = new AnyEvent::DBI $database, $user, $pass, [key => value]...
107
108 Returns a database handle for the given database. Each database handle
109 has an associated server process that executes statements in order. If
110 you want to run more than one statement in parallel, you need to create
111 additional database handles.
112
113 The advantage of this approach is that transactions work as state is
114 preserved.
115
116 Example:
117
118 $dbh = new AnyEvent::DBI
119 "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "";
120
121 Additional key-value pairs can be used to adjust behaviour:
122
123 =over 4
124
125 =item on_error => $callback->($dbh, $filename, $line, $fatal)
126
127 When an error occurs, then this callback will be invoked. On entry, C<$@>
128 is set to the error message. C<$filename> and C<$line> is where the
129 original request was submitted.
130
131 If this callback returns and this was a fatal error (C<$fatal> is true)
132 then AnyEvent::DBI die's, otherwise it calls the original request callback
133 without any arguments.
134
135 If omitted, then C<die> will be called on any errors, fatal or not.
136
137 =back
138
139 =cut
140
141 # stupid Storable autoloading, total loss-loss situation
142 Storable::thaw Storable::freeze [];
143
144 sub new {
145 my ($class, $dbi, $user, $pass, %arg) = @_;
146
147 socketpair my $client, my $server, &Socket::AF_UNIX, &Socket::SOCK_STREAM, &Socket::PF_UNSPEC
148 or croak "unable to create dbi communicaiton pipe: $!";
149
150 my $self = bless \%arg, $class;
151
152 $self->{fh} = $client;
153
154 Scalar::Util::weaken (my $wself = $self);
155
156 AnyEvent::Util::fh_nonblocking $client, 1;
157
158 my $rbuf;
159 my @caller = (caller)[1,2]; # the "default" caller
160
161 $self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub {
162 my $len = sysread $client, $rbuf, 65536, length $rbuf;
163
164 if ($len > 0) {
165
166 while () {
167 my $len = unpack "L", $rbuf;
168
169 # full request available?
170 last unless $len && $len + 4 <= length $rbuf;
171
172 my $res = Storable::thaw substr $rbuf, 4;
173 substr $rbuf, 0, $len + 4, ""; # remove length + request
174
175 my $req = shift @{ $wself->{queue} };
176
177 if (defined $res->[0]) {
178 $req->[0](@$res);
179 } else {
180 my $cb = shift @$req;
181 $wself->_error ($res->[1], @$req);
182 $cb->();
183 }
184 }
185
186 } elsif (defined $len) {
187 $wself->_error ("unexpected eof", @caller, 1);
188 } else {
189 $wself->_error ("read error: $!", @caller, 1);
190 }
191 });
192
193 $self->{ww_cb} = sub {
194 my $len = syswrite $client, $wself->{wbuf}
195 or return delete $wself->{ww};
196
197 substr $wself->{wbuf}, 0, $len, "";
198 };
199
200 my $pid = fork;
201
202 if ($pid) {
203 # parent
204 close $server;
205
206 } elsif (defined $pid) {
207 # child
208 close $client;
209 @_ = $server;
210 goto &serve;
211
212 } else {
213 croak "fork: $!";
214 }
215
216 $self->_req (sub { }, (caller)[1,2], 1, req_open => $dbi, $user, $pass);
217
218 $self
219 }
220
221 sub _error {
222 my ($self, $error, $filename, $line, $fatal) = @_;
223
224 delete $self->{rw};
225 delete $self->{ww};
226 delete $self->{fh};
227
228 $@ = $error;
229
230 $self->{on_error}($self, $filename, $line, $fatal)
231 if $self->{on_error};
232
233 die "$error at $filename, line $line\n";
234 }
235
236 sub _req {
237 my ($self, $cb, $filename, $line, $fatal) = splice @_, 0, 5, ();
238
239 push @{ $self->{queue} }, [$cb, $filename, $line, $fatal];
240
241 $self->{wbuf} .= pack "L/a*", Storable::freeze \@_;
242
243 unless ($self->{ww}) {
244 my $len = syswrite $self->{fh}, $self->{wbuf};
245 substr $self->{wbuf}, 0, $len, "";
246
247 # still any left? then install a write watcher
248 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb})
249 if length $self->{wbuf};
250 }
251 }
252
253 =item $dbh->exec ("statement", @args, $cb->($rows, %extra))
254
255 Executes the given SQL statement with placeholders replaced by
256 C<@args>. The statement will be prepared and cached on the server side, so
257 using placeholders is compulsory.
258
259 The callback will be called with the result of C<fetchall_arrayref> as
260 first argument and possibly a hash reference with additional information.
261
262 If an error occurs and the C<on_error> callback returns, then no arguments
263 will be passed and C<$@> contains the error message.
264
265 =cut
266
267 sub exec {
268 my $cb = pop;
269 splice @_, 1, 0, $cb, (caller)[1,2], 0, "req_exec";
270
271 goto &_req;
272 }
273
274 =back
275
276 =head1 SEE ALSO
277
278 L<AnyEvent>, L<DBI>.
279
280 =head1 AUTHOR
281
282 Marc Lehmann <schmorp@schmorp.de>
283 http://home.schmorp.de/
284
285 =cut
286
287 1
288