ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
Revision: 1.7
Committed: Mon Jul 21 02:34:40 2008 UTC (15 years, 10 months ago) by root
Branch: MAIN
Changes since 1.6: +17 -7 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::DBI - asynchronous DBI access
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::DBI;
8    
9 root 1.5 my $cv = AnyEvent->condvar;
10    
11     my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", "";
12    
13     $dbh->exec ("select * from test where num=?", 10, sub {
14 root 1.7 my ($rows, $rv) = @_;
15 root 1.5
16     print "@$_\n"
17     for @$rows;
18    
19     $cv->broadcast;
20     });
21    
22     # asynchronously do sth. else here
23    
24     $cv->wait;
25    
26 root 1.1 =head1 DESCRIPTION
27    
28     This module is an L<AnyEvent> user, you need to make sure that you use and
29     run a supported event loop.
30    
31     This module implements asynchronous DBI access my forking or executing
32     separate "DBI-Server" processes and sending them requests.
33    
34     It means that you can run DBI requests in parallel to other tasks.
35    
36 root 1.3 The overhead for very simple statements ("select 0") is somewhere
37 root 1.6 around 120% to 200% (dual/single core CPU) compared to an explicit
38 root 1.3 prepare_cached/execute/fetchrow_arrayref/finish combination.
39    
40 root 1.1 =cut
41    
42     package AnyEvent::DBI;
43    
44     use strict;
45     no warnings;
46    
47     use Carp;
48     use Socket ();
49     use Scalar::Util ();
50     use Storable ();
51    
52     use DBI ();
53    
54     use AnyEvent ();
55     use AnyEvent::Util ();
56    
57 root 1.7 our $VERSION = '1.1';
58 root 1.1
59     # this is the forked server code
60    
61     our $DBH;
62    
63     sub req_open {
64     my (undef, $dbi, $user, $pass, %attr) = @{+shift};
65    
66     $DBH = DBI->connect ($dbi, $user, $pass, \%attr);
67    
68     [1]
69     }
70    
71 root 1.2 sub req_exec {
72     my (undef, $st, @args) = @{+shift};
73    
74     my $sth = $DBH->prepare_cached ($st, undef, 1);
75    
76 root 1.7 my $rv = $sth->execute (@args)
77 root 1.2 or die $sth->errstr;
78    
79 root 1.7 [1, $sth->{NUM_OF_FIELDS} ? $sth->fetchall_arrayref : undef, { rv => $rv }]
80 root 1.2 }
81    
82 root 1.1 sub serve {
83     my ($fh) = @_;
84    
85     no strict;
86    
87     eval {
88     my $rbuf;
89    
90     while () {
91     sysread $fh, $rbuf, 16384, length $rbuf
92     or last;
93    
94     while () {
95     my $len = unpack "L", $rbuf;
96    
97     # full request available?
98     last unless $len && $len + 4 <= length $rbuf;
99    
100     my $req = Storable::thaw substr $rbuf, 4;
101     substr $rbuf, 0, $len + 4, ""; # remove length + request
102    
103     my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) };
104    
105     $wbuf = pack "L/a*", Storable::freeze [undef, "$@"]
106     if $@;
107    
108     for (my $ofs = 0; $ofs < length $wbuf; ) {
109     $ofs += (syswrite $fh, substr $wbuf, $ofs
110     or die "unable to write results");
111     }
112     }
113     }
114     };
115    
116 root 1.7 if (AnyEvent::WIN32) {
117     kill 9, $$; # no other way on the broken windows platform
118     # and the above doesn't even work on windows, it seems the only
119     # way to is to leak memory and kill 9 from the parent. yay.
120     }
121    
122     require POSIX;
123     POSIX::_exit (0);
124     # and the above kills the parent process on windows
125 root 1.1 }
126    
127     =head2 METHODS
128    
129     =over 4
130    
131     =item $dbh = new AnyEvent::DBI $database, $user, $pass, [key => value]...
132    
133     Returns a database handle for the given database. Each database handle
134     has an associated server process that executes statements in order. If
135     you want to run more than one statement in parallel, you need to create
136     additional database handles.
137    
138     The advantage of this approach is that transactions work as state is
139     preserved.
140    
141     Example:
142    
143     $dbh = new AnyEvent::DBI
144     "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "";
145    
146     Additional key-value pairs can be used to adjust behaviour:
147    
148     =over 4
149    
150     =item on_error => $callback->($dbh, $filename, $line, $fatal)
151    
152     When an error occurs, then this callback will be invoked. On entry, C<$@>
153     is set to the error message. C<$filename> and C<$line> is where the
154     original request was submitted.
155    
156     If this callback returns and this was a fatal error (C<$fatal> is true)
157     then AnyEvent::DBI die's, otherwise it calls the original request callback
158     without any arguments.
159    
160 root 1.2 If omitted, then C<die> will be called on any errors, fatal or not.
161 root 1.1
162     =back
163    
164     =cut
165    
166     # stupid Storable autoloading, total loss-loss situation
167     Storable::thaw Storable::freeze [];
168    
169     sub new {
170     my ($class, $dbi, $user, $pass, %arg) = @_;
171    
172     socketpair my $client, my $server, &Socket::AF_UNIX, &Socket::SOCK_STREAM, &Socket::PF_UNSPEC
173     or croak "unable to create dbi communicaiton pipe: $!";
174    
175     my $self = bless \%arg, $class;
176    
177     $self->{fh} = $client;
178    
179     Scalar::Util::weaken (my $wself = $self);
180    
181     AnyEvent::Util::fh_nonblocking $client, 1;
182    
183     my $rbuf;
184     my @caller = (caller)[1,2]; # the "default" caller
185    
186     $self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub {
187     my $len = sysread $client, $rbuf, 65536, length $rbuf;
188    
189     if ($len > 0) {
190    
191     while () {
192     my $len = unpack "L", $rbuf;
193    
194     # full request available?
195     last unless $len && $len + 4 <= length $rbuf;
196    
197     my $res = Storable::thaw substr $rbuf, 4;
198     substr $rbuf, 0, $len + 4, ""; # remove length + request
199    
200     my $req = shift @{ $wself->{queue} };
201    
202     if (defined $res->[0]) {
203     $req->[0](@$res);
204     } else {
205     my $cb = shift @$req;
206     $wself->_error ($res->[1], @$req);
207 root 1.2 $cb->();
208 root 1.1 }
209     }
210    
211     } elsif (defined $len) {
212     $wself->_error ("unexpected eof", @caller, 1);
213     } else {
214     $wself->_error ("read error: $!", @caller, 1);
215     }
216     });
217    
218 root 1.3 $self->{ww_cb} = sub {
219     my $len = syswrite $client, $wself->{wbuf}
220     or return delete $wself->{ww};
221    
222     substr $wself->{wbuf}, 0, $len, "";
223     };
224    
225 root 1.1 my $pid = fork;
226    
227     if ($pid) {
228     # parent
229     close $server;
230    
231     } elsif (defined $pid) {
232     # child
233     close $client;
234     @_ = $server;
235     goto &serve;
236    
237     } else {
238     croak "fork: $!";
239     }
240    
241     $self->_req (sub { }, (caller)[1,2], 1, req_open => $dbi, $user, $pass);
242    
243     $self
244     }
245    
246     sub _error {
247     my ($self, $error, $filename, $line, $fatal) = @_;
248    
249     delete $self->{rw};
250     delete $self->{ww};
251     delete $self->{fh};
252    
253     $@ = $error;
254    
255     $self->{on_error}($self, $filename, $line, $fatal)
256     if $self->{on_error};
257    
258 root 1.2 die "$error at $filename, line $line\n";
259 root 1.1 }
260    
261     sub _req {
262     my ($self, $cb, $filename, $line, $fatal) = splice @_, 0, 5, ();
263    
264     push @{ $self->{queue} }, [$cb, $filename, $line, $fatal];
265    
266     $self->{wbuf} .= pack "L/a*", Storable::freeze \@_;
267    
268     unless ($self->{ww}) {
269     my $len = syswrite $self->{fh}, $self->{wbuf};
270     substr $self->{wbuf}, 0, $len, "";
271    
272     # still any left? then install a write watcher
273     $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb})
274     if length $self->{wbuf};
275     }
276     }
277    
278 root 1.7 =item $dbh->exec ("statement", @args, $cb->($rows, $rv, ...))
279 root 1.1
280     Executes the given SQL statement with placeholders replaced by
281 root 1.2 C<@args>. The statement will be prepared and cached on the server side, so
282     using placeholders is compulsory.
283 root 1.1
284     The callback will be called with the result of C<fetchall_arrayref> as
285 root 1.7 first argument (or C<undef> if the statement wasn't a select statement)
286     and the return value of C<execute> as second argument. Additional
287     arguments might get passed as well.
288 root 1.1
289 root 1.2 If an error occurs and the C<on_error> callback returns, then no arguments
290     will be passed and C<$@> contains the error message.
291    
292 root 1.1 =cut
293    
294     sub exec {
295     my $cb = pop;
296 root 1.2 splice @_, 1, 0, $cb, (caller)[1,2], 0, "req_exec";
297 root 1.1
298     goto &_req;
299     }
300    
301     =back
302    
303     =head1 SEE ALSO
304    
305     L<AnyEvent>, L<DBI>.
306    
307     =head1 AUTHOR
308    
309 root 1.4 Marc Lehmann <schmorp@schmorp.de>
310     http://home.schmorp.de/
311 root 1.1
312     =cut
313    
314     1
315