--- deliantra/server/lib/cf.pm 2007/02/05 18:29:20 1.205 +++ deliantra/server/lib/cf.pm 2007/02/13 19:25:44 1.212 @@ -17,12 +17,16 @@ use Coro::Semaphore; use Coro::AIO; +use BDB (); use Data::Dumper; use Digest::MD5; use Fcntl; use IO::AIO 2.32 (); use YAML::Syck (); use Time::HiRes; +use Compress::LZF; + +Compress::LZF::sfreeze_cr { }; # prime Compress::LZF so it does not use require later use Event; $Event::Eval = 1; # no idea why this is required, but it is @@ -52,6 +56,9 @@ our $NOW; our $USE_FSYNC = 1; # use fsync to write maps - default off +our $BDB_POLL_WATCHER; +our $DB_ENV; + our %CFG; our $UPTIME; $UPTIME ||= time; @@ -61,6 +68,7 @@ our %MAP; # all maps our $LINK_MAP; # the special {link} map, which is always available our $RANDOM_MAPS = cf::localdir . "/random"; +our $BDB_ENV_DIR = cf::localdir . "/db"; our $WAIT_FOR_TICK; $WAIT_FOR_TICK ||= new Coro::Signal; our $WAIT_FOR_TICK_ONE; $WAIT_FOR_TICK_ONE ||= new Coro::Signal; @@ -83,6 +91,7 @@ mkdir cf::localdir . "/" . cf::tmpdir; mkdir cf::localdir . "/" . cf::uniquedir; mkdir $RANDOM_MAPS; +mkdir $BDB_ENV_DIR; our $EMERGENCY_POSITION; @@ -1602,9 +1611,18 @@ warn "resetting map ", $self->path;#d# + $self->in_memory (cf::MAP_SWAPPED); + + # need to save uniques path + unless ($self->{deny_save}) { + my $uniq = $self->uniq_path; utf8::encode $uniq; + + $self->_save_objects ($uniq, cf::IO_UNIQUES) + if $uniq; + } + delete $cf::MAP{$self->path}; - $self->in_memory (cf::MAP_SWAPPED); $self->clear; $_->clear_links_to ($self) for values %cf::MAP; @@ -2178,99 +2196,63 @@ =over 4 -=item $hashref = cf::db_get $family - -Return a hashref for use by the extension C<$family>, which can be -modified. After modifications, you have to call C or -C. - =item $value = cf::db_get $family => $key -Returns a single value from the database - -=item cf::db_put $family => $hashref - -Stores the given family hashref into the database. Updates are delayed, if -you want the data to be synced to disk immediately, use C. +Returns a single value from the database. =item cf::db_put $family => $key => $value -Stores the given C<$value> in the family hash. Updates are delayed, if you -want the data to be synced to disk immediately, use C. - -=item cf::db_dirty - -Marks the database as dirty, to be updated at a later time. - -=item cf::db_sync - -Immediately write the database to disk I. +Stores the given C<$value> in the family. =cut our $DB; -{ - my $path = cf::localdir . "/database.pst"; +sub db_init { + unless ($DB) { + $DB = BDB::db_create $DB_ENV; + + cf::sync_job { + eval { + $DB->set_flags (BDB::CHKSUM); + + BDB::db_open $DB, undef, "db", undef, BDB::BTREE, + BDB::CREATE | BDB::AUTO_COMMIT, 0666; + cf::cleanup "db_open(db): $!" if $!; + }; + cf::cleanup "db_open(db): $@" if $@; + }; - sub db_load() { - $DB = stat $path ? Storable::retrieve $path : { }; - } + my $path = cf::localdir . "/database.pst"; + if (stat $path) { + cf::sync_job { + my $pst = Storable::retrieve $path; - my $pid; + cf::db_put (board => data => $pst->{board}); + cf::db_put (guildrules => data => $pst->{guildrules}); + cf::db_put (rent => balance => $pst->{rent}{balance}); + BDB::db_env_txn_checkpoint $DB_ENV; - sub db_save() { - waitpid $pid, 0 if $pid; - if (0 == ($pid = fork)) { - $DB->{_meta}{version} = 1; - Storable::nstore $DB, "$path~"; - rename "$path~", $path; - cf::_exit 0 if defined $pid; + unlink $path; + }; } } +} - my $dirty; - - sub db_sync() { - db_save if $dirty; - undef $dirty; - } +sub db_get($$) { + my $key = "$_[0]/$_[1]"; - my $idle = Event->idle ( - reentrant => 0, - min => 10, - max => 20, - repeat => 0, - data => WF_AUTOCANCEL, - cb => \&db_sync, - ); - - sub db_dirty() { - $dirty = 1; - $idle->start; - } + cf::sync_job { + BDB::db_get $DB, undef, $key, my $data; - sub db_get($;$) { - @_ >= 2 - ? $DB->{$_[0]}{$_[1]} - : ($DB->{$_[0]} ||= { }) - } - - sub db_put($$;$) { - if (@_ >= 3) { - $DB->{$_[0]}{$_[1]} = $_[2]; - } else { - $DB->{$_[0]} = $_[1]; - } - db_dirty; + $! ? () + : Compress::LZF::sthaw $data } +} - cf::global->attach ( - prio => 10000, - on_cleanup => sub { - db_sync; - }, - ); +sub db_put($$$) { + BDB::dbreq_pri 4; + BDB::db_put $DB, undef, "$_[0]/$_[1]", Compress::LZF::sfreeze_cr $_[2], 0, sub { }; } ############################################################################# @@ -2316,7 +2298,7 @@ }; cfg_load; - db_load; + db_init; load_extensions; $TICK_WATCHER->start; @@ -2365,11 +2347,22 @@ $map->save; } warn "end emergency map save\n"; + + warn "begin emergency database checkpoint\n"; + BDB::db_env_txn_checkpoint $DB_ENV; + warn "end emergency database checkpoint\n"; }; warn "leave emergency perl save\n"; } +sub post_cleanup { + my ($make_core) = @_; + + warn Carp::longmess "post_cleanup backtrace" + if $make_core; +} + sub reload() { # can/must only be called in main if ($Coro::current != $Coro::main) { @@ -2379,16 +2372,17 @@ warn "reloading..."; - warn "cancelling server ticker"; - $TICK_WATCHER->cancel; - - cf::emergency_save; + warn "entering sync_job"; - eval { - # if anything goes wrong in here, we should simply crash as we already saved + sync_job { + write_runtime; + cf::emergency_save; + write_runtime; warn "syncing database to disk"; - cf::db_sync; + BDB::db_env_txn_checkpoint $DB_ENV; + + # if anything goes wrong in here, we should simply crash as we already saved warn "cancelling all WF_AUTOCANCEL watchers"; for (Event::all_watchers) { @@ -2397,9 +2391,10 @@ warn "flushing outstanding aio requests"; for (;;) { + BDB::flush; IO::AIO::flush; Coro::cede; - last unless IO::AIO::nreqs; + last unless IO::AIO::nreqs || BDB::nreqs; warn "iterate..."; } @@ -2464,7 +2459,6 @@ warn "loading config and database again"; cf::cfg_load; - cf::db_load; warn "loading extensions"; cf::load_extensions; @@ -2477,16 +2471,14 @@ warn "loading reloadable resources"; load_resources; - warn "restarting server ticker"; + warn "leaving sync_job"; - $TICK_WATCHER->start; - }; - - if ($@) { + 1 + } or do { warn $@; warn "error while reloading, exiting."; exit 1; - } + }; warn "reloaded"; }; @@ -2540,9 +2532,6 @@ $WAIT_FOR_TICK->broadcast; $WAIT_FOR_TICK_ONE->send if $WAIT_FOR_TICK_ONE->awaited; - Event::sweep; - Coro::cede_notself; - # my $AFTER = Event::time; # warn $AFTER - $NOW;#d# @@ -2554,27 +2543,85 @@ }, ); -IO::AIO::max_poll_time $TICK * 0.1; +{ + BDB::max_poll_time $TICK * 0.1; + $BDB_POLL_WATCHER = Event->io ( + reentrant => 0, + fd => BDB::poll_fileno, + poll => 'r', + prio => 0, + data => WF_AUTOCANCEL, + cb => \&BDB::poll_cb, + ); + BDB::min_parallel 8; -undef $Coro::AIO::WATCHER; -$AIO_POLL_WATCHER = Event->io ( - reentrant => 0, - fd => IO::AIO::poll_fileno, - poll => 'r', - prio => 6, - data => WF_AUTOCANCEL, - cb => \&IO::AIO::poll_cb, -); + BDB::set_sync_prepare { + my $status; + my $current = $Coro::current; + ( + sub { + $status = $!; + $current->ready; undef $current; + }, + sub { + Coro::schedule while defined $current; + $! = $status; + }, + ) + }; + + unless ($DB_ENV) { + $DB_ENV = BDB::db_env_create; + + cf::sync_job { + eval { + BDB::db_env_open + $DB_ENV, + $BDB_ENV_DIR, + BDB::INIT_LOCK | BDB::INIT_LOG | BDB::INIT_MPOOL | BDB::INIT_TXN + | BDB::RECOVER | BDB::REGISTER | BDB::USE_ENVIRON | BDB::CREATE, + 0666; + + cf::cleanup "db_env_open($BDB_ENV_DIR): $!" if $!; + + $DB_ENV->set_flags (BDB::AUTO_COMMIT | BDB::REGION_INIT | BDB::TXN_NOSYNC, 1); + $DB_ENV->set_lk_detect; + }; + + cf::cleanup "db_env_open(db): $@" if $@; + }; + } +} + +{ + IO::AIO::min_parallel 8; + + undef $Coro::AIO::WATCHER; + IO::AIO::max_poll_time $TICK * 0.1; + $AIO_POLL_WATCHER = Event->io ( + reentrant => 0, + fd => IO::AIO::poll_fileno, + poll => 'r', + prio => 6, + data => WF_AUTOCANCEL, + cb => \&IO::AIO::poll_cb, + ); +} $WRITE_RUNTIME_WATCHER = Event->timer ( reentrant => 0, data => WF_AUTOCANCEL, + parked => 1, after => 1, interval => 10, prio => 6, # keep it lowest so it acts like a watchdog - cb => Coro::unblock_sub { - write_runtime - or warn "ERROR: unable to write runtime file: $!"; + cb => sub { + $TICK_WATCHER->is_active or cf::cleanup "mainloop frozen but runtime active", 1; + + Coro::async_pool { + write_runtime + or warn "ERROR: unable to write runtime file: $!"; + }; }, );