ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
Revision: 1.2
Committed: Fri Jun 6 15:44:34 2008 UTC (16 years ago) by root
Branch: MAIN
Changes since 1.1: +20 -10 lines
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::DBI - asynchronous DBI access
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::DBI;
8    
9     =head1 DESCRIPTION
10    
11     This module is an L<AnyEvent> user, you need to make sure that you use and
12     run a supported event loop.
13    
14     This module implements asynchronous DBI access my forking or executing
15     separate "DBI-Server" processes and sending them requests.
16    
17     It means that you can run DBI requests in parallel to other tasks.
18    
19     =cut
20    
21     package AnyEvent::DBI;
22    
23     use strict;
24     no warnings;
25    
26     use Carp;
27     use Socket ();
28     use Scalar::Util ();
29     use Storable ();
30    
31     use DBI ();
32    
33     use AnyEvent ();
34     use AnyEvent::Util ();
35    
36     our $VERSION = '1.0';
37    
38     # this is the forked server code
39    
40     our $DBH;
41    
42     sub req_open {
43     my (undef, $dbi, $user, $pass, %attr) = @{+shift};
44    
45     $DBH = DBI->connect ($dbi, $user, $pass, \%attr);
46    
47     [1]
48     }
49    
50 root 1.2 sub req_exec {
51     my (undef, $st, @args) = @{+shift};
52    
53     my $sth = $DBH->prepare_cached ($st, undef, 1);
54    
55     $sth->execute (@args)
56     or die $sth->errstr;
57    
58     [$sth->fetchall_arrayref]
59     }
60    
61 root 1.1 sub serve {
62     my ($fh) = @_;
63    
64     no strict;
65    
66     eval {
67     my $rbuf;
68    
69     while () {
70     sysread $fh, $rbuf, 16384, length $rbuf
71     or last;
72    
73     while () {
74     my $len = unpack "L", $rbuf;
75    
76     # full request available?
77     last unless $len && $len + 4 <= length $rbuf;
78    
79     my $req = Storable::thaw substr $rbuf, 4;
80     substr $rbuf, 0, $len + 4, ""; # remove length + request
81    
82     my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) };
83    
84     $wbuf = pack "L/a*", Storable::freeze [undef, "$@"]
85     if $@;
86    
87     for (my $ofs = 0; $ofs < length $wbuf; ) {
88     $ofs += (syswrite $fh, substr $wbuf, $ofs
89     or die "unable to write results");
90     }
91     }
92     }
93     };
94    
95     kill 9, $$; # no other way on the broken windows platform
96     }
97    
98     =head2 METHODS
99    
100     =over 4
101    
102     =item $dbh = new AnyEvent::DBI $database, $user, $pass, [key => value]...
103    
104     Returns a database handle for the given database. Each database handle
105     has an associated server process that executes statements in order. If
106     you want to run more than one statement in parallel, you need to create
107     additional database handles.
108    
109     The advantage of this approach is that transactions work as state is
110     preserved.
111    
112     Example:
113    
114     $dbh = new AnyEvent::DBI
115     "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "";
116    
117     Additional key-value pairs can be used to adjust behaviour:
118    
119     =over 4
120    
121     =item on_error => $callback->($dbh, $filename, $line, $fatal)
122    
123     When an error occurs, then this callback will be invoked. On entry, C<$@>
124     is set to the error message. C<$filename> and C<$line> is where the
125     original request was submitted.
126    
127     If this callback returns and this was a fatal error (C<$fatal> is true)
128     then AnyEvent::DBI die's, otherwise it calls the original request callback
129     without any arguments.
130    
131 root 1.2 If omitted, then C<die> will be called on any errors, fatal or not.
132 root 1.1
133     =back
134    
135     =cut
136    
137     # stupid Storable autoloading, total loss-loss situation
138     Storable::thaw Storable::freeze [];
139    
140     sub new {
141     my ($class, $dbi, $user, $pass, %arg) = @_;
142    
143     socketpair my $client, my $server, &Socket::AF_UNIX, &Socket::SOCK_STREAM, &Socket::PF_UNSPEC
144     or croak "unable to create dbi communicaiton pipe: $!";
145    
146     my $self = bless \%arg, $class;
147    
148     $self->{fh} = $client;
149    
150     Scalar::Util::weaken (my $wself = $self);
151    
152     AnyEvent::Util::fh_nonblocking $client, 1;
153    
154     my $rbuf;
155     my @caller = (caller)[1,2]; # the "default" caller
156    
157     $self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub {
158     my $len = sysread $client, $rbuf, 65536, length $rbuf;
159    
160     if ($len > 0) {
161    
162     while () {
163     my $len = unpack "L", $rbuf;
164    
165     # full request available?
166     last unless $len && $len + 4 <= length $rbuf;
167    
168     my $res = Storable::thaw substr $rbuf, 4;
169     substr $rbuf, 0, $len + 4, ""; # remove length + request
170    
171     my $req = shift @{ $wself->{queue} };
172    
173     if (defined $res->[0]) {
174     $req->[0](@$res);
175     } else {
176     my $cb = shift @$req;
177     $wself->_error ($res->[1], @$req);
178 root 1.2 $cb->();
179 root 1.1 }
180     }
181    
182     } elsif (defined $len) {
183     $wself->_error ("unexpected eof", @caller, 1);
184     } else {
185     $wself->_error ("read error: $!", @caller, 1);
186     }
187     });
188    
189     my $pid = fork;
190    
191     if ($pid) {
192     # parent
193     close $server;
194    
195     } elsif (defined $pid) {
196     # child
197     close $client;
198     @_ = $server;
199     goto &serve;
200    
201     } else {
202     croak "fork: $!";
203     }
204    
205     $self->_req (sub { }, (caller)[1,2], 1, req_open => $dbi, $user, $pass);
206    
207     $self
208     }
209    
210     sub _error {
211     my ($self, $error, $filename, $line, $fatal) = @_;
212    
213     delete $self->{rw};
214     delete $self->{ww};
215     delete $self->{fh};
216    
217     $@ = $error;
218    
219     $self->{on_error}($self, $filename, $line, $fatal)
220     if $self->{on_error};
221    
222 root 1.2 die "$error at $filename, line $line\n";
223 root 1.1 }
224    
225     sub _req {
226     my ($self, $cb, $filename, $line, $fatal) = splice @_, 0, 5, ();
227    
228     push @{ $self->{queue} }, [$cb, $filename, $line, $fatal];
229    
230     $self->{wbuf} .= pack "L/a*", Storable::freeze \@_;
231    
232     unless ($self->{ww}) {
233     my $len = syswrite $self->{fh}, $self->{wbuf};
234     substr $self->{wbuf}, 0, $len, "";
235    
236     #TODO, ww_cb
237     # still any left? then install a write watcher
238     $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb})
239     if length $self->{wbuf};
240     }
241     }
242    
243     =item $dbh->exec ("statement", @args, $cb->($rows, %extra))
244    
245     Executes the given SQL statement with placeholders replaced by
246 root 1.2 C<@args>. The statement will be prepared and cached on the server side, so
247     using placeholders is compulsory.
248 root 1.1
249     The callback will be called with the result of C<fetchall_arrayref> as
250     first argument and possibly a hash reference with additional information.
251    
252 root 1.2 If an error occurs and the C<on_error> callback returns, then no arguments
253     will be passed and C<$@> contains the error message.
254    
255 root 1.1 =cut
256    
257     sub exec {
258     my $cb = pop;
259 root 1.2 splice @_, 1, 0, $cb, (caller)[1,2], 0, "req_exec";
260 root 1.1
261     goto &_req;
262     }
263    
264     =back
265    
266     =head1 SEE ALSO
267    
268     L<AnyEvent>, L<DBI>.
269    
270     =head1 AUTHOR
271    
272     Marc Lehmann <schmorp@schmorp.de>
273     http://home.schmorp.de/
274    
275     =cut
276    
277     1
278