ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/AnyEvent-DBI/DBI.pm
Revision: 1.1
Committed: Fri Jun 6 15:35:46 2008 UTC (16 years ago) by root
Branch: MAIN
Log Message:
*** empty log message ***

File Contents

# Content
1 =head1 NAME
2
3 AnyEvent::DBI - asynchronous DBI access
4
5 =head1 SYNOPSIS
6
7 use AnyEvent::DBI;
8
9 =head1 DESCRIPTION
10
11 This module is an L<AnyEvent> user, you need to make sure that you use and
12 run a supported event loop.
13
14 This module implements asynchronous DBI access my forking or executing
15 separate "DBI-Server" processes and sending them requests.
16
17 It means that you can run DBI requests in parallel to other tasks.
18
19 =cut
20
21 package AnyEvent::DBI;
22
23 use strict;
24 no warnings;
25
26 use Carp;
27 use Socket ();
28 use Scalar::Util ();
29 use Storable ();
30
31 use DBI ();
32
33 use AnyEvent ();
34 use AnyEvent::Util ();
35
36 our $VERSION = '1.0';
37
38 # this is the forked server code
39
40 our $DBH;
41
42 sub req_open {
43 my (undef, $dbi, $user, $pass, %attr) = @{+shift};
44
45 $DBH = DBI->connect ($dbi, $user, $pass, \%attr);
46
47 [1]
48 }
49
50 sub serve {
51 my ($fh) = @_;
52
53 no strict;
54
55 eval {
56 my $rbuf;
57
58 while () {
59 sysread $fh, $rbuf, 16384, length $rbuf
60 or last;
61
62 while () {
63 my $len = unpack "L", $rbuf;
64
65 # full request available?
66 last unless $len && $len + 4 <= length $rbuf;
67
68 my $req = Storable::thaw substr $rbuf, 4;
69 substr $rbuf, 0, $len + 4, ""; # remove length + request
70
71 my $wbuf = eval { pack "L/a*", Storable::freeze $req->[0]($req) };
72
73 $wbuf = pack "L/a*", Storable::freeze [undef, "$@"]
74 if $@;
75
76 for (my $ofs = 0; $ofs < length $wbuf; ) {
77 $ofs += (syswrite $fh, substr $wbuf, $ofs
78 or die "unable to write results");
79 }
80 }
81 }
82 };
83
84 warn $@;#d#
85
86 kill 9, $$; # no other way on the broken windows platform
87 }
88
89 =head2 METHODS
90
91 =over 4
92
93 =item $dbh = new AnyEvent::DBI $database, $user, $pass, [key => value]...
94
95 Returns a database handle for the given database. Each database handle
96 has an associated server process that executes statements in order. If
97 you want to run more than one statement in parallel, you need to create
98 additional database handles.
99
100 The advantage of this approach is that transactions work as state is
101 preserved.
102
103 Example:
104
105 $dbh = new AnyEvent::DBI
106 "DBI:mysql:test;mysql_read_default_file=/root/.my.cnf", "", "";
107
108 Additional key-value pairs can be used to adjust behaviour:
109
110 =over 4
111
112 =item on_error => $callback->($dbh, $filename, $line, $fatal)
113
114 When an error occurs, then this callback will be invoked. On entry, C<$@>
115 is set to the error message. C<$filename> and C<$line> is where the
116 original request was submitted.
117
118 If this callback returns and this was a fatal error (C<$fatal> is true)
119 then AnyEvent::DBI die's, otherwise it calls the original request callback
120 without any arguments.
121
122 If omitted, then C<die> will be called on any fatal errors, others will be ignored.
123
124 =back
125
126 =cut
127
128 # stupid Storable autoloading, total loss-loss situation
129 Storable::thaw Storable::freeze [];
130
131 sub new {
132 my ($class, $dbi, $user, $pass, %arg) = @_;
133
134 socketpair my $client, my $server, &Socket::AF_UNIX, &Socket::SOCK_STREAM, &Socket::PF_UNSPEC
135 or croak "unable to create dbi communicaiton pipe: $!";
136
137 my $self = bless \%arg, $class;
138
139 $self->{fh} = $client;
140
141 Scalar::Util::weaken (my $wself = $self);
142
143 AnyEvent::Util::fh_nonblocking $client, 1;
144
145 my $rbuf;
146 my @caller = (caller)[1,2]; # the "default" caller
147
148 $self->{rw} = AnyEvent->io (fh => $client, poll => "r", cb => sub {
149 my $len = sysread $client, $rbuf, 65536, length $rbuf;
150
151 if ($len > 0) {
152
153 while () {
154 my $len = unpack "L", $rbuf;
155
156 # full request available?
157 last unless $len && $len + 4 <= length $rbuf;
158
159 my $res = Storable::thaw substr $rbuf, 4;
160 substr $rbuf, 0, $len + 4, ""; # remove length + request
161
162 my $req = shift @{ $wself->{queue} };
163
164 if (defined $res->[0]) {
165 $req->[0](@$res);
166 } else {
167 my $cb = shift @$req;
168 $wself->_error ($res->[1], @$req);
169 $cb->[0]();
170 }
171 }
172
173 } elsif (defined $len) {
174 $wself->_error ("unexpected eof", @caller, 1);
175 } else {
176 $wself->_error ("read error: $!", @caller, 1);
177 }
178 });
179
180 my $pid = fork;
181
182 if ($pid) {
183 # parent
184 close $server;
185
186 } elsif (defined $pid) {
187 # child
188 close $client;
189 @_ = $server;
190 goto &serve;
191
192 } else {
193 croak "fork: $!";
194 }
195
196 $self->_req (sub { }, (caller)[1,2], 1, req_open => $dbi, $user, $pass);
197
198 $self
199 }
200
201 sub _error {
202 my ($self, $error, $filename, $line, $fatal) = @_;
203
204 delete $self->{rw};
205 delete $self->{ww};
206 delete $self->{fh};
207
208 $@ = $error;
209
210 $self->{on_error}($self, $filename, $line, $fatal)
211 if $self->{on_error};
212
213 die "$error at $filename, line $line\n"
214 if $fatal;
215 }
216
217 sub _req {
218 warn "<req(@_>\n";#d#
219 my ($self, $cb, $filename, $line, $fatal) = splice @_, 0, 5, ();
220
221 push @{ $self->{queue} }, [$cb, $filename, $line, $fatal];
222
223 $self->{wbuf} .= pack "L/a*", Storable::freeze \@_;
224
225 unless ($self->{ww}) {
226 my $len = syswrite $self->{fh}, $self->{wbuf};
227 substr $self->{wbuf}, 0, $len, "";
228
229 #TODO, ww_cb
230 # still any left? then install a write watcher
231 $self->{ww} = AnyEvent->io (fh => $self->{fh}, poll => "w", cb => $self->{ww_cb})
232 if length $self->{wbuf};
233 }
234 }
235
236 =item $dbh->exec ("statement", @args, $cb->($rows, %extra))
237
238 Executes the given SQL statement with placeholders replaced by
239 C<@args>. The statement will be prepared and cached on the
240 server side, so using placeholders is compulsory.
241
242 The callback will be called with the result of C<fetchall_arrayref> as
243 first argument and possibly a hash reference with additional information.
244
245 =cut
246
247 sub exec {
248 my $cb = pop;
249 splice @_, 1, 0, $cb, (caller)[1,2], 0, "exec";
250
251 goto &_req;
252 }
253
254 =back
255
256 =head1 SEE ALSO
257
258 L<AnyEvent>, L<DBI>.
259
260 =head1 AUTHOR
261
262 Marc Lehmann <schmorp@schmorp.de>
263 http://home.schmorp.de/
264
265 =cut
266
267 1
268