--- deliantra/Deliantra-Client/DC/DB.pm 2007/04/16 20:35:29 1.5 +++ deliantra/Deliantra-Client/DC/DB.pm 2007/11/12 23:25:37 1.21 @@ -1,6 +1,6 @@ =head1 NAME -CFPlus::DB - async. database access for cfplus +CFPlus::DB - async. database and filesystem access for cfplus =head1 SYNOPSIS @@ -18,17 +18,28 @@ use utf8; use Carp (); -use AnyEvent (); use Storable (); +use Config; use CFPlus; +our $DB_HOME = "$Crossfire::VARDIR/cfplus-$BerkeleyDB::db_version-$Config{archname}"; + +sub path_of_res($) { + utf8::downgrade $_[0]; # bug in unpack "H*" + "$DB_HOME/res-data-" . unpack "H*", $_[0] +} + sub sync { # for debugging #CFPlus::DB::Server::req (sync => sub { }); CFPlus::DB::Server::sync (); } +sub exists($$$) { + CFPlus::DB::Server::req (exists => @_); +} + sub get($$$) { CFPlus::DB::Server::req (get => @_); } @@ -37,20 +48,48 @@ CFPlus::DB::Server::req (put => @_); } +sub unlink($$) { + CFPlus::DB::Server::req (unlink => @_); +} + +sub read_file($$) { + CFPlus::DB::Server::req (read_file => @_); +} + +sub write_file($$$) { + CFPlus::DB::Server::req (write_file => @_); +} + +sub prefetch_file($$$) { + CFPlus::DB::Server::req (prefetch_file => @_); +} + +sub logprint($$$) { + CFPlus::DB::Server::req (logprint => @_); +} + our $tilemap; sub get_tile_id_sync($) { - my ($hash) = @_; + my ($name) = @_; # fetch the full face table first unless ($tilemap) { - CFPlus::DB::Server::req (table => facemap => sub { $tilemap = $_[0] }); + CFPlus::DB::Server::req (table => facemap => sub { + $tilemap = $_[0]; + delete $tilemap->{id}; + my %maptile = reverse %$tilemap;#d# + if ((scalar keys %$tilemap) != (scalar keys %maptile)) {#d# + $tilemap = { };#d# + CFPlus::error "FATAL: facemap is not a 1:1 mapping, please report this and delete your $DB_HOME directory!\n";#d# + }#d# + }); sync; } - $tilemap->{$hash} ||= do { + $tilemap->{$name} ||= do { my $id; - CFPlus::DB::Server::req (get_tile_id => $hash, sub { $id = $_[0] }); + CFPlus::DB::Server::req (get_tile_id => $name, sub { $id = $_[0] }); sync; $id } @@ -63,7 +102,6 @@ use Fcntl; use BerkeleyDB; -our $DB_HOME = "$Crossfire::VARDIR/cfplus"; our $DB_ENV; our $DB_STATE; our %DB_TABLE; @@ -81,7 +119,7 @@ # -ErrPrefix => "DATABASE", -Verbose => 1, -Flags => DB_CREATE | DB_RECOVER | DB_INIT_MPOOL | DB_INIT_LOCK | DB_INIT_TXN | $recover, - -SetFlags => DB_AUTO_COMMIT | DB_LOG_AUTOREMOVE, + -SetFlags => DB_AUTO_COMMIT | DB_LOG_AUTOREMOVE | DB_TXN_WRITE_NOSYNC, or die "unable to create/open database home $DB_HOME: $BerkeleyDB::Error"; 1 @@ -99,13 +137,11 @@ # -Filename => "database", # -Subname => $table, -Property => DB_CHKSUM, - -Flags => DB_CREATE | DB_UPGRADE, + -Flags => DB_AUTO_COMMIT | DB_CREATE | DB_UPGRADE, or die "unable to create/open database table $_[0]: $BerkeleyDB::Error" } } -our $SYNC_INTERVAL = 60; - our %CB; our $FH; our $ID = "aaa0"; @@ -114,12 +150,17 @@ our $write_buf; our $read_buf; +our $SYNC = EV::timer_ns 0, 60, sub { + $_[0]->stop; + CFPlus::DB::Server::req (sync => sub { }); +}; + sub fh_write { my $len = syswrite $FH, $write_buf; substr $write_buf, 0, $len, ""; - undef $fh_w_watcher + $fh_w_watcher->stop unless length $write_buf; } @@ -165,12 +206,8 @@ $write_buf .= pack "N/a*", Storable::freeze [$id, $type, @args]; $CB{$id} = $cb; - $fh_w_watcher = AnyEvent->io (fh => $FH, poll => 'w', cb => \&fh_write); -} - -sub sync_tick { - req "sync", sub { }; - $sync_timer = AnyEvent->timer (after => $SYNC_INTERVAL, cb => \&sync_tick); + $fh_w_watcher->start; + $SYNC->again unless $SYNC->is_active; } sub do_sync { @@ -178,6 +215,16 @@ () } +sub do_exists { + my ($db, $key) = @_; + + utf8::downgrade $key; + my $data; + (table $db)->db_get ($key, $data) == 0 + ? length $data + : () +} + sub do_get { my ($db, $key) = @_; @@ -210,13 +257,13 @@ } sub do_get_tile_id { - my ($hash) = @_; + my ($name) = @_; my $id; my $table = table "facemap"; return $id - if $table->db_get ($hash, $id) == 0; + if $table->db_get ($name, $id) == 0; for (1..100) { my $txn = $DB_ENV->txn_begin; @@ -224,18 +271,80 @@ if ($status == 0 || $status == BerkeleyDB::DB_NOTFOUND) { $id = ($id || 64) + 1; if ($table->db_put (id => $id) == 0 - && $table->db_put ($hash => $id) == 0) { + && $table->db_put ($name => $id) == 0) { $txn->txn_commit; return $id; } } $txn->txn_abort; + select undef, undef, undef, 0.01 * rand; } die "maximum number of transaction retries reached - database problems?"; } +sub do_unlink { + unlink $_[0]; +} + +sub do_read_file { + my ($path) = @_; + + utf8::downgrade $path; + open my $fh, "<:raw", $path + or return; + sysread $fh, my $buf, -s $fh; + + $buf +} + +sub do_write_file { + my ($path, $data) = @_; + + utf8::downgrade $path; + utf8::downgrade $data; + open my $fh, ">:raw", $path + or return; + syswrite $fh, $data; + close $fh; + + 1 +} + +sub do_prefetch_file { + my ($path, $size) = @_; + + utf8::downgrade $path; + open my $fh, "<:raw", $path + or return; + sysread $fh, my $buf, $size; + + 1 +} + +our %LOG_FH; + +sub do_logprint { + my ($path, $line) = @_; + + $LOG_FH{$path} ||= do { + open my $fh, ">>:utf8", $path + or warn "Couldn't open logfile $path: $!"; + + $fh->autoflush (1); + + $fh + }; + + my ($sec, $min, $hour, $mday, $mon, $year) = localtime time; + + my $ts = sprintf "%04d-%02d-%02d %02d:%02d:%02d", + $year + 1900, $mon + 1, $mday, $hour, $min, $sec; + + print { $LOG_FH{$path} } "$ts $line\n" +} + sub run { ($FH, my $fh) = CFPlus::socketpipe; @@ -245,12 +354,14 @@ my $pid = fork; if (defined $pid && !$pid) { + local $SIG{QUIT}; local $SIG{__DIE__}; + local $SIG{__WARN__}; eval { close $FH; unless (eval { open_db }) { - File::Path::rmtree $DB_HOME; + eval { File::Path::rmtree $DB_HOME }; open_db; } @@ -268,7 +379,7 @@ or die "$type: unknown database request type\n"; my $res = pack "N/a*", Storable::freeze [$id, $cb->(@args)]; (syswrite $fh, $res) == length $res - or die; + or die "DB::write: $!"; } }; @@ -281,6 +392,7 @@ Storable::store_fd [die => $error], $fh; }; + $DB_ENV->txn_checkpoint (0, 0, 0); CFPlus::_exit 0; } @@ -289,9 +401,13 @@ $CB{die} = sub { die shift }; - $fh_r_watcher = AnyEvent->io (fh => $FH, poll => 'r', cb => \&fh_read); + $fh_r_watcher = EV::io $FH, EV::READ , \&fh_read; + $fh_w_watcher = EV::io $FH, EV::WRITE, \&fh_write; + $SYNC->again unless $SYNC->is_active; +} - sync_tick; +sub stop { + close $FH; } 1;