--- deliantra/server/lib/cf.pm 2007/01/28 00:20:02 1.197 +++ deliantra/server/lib/cf.pm 2007/02/12 01:32:49 1.210 @@ -17,12 +17,16 @@ use Coro::Semaphore; use Coro::AIO; +use BDB (); use Data::Dumper; use Digest::MD5; use Fcntl; use IO::AIO 2.32 (); use YAML::Syck (); use Time::HiRes; +use Compress::LZF; + +Compress::LZF::sfreeze_cr { }; # prime Compress::LZF so it does not use require later use Event; $Event::Eval = 1; # no idea why this is required, but it is @@ -50,6 +54,10 @@ our $WRITE_RUNTIME_WATCHER; our $NEXT_TICK; our $NOW; +our $USE_FSYNC = 1; # use fsync to write maps - default off + +our $BDB_POLL_WATCHER; +our $DB_ENV; our %CFG; @@ -60,6 +68,7 @@ our %MAP; # all maps our $LINK_MAP; # the special {link} map, which is always available our $RANDOM_MAPS = cf::localdir . "/random"; +our $BDB_ENV_DIR = cf::localdir . "/db"; our $WAIT_FOR_TICK; $WAIT_FOR_TICK ||= new Coro::Signal; our $WAIT_FOR_TICK_ONE; $WAIT_FOR_TICK_ONE ||= new Coro::Signal; @@ -82,9 +91,12 @@ mkdir cf::localdir . "/" . cf::tmpdir; mkdir cf::localdir . "/" . cf::uniquedir; mkdir $RANDOM_MAPS; +mkdir $BDB_ENV_DIR; our $EMERGENCY_POSITION; +sub cf::map::normalise; + ############################################################################# =head2 GLOBAL VARIABLES @@ -358,6 +370,7 @@ (aio_write $fh, 0, (length $value), $value, 0) <= 0 and return; + # always fsync - this file is important aio_fsync $fh and return; @@ -752,7 +765,7 @@ if (my $fh = aio_open "$filename~", O_WRONLY | O_CREAT, 0600) { chmod SAVE_MODE, $fh; aio_write $fh, 0, (length $$rdata), $$rdata, 0; - aio_fsync $fh; + aio_fsync $fh if $cf::USE_FSYNC; close $fh; if (@$objs) { @@ -760,7 +773,7 @@ chmod SAVE_MODE, $fh; my $data = Storable::nfreeze { version => 1, objs => $objs }; aio_write $fh, 0, (length $data), $data, 0; - aio_fsync $fh; + aio_fsync $fh if $cf::USE_FSYNC; close $fh; aio_rename "$filename.pst~", "$filename.pst"; } @@ -776,6 +789,12 @@ aio_unlink $filename; aio_unlink "$filename.pst"; } + + #d##TODO# nuke non .map-files if exist + if ($filename =~ s/\.map$//) { + aio_unlink $filename; + aio_unlink "$filename.pst"; + } } } @@ -792,6 +811,9 @@ my ($data, $av); + #d#TODO remove .map if file does not exist + aio_stat $filename and $filename =~ s/\.map$//; + (aio_load $filename, $data) >= 0 or return; @@ -1070,6 +1092,8 @@ sub maps($) { my ($pl) = @_; + $pl = ref $pl ? $pl->ob->name : $pl; + my $files = aio_readdir playerdir $pl or return; @@ -1080,8 +1104,7 @@ next if /\.(?:pl|pst)$/; next unless /^$PATH_SEP/o; - s/\.map$//; - push @paths, "~" . $pl->ob->name . "/" . $_; + push @paths, cf::map::normalise "~$pl/$_"; } \@paths @@ -1180,6 +1203,8 @@ $path = "$path"; # make sure its a string + $path =~ s/\.map$//; + # map plan: # # /! non-realised random map exit (special hack!) @@ -1250,7 +1275,7 @@ sub load_path { my ($self) = @_; - sprintf "%s/%s/%s", cf::datadir, cf::mapdir, $self->{path} + sprintf "%s/%s/%s.map", cf::datadir, cf::mapdir, $self->{path} } # the temporary/swap location @@ -1258,7 +1283,7 @@ my ($self) = @_; (my $path = $_[0]{path}) =~ s/\//$PATH_SEP/g; - sprintf "%s/%s/%s", cf::localdir, cf::tmpdir, $path + sprintf "%s/%s/%s.map", cf::localdir, cf::tmpdir, $path } # the unique path, undef == no special unique path @@ -1284,14 +1309,19 @@ utf8::encode (my $save = $self->save_path); IO::AIO::aioreq_pri 4; Coro::AIO::aio_unlink $save; IO::AIO::aioreq_pri 4; Coro::AIO::aio_unlink "$save.pst"; + + #d#TODO remove .map and also nuke + $save =~ s/\.map// or return;#d# + IO::AIO::aioreq_pri 4; Coro::AIO::aio_unlink $save;#d# + IO::AIO::aioreq_pri 4; Coro::AIO::aio_unlink "$save.pst";#d# } sub load_header_from($) { my ($self, $path) = @_; utf8::encode $path; - aio_open $path, O_RDONLY, 0 - or return; + #aio_open $path, O_RDONLY, 0 + # or return; $self->_load_header ($path) or return; @@ -1581,9 +1611,18 @@ warn "resetting map ", $self->path;#d# + $self->in_memory (cf::MAP_SWAPPED); + + # need to save uniques path + unless ($self->{deny_save}) { + my $uniq = $self->uniq_path; utf8::encode $uniq; + + $self->_save_objects ($uniq, cf::IO_UNIQUES) + if $uniq; + } + delete $cf::MAP{$self->path}; - $self->in_memory (cf::MAP_SWAPPED); $self->clear; $_->clear_links_to ($self) for values %cf::MAP; @@ -1630,8 +1669,7 @@ next if /\.pst$/; next unless /^$PATH_SEP/o; - s/\.map$//; - push @paths, $_; + push @paths, cf::map::normalise $_; } \@paths @@ -1875,7 +1913,7 @@ my $rmp = parse_random_map_params $exit->msg; if ($exit->map) { - $rmp->{region} = $exit->map->region_name; + $rmp->{region} = $exit->region->name; $rmp->{origin_map} = $exit->map->path; $rmp->{origin_x} = $exit->x; $rmp->{origin_y} = $exit->y; @@ -1987,6 +2025,7 @@ or return; # be conservative, not sure how that can happen, but we saw a crash here (shift @$queue)->[1]->($msg); + return unless $ns->valid; # temporary(?) workaround for callback destroying socket push @{ $ns->{query_queue} }, @$queue; @@ -2157,103 +2196,72 @@ =over 4 -=item $hashref = cf::db_get $family - -Return a hashref for use by the extension C<$family>, which can be -modified. After modifications, you have to call C or -C. - =item $value = cf::db_get $family => $key -Returns a single value from the database - -=item cf::db_put $family => $hashref - -Stores the given family hashref into the database. Updates are delayed, if -you want the data to be synced to disk immediately, use C. +Returns a single value from the database. =item cf::db_put $family => $key => $value -Stores the given C<$value> in the family hash. Updates are delayed, if you -want the data to be synced to disk immediately, use C. - -=item cf::db_dirty - -Marks the database as dirty, to be updated at a later time. - -=item cf::db_sync - -Immediately write the database to disk I. +Stores the given C<$value> in the family. =cut our $DB; -{ - my $path = cf::localdir . "/database.pst"; +sub db_init { + unless ($DB) { + $DB = BDB::db_create $DB_ENV; + + cf::sync_job { + eval { + $DB->set_flags (BDB::CHKSUM); + + BDB::db_open $DB, undef, "db", undef, BDB::BTREE, + BDB::CREATE | BDB::AUTO_COMMIT, 0666; + cf::cleanup "db_open(db): $!" if $!; + }; + cf::cleanup "db_open(db): $@" if $@; + }; - sub db_load() { - $DB = stat $path ? Storable::retrieve $path : { }; - } + my $path = cf::localdir . "/database.pst"; + if (stat $path) { + cf::sync_job { + my $pst = Storable::retrieve $path; - my $pid; + cf::db_put (board => data => $pst->{board}); + cf::db_put (guildrules => data => $pst->{guildrules}); + cf::db_put (rent => balance => $pst->{rent}{balance}); + BDB::db_env_txn_checkpoint $DB_ENV; - sub db_save() { - waitpid $pid, 0 if $pid; - if (0 == ($pid = fork)) { - $DB->{_meta}{version} = 1; - Storable::nstore $DB, "$path~"; - rename "$path~", $path; - cf::_exit 0 if defined $pid; + unlink $path; + }; } } +} - my $dirty; - - sub db_sync() { - db_save if $dirty; - undef $dirty; - } - - my $idle = Event->idle ( - reentrant => 0, - min => 10, - max => 20, - repeat => 0, - data => WF_AUTOCANCEL, - cb => \&db_sync, - ); +sub db_get($$) { + my $key = "$_[0]/$_[1]"; - sub db_dirty() { - $dirty = 1; - $idle->start; - } + cf::sync_job { + BDB::db_get $DB, undef, $key, my $data; - sub db_get($;$) { - @_ >= 2 - ? $DB->{$_[0]}{$_[1]} - : ($DB->{$_[0]} ||= { }) - } - - sub db_put($$;$) { - if (@_ >= 3) { - $DB->{$_[0]}{$_[1]} = $_[2]; - } else { - $DB->{$_[0]} = $_[1]; - } - db_dirty; + $! ? () + : Compress::LZF::sthaw $data } +} - cf::global->attach ( - prio => 10000, - on_cleanup => sub { - db_sync; - }, - ); +sub db_put($$$) { + BDB::dbreq_pri 4; + BDB::db_put $DB, undef, "$_[0]/$_[1]", Compress::LZF::sfreeze_cr $_[2], 0, sub { }; } ############################################################################# -# the server's main() +# the server's init and main functions + +sub load_resources { + load_regions sprintf "%s/%s/regions", cf::datadir, cf::mapdir + or die "unable to load regions file\n";#d# +} sub cfg_load { open my $fh, "<:utf8", cf::confdir . "/config" @@ -2276,6 +2284,10 @@ } } +sub init { + load_resources; +} + sub main { # we must not ever block the main coroutine local $Coro::idle = sub { @@ -2286,7 +2298,7 @@ }; cfg_load; - db_load; + db_init; load_extensions; $TICK_WATCHER->start; @@ -2335,6 +2347,10 @@ $map->save; } warn "end emergency map save\n"; + + warn "begin emergency database checkpoint\n"; + BDB::db_env_txn_checkpoint $DB_ENV; + warn "end emergency database checkpoint\n"; }; warn "leave emergency perl save\n"; @@ -2357,19 +2373,20 @@ eval { # if anything goes wrong in here, we should simply crash as we already saved - warn "syncing database to disk"; - cf::db_sync; - warn "cancelling all WF_AUTOCANCEL watchers"; for (Event::all_watchers) { $_->cancel if $_->data & WF_AUTOCANCEL; } + warn "syncing database to disk"; + BDB::db_env_txn_checkpoint $DB_ENV, 0, 0, 0, sub { }; + warn "flushing outstanding aio requests"; for (;;) { + BDB::flush; IO::AIO::flush; Coro::cede; - last unless IO::AIO::nreqs; + last unless IO::AIO::nreqs || BDB::nreqs; warn "iterate..."; } @@ -2434,7 +2451,6 @@ warn "loading config and database again"; cf::cfg_load; - cf::db_load; warn "loading extensions"; cf::load_extensions; @@ -2444,6 +2460,9 @@ warn "reattaching attachments to maps"; reattach $_ for values %MAP; + warn "loading reloadable resources"; + load_resources; + warn "restarting server ticker"; $TICK_WATCHER->start; @@ -2521,17 +2540,70 @@ }, ); -IO::AIO::max_poll_time $TICK * 0.1; +{ + BDB::max_poll_time $TICK * 0.1; + $BDB_POLL_WATCHER = Event->io ( + reentrant => 0, + fd => BDB::poll_fileno, + poll => 'r', + prio => 0, + data => WF_AUTOCANCEL, + cb => \&BDB::poll_cb, + ); + BDB::min_parallel 8; -undef $Coro::AIO::WATCHER; -$AIO_POLL_WATCHER = Event->io ( - reentrant => 0, - fd => IO::AIO::poll_fileno, - poll => 'r', - prio => 6, - data => WF_AUTOCANCEL, - cb => \&IO::AIO::poll_cb, -); + BDB::set_sync_prepare { + my $status; + my $current = $Coro::current; + ( + sub { + $status = $!; + $current->ready; undef $current; + }, + sub { + Coro::schedule while defined $current; + $! = $status; + }, + ) + }; + + unless ($DB_ENV) { + $DB_ENV = BDB::db_env_create; + + cf::sync_job { + eval { + BDB::db_env_open + $DB_ENV, + $BDB_ENV_DIR, + BDB::INIT_LOCK | BDB::INIT_LOG | BDB::INIT_MPOOL | BDB::INIT_TXN + | BDB::RECOVER | BDB::REGISTER | BDB::USE_ENVIRON | BDB::CREATE, + 0666; + + cf::cleanup "db_env_open($BDB_ENV_DIR): $!" if $!; + + $DB_ENV->set_flags (BDB::AUTO_COMMIT | BDB::REGION_INIT | BDB::TXN_NOSYNC, 1); + $DB_ENV->set_lk_detect; + }; + + cf::cleanup "db_env_open(db): $@" if $@; + }; + } +} + +{ + IO::AIO::min_parallel 8; + + undef $Coro::AIO::WATCHER; + IO::AIO::max_poll_time $TICK * 0.1; + $AIO_POLL_WATCHER = Event->io ( + reentrant => 0, + fd => IO::AIO::poll_fileno, + poll => 'r', + prio => 6, + data => WF_AUTOCANCEL, + cb => \&IO::AIO::poll_cb, + ); +} $WRITE_RUNTIME_WATCHER = Event->timer ( reentrant => 0,