ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
Revision: 1.9
Committed: Thu Nov 6 13:56:58 2008 UTC (15 years, 7 months ago) by root
Branch: MAIN
CVS Tags: rel-1_1
Changes since 1.8: +4 -2 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::DBI - asynchronous DBI access
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::DBI;
8    
9 root 1.5 my $cv = AnyEvent->condvar;
10    
11     my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", "";
12    
13     $dbh->exec ("select * from test where num=?", 10, sub {
14 root 1.7 my ($rows, $rv) = @_;
15 root 1.5
16     print "@$_\n"
17     for @$rows;
18    
19     $cv->broadcast;
20     });
21    
22     # asynchronously do sth. else here
23    
24     $cv->wait;
25    
26 root 1.1 =head1 DESCRIPTION
27    
28     This module is an L<AnyEvent> user, you need to make sure that you use and
29     run a supported event loop.
30    
31 root 1.8 This module implements asynchronous DBI access by forking or executing
32 root 1.1 separate "DBI-Server" processes and sending them requests.
33    
34     It means that you can run DBI requests in parallel to other tasks.
35    
36 root 1.3 The overhead for very simple statements ("select 0") is somewhere
37 root 1.6 around 120% to 200% (dual/single core CPU) compared to an explicit
38 root 1.3 prepare_cached/execute/fetchrow_arrayref/finish combination.
39    
40 root 1.1 =cut
41    
42     package AnyEvent::DBI;
43    
44     use strict;
45     no warnings;
46    
47     use Carp;
48     use Socket ();
49     use Scalar::Util ();
50     use Storable ();
51    
52     use DBI ();
53    
54     use AnyEvent ();
55     use AnyEvent::Util ();
56    
57 root 1.7 our $VERSION = '1.1';
58 root 1.1
59     # this is the forked server code
60    
61     our $DBH;
62    
63     sub req_open {
64     my (undef, $dbi, $user, $pass, %attr) = @{+shift};
65    
66     $DBH = DBI->connect ($dbi, $user, $pass, \%attr);
67    
68     [1]
69     }
70    
71 root 1.2 sub req_exec {
72     my (undef, $st, @args) = @{+shift};
73    
74     my $sth = $DBH->prepare_cached ($st, undef, 1);
75    
76 root 1.7 my $rv = $sth->execute (@args)
77 root 1.2 or die $sth->errstr;
78    
79 root 1.7 [1, $sth->{NUM_OF_FIELDS} ? $sth->fetchall_arrayref : undef, { rv => $rv }]
80 root 1.2 }
81    
82 root 1.1 sub serve {
83     my ($fh) = @_;
84    
85     no strict;
86    
87     eval {
88     my $rbuf;
89    
90     while () {
91     sysread $fh, $rbuf, 16384, length $rbuf
92     or last;
93    
94     while () {
95     my $len = unpack "L", $rbuf;
96    
97     # full request available?
98     last unless $len && $len + 4 <= length $rbuf;
99    
100     my $req = Storable::thaw substr $rbuf, 4;
101     substr $rbuf, 0, $len + 4, ""; # remove length + request
102    
103     my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) };
104    
105     $wbuf = pack "L/a*", Storable::freeze [undef, "$@"]
106     if $@;
107    
108     for (my $ofs = 0; $ofs < length $wbuf; ) {
109     $ofs += (syswrite $fh, substr $wbuf, $ofs
110     or die "unable to write results");
111     }
112     }
113     }
114     };
115    
116 root 1.7 if (AnyEvent::WIN32) {
117     kill 9, $$; # no other way on the broken windows platform
118     # and the above doesn't even work on windows, it seems the only
119     # way to is to leak memory and kill 9 from the parent. yay.
120     }
121    
122     require POSIX;
123     POSIX::_exit (0);
124     # and the above kills the parent process on windows
125 root 1.1 }
126    
127     =head2 METHODS
128    
129     =over 4
130    
131     =item $dbh = new AnyEvent::DBI $database, $user, $pass, [key => value]...
132    
133     Returns a database handle for the given database. Each database handle
134     has an associated server process that executes statements in order. If
135     you want to run more than one statement in parallel, you need to create
136     additional database handles.
137    
138     The advantage of this approach is that transactions work as state is
139     preserved.
140    
141     Example:
142    
143     $dbh = new AnyEvent::DBI
144     "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "";
145    
146     Additional key-value pairs can be used to adjust behaviour:
147    
148     =over 4
149    
150     =item on_error => $callback->($dbh, $filename, $line, $fatal)
151    
152     When an error occurs, then this callback will be invoked. On entry, C<$@>
153     is set to the error message. C<$filename> and C<$line> is where the
154     original request was submitted.
155    
156     If this callback returns and this was a fatal error (C<$fatal> is true)
157     then AnyEvent::DBI die's, otherwise it calls the original request callback
158     without any arguments.
159    
160 root 1.2 If omitted, then C<die> will be called on any errors, fatal or not.
161 root 1.1
162     =back
163    
164     =cut
165    
166     # stupid Storable autoloading, total loss-loss situation
167     Storable::thaw Storable::freeze [];
168    
169     sub new {
170     my ($class, $dbi, $user, $pass, %arg) = @_;
171    
172     socketpair my $client, my $server, &Socket::AF_UNIX, &Socket::SOCK_STREAM, &Socket::PF_UNSPEC
173     or croak "unable to create dbi communicaiton pipe: $!";
174    
175     my $self = bless \%arg, $class;
176    
177     $self->{fh} = $client;
178    
179     Scalar::Util::weaken (my $wself = $self);
180    
181     AnyEvent::Util::fh_nonblocking $client, 1;
182    
183     my $rbuf;
184     my @caller = (caller)[1,2]; # the "default" caller
185    
186     $self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub {
187     my $len = sysread $client, $rbuf, 65536, length $rbuf;
188    
189     if ($len > 0) {
190    
191     while () {
192     my $len = unpack "L", $rbuf;
193    
194     # full request available?
195     last unless $len && $len + 4 <= length $rbuf;
196    
197     my $res = Storable::thaw substr $rbuf, 4;
198     substr $rbuf, 0, $len + 4, ""; # remove length + request
199    
200     my $req = shift @{ $wself->{queue} };
201    
202     if (defined $res->[0]) {
203     $req->[0](@$res);
204     } else {
205     my $cb = shift @$req;
206     $wself->_error ($res->[1], @$req);
207 root 1.2 $cb->();
208 root 1.1 }
209     }
210    
211     } elsif (defined $len) {
212     $wself->_error ("unexpected eof", @caller, 1);
213     } else {
214     $wself->_error ("read error: $!", @caller, 1);
215     }
216     });
217    
218 root 1.3 $self->{ww_cb} = sub {
219     my $len = syswrite $client, $wself->{wbuf}
220     or return delete $wself->{ww};
221    
222     substr $wself->{wbuf}, 0, $len, "";
223     };
224    
225 root 1.1 my $pid = fork;
226    
227     if ($pid) {
228     # parent
229     close $server;
230    
231     } elsif (defined $pid) {
232     # child
233     close $client;
234     @_ = $server;
235     goto &serve;
236    
237     } else {
238     croak "fork: $!";
239     }
240    
241     $self->_req (sub { }, (caller)[1,2], 1, req_open => $dbi, $user, $pass);
242    
243     $self
244     }
245    
246     sub _error {
247     my ($self, $error, $filename, $line, $fatal) = @_;
248    
249     delete $self->{rw};
250     delete $self->{ww};
251     delete $self->{fh};
252    
253     $@ = $error;
254    
255 root 1.9 if ($self->{on_error}) {
256     $self->{on_error}($self, $filename, $line, $fatal);
257     return unless $fatal;
258     }
259 root 1.1
260 root 1.2 die "$error at $filename, line $line\n";
261 root 1.1 }
262    
263     sub _req {
264     my ($self, $cb, $filename, $line, $fatal) = splice @_, 0, 5, ();
265    
266     push @{ $self->{queue} }, [$cb, $filename, $line, $fatal];
267    
268     $self->{wbuf} .= pack "L/a*", Storable::freeze \@_;
269    
270     unless ($self->{ww}) {
271     my $len = syswrite $self->{fh}, $self->{wbuf};
272     substr $self->{wbuf}, 0, $len, "";
273    
274     # still any left? then install a write watcher
275     $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb})
276     if length $self->{wbuf};
277     }
278     }
279    
280 root 1.7 =item $dbh->exec ("statement", @args, $cb->($rows, $rv, ...))
281 root 1.1
282     Executes the given SQL statement with placeholders replaced by
283 root 1.2 C<@args>. The statement will be prepared and cached on the server side, so
284     using placeholders is compulsory.
285 root 1.1
286     The callback will be called with the result of C<fetchall_arrayref> as
287 root 1.7 first argument (or C<undef> if the statement wasn't a select statement)
288     and the return value of C<execute> as second argument. Additional
289     arguments might get passed as well.
290 root 1.1
291 root 1.2 If an error occurs and the C<on_error> callback returns, then no arguments
292     will be passed and C<$@> contains the error message.
293    
294 root 1.1 =cut
295    
296     sub exec {
297     my $cb = pop;
298 root 1.2 splice @_, 1, 0, $cb, (caller)[1,2], 0, "req_exec";
299 root 1.1
300     goto &_req;
301     }
302    
303     =back
304    
305     =head1 SEE ALSO
306    
307     L<AnyEvent>, L<DBI>.
308    
309     =head1 AUTHOR
310    
311 root 1.4 Marc Lehmann <schmorp@schmorp.de>
312     http://home.schmorp.de/
313 root 1.1
314     =cut
315    
316     1
317