--- deliantra/server/lib/cf.pm 2007/01/29 18:15:23 1.201 +++ deliantra/server/lib/cf.pm 2007/02/12 01:32:49 1.210 @@ -17,12 +17,16 @@ use Coro::Semaphore; use Coro::AIO; +use BDB (); use Data::Dumper; use Digest::MD5; use Fcntl; use IO::AIO 2.32 (); use YAML::Syck (); use Time::HiRes; +use Compress::LZF; + +Compress::LZF::sfreeze_cr { }; # prime Compress::LZF so it does not use require later use Event; $Event::Eval = 1; # no idea why this is required, but it is @@ -50,6 +54,10 @@ our $WRITE_RUNTIME_WATCHER; our $NEXT_TICK; our $NOW; +our $USE_FSYNC = 1; # use fsync to write maps - default off + +our $BDB_POLL_WATCHER; +our $DB_ENV; our %CFG; @@ -60,6 +68,7 @@ our %MAP; # all maps our $LINK_MAP; # the special {link} map, which is always available our $RANDOM_MAPS = cf::localdir . "/random"; +our $BDB_ENV_DIR = cf::localdir . "/db"; our $WAIT_FOR_TICK; $WAIT_FOR_TICK ||= new Coro::Signal; our $WAIT_FOR_TICK_ONE; $WAIT_FOR_TICK_ONE ||= new Coro::Signal; @@ -82,6 +91,7 @@ mkdir cf::localdir . "/" . cf::tmpdir; mkdir cf::localdir . "/" . cf::uniquedir; mkdir $RANDOM_MAPS; +mkdir $BDB_ENV_DIR; our $EMERGENCY_POSITION; @@ -360,6 +370,7 @@ (aio_write $fh, 0, (length $value), $value, 0) <= 0 and return; + # always fsync - this file is important aio_fsync $fh and return; @@ -754,7 +765,7 @@ if (my $fh = aio_open "$filename~", O_WRONLY | O_CREAT, 0600) { chmod SAVE_MODE, $fh; aio_write $fh, 0, (length $$rdata), $$rdata, 0; - aio_fsync $fh; + aio_fsync $fh if $cf::USE_FSYNC; close $fh; if (@$objs) { @@ -762,7 +773,7 @@ chmod SAVE_MODE, $fh; my $data = Storable::nfreeze { version => 1, objs => $objs }; aio_write $fh, 0, (length $data), $data, 0; - aio_fsync $fh; + aio_fsync $fh if $cf::USE_FSYNC; close $fh; aio_rename "$filename.pst~", "$filename.pst"; } @@ -1600,9 +1611,18 @@ warn "resetting map ", $self->path;#d# + $self->in_memory (cf::MAP_SWAPPED); + + # need to save uniques path + unless ($self->{deny_save}) { + my $uniq = $self->uniq_path; utf8::encode $uniq; + + $self->_save_objects ($uniq, cf::IO_UNIQUES) + if $uniq; + } + delete $cf::MAP{$self->path}; - $self->in_memory (cf::MAP_SWAPPED); $self->clear; $_->clear_links_to ($self) for values %cf::MAP; @@ -2005,6 +2025,7 @@ or return; # be conservative, not sure how that can happen, but we saw a crash here (shift @$queue)->[1]->($msg); + return unless $ns->valid; # temporary(?) workaround for callback destroying socket push @{ $ns->{query_queue} }, @$queue; @@ -2175,103 +2196,72 @@ =over 4 -=item $hashref = cf::db_get $family - -Return a hashref for use by the extension C<$family>, which can be -modified. After modifications, you have to call C or -C. - =item $value = cf::db_get $family => $key -Returns a single value from the database - -=item cf::db_put $family => $hashref - -Stores the given family hashref into the database. Updates are delayed, if -you want the data to be synced to disk immediately, use C. +Returns a single value from the database. =item cf::db_put $family => $key => $value -Stores the given C<$value> in the family hash. Updates are delayed, if you -want the data to be synced to disk immediately, use C. - -=item cf::db_dirty - -Marks the database as dirty, to be updated at a later time. - -=item cf::db_sync - -Immediately write the database to disk I. +Stores the given C<$value> in the family. =cut our $DB; -{ - my $path = cf::localdir . "/database.pst"; +sub db_init { + unless ($DB) { + $DB = BDB::db_create $DB_ENV; + + cf::sync_job { + eval { + $DB->set_flags (BDB::CHKSUM); + + BDB::db_open $DB, undef, "db", undef, BDB::BTREE, + BDB::CREATE | BDB::AUTO_COMMIT, 0666; + cf::cleanup "db_open(db): $!" if $!; + }; + cf::cleanup "db_open(db): $@" if $@; + }; - sub db_load() { - $DB = stat $path ? Storable::retrieve $path : { }; - } + my $path = cf::localdir . "/database.pst"; + if (stat $path) { + cf::sync_job { + my $pst = Storable::retrieve $path; - my $pid; + cf::db_put (board => data => $pst->{board}); + cf::db_put (guildrules => data => $pst->{guildrules}); + cf::db_put (rent => balance => $pst->{rent}{balance}); + BDB::db_env_txn_checkpoint $DB_ENV; - sub db_save() { - waitpid $pid, 0 if $pid; - if (0 == ($pid = fork)) { - $DB->{_meta}{version} = 1; - Storable::nstore $DB, "$path~"; - rename "$path~", $path; - cf::_exit 0 if defined $pid; + unlink $path; + }; } } +} - my $dirty; - - sub db_sync() { - db_save if $dirty; - undef $dirty; - } - - my $idle = Event->idle ( - reentrant => 0, - min => 10, - max => 20, - repeat => 0, - data => WF_AUTOCANCEL, - cb => \&db_sync, - ); - - sub db_dirty() { - $dirty = 1; - $idle->start; - } +sub db_get($$) { + my $key = "$_[0]/$_[1]"; - sub db_get($;$) { - @_ >= 2 - ? $DB->{$_[0]}{$_[1]} - : ($DB->{$_[0]} ||= { }) - } + cf::sync_job { + BDB::db_get $DB, undef, $key, my $data; - sub db_put($$;$) { - if (@_ >= 3) { - $DB->{$_[0]}{$_[1]} = $_[2]; - } else { - $DB->{$_[0]} = $_[1]; - } - db_dirty; + $! ? () + : Compress::LZF::sthaw $data } +} - cf::global->attach ( - prio => 10000, - on_cleanup => sub { - db_sync; - }, - ); +sub db_put($$$) { + BDB::dbreq_pri 4; + BDB::db_put $DB, undef, "$_[0]/$_[1]", Compress::LZF::sfreeze_cr $_[2], 0, sub { }; } ############################################################################# -# the server's main() +# the server's init and main functions + +sub load_resources { + load_regions sprintf "%s/%s/regions", cf::datadir, cf::mapdir + or die "unable to load regions file\n";#d# +} sub cfg_load { open my $fh, "<:utf8", cf::confdir . "/config" @@ -2294,6 +2284,10 @@ } } +sub init { + load_resources; +} + sub main { # we must not ever block the main coroutine local $Coro::idle = sub { @@ -2304,7 +2298,7 @@ }; cfg_load; - db_load; + db_init; load_extensions; $TICK_WATCHER->start; @@ -2353,6 +2347,10 @@ $map->save; } warn "end emergency map save\n"; + + warn "begin emergency database checkpoint\n"; + BDB::db_env_txn_checkpoint $DB_ENV; + warn "end emergency database checkpoint\n"; }; warn "leave emergency perl save\n"; @@ -2375,19 +2373,20 @@ eval { # if anything goes wrong in here, we should simply crash as we already saved - warn "syncing database to disk"; - cf::db_sync; - warn "cancelling all WF_AUTOCANCEL watchers"; for (Event::all_watchers) { $_->cancel if $_->data & WF_AUTOCANCEL; } + warn "syncing database to disk"; + BDB::db_env_txn_checkpoint $DB_ENV, 0, 0, 0, sub { }; + warn "flushing outstanding aio requests"; for (;;) { + BDB::flush; IO::AIO::flush; Coro::cede; - last unless IO::AIO::nreqs; + last unless IO::AIO::nreqs || BDB::nreqs; warn "iterate..."; } @@ -2452,7 +2451,6 @@ warn "loading config and database again"; cf::cfg_load; - cf::db_load; warn "loading extensions"; cf::load_extensions; @@ -2462,6 +2460,9 @@ warn "reattaching attachments to maps"; reattach $_ for values %MAP; + warn "loading reloadable resources"; + load_resources; + warn "restarting server ticker"; $TICK_WATCHER->start; @@ -2539,17 +2540,70 @@ }, ); -IO::AIO::max_poll_time $TICK * 0.1; +{ + BDB::max_poll_time $TICK * 0.1; + $BDB_POLL_WATCHER = Event->io ( + reentrant => 0, + fd => BDB::poll_fileno, + poll => 'r', + prio => 0, + data => WF_AUTOCANCEL, + cb => \&BDB::poll_cb, + ); + BDB::min_parallel 8; -undef $Coro::AIO::WATCHER; -$AIO_POLL_WATCHER = Event->io ( - reentrant => 0, - fd => IO::AIO::poll_fileno, - poll => 'r', - prio => 6, - data => WF_AUTOCANCEL, - cb => \&IO::AIO::poll_cb, -); + BDB::set_sync_prepare { + my $status; + my $current = $Coro::current; + ( + sub { + $status = $!; + $current->ready; undef $current; + }, + sub { + Coro::schedule while defined $current; + $! = $status; + }, + ) + }; + + unless ($DB_ENV) { + $DB_ENV = BDB::db_env_create; + + cf::sync_job { + eval { + BDB::db_env_open + $DB_ENV, + $BDB_ENV_DIR, + BDB::INIT_LOCK | BDB::INIT_LOG | BDB::INIT_MPOOL | BDB::INIT_TXN + | BDB::RECOVER | BDB::REGISTER | BDB::USE_ENVIRON | BDB::CREATE, + 0666; + + cf::cleanup "db_env_open($BDB_ENV_DIR): $!" if $!; + + $DB_ENV->set_flags (BDB::AUTO_COMMIT | BDB::REGION_INIT | BDB::TXN_NOSYNC, 1); + $DB_ENV->set_lk_detect; + }; + + cf::cleanup "db_env_open(db): $@" if $@; + }; + } +} + +{ + IO::AIO::min_parallel 8; + + undef $Coro::AIO::WATCHER; + IO::AIO::max_poll_time $TICK * 0.1; + $AIO_POLL_WATCHER = Event->io ( + reentrant => 0, + fd => IO::AIO::poll_fileno, + poll => 'r', + prio => 6, + data => WF_AUTOCANCEL, + cb => \&IO::AIO::poll_cb, + ); +} $WRITE_RUNTIME_WATCHER = Event->timer ( reentrant => 0,