ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
Revision: 1.2
Committed: Fri Jun 6 15:44:34 2008 UTC (16 years ago) by root
Branch: MAIN
Changes since 1.1: +20 -10 lines
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::DBI - asynchronous DBI access
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::DBI;
8
9 =head1 DESCRIPTION
10
11 This module is an L<AnyEvent> user, you need to make sure that you use and
12 run a supported event loop.
13
14 This module implements asynchronous DBI access my forking or executing
15 separate "DBI-Server" processes and sending them requests.
16
17 It means that you can run DBI requests in parallel to other tasks.
18
19 =cut
20
21 package AnyEvent::DBI;
22
23 use strict;
24 no warnings;
25
26 use Carp;
27 use Socket ();
28 use Scalar::Util ();
29 use Storable ();
30
31 use DBI ();
32
33 use AnyEvent ();
34 use AnyEvent::Util ();
35
36 our $VERSION = '1.0';
37
38 # this is the forked server code
39
40 our $DBH;
41
42 sub req_open {
43 my (undef, $dbi, $user, $pass, %attr) = @{+shift};
44
45 $DBH = DBI->connect ($dbi, $user, $pass, \%attr);
46
47 [1]
48 }
49
50 sub req_exec {
51 my (undef, $st, @args) = @{+shift};
52
53 my $sth = $DBH->prepare_cached ($st, undef, 1);
54
55 $sth->execute (@args)
56 or die $sth->errstr;
57
58 [$sth->fetchall_arrayref]
59 }
60
61 sub serve {
62 my ($fh) = @_;
63
64 no strict;
65
66 eval {
67 my $rbuf;
68
69 while () {
70 sysread $fh, $rbuf, 16384, length $rbuf
71 or last;
72
73 while () {
74 my $len = unpack "L", $rbuf;
75
76 # full request available?
77 last unless $len && $len + 4 <= length $rbuf;
78
79 my $req = Storable::thaw substr $rbuf, 4;
80 substr $rbuf, 0, $len + 4, ""; # remove length + request
81
82 my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) };
83
84 $wbuf = pack "L/a*", Storable::freeze [undef, "$@"]
85 if $@;
86
87 for (my $ofs = 0; $ofs < length $wbuf; ) {
88 $ofs += (syswrite $fh, substr $wbuf, $ofs
89 or die "unable to write results");
90 }
91 }
92 }
93 };
94
95 kill 9, $$; # no other way on the broken windows platform
96 }
97
98 =head2 METHODS
99
100 =over 4
101
102 =item $dbh = new AnyEvent::DBI $database, $user, $pass, [key => value]...
103
104 Returns a database handle for the given database. Each database handle
105 has an associated server process that executes statements in order. If
106 you want to run more than one statement in parallel, you need to create
107 additional database handles.
108
109 The advantage of this approach is that transactions work as state is
110 preserved.
111
112 Example:
113
114 $dbh = new AnyEvent::DBI
115 "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "";
116
117 Additional key-value pairs can be used to adjust behaviour:
118
119 =over 4
120
121 =item on_error => $callback->($dbh, $filename, $line, $fatal)
122
123 When an error occurs, then this callback will be invoked. On entry, C<$@>
124 is set to the error message. C<$filename> and C<$line> is where the
125 original request was submitted.
126
127 If this callback returns and this was a fatal error (C<$fatal> is true)
128 then AnyEvent::DBI die's, otherwise it calls the original request callback
129 without any arguments.
130
131 If omitted, then C<die> will be called on any errors, fatal or not.
132
133 =back
134
135 =cut
136
137 # stupid Storable autoloading, total loss-loss situation
138 Storable::thaw Storable::freeze [];
139
140 sub new {
141 my ($class, $dbi, $user, $pass, %arg) = @_;
142
143 socketpair my $client, my $server, &Socket::AF_UNIX, &Socket::SOCK_STREAM, &Socket::PF_UNSPEC
144 or croak "unable to create dbi communicaiton pipe: $!";
145
146 my $self = bless \%arg, $class;
147
148 $self->{fh} = $client;
149
150 Scalar::Util::weaken (my $wself = $self);
151
152 AnyEvent::Util::fh_nonblocking $client, 1;
153
154 my $rbuf;
155 my @caller = (caller)[1,2]; # the "default" caller
156
157 $self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub {
158 my $len = sysread $client, $rbuf, 65536, length $rbuf;
159
160 if ($len > 0) {
161
162 while () {
163 my $len = unpack "L", $rbuf;
164
165 # full request available?
166 last unless $len && $len + 4 <= length $rbuf;
167
168 my $res = Storable::thaw substr $rbuf, 4;
169 substr $rbuf, 0, $len + 4, ""; # remove length + request
170
171 my $req = shift @{ $wself->{queue} };
172
173 if (defined $res->[0]) {
174 $req->[0](@$res);
175 } else {
176 my $cb = shift @$req;
177 $wself->_error ($res->[1], @$req);
178 $cb->();
179 }
180 }
181
182 } elsif (defined $len) {
183 $wself->_error ("unexpected eof", @caller, 1);
184 } else {
185 $wself->_error ("read error: $!", @caller, 1);
186 }
187 });
188
189 my $pid = fork;
190
191 if ($pid) {
192 # parent
193 close $server;
194
195 } elsif (defined $pid) {
196 # child
197 close $client;
198 @_ = $server;
199 goto &serve;
200
201 } else {
202 croak "fork: $!";
203 }
204
205 $self->_req (sub { }, (caller)[1,2], 1, req_open => $dbi, $user, $pass);
206
207 $self
208 }
209
210 sub _error {
211 my ($self, $error, $filename, $line, $fatal) = @_;
212
213 delete $self->{rw};
214 delete $self->{ww};
215 delete $self->{fh};
216
217 $@ = $error;
218
219 $self->{on_error}($self, $filename, $line, $fatal)
220 if $self->{on_error};
221
222 die "$error at $filename, line $line\n";
223 }
224
225 sub _req {
226 my ($self, $cb, $filename, $line, $fatal) = splice @_, 0, 5, ();
227
228 push @{ $self->{queue} }, [$cb, $filename, $line, $fatal];
229
230 $self->{wbuf} .= pack "L/a*", Storable::freeze \@_;
231
232 unless ($self->{ww}) {
233 my $len = syswrite $self->{fh}, $self->{wbuf};
234 substr $self->{wbuf}, 0, $len, "";
235
236 #TODO, ww_cb
237 # still any left? then install a write watcher
238 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb})
239 if length $self->{wbuf};
240 }
241 }
242
243 =item $dbh->exec ("statement", @args, $cb->($rows, %extra))
244
245 Executes the given SQL statement with placeholders replaced by
246 C<@args>. The statement will be prepared and cached on the server side, so
247 using placeholders is compulsory.
248
249 The callback will be called with the result of C<fetchall_arrayref> as
250 first argument and possibly a hash reference with additional information.
251
252 If an error occurs and the C<on_error> callback returns, then no arguments
253 will be passed and C<$@> contains the error message.
254
255 =cut
256
257 sub exec {
258 my $cb = pop;
259 splice @_, 1, 0, $cb, (caller)[1,2], 0, "req_exec";
260
261 goto &_req;
262 }
263
264 =back
265
266 =head1 SEE ALSO
267
268 L<AnyEvent>, L<DBI>.
269
270 =head1 AUTHOR
271
272 Marc Lehmann <schmorp@schmorp.de>
273 http://home.schmorp.de/
274
275 =cut
276
277 1
278