--- deliantra/server/lib/cf.pm 2007/06/30 03:00:54 1.289
+++ deliantra/server/lib/cf.pm 2009/01/08 19:23:44 1.468
@@ -1,50 +1,79 @@
+#
+# This file is part of Deliantra, the Roguelike Realtime MMORPG.
+#
+# Copyright (©) 2006,2007,2008 Marc Alexander Lehmann / Robin Redeker / the Deliantra team
+#
+# Deliantra is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#
+# The authors can be reached via e-mail to
+#
+
package cf;
+use 5.10.0;
use utf8;
-use strict;
+use strict qw(vars subs);
use Symbol;
use List::Util;
use Socket;
-use Storable;
-use Event;
+use EV;
use Opcode;
use Safe;
use Safe::Hole;
+use Storable ();
-use Coro 3.61 ();
+use Guard ();
+use Coro ();
use Coro::State;
use Coro::Handle;
-use Coro::Event;
+use Coro::EV;
+use Coro::AnyEvent;
use Coro::Timer;
use Coro::Signal;
use Coro::Semaphore;
+use Coro::SemaphoreSet;
+use Coro::AnyEvent;
use Coro::AIO;
+use Coro::BDB 1.6;
use Coro::Storable;
+use Coro::Util ();
-use JSON::XS 1.4 ();
+use JSON::XS 2.01 ();
use BDB ();
use Data::Dumper;
use Digest::MD5;
use Fcntl;
-use YAML::Syck ();
-use IO::AIO 2.32 ();
+use YAML ();
+use IO::AIO ();
use Time::HiRes;
use Compress::LZF;
+use Digest::MD5 ();
+
+AnyEvent::detect;
# configure various modules to our taste
#
$Storable::canonical = 1; # reduce rsync transfers
Coro::State::cctx_stacksize 256000; # 1-2MB stack, for deep recursions in maze generator
-Compress::LZF::sfreeze_cr { }; # prime Compress::LZF so it does not use require later
-
-$Event::Eval = 1; # no idea why this is required, but it is
-
-# work around bug in YAML::Syck - bad news for perl6, will it be as broken wrt. unicode?
-$YAML::Syck::ImplicitUnicode = 1;
$Coro::main->prio (Coro::PRIO_MAX); # run main coroutine ("the server") with very high priority
+# make sure c-lzf reinitialises itself
+Compress::LZF::set_serializer "Storable", "Storable::net_mstore", "Storable::mretrieve";
+Compress::LZF::sfreeze_cr { }; # prime Compress::LZF so it does not use require later
+
sub WF_AUTOCANCEL () { 1 } # automatically cancel this watcher on reload
our %COMMAND = ();
@@ -56,39 +85,45 @@
our %EXT_CORO = (); # coroutines bound to extensions
our %EXT_MAP = (); # pluggable maps
-our $RELOAD; # number of reloads so far
+our $RELOAD; # number of reloads so far, non-zero while in reload
our @EVENT;
-our $CONFDIR = confdir;
-our $DATADIR = datadir;
-our $LIBDIR = "$DATADIR/ext";
-our $PODDIR = "$DATADIR/pod";
-our $MAPDIR = "$DATADIR/" . mapdir;
-our $LOCALDIR = localdir;
-our $TMPDIR = "$LOCALDIR/" . tmpdir;
-our $UNIQUEDIR = "$LOCALDIR/" . uniquedir;
-our $PLAYERDIR = "$LOCALDIR/" . playerdir;
-our $RANDOMDIR = "$LOCALDIR/random";
-our $BDBDIR = "$LOCALDIR/db";
+our $CONFDIR = confdir;
+our $DATADIR = datadir;
+our $LIBDIR = "$DATADIR/ext";
+our $PODDIR = "$DATADIR/pod";
+our $MAPDIR = "$DATADIR/" . mapdir;
+our $LOCALDIR = localdir;
+our $TMPDIR = "$LOCALDIR/" . tmpdir;
+our $UNIQUEDIR = "$LOCALDIR/" . uniquedir;
+our $PLAYERDIR = "$LOCALDIR/" . playerdir;
+our $RANDOMDIR = "$LOCALDIR/random";
+our $BDBDIR = "$LOCALDIR/db";
+our $PIDFILE = "$LOCALDIR/pid";
+our $RUNTIMEFILE = "$LOCALDIR/runtime";
+
+our %RESOURCE;
our $TICK = MAX_TIME * 1e-6; # this is a CONSTANT(!)
-our $TICK_WATCHER;
-our $AIO_POLL_WATCHER;
our $NEXT_RUNTIME_WRITE; # when should the runtime file be written
our $NEXT_TICK;
-our $NOW;
our $USE_FSYNC = 1; # use fsync to write maps - default off
-our $BDB_POLL_WATCHER;
+our $BDB_DEADLOCK_WATCHER;
+our $BDB_CHECKPOINT_WATCHER;
+our $BDB_TRICKLE_WATCHER;
our $DB_ENV;
+our @EXTRA_MODULES = qw(pod mapscript);
+
our %CFG;
our $UPTIME; $UPTIME ||= time;
our $RUNTIME;
+our $NOW;
-our %PLAYER; # all users
-our %MAP; # all maps
+our (%PLAYER, %PLAYER_LOADING); # all users
+our (%MAP, %MAP_LOADING ); # all maps
our $LINK_MAP; # the special {link} map, which is always available
# used to convert map paths into valid unix filenames by replacing / by ∕
@@ -96,18 +131,25 @@
our $LOAD; # a number between 0 (idle) and 1 (too many objects)
our $LOADAVG; # same thing, but with alpha-smoothing
-our $tick_start; # for load detecting purposes
+our $JITTER; # average jitter
+our $TICK_START; # for load detecting purposes
+
+our @POST_INIT;
+
+our $REATTACH_ON_RELOAD; # ste to true to force object reattach on reload (slow)
binmode STDOUT;
binmode STDERR;
# read virtual server time, if available
-unless ($RUNTIME || !-e "$LOCALDIR/runtime") {
- open my $fh, "<", "$LOCALDIR/runtime"
- or die "unable to read runtime file: $!";
+unless ($RUNTIME || !-e $RUNTIMEFILE) {
+ open my $fh, "<", $RUNTIMEFILE
+ or die "unable to read $RUNTIMEFILE file: $!";
$RUNTIME = <$fh> + 0.;
}
+eval "sub TICK() { $TICK } 1" or die;
+
mkdir $_
for $LOCALDIR, $TMPDIR, $UNIQUEDIR, $PLAYERDIR, $RANDOMDIR, $BDBDIR;
@@ -159,7 +201,7 @@
=item %cf::CFG
-Configuration for the server, loaded from C, or
+Configuration for the server, loaded from C, or
from wherever your confdir points to.
=item cf::wait_for_tick, cf::wait_for_tick_begin
@@ -168,6 +210,12 @@
returns directly I the tick processing (and consequently, can only wake one process
per tick), while cf::wait_for_tick wakes up all waiters after tick processing.
+=item @cf::INVOKE_RESULTS
+
+This array contains the results of the last C call. When
+C is called C<@cf::INVOKE_RESULTS> is set to the parameters of
+that call.
+
=back
=cut
@@ -181,11 +229,25 @@
$msg =~ s/([\x00-\x08\x0b-\x1f])/sprintf "\\x%02x", ord $1/ge;
- utf8::encode $msg;
LOG llevError, $msg;
};
}
+$Coro::State::DIEHOOK = sub {
+ return unless $^S eq 0; # "eq", not "=="
+
+ if ($Coro::current == $Coro::main) {#d#
+ warn "DIEHOOK called in main context, Coro bug?\n";#d#
+ return;#d#
+ }#d#
+
+ # kill coroutine otherwise
+ warn Carp::longmess $_[0];
+ Coro::terminate
+};
+
+$SIG{__DIE__} = sub { }; #d#?
+
@safe::cf::global::ISA = @cf::global::ISA = 'cf::attachable';
@safe::cf::object::ISA = @cf::object::ISA = 'cf::attachable';
@safe::cf::player::ISA = @cf::player::ISA = 'cf::attachable';
@@ -201,13 +263,13 @@
cf::object cf::object::player
cf::client cf::player
cf::arch cf::living
- cf::map cf::party cf::region
+ cf::map cf::mapspace
+ cf::party cf::region
)) {
- no strict 'refs';
@{"safe::$pkg\::wrap::ISA"} = @{"$pkg\::wrap::ISA"} = $pkg;
}
-$Event::DIED = sub {
+$EV::DIED = sub {
warn "error in event callback: @_";
};
@@ -240,20 +302,34 @@
} || "[unable to dump $_[0]: '$@']";
}
-=item $ref = cf::from_json $json
+=item $ref = cf::decode_json $json
Converts a JSON string into the corresponding perl data structure.
-=item $json = cf::to_json $ref
+=item $json = cf::encode_json $ref
Converts a perl data structure into its JSON representation.
=cut
-our $json_coder = JSON::XS->new->convert_blessed->utf8->max_size (1e6); # accept ~1mb max
+our $json_coder = JSON::XS->new->utf8->max_size (1e6); # accept ~1mb max
+
+sub encode_json($) { $json_coder->encode ($_[0]) }
+sub decode_json($) { $json_coder->decode ($_[0]) }
+
+=item cf::post_init { BLOCK }
-sub to_json ($) { $json_coder->encode ($_[0]) }
-sub from_json ($) { $json_coder->decode ($_[0]) }
+Execute the given codeblock, I all extensions have been (re-)loaded,
+but I the server starts ticking again.
+
+The cdoeblock will have a single boolean argument to indicate whether this
+is a reload or not.
+
+=cut
+
+sub post_init(&) {
+ push @POST_INIT, shift;
+}
=item cf::lock_wait $string
@@ -262,10 +338,13 @@
=item my $lock = cf::lock_acquire $string
Wait until the given lock is available and then acquires it and returns
-a Coro::guard object. If the guard object gets destroyed (goes out of scope,
+a L object. If the guard object gets destroyed (goes out of scope,
for example when the coroutine gets canceled), the lock is automatically
returned.
+Locks are *not* recursive, locking from the same coro twice results in a
+deadlocked coro.
+
Lock names should begin with a unique identifier (for example, cf::map::find
uses map_find and cf::map::load uses map_load).
@@ -275,46 +354,99 @@
=cut
-our %LOCK;
+our $LOCKS = new Coro::SemaphoreSet;
sub lock_wait($) {
- my ($key) = @_;
-
- # wait for lock, if any
- while ($LOCK{$key}) {
- push @{ $LOCK{$key} }, $Coro::current;
- Coro::schedule;
- }
+ $LOCKS->wait ($_[0]);
}
sub lock_acquire($) {
- my ($key) = @_;
+ $LOCKS->guard ($_[0])
+}
- # wait, to be sure we are not locked
- lock_wait $key;
+sub lock_active($) {
+ $LOCKS->count ($_[0]) < 1
+}
- $LOCK{$key} = [];
+sub freeze_mainloop {
+ tick_inhibit_inc;
- Coro::guard {
- # wake up all waiters, to be on the safe side
- $_->ready for @{ delete $LOCK{$key} };
- }
+ &Guard::guard (\&tick_inhibit_dec);
}
-sub lock_active($) {
- my ($key) = @_;
+=item cf::periodic $interval, $cb
+
+Like EV::periodic, but randomly selects a starting point so that the actions
+get spread over timer.
- ! ! $LOCK{$key}
+=cut
+
+sub periodic($$) {
+ my ($interval, $cb) = @_;
+
+ my $start = rand List::Util::min 180, $interval;
+
+ EV::periodic $start, $interval, 0, $cb
}
-sub freeze_mainloop {
- return unless $TICK_WATCHER->is_active;
+=item cf::get_slot $time[, $priority[, $name]]
- my $guard = Coro::guard {
- $TICK_WATCHER->start;
- };
- $TICK_WATCHER->stop;
- $guard
+Allocate $time seconds of blocking CPU time at priority C<$priority>:
+This call blocks and returns only when you have at least C<$time> seconds
+of cpu time till the next tick. The slot is only valid till the next cede.
+
+The optional C<$name> can be used to identify the job to run. It might be
+used for statistical purposes and should identify the same time-class.
+
+Useful for short background jobs.
+
+=cut
+
+our @SLOT_QUEUE;
+our $SLOT_QUEUE;
+
+$SLOT_QUEUE->cancel if $SLOT_QUEUE;
+$SLOT_QUEUE = Coro::async {
+ $Coro::current->desc ("timeslot manager");
+
+ my $signal = new Coro::Signal;
+
+ while () {
+ next_job:
+ my $avail = cf::till_tick;
+ if ($avail > 0.01) {
+ for (0 .. $#SLOT_QUEUE) {
+ if ($SLOT_QUEUE[$_][0] < $avail) {
+ my $job = splice @SLOT_QUEUE, $_, 1, ();
+ $job->[2]->send;
+ Coro::cede;
+ goto next_job;
+ }
+ }
+ }
+
+ if (@SLOT_QUEUE) {
+ # we do not use wait_for_tick() as it returns immediately when tick is inactive
+ push @cf::WAIT_FOR_TICK, $signal;
+ $signal->wait;
+ } else {
+ Coro::schedule;
+ }
+ }
+};
+
+sub get_slot($;$$) {
+ return if tick_inhibit || $Coro::current == $Coro::main;
+
+ my ($time, $pri, $name) = @_;
+
+ $time = $TICK * .6 if $time > $TICK * .6;
+ my $sig = new Coro::Signal;
+
+ push @SLOT_QUEUE, [$time, $pri, $sig, $name];
+ @SLOT_QUEUE = sort { $b->[1] <=> $a->[1] } @SLOT_QUEUE;
+ $SLOT_QUEUE->ready;
+ $sig->wait;
}
=item cf::async { BLOCK }
@@ -329,8 +461,8 @@
=item cf::sync_job { BLOCK }
-The design of Crossfire TRT requires that the main coroutine ($Coro::main)
-is always able to handle events or runnable, as Crossfire TRT is only
+The design of Deliantra requires that the main coroutine ($Coro::main)
+is always able to handle events or runnable, as Deliantra is only
partly reentrant. Thus "blocking" it by e.g. waiting for I/O is not
acceptable.
@@ -345,34 +477,36 @@
my ($job) = @_;
if ($Coro::current == $Coro::main) {
- my $time = Event::time;
+ my $time = EV::time;
# this is the main coro, too bad, we have to block
# till the operation succeeds, freezing the server :/
- # TODO: use suspend/resume instead
- # (but this is cancel-safe)
+ LOG llevError, Carp::longmess "sync job";#d#
+
my $freeze_guard = freeze_mainloop;
my $busy = 1;
my @res;
(async {
+ $Coro::current->desc ("sync job coro");
@res = eval { $job->() };
warn $@ if $@;
undef $busy;
})->prio (Coro::PRIO_MAX);
while ($busy) {
- Coro::cede or Event::one_event;
+ if (Coro::nready) {
+ Coro::cede_notself;
+ } else {
+ EV::loop EV::LOOP_ONESHOT;
+ }
}
- $time = Event::time - $time;
+ my $time = EV::time - $time;
- LOG llevError | logBacktrace, Carp::longmess "long sync job"
- if $time > $TICK * 0.5 && $TICK_WATCHER->is_active;
-
- $tick_start += $time; # do not account sync jobs to server load
+ $TICK_START += $time; # do not account sync jobs to server load
wantarray ? @res : $res[0]
} else {
@@ -407,43 +541,50 @@
Executes the given code block with the given arguments in a seperate
process, returning the results. Everything must be serialisable with
Coro::Storable. May, of course, block. Note that the executed sub may
-never block itself or use any form of Event handling.
+never block itself or use any form of event handling.
=cut
sub fork_call(&@) {
my ($cb, @args) = @_;
-# socketpair my $fh1, my $fh2, Socket::AF_UNIX, Socket::SOCK_STREAM, Socket::PF_UNSPEC
-# or die "socketpair: $!";
- pipe my $fh1, my $fh2
- or die "pipe: $!";
-
- if (my $pid = fork) {
- close $fh2;
+ # we seemingly have to make a local copy of the whole thing,
+ # otherwise perl prematurely frees the stuff :/
+ # TODO: investigate and fix (likely this will be rather laborious)
- my $res = (Coro::Handle::unblock $fh1)->readline (undef);
- $res = Coro::Storable::thaw $res;
+ my @res = Coro::Util::fork_eval {
+ reset_signals;
+ &$cb
+ }, @args;
- waitpid $pid, 0; # should not block anymore, we expect the child to simply behave
+ wantarray ? @res : $res[-1]
+}
- die $$res unless "ARRAY" eq ref $res;
+=item $coin = coin_from_name $name
- return wantarray ? @$res : $res->[-1];
- } else {
- reset_signals;
- local $SIG{__WARN__};
- local $SIG{__DIE__};
- eval {
- close $fh1;
+=cut
- my @res = eval { $cb->(@args) };
- syswrite $fh2, Coro::Storable::freeze +($@ ? \"$@" : \@res);
- };
+our %coin_alias = (
+ "silver" => "silvercoin",
+ "silvercoin" => "silvercoin",
+ "silvercoins" => "silvercoin",
+ "gold" => "goldcoin",
+ "goldcoin" => "goldcoin",
+ "goldcoins" => "goldcoin",
+ "platinum" => "platinacoin",
+ "platinumcoin" => "platinacoin",
+ "platinumcoins" => "platinacoin",
+ "platina" => "platinacoin",
+ "platinacoin" => "platinacoin",
+ "platinacoins" => "platinacoin",
+ "royalty" => "royalty",
+ "royalties" => "royalty",
+);
- warn $@ if $@;
- _exit 0;
- }
+sub coin_from_name($) {
+ $coin_alias{$_[0]}
+ ? cf::arch::find $coin_alias{$_[0]}
+ : undef
}
=item $value = cf::db_get $family => $key
@@ -455,25 +596,36 @@
Stores the given C<$value> in the family. It can currently store binary
data only (use Compress::LZF::sfreeze_cr/sthaw to convert to/from binary).
+=item $db = cf::db_table "name"
+
+Create and/or open a new database table. The string must not be "db" and must be unique
+within each server.
+
=cut
-our $DB;
+sub db_table($) {
+ my ($name) = @_;
+ my $db = BDB::db_create $DB_ENV;
-sub db_init {
- unless ($DB) {
- $DB = BDB::db_create $DB_ENV;
+ eval {
+ $db->set_flags (BDB::CHKSUM);
- cf::sync_job {
- eval {
- $DB->set_flags (BDB::CHKSUM);
+ utf8::encode $name;
+ BDB::db_open $db, undef, $name, undef, BDB::BTREE,
+ BDB::CREATE | BDB::AUTO_COMMIT, 0666;
+ cf::cleanup "db_open(db): $!" if $!;
+ };
+ cf::cleanup "db_open(db): $@" if $@;
- BDB::db_open $DB, undef, "db", undef, BDB::BTREE,
- BDB::CREATE | BDB::AUTO_COMMIT, 0666;
- cf::cleanup "db_open(db): $!" if $!;
- };
- cf::cleanup "db_open(db): $@" if $@;
- };
- }
+ $db
+}
+
+our $DB;
+
+sub db_init {
+ cf::sync_job {
+ $DB ||= db_table "db";
+ };
}
sub db_get($$) {
@@ -533,7 +685,7 @@
join "\x00",
$processversion,
map {
- Coro::cede;
+ cf::cede_to_tick;
($src->[$_], Digest::MD5::md5_hex $data[$_])
} 0.. $#$src;
@@ -581,13 +733,13 @@
=head2 ATTACHABLE OBJECTS
-Many objects in crossfire are so-called attachable objects. That means you can
+Many objects in deliantra are so-called attachable objects. That means you can
attach callbacks/event handlers (a collection of which is called an "attachment")
to it. All such attachable objects support the following methods.
In the following description, CLASS can be any of C, C