ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
Revision: 1.1
Committed: Fri Jun 6 15:35:46 2008 UTC (15 years, 11 months ago) by root
Branch: MAIN
Log Message:
*** empty log message ***

File Contents

# User Rev Content
1 root 1.1 =head1 NAME
2    
3     AnyEvent::DBI - asynchronous DBI access
4    
5     =head1 SYNOPSIS
6    
7     use AnyEvent::DBI;
8    
9     =head1 DESCRIPTION
10    
11     This module is an L<AnyEvent> user, you need to make sure that you use and
12     run a supported event loop.
13    
14     This module implements asynchronous DBI access my forking or executing
15     separate "DBI-Server" processes and sending them requests.
16    
17     It means that you can run DBI requests in parallel to other tasks.
18    
19     =cut
20    
21     package AnyEvent::DBI;
22    
23     use strict;
24     no warnings;
25    
26     use Carp;
27     use Socket ();
28     use Scalar::Util ();
29     use Storable ();
30    
31     use DBI ();
32    
33     use AnyEvent ();
34     use AnyEvent::Util ();
35    
36     our $VERSION = '1.0';
37    
38     # this is the forked server code
39    
40     our $DBH;
41    
42     sub req_open {
43     my (undef, $dbi, $user, $pass, %attr) = @{+shift};
44    
45     $DBH = DBI->connect ($dbi, $user, $pass, \%attr);
46    
47     [1]
48     }
49    
50     sub serve {
51     my ($fh) = @_;
52    
53     no strict;
54    
55     eval {
56     my $rbuf;
57    
58     while () {
59     sysread $fh, $rbuf, 16384, length $rbuf
60     or last;
61    
62     while () {
63     my $len = unpack "L", $rbuf;
64    
65     # full request available?
66     last unless $len && $len + 4 <= length $rbuf;
67    
68     my $req = Storable::thaw substr $rbuf, 4;
69     substr $rbuf, 0, $len + 4, ""; # remove length + request
70    
71     my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) };
72    
73     $wbuf = pack "L/a*", Storable::freeze [undef, "$@"]
74     if $@;
75    
76     for (my $ofs = 0; $ofs < length $wbuf; ) {
77     $ofs += (syswrite $fh, substr $wbuf, $ofs
78     or die "unable to write results");
79     }
80     }
81     }
82     };
83    
84     warn $@;#d#
85    
86     kill 9, $$; # no other way on the broken windows platform
87     }
88    
89     =head2 METHODS
90    
91     =over 4
92    
93     =item $dbh = new AnyEvent::DBI $database, $user, $pass, [key => value]...
94    
95     Returns a database handle for the given database. Each database handle
96     has an associated server process that executes statements in order. If
97     you want to run more than one statement in parallel, you need to create
98     additional database handles.
99    
100     The advantage of this approach is that transactions work as state is
101     preserved.
102    
103     Example:
104    
105     $dbh = new AnyEvent::DBI
106     "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "";
107    
108     Additional key-value pairs can be used to adjust behaviour:
109    
110     =over 4
111    
112     =item on_error => $callback->($dbh, $filename, $line, $fatal)
113    
114     When an error occurs, then this callback will be invoked. On entry, C<$@>
115     is set to the error message. C<$filename> and C<$line> is where the
116     original request was submitted.
117    
118     If this callback returns and this was a fatal error (C<$fatal> is true)
119     then AnyEvent::DBI die's, otherwise it calls the original request callback
120     without any arguments.
121    
122     If omitted, then C<die> will be called on any fatal errors, others will be ignored.
123    
124     =back
125    
126     =cut
127    
128     # stupid Storable autoloading, total loss-loss situation
129     Storable::thaw Storable::freeze [];
130    
131     sub new {
132     my ($class, $dbi, $user, $pass, %arg) = @_;
133    
134     socketpair my $client, my $server, &Socket::AF_UNIX, &Socket::SOCK_STREAM, &Socket::PF_UNSPEC
135     or croak "unable to create dbi communicaiton pipe: $!";
136    
137     my $self = bless \%arg, $class;
138    
139     $self->{fh} = $client;
140    
141     Scalar::Util::weaken (my $wself = $self);
142    
143     AnyEvent::Util::fh_nonblocking $client, 1;
144    
145     my $rbuf;
146     my @caller = (caller)[1,2]; # the "default" caller
147    
148     $self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub {
149     my $len = sysread $client, $rbuf, 65536, length $rbuf;
150    
151     if ($len > 0) {
152    
153     while () {
154     my $len = unpack "L", $rbuf;
155    
156     # full request available?
157     last unless $len && $len + 4 <= length $rbuf;
158    
159     my $res = Storable::thaw substr $rbuf, 4;
160     substr $rbuf, 0, $len + 4, ""; # remove length + request
161    
162     my $req = shift @{ $wself->{queue} };
163    
164     if (defined $res->[0]) {
165     $req->[0](@$res);
166     } else {
167     my $cb = shift @$req;
168     $wself->_error ($res->[1], @$req);
169     $cb->[0]();
170     }
171     }
172    
173     } elsif (defined $len) {
174     $wself->_error ("unexpected eof", @caller, 1);
175     } else {
176     $wself->_error ("read error: $!", @caller, 1);
177     }
178     });
179    
180     my $pid = fork;
181    
182     if ($pid) {
183     # parent
184     close $server;
185    
186     } elsif (defined $pid) {
187     # child
188     close $client;
189     @_ = $server;
190     goto &serve;
191    
192     } else {
193     croak "fork: $!";
194     }
195    
196     $self->_req (sub { }, (caller)[1,2], 1, req_open => $dbi, $user, $pass);
197    
198     $self
199     }
200    
201     sub _error {
202     my ($self, $error, $filename, $line, $fatal) = @_;
203    
204     delete $self->{rw};
205     delete $self->{ww};
206     delete $self->{fh};
207    
208     $@ = $error;
209    
210     $self->{on_error}($self, $filename, $line, $fatal)
211     if $self->{on_error};
212    
213     die "$error at $filename, line $line\n"
214     if $fatal;
215     }
216    
217     sub _req {
218     warn "<req(@_>\n";#d#
219     my ($self, $cb, $filename, $line, $fatal) = splice @_, 0, 5, ();
220    
221     push @{ $self->{queue} }, [$cb, $filename, $line, $fatal];
222    
223     $self->{wbuf} .= pack "L/a*", Storable::freeze \@_;
224    
225     unless ($self->{ww}) {
226     my $len = syswrite $self->{fh}, $self->{wbuf};
227     substr $self->{wbuf}, 0, $len, "";
228    
229     #TODO, ww_cb
230     # still any left? then install a write watcher
231     $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb})
232     if length $self->{wbuf};
233     }
234     }
235    
236     =item $dbh->exec ("statement", @args, $cb->($rows, %extra))
237    
238     Executes the given SQL statement with placeholders replaced by
239     C<@args>. The statement will be prepared and cached on the
240     server side, so using placeholders is compulsory.
241    
242     The callback will be called with the result of C<fetchall_arrayref> as
243     first argument and possibly a hash reference with additional information.
244    
245     =cut
246    
247     sub exec {
248     my $cb = pop;
249     splice @_, 1, 0, $cb, (caller)[1,2], 0, "exec";
250    
251     goto &_req;
252     }
253    
254     =back
255    
256     =head1 SEE ALSO
257    
258     L<AnyEvent>, L<DBI>.
259    
260     =head1 AUTHOR
261    
262     Marc Lehmann <schmorp@schmorp.de>
263     http://home.schmorp.de/
264    
265     =cut
266    
267     1
268