ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/deliantra/Deliantra-Client/DC/DB.pm
(Generate patch)

Comparing deliantra/Deliantra-Client/DC/DB.pm (file contents):
Revision 1.26 by root, Wed Dec 26 18:09:30 2007 UTC vs.
Revision 1.33 by root, Tue Mar 25 21:11:51 2008 UTC

1=head1 NAME 1=head1 NAME
2 2
3CFPlus::DB - async. database and filesystem access for cfplus 3DC::DB - async. database and filesystem access for deliantra
4 4
5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use CFPlus::DB; 7 use DC::DB;
8 8
9=head1 DESCRIPTION 9=head1 DESCRIPTION
10 10
11=over 4 11=over 4
12 12
13=cut 13=cut
14 14
15package CFPlus::DB; 15package DC::DB;
16 16
17use strict; 17use strict;
18use utf8; 18use utf8;
19 19
20use Carp (); 20use Carp ();
21use Storable (); 21use Storable ();
22use Config; 22use Config;
23use BDB; 23use BDB;
24 24
25use CFPlus; 25use DC;
26 26
27our $DB_HOME = "$Deliantra::VARDIR/cfplus-" . BDB::VERSION . "-$Config{archname}"; 27our $ODBDIR = "cfplus-" . BDB::VERSION . "-$Config{archname}";
28our $DBDIR = "client-" . BDB::VERSION . "-$Config{archname}";
29our $DB_HOME = "$Deliantra::VARDIR/$DBDIR";
30
31if (!-e $DB_HOME and -e "$Deliantra::VARDIR/$ODBDIR") {
32 rename "$Deliantra::VARDIR/$ODBDIR", $DB_HOME;
33 print STDERR "INFO: moved old database from $Deliantra::VARDIR/$ODBDIR to $DB_HOME\n";
34}
35
36if (!-e $DB_HOME and -e "$Deliantra::OLDDIR/$ODBDIR") {
37 rename "$Deliantra::OLDDIR/$DBDIR", $DB_HOME;
38 print STDERR "INFO: moved old database from $Deliantra::OLDDIR/$ODBDIR to $DB_HOME\n";
39}
40
41BDB::max_poll_time 0.03;
42BDB::max_parallel 1;
28 43
29our $DB_ENV; 44our $DB_ENV;
30our $DB_STATE; 45our $DB_STATE;
31our %DB_TABLE; 46our %DB_TABLE;
32 47
33sub open_db { 48sub try_open_db {
34 mkdir $DB_HOME, 0777; 49 mkdir $DB_HOME, 0777;
35 50
36 $DB_ENV = db_env_create; 51 $DB_ENV = db_env_create;
37 52
38 $DB_ENV->set_errfile (\*STDERR); 53 $DB_ENV->set_errfile (\*STDERR);
69 } 84 }
70} 85}
71 86
72############################################################################# 87#############################################################################
73 88
74unless (eval { open_db }) { 89our $WATCHER;
75 warn "$@";#d# 90our $SYNC;
76 eval { File::Path::rmtree $DB_HOME };
77 open_db;
78}
79
80our $WATCHER = EV::io BDB::poll_fileno, EV::READ, \&BDB::poll_cb;
81
82our $SYNC = EV::timer_ns 0, 60, sub {
83 $_[0]->stop;
84 db_env_txn_checkpoint $DB_ENV, 0, 0, 0, sub { };
85};
86
87our $tilemap; 91our $tilemap;
88 92
89sub exists($$$) { 93sub exists($$$) {
90 my ($db, $key, $cb) = @_; 94 my ($db, $key, $cb) = @_;
91 95
160 die "maximum number of transaction retries reached - database problems?"; 164 die "maximum number of transaction retries reached - database problems?";
161} 165}
162 166
163sub get_tile_id_sync($) { 167sub get_tile_id_sync($) {
164 my ($name) = @_; 168 my ($name) = @_;
169
170 $tilemap->{$name} ||= do {
171 my $id;
172 do_get_tile_id $name, sub {
173 $id = $_[0];
174 };
175 BDB::flush;
176 $id
177 }
178}
179
180#############################################################################
181
182sub path_of_res($) {
183 utf8::downgrade $_[0]; # bug in unpack "H*"
184 "$DB_HOME/res-data-" . unpack "H*", $_[0]
185}
186
187sub sync {
188 # for debugging
189 #DC::DB::Server::req (sync => sub { });
190 DC::DB::Server::sync ();
191}
192
193sub unlink($$) {
194 DC::DB::Server::req (unlink => @_);
195}
196
197sub read_file($$) {
198 DC::DB::Server::req (read_file => @_);
199}
200
201sub write_file($$$) {
202 DC::DB::Server::req (write_file => @_);
203}
204
205sub prefetch_file($$$) {
206 DC::DB::Server::req (prefetch_file => @_);
207}
208
209sub logprint($$$) {
210 DC::DB::Server::req (logprint => @_);
211}
212
213#############################################################################
214
215package DC::DB::Server;
216
217use strict;
218
219use EV ();
220use Fcntl;
221
222our %CB;
223our $FH;
224our $ID = "aaa0";
225our ($fh_r_watcher, $fh_w_watcher);
226our $sync_timer;
227our $write_buf;
228our $read_buf;
229
230sub fh_write {
231 my $len = syswrite $FH, $write_buf;
232
233 substr $write_buf, 0, $len, "";
234
235 $fh_w_watcher->stop
236 unless length $write_buf;
237}
238
239sub fh_read {
240 my $status = sysread $FH, $read_buf, 16384, length $read_buf;
241
242 die "FATAL: database process died\n"
243 if $status == 0 && defined $status;
244
245 while () {
246 return if 4 > length $read_buf;
247 my $len = unpack "N", $read_buf;
248
249 return if $len + 4 > length $read_buf;
250
251 substr $read_buf, 0, 4, "";
252 my $res = Storable::thaw substr $read_buf, 0, $len, "";
253
254 my ($id, @args) = @$res;
255 (delete $CB{$id})->(@args);
256 }
257}
258
259sub sync {
260 # biggest mess evarr
261 my $fds; (vec $fds, fileno $FH, 1) = 1;
262
263 while (1 < scalar keys %CB) {
264 my $r = $fds;
265 my $w = length $write_buf ? $fds : undef;
266 select $r, $w, undef, undef;
267
268 fh_write if vec $w, fileno $FH, 1;
269 fh_read if vec $r, fileno $FH, 1;
270 }
271}
272
273sub req {
274 my ($type, @args) = @_;
275 my $cb = pop @args;
276
277 my $id = ++$ID;
278 $write_buf .= pack "N/a*", Storable::freeze [$id, $type, @args];
279 $CB{$id} = $cb;
280
281 $fh_w_watcher->start;
282}
283
284sub do_unlink {
285 unlink $_[0];
286}
287
288sub do_read_file {
289 my ($path) = @_;
290
291 utf8::downgrade $path;
292 open my $fh, "<:raw", $path
293 or return;
294 sysread $fh, my $buf, -s $fh;
295
296 $buf
297}
298
299sub do_write_file {
300 my ($path, $data) = @_;
301
302 utf8::downgrade $path;
303 utf8::downgrade $data;
304 open my $fh, ">:raw", $path
305 or return;
306 syswrite $fh, $data;
307 close $fh;
308
309 1
310}
311
312sub do_prefetch_file {
313 my ($path, $size) = @_;
314
315 utf8::downgrade $path;
316 open my $fh, "<:raw", $path
317 or return;
318 sysread $fh, my $buf, $size;
319
320 1
321}
322
323our %LOG_FH;
324
325sub do_logprint {
326 my ($path, $line) = @_;
327
328 $LOG_FH{$path} ||= do {
329 open my $fh, ">>:utf8", $path
330 or warn "Couldn't open logfile $path: $!";
331
332 $fh->autoflush (1);
333
334 $fh
335 };
336
337 my ($sec, $min, $hour, $mday, $mon, $year) = localtime time;
338
339 my $ts = sprintf "%04d-%02d-%02d %02d:%02d:%02d",
340 $year + 1900, $mon + 1, $mday, $hour, $min, $sec;
341
342 print { $LOG_FH{$path} } "$ts $line\n"
343}
344
345sub run {
346 ($FH, my $fh) = DC::socketpipe;
347
348 my $oldfh = select $FH; $| = 1; select $oldfh;
349 my $oldfh = select $fh; $| = 1; select $oldfh;
350
351 my $pid = fork;
352
353 if (defined $pid && !$pid) {
354 local $SIG{QUIT} = "IGNORE";
355 local $SIG{__DIE__};
356 local $SIG{__WARN__};
357 eval {
358 close $FH;
359
360 while () {
361 4 == read $fh, my $len, 4
362 or last;
363 $len = unpack "N", $len;
364 $len == read $fh, my $req, $len
365 or die "unexpected eof while reading request";
366
367 $req = Storable::thaw $req;
368
369 my ($id, $type, @args) = @$req;
370 my $cb = DC::DB::Server->can ("do_$type")
371 or die "$type: unknown database request type\n";
372 my $res = pack "N/a*", Storable::freeze [$id, $cb->(@args)];
373 (syswrite $fh, $res) == length $res
374 or die "DB::write: $!";
375 }
376 };
377
378 my $error = $@;
379
380 eval {
381 Storable::store_fd [die => $error], $fh;
382 };
383
384 warn $error
385 if $error;
386
387 DC::_exit 0;
388 }
389
390 close $fh;
391 DC::fh_nonblocking $FH, 1;
392
393 $CB{die} = sub { die shift };
394
395 $fh_r_watcher = EV::io $FH, EV::READ , \&fh_read;
396 $fh_w_watcher = EV::io $FH, EV::WRITE, \&fh_write;
397}
398
399sub stop {
400 close $FH;
401}
402
403package DC::DB;
404
405sub open_db {
406 unless (eval { try_open_db }) {
407 warn "$@";#d#
408 eval { File::Path::rmtree $DB_HOME };
409 try_open_db;
410 }
165 411
166 # fetch the full face table first 412 # fetch the full face table first
167 unless ($tilemap) { 413 unless ($tilemap) {
168 do_table facemap => sub { 414 do_table facemap => sub {
169 $tilemap = $_[0]; 415 $tilemap = $_[0];
170 delete $tilemap->{id}; 416 delete $tilemap->{id};
171 my %maptile = reverse %$tilemap;#d# 417 my %maptile = reverse %$tilemap;#d#
172 if ((scalar keys %$tilemap) != (scalar keys %maptile)) {#d# 418 if ((scalar keys %$tilemap) != (scalar keys %maptile)) {#d#
173 $tilemap = { };#d# 419 $tilemap = { };#d#
174 CFPlus::error "FATAL: facemap is not a 1:1 mapping, please report this and delete your $DB_HOME directory!\n";#d# 420 DC::error "FATAL: facemap is not a 1:1 mapping, please report this and delete your $DB_HOME directory!\n";#d#
175 }#d# 421 }#d#
176 }; 422 };
177 BDB::flush;
178 }
179
180 $tilemap->{$name} ||= do {
181 my $id;
182 do_get_tile_id $name, sub {
183 $id = $_[0];
184 };
185 BDB::flush;
186 $id
187 }
188}
189
190#############################################################################
191
192sub path_of_res($) {
193 utf8::downgrade $_[0]; # bug in unpack "H*"
194 "$DB_HOME/res-data-" . unpack "H*", $_[0]
195}
196
197sub sync {
198 # for debugging
199 #CFPlus::DB::Server::req (sync => sub { });
200 CFPlus::DB::Server::sync ();
201}
202
203sub unlink($$) {
204 CFPlus::DB::Server::req (unlink => @_);
205}
206
207sub read_file($$) {
208 CFPlus::DB::Server::req (read_file => @_);
209}
210
211sub write_file($$$) {
212 CFPlus::DB::Server::req (write_file => @_);
213}
214
215sub prefetch_file($$$) {
216 CFPlus::DB::Server::req (prefetch_file => @_);
217}
218
219sub logprint($$$) {
220 CFPlus::DB::Server::req (logprint => @_);
221}
222
223package CFPlus::DB::Server;
224
225use strict;
226
227use EV ();
228use Fcntl;
229
230our %CB;
231our $FH;
232our $ID = "aaa0";
233our ($fh_r_watcher, $fh_w_watcher);
234our $sync_timer;
235our $write_buf;
236our $read_buf;
237
238sub fh_write {
239 my $len = syswrite $FH, $write_buf;
240
241 substr $write_buf, 0, $len, "";
242
243 $fh_w_watcher->stop
244 unless length $write_buf;
245}
246
247sub fh_read {
248 my $status = sysread $FH, $read_buf, 16384, length $read_buf;
249
250 die "FATAL: database process died\n"
251 if $status == 0 && defined $status;
252
253 while () {
254 return if 4 > length $read_buf;
255 my $len = unpack "N", $read_buf;
256
257 return if $len + 4 > length $read_buf;
258
259 substr $read_buf, 0, 4, "";
260 my $res = Storable::thaw substr $read_buf, 0, $len, "";
261
262 my ($id, @args) = @$res;
263 (delete $CB{$id})->(@args);
264 }
265}
266
267sub sync {
268 # biggest mess evarr
269 my $fds; (vec $fds, fileno $FH, 1) = 1;
270
271 while (1 < scalar keys %CB) {
272 my $r = $fds;
273 my $w = length $write_buf ? $fds : undef;
274 select $r, $w, undef, undef;
275
276 fh_write if vec $w, fileno $FH, 1;
277 fh_read if vec $r, fileno $FH, 1;
278 }
279}
280
281sub req {
282 my ($type, @args) = @_;
283 my $cb = pop @args;
284
285 my $id = ++$ID;
286 $write_buf .= pack "N/a*", Storable::freeze [$id, $type, @args];
287 $CB{$id} = $cb;
288
289 $fh_w_watcher->start;
290}
291
292sub do_unlink {
293 unlink $_[0];
294}
295
296sub do_read_file {
297 my ($path) = @_;
298
299 utf8::downgrade $path;
300 open my $fh, "<:raw", $path
301 or return;
302 sysread $fh, my $buf, -s $fh;
303
304 $buf
305}
306
307sub do_write_file {
308 my ($path, $data) = @_;
309
310 utf8::downgrade $path;
311 utf8::downgrade $data;
312 open my $fh, ">:raw", $path
313 or return;
314 syswrite $fh, $data;
315 close $fh;
316
317 1 423 }
318}
319 424
320sub do_prefetch_file { 425 $WATCHER = EV::io BDB::poll_fileno, EV::READ, \&BDB::poll_cb;
321 my ($path, $size) = @_; 426 $SYNC = EV::timer_ns 0, 60, sub {
322 427 $_[0]->stop;
323 utf8::downgrade $path; 428 db_env_txn_checkpoint $DB_ENV, 0, 0, 0, sub { };
324 open my $fh, "<:raw", $path
325 or return;
326 sysread $fh, my $buf, $size;
327
328 1
329}
330
331our %LOG_FH;
332
333sub do_logprint {
334 my ($path, $line) = @_;
335
336 $LOG_FH{$path} ||= do {
337 open my $fh, ">>:utf8", $path
338 or warn "Couldn't open logfile $path: $!";
339
340 $fh->autoflush (1);
341
342 $fh
343 }; 429 };
344
345 my ($sec, $min, $hour, $mday, $mon, $year) = localtime time;
346
347 my $ts = sprintf "%04d-%02d-%02d %02d:%02d:%02d",
348 $year + 1900, $mon + 1, $mday, $hour, $min, $sec;
349
350 print { $LOG_FH{$path} } "$ts $line\n"
351} 430}
352 431
353sub run { 432END {
354 ($FH, my $fh) = CFPlus::socketpipe; 433 %DB_TABLE = ();
355 434 undef $DB_ENV;
356 my $oldfh = select $FH; $| = 1; select $oldfh;
357 my $oldfh = select $fh; $| = 1; select $oldfh;
358
359 my $pid = fork;
360
361 if (defined $pid && !$pid) {
362 local $SIG{QUIT};
363 local $SIG{__DIE__};
364 local $SIG{__WARN__};
365 eval {
366 close $FH;
367
368 while () {
369 4 == read $fh, my $len, 4
370 or last;
371 $len = unpack "N", $len;
372 $len == read $fh, my $req, $len
373 or die "unexpected eof while reading request";
374
375 $req = Storable::thaw $req;
376
377 my ($id, $type, @args) = @$req;
378 my $cb = CFPlus::DB::Server->can ("do_$type")
379 or die "$type: unknown database request type\n";
380 my $res = pack "N/a*", Storable::freeze [$id, $cb->(@args)];
381 (syswrite $fh, $res) == length $res
382 or die "DB::write: $!";
383 }
384 };
385
386 my $error = $@;
387
388 eval {
389 Storable::store_fd [die => $error], $fh;
390 };
391
392 warn $error
393 if $error;
394
395 CFPlus::_exit 0;
396 }
397
398 close $fh;
399 CFPlus::fh_nonblocking $FH, 1;
400
401 $CB{die} = sub { die shift };
402
403 $fh_r_watcher = EV::io $FH, EV::READ , \&fh_read;
404 $fh_w_watcher = EV::io $FH, EV::WRITE, \&fh_write;
405}
406
407sub stop {
408 close $FH;
409} 435}
410 436
4111; 4371;
412 438
413=back 439=back

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines