ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
Revision: 1.6
Committed: Thu Jun 12 11:56:59 2008 UTC (16 years ago) by root
Branch: MAIN
Changes since 1.5: +1 -1 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::DBI - asynchronous DBI access
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::DBI;
8    
9 root 1.5 my $cv = AnyEvent->condvar;
10    
11     my $dbh = new AnyEvent::DBI "DBI:SQLite:dbname=test.db", "", "";
12    
13     $dbh->exec ("select * from test where num=?", 10, sub {
14     my ($rows) = @_;
15    
16     print "@$_\n"
17     for @$rows;
18    
19     $cv->broadcast;
20     });
21    
22     # asynchronously do sth. else here
23    
24     $cv->wait;
25    
26 root 1.1 =head1 DESCRIPTION
27    
28     This module is an L<AnyEvent> user, you need to make sure that you use and
29     run a supported event loop.
30    
31     This module implements asynchronous DBI access my forking or executing
32     separate "DBI-Server" processes and sending them requests.
33    
34     It means that you can run DBI requests in parallel to other tasks.
35    
36 root 1.3 The overhead for very simple statements ("select 0") is somewhere
37 root 1.6 around 120% to 200% (dual/single core CPU) compared to an explicit
38 root 1.3 prepare_cached/execute/fetchrow_arrayref/finish combination.
39    
40 root 1.1 =cut
41    
42     package AnyEvent::DBI;
43    
44     use strict;
45     no warnings;
46    
47     use Carp;
48     use Socket ();
49     use Scalar::Util ();
50     use Storable ();
51    
52     use DBI ();
53    
54     use AnyEvent ();
55     use AnyEvent::Util ();
56    
57     our $VERSION = '1.0';
58    
59     # this is the forked server code
60    
61     our $DBH;
62    
63     sub req_open {
64     my (undef, $dbi, $user, $pass, %attr) = @{+shift};
65    
66     $DBH = DBI->connect ($dbi, $user, $pass, \%attr);
67    
68     [1]
69     }
70    
71 root 1.2 sub req_exec {
72     my (undef, $st, @args) = @{+shift};
73    
74     my $sth = $DBH->prepare_cached ($st, undef, 1);
75    
76     $sth->execute (@args)
77     or die $sth->errstr;
78    
79     [$sth->fetchall_arrayref]
80     }
81    
82 root 1.1 sub serve {
83     my ($fh) = @_;
84    
85     no strict;
86    
87     eval {
88     my $rbuf;
89    
90     while () {
91     sysread $fh, $rbuf, 16384, length $rbuf
92     or last;
93    
94     while () {
95     my $len = unpack "L", $rbuf;
96    
97     # full request available?
98     last unless $len && $len + 4 <= length $rbuf;
99    
100     my $req = Storable::thaw substr $rbuf, 4;
101     substr $rbuf, 0, $len + 4, ""; # remove length + request
102    
103     my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) };
104    
105     $wbuf = pack "L/a*", Storable::freeze [undef, "$@"]
106     if $@;
107    
108     for (my $ofs = 0; $ofs < length $wbuf; ) {
109     $ofs += (syswrite $fh, substr $wbuf, $ofs
110     or die "unable to write results");
111     }
112     }
113     }
114     };
115    
116     kill 9, $$; # no other way on the broken windows platform
117     }
118    
119     =head2 METHODS
120    
121     =over 4
122    
123     =item $dbh = new AnyEvent::DBI $database, $user, $pass, [key => value]...
124    
125     Returns a database handle for the given database. Each database handle
126     has an associated server process that executes statements in order. If
127     you want to run more than one statement in parallel, you need to create
128     additional database handles.
129    
130     The advantage of this approach is that transactions work as state is
131     preserved.
132    
133     Example:
134    
135     $dbh = new AnyEvent::DBI
136     "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "";
137    
138     Additional key-value pairs can be used to adjust behaviour:
139    
140     =over 4
141    
142     =item on_error => $callback->($dbh, $filename, $line, $fatal)
143    
144     When an error occurs, then this callback will be invoked. On entry, C<$@>
145     is set to the error message. C<$filename> and C<$line> is where the
146     original request was submitted.
147    
148     If this callback returns and this was a fatal error (C<$fatal> is true)
149     then AnyEvent::DBI die's, otherwise it calls the original request callback
150     without any arguments.
151    
152 root 1.2 If omitted, then C<die> will be called on any errors, fatal or not.
153 root 1.1
154     =back
155    
156     =cut
157    
158     # stupid Storable autoloading, total loss-loss situation
159     Storable::thaw Storable::freeze [];
160    
161     sub new {
162     my ($class, $dbi, $user, $pass, %arg) = @_;
163    
164     socketpair my $client, my $server, &Socket::AF_UNIX, &Socket::SOCK_STREAM, &Socket::PF_UNSPEC
165     or croak "unable to create dbi communicaiton pipe: $!";
166    
167     my $self = bless \%arg, $class;
168    
169     $self->{fh} = $client;
170    
171     Scalar::Util::weaken (my $wself = $self);
172    
173     AnyEvent::Util::fh_nonblocking $client, 1;
174    
175     my $rbuf;
176     my @caller = (caller)[1,2]; # the "default" caller
177    
178     $self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub {
179     my $len = sysread $client, $rbuf, 65536, length $rbuf;
180    
181     if ($len > 0) {
182    
183     while () {
184     my $len = unpack "L", $rbuf;
185    
186     # full request available?
187     last unless $len && $len + 4 <= length $rbuf;
188    
189     my $res = Storable::thaw substr $rbuf, 4;
190     substr $rbuf, 0, $len + 4, ""; # remove length + request
191    
192     my $req = shift @{ $wself->{queue} };
193    
194     if (defined $res->[0]) {
195     $req->[0](@$res);
196     } else {
197     my $cb = shift @$req;
198     $wself->_error ($res->[1], @$req);
199 root 1.2 $cb->();
200 root 1.1 }
201     }
202    
203     } elsif (defined $len) {
204     $wself->_error ("unexpected eof", @caller, 1);
205     } else {
206     $wself->_error ("read error: $!", @caller, 1);
207     }
208     });
209    
210 root 1.3 $self->{ww_cb} = sub {
211     my $len = syswrite $client, $wself->{wbuf}
212     or return delete $wself->{ww};
213    
214     substr $wself->{wbuf}, 0, $len, "";
215     };
216    
217 root 1.1 my $pid = fork;
218    
219     if ($pid) {
220     # parent
221     close $server;
222    
223     } elsif (defined $pid) {
224     # child
225     close $client;
226     @_ = $server;
227     goto &serve;
228    
229     } else {
230     croak "fork: $!";
231     }
232    
233     $self->_req (sub { }, (caller)[1,2], 1, req_open => $dbi, $user, $pass);
234    
235     $self
236     }
237    
238     sub _error {
239     my ($self, $error, $filename, $line, $fatal) = @_;
240    
241     delete $self->{rw};
242     delete $self->{ww};
243     delete $self->{fh};
244    
245     $@ = $error;
246    
247     $self->{on_error}($self, $filename, $line, $fatal)
248     if $self->{on_error};
249    
250 root 1.2 die "$error at $filename, line $line\n";
251 root 1.1 }
252    
253     sub _req {
254     my ($self, $cb, $filename, $line, $fatal) = splice @_, 0, 5, ();
255    
256     push @{ $self->{queue} }, [$cb, $filename, $line, $fatal];
257    
258     $self->{wbuf} .= pack "L/a*", Storable::freeze \@_;
259    
260     unless ($self->{ww}) {
261     my $len = syswrite $self->{fh}, $self->{wbuf};
262     substr $self->{wbuf}, 0, $len, "";
263    
264     # still any left? then install a write watcher
265     $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb})
266     if length $self->{wbuf};
267     }
268     }
269    
270     =item $dbh->exec ("statement", @args, $cb->($rows, %extra))
271    
272     Executes the given SQL statement with placeholders replaced by
273 root 1.2 C<@args>. The statement will be prepared and cached on the server side, so
274     using placeholders is compulsory.
275 root 1.1
276     The callback will be called with the result of C<fetchall_arrayref> as
277     first argument and possibly a hash reference with additional information.
278    
279 root 1.2 If an error occurs and the C<on_error> callback returns, then no arguments
280     will be passed and C<$@> contains the error message.
281    
282 root 1.1 =cut
283    
284     sub exec {
285     my $cb = pop;
286 root 1.2 splice @_, 1, 0, $cb, (caller)[1,2], 0, "req_exec";
287 root 1.1
288     goto &_req;
289     }
290    
291     =back
292    
293     =head1 SEE ALSO
294    
295     L<AnyEvent>, L<DBI>.
296    
297     =head1 AUTHOR
298    
299 root 1.4 Marc Lehmann <schmorp@schmorp.de>
300     http://home.schmorp.de/
301 root 1.1
302     =cut
303    
304     1
305