ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
Revision: 1.4
Committed: Fri Jun 6 20:06:34 2008 UTC (16 years ago) by root
Branch: MAIN
CVS Tags: rel-1_0
Changes since 1.3: +2 -2 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::DBI - asynchronous DBI access
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::DBI;
8    
9     =head1 DESCRIPTION
10    
11     This module is an L<AnyEvent> user, you need to make sure that you use and
12     run a supported event loop.
13    
14     This module implements asynchronous DBI access my forking or executing
15     separate "DBI-Server" processes and sending them requests.
16    
17     It means that you can run DBI requests in parallel to other tasks.
18    
19 root 1.3 The overhead for very simple statements ("select 0") is somewhere
20     around 120% to 200% (single/dual core CPU) compared to an explicit
21     prepare_cached/execute/fetchrow_arrayref/finish combination.
22    
23 root 1.1 =cut
24    
25     package AnyEvent::DBI;
26    
27     use strict;
28     no warnings;
29    
30     use Carp;
31     use Socket ();
32     use Scalar::Util ();
33     use Storable ();
34    
35     use DBI ();
36    
37     use AnyEvent ();
38     use AnyEvent::Util ();
39    
40     our $VERSION = '1.0';
41    
42     # this is the forked server code
43    
44     our $DBH;
45    
46     sub req_open {
47     my (undef, $dbi, $user, $pass, %attr) = @{+shift};
48    
49     $DBH = DBI->connect ($dbi, $user, $pass, \%attr);
50    
51     [1]
52     }
53    
54 root 1.2 sub req_exec {
55     my (undef, $st, @args) = @{+shift};
56    
57     my $sth = $DBH->prepare_cached ($st, undef, 1);
58    
59     $sth->execute (@args)
60     or die $sth->errstr;
61    
62     [$sth->fetchall_arrayref]
63     }
64    
65 root 1.1 sub serve {
66     my ($fh) = @_;
67    
68     no strict;
69    
70     eval {
71     my $rbuf;
72    
73     while () {
74     sysread $fh, $rbuf, 16384, length $rbuf
75     or last;
76    
77     while () {
78     my $len = unpack "L", $rbuf;
79    
80     # full request available?
81     last unless $len && $len + 4 <= length $rbuf;
82    
83     my $req = Storable::thaw substr $rbuf, 4;
84     substr $rbuf, 0, $len + 4, ""; # remove length + request
85    
86     my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) };
87    
88     $wbuf = pack "L/a*", Storable::freeze [undef, "$@"]
89     if $@;
90    
91     for (my $ofs = 0; $ofs < length $wbuf; ) {
92     $ofs += (syswrite $fh, substr $wbuf, $ofs
93     or die "unable to write results");
94     }
95     }
96     }
97     };
98    
99     kill 9, $$; # no other way on the broken windows platform
100     }
101    
102     =head2 METHODS
103    
104     =over 4
105    
106     =item $dbh = new AnyEvent::DBI $database, $user, $pass, [key => value]...
107    
108     Returns a database handle for the given database. Each database handle
109     has an associated server process that executes statements in order. If
110     you want to run more than one statement in parallel, you need to create
111     additional database handles.
112    
113     The advantage of this approach is that transactions work as state is
114     preserved.
115    
116     Example:
117    
118     $dbh = new AnyEvent::DBI
119     "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "";
120    
121     Additional key-value pairs can be used to adjust behaviour:
122    
123     =over 4
124    
125     =item on_error => $callback->($dbh, $filename, $line, $fatal)
126    
127     When an error occurs, then this callback will be invoked. On entry, C<$@>
128     is set to the error message. C<$filename> and C<$line> is where the
129     original request was submitted.
130    
131     If this callback returns and this was a fatal error (C<$fatal> is true)
132     then AnyEvent::DBI die's, otherwise it calls the original request callback
133     without any arguments.
134    
135 root 1.2 If omitted, then C<die> will be called on any errors, fatal or not.
136 root 1.1
137     =back
138    
139     =cut
140    
141     # stupid Storable autoloading, total loss-loss situation
142     Storable::thaw Storable::freeze [];
143    
144     sub new {
145     my ($class, $dbi, $user, $pass, %arg) = @_;
146    
147     socketpair my $client, my $server, &Socket::AF_UNIX, &Socket::SOCK_STREAM, &Socket::PF_UNSPEC
148     or croak "unable to create dbi communicaiton pipe: $!";
149    
150     my $self = bless \%arg, $class;
151    
152     $self->{fh} = $client;
153    
154     Scalar::Util::weaken (my $wself = $self);
155    
156     AnyEvent::Util::fh_nonblocking $client, 1;
157    
158     my $rbuf;
159     my @caller = (caller)[1,2]; # the "default" caller
160    
161     $self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub {
162     my $len = sysread $client, $rbuf, 65536, length $rbuf;
163    
164     if ($len > 0) {
165    
166     while () {
167     my $len = unpack "L", $rbuf;
168    
169     # full request available?
170     last unless $len && $len + 4 <= length $rbuf;
171    
172     my $res = Storable::thaw substr $rbuf, 4;
173     substr $rbuf, 0, $len + 4, ""; # remove length + request
174    
175     my $req = shift @{ $wself->{queue} };
176    
177     if (defined $res->[0]) {
178     $req->[0](@$res);
179     } else {
180     my $cb = shift @$req;
181     $wself->_error ($res->[1], @$req);
182 root 1.2 $cb->();
183 root 1.1 }
184     }
185    
186     } elsif (defined $len) {
187     $wself->_error ("unexpected eof", @caller, 1);
188     } else {
189     $wself->_error ("read error: $!", @caller, 1);
190     }
191     });
192    
193 root 1.3 $self->{ww_cb} = sub {
194     my $len = syswrite $client, $wself->{wbuf}
195     or return delete $wself->{ww};
196    
197     substr $wself->{wbuf}, 0, $len, "";
198     };
199    
200 root 1.1 my $pid = fork;
201    
202     if ($pid) {
203     # parent
204     close $server;
205    
206     } elsif (defined $pid) {
207     # child
208     close $client;
209     @_ = $server;
210     goto &serve;
211    
212     } else {
213     croak "fork: $!";
214     }
215    
216     $self->_req (sub { }, (caller)[1,2], 1, req_open => $dbi, $user, $pass);
217    
218     $self
219     }
220    
221     sub _error {
222     my ($self, $error, $filename, $line, $fatal) = @_;
223    
224     delete $self->{rw};
225     delete $self->{ww};
226     delete $self->{fh};
227    
228     $@ = $error;
229    
230     $self->{on_error}($self, $filename, $line, $fatal)
231     if $self->{on_error};
232    
233 root 1.2 die "$error at $filename, line $line\n";
234 root 1.1 }
235    
236     sub _req {
237     my ($self, $cb, $filename, $line, $fatal) = splice @_, 0, 5, ();
238    
239     push @{ $self->{queue} }, [$cb, $filename, $line, $fatal];
240    
241     $self->{wbuf} .= pack "L/a*", Storable::freeze \@_;
242    
243     unless ($self->{ww}) {
244     my $len = syswrite $self->{fh}, $self->{wbuf};
245     substr $self->{wbuf}, 0, $len, "";
246    
247     # still any left? then install a write watcher
248     $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb})
249     if length $self->{wbuf};
250     }
251     }
252    
253     =item $dbh->exec ("statement", @args, $cb->($rows, %extra))
254    
255     Executes the given SQL statement with placeholders replaced by
256 root 1.2 C<@args>. The statement will be prepared and cached on the server side, so
257     using placeholders is compulsory.
258 root 1.1
259     The callback will be called with the result of C<fetchall_arrayref> as
260     first argument and possibly a hash reference with additional information.
261    
262 root 1.2 If an error occurs and the C<on_error> callback returns, then no arguments
263     will be passed and C<$@> contains the error message.
264    
265 root 1.1 =cut
266    
267     sub exec {
268     my $cb = pop;
269 root 1.2 splice @_, 1, 0, $cb, (caller)[1,2], 0, "req_exec";
270 root 1.1
271     goto &_req;
272     }
273    
274     =back
275    
276     =head1 SEE ALSO
277    
278     L<AnyEvent>, L<DBI>.
279    
280     =head1 AUTHOR
281    
282 root 1.4 Marc Lehmann <schmorp@schmorp.de>
283     http://home.schmorp.de/
284 root 1.1
285     =cut
286    
287     1
288