#
# This file is part of Deliantra, the Roguelike Realtime MMORPG.
#
# Copyright (©) 2006,2007,2008,2009,2010,2011 Marc Alexander Lehmann / Robin Redeker / the Deliantra team
#
# Deliantra is free software: you can redistribute it and/or modify it under
# the terms of the Affero GNU General Public License as published by the
# Free Software Foundation, either version 3 of the License, or (at your
# option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the Affero GNU General Public License
# and the GNU General Public License along with this program. If not, see
# .
#
# The authors can be reached via e-mail to
#
package cf;
use common::sense;
use Symbol;
use List::Util;
use Socket;
use EV;
use Opcode;
use Safe;
use Safe::Hole;
use Storable ();
use Carp ();
use Guard ();
use Coro ();
use Coro::State;
use Coro::Handle;
use Coro::EV;
use Coro::AnyEvent;
use Coro::Timer;
use Coro::Signal;
use Coro::Semaphore;
use Coro::SemaphoreSet;
use Coro::AnyEvent;
use Coro::AIO;
use Coro::BDB 1.6;
use Coro::Storable;
use Coro::Util ();
use JSON::XS 2.01 ();
use BDB ();
use Data::Dumper;
use Fcntl;
use YAML::XS ();
use IO::AIO ();
use Time::HiRes;
use Compress::LZF;
use Digest::MD5 ();
AnyEvent::detect;
# configure various modules to our taste
#
$Storable::canonical = 1; # reduce rsync transfers
Coro::State::cctx_stacksize 256000; # 1-2MB stack, for deep recursions in maze generator
$Coro::main->prio (Coro::PRIO_MAX); # run main coroutine ("the server") with very high priority
# make sure c-lzf reinitialises itself
Compress::LZF::set_serializer "Storable", "Storable::net_mstore", "Storable::mretrieve";
Compress::LZF::sfreeze_cr { }; # prime Compress::LZF so it does not use require later
# strictly for debugging
$SIG{QUIT} = sub { Carp::cluck "SIGQUIT" };
sub WF_AUTOCANCEL () { 1 } # automatically cancel this watcher on reload
our @ORIG_INC;
our %COMMAND = ();
our %COMMAND_TIME = ();
our @EXTS = (); # list of extension package names
our %EXTCMD = ();
our %EXTICMD = ();
our %EXT_CORO = (); # coroutines bound to extensions
our %EXT_MAP = (); # pluggable maps
our $RELOAD; # number of reloads so far, non-zero while in reload
our @EVENT;
our @REFLECT; # set by XS
our %REFLECT; # set by us
our $CONFDIR = confdir;
our $DATADIR = datadir;
our $LIBDIR = "$DATADIR/ext";
our $PODDIR = "$DATADIR/pod";
our $MAPDIR = "$DATADIR/" . mapdir;
our $LOCALDIR = localdir;
our $TMPDIR = "$LOCALDIR/" . tmpdir;
our $UNIQUEDIR = "$LOCALDIR/" . uniquedir;
our $PLAYERDIR = "$LOCALDIR/" . playerdir;
our $RANDOMDIR = "$LOCALDIR/random";
our $BDBDIR = "$LOCALDIR/db";
our $PIDFILE = "$LOCALDIR/pid";
our $RUNTIMEFILE = "$LOCALDIR/runtime";
our %RESOURCE; # unused
our $OUTPUT_RATE_MIN = 3000;
our $OUTPUT_RATE_MAX = 1000000;
our $MAX_LINKS = 32; # how many chained exits to follow
our $VERBOSE_IO = 1;
our $TICK = MAX_TIME * 1e-6; # this is a CONSTANT(!)
our $NEXT_RUNTIME_WRITE; # when should the runtime file be written
our $NEXT_TICK;
our $USE_FSYNC = 1; # use fsync to write maps - default on
our $BDB_DEADLOCK_WATCHER;
our $BDB_CHECKPOINT_WATCHER;
our $BDB_TRICKLE_WATCHER;
our $DB_ENV;
our @EXTRA_MODULES = qw(pod match mapscript incloader);
our %CFG;
our $UPTIME; $UPTIME ||= time;
our $RUNTIME = 0;
our $SERVER_TICK = 0;
our $NOW;
our (%PLAYER, %PLAYER_LOADING); # all users
our (%MAP, %MAP_LOADING ); # all maps
our $LINK_MAP; # the special {link} map, which is always available
# used to convert map paths into valid unix filenames by replacing / by ∕
our $PATH_SEP = "∕"; # U+2215, chosen purely for visual reasons
our $LOAD; # a number between 0 (idle) and 1 (too many objects)
our $LOADAVG; # same thing, but with alpha-smoothing
our $JITTER; # average jitter
our $TICK_START; # for load detecting purposes
our @POST_INIT;
our $REATTACH_ON_RELOAD; # set to true to force object reattach on reload (slow)
our $REALLY_UNLOOP; # never set to true, please :)
our $WAIT_FOR_TICK = new Coro::Signal;
our @WAIT_FOR_TICK_BEGIN;
binmode STDOUT;
binmode STDERR;
# read virtual server time, if available
unless ($RUNTIME || !-e $RUNTIMEFILE) {
open my $fh, "<", $RUNTIMEFILE
or die "unable to read $RUNTIMEFILE file: $!";
$RUNTIME = <$fh> + 0.;
}
eval "sub TICK() { $TICK } 1" or die;
mkdir $_
for $LOCALDIR, $TMPDIR, $UNIQUEDIR, $PLAYERDIR, $RANDOMDIR, $BDBDIR;
our $EMERGENCY_POSITION;
sub cf::map::normalise;
sub in_main() {
$Coro::current == $Coro::main
}
#############################################################################
%REFLECT = ();
for (@REFLECT) {
my $reflect = JSON::XS::decode_json $_;
$REFLECT{$reflect->{class}} = $reflect;
}
# this is decidedly evil
$REFLECT{object}{flags} = { map +($_ => undef), grep $_, map /^FLAG_([A-Z0-9_]+)$/ && lc $1, keys %{"cf::"} };
#############################################################################
=head2 GLOBAL VARIABLES
=over 4
=item $cf::UPTIME
The timestamp of the server start (so not actually an "uptime").
=item $cf::SERVER_TICK
An unsigned integer that starts at zero when the server is started and is
incremented on every tick.
=item $cf::NOW
The (real) time of the last (current) server tick - updated before and
after tick processing, so this is useful only as a rough "what time is it
now" estimate.
=item $cf::TICK
The interval between each server tick, in seconds.
=item $cf::RUNTIME
The time this server has run, starts at 0 and is increased by $cf::TICK on
every server tick.
=item $cf::CONFDIR $cf::DATADIR $cf::LIBDIR $cf::PODDIR
$cf::MAPDIR $cf::LOCALDIR $cf::TMPDIR $cf::UNIQUEDIR
$cf::PLAYERDIR $cf::RANDOMDIR $cf::BDBDIR
Various directories - "/etc", read-only install directory, perl-library
directory, pod-directory, read-only maps directory, "/var", "/var/tmp",
unique-items directory, player file directory, random maps directory and
database environment.
=item $cf::LOADAVG
The current CPU load on the server (alpha-smoothed), as a value between 0
(none) and 1 (overloaded), indicating how much time is spent on processing
objects per tick. Healthy values are < 0.5.
=item $cf::LOAD
The raw value load value from the last tick.
=item %cf::CFG
Configuration for the server, loaded from C, or
from wherever your confdir points to.
=item cf::wait_for_tick, cf::wait_for_tick_begin
These are functions that inhibit the current coroutine one tick. cf::wait_for_tick_begin only
returns directly I the tick processing (and consequently, can only wake one thread
per tick), while cf::wait_for_tick wakes up all waiters after tick processing.
Note that cf::Wait_for_tick will immediately return when the server is not
ticking, making it suitable for small pauses in threads that need to run
when the server is paused. If that is not applicable (i.e. you I
want to wait, use C<$cf::WAIT_FOR_TICK>).
=item $cf::WAIT_FOR_TICK
Note that C is probably the correct thing to use. This
variable contains a L that is broadcats after every server
tick. Calling C<< ->wait >> on it will suspend the caller until after the
next server tick.
=cut
sub wait_for_tick();
sub wait_for_tick_begin();
=item @cf::INVOKE_RESULTS
This array contains the results of the last C call. When
C is called C<@cf::INVOKE_RESULTS> is set to the parameters of
that call.
=item %cf::REFLECT
Contains, for each (C++) class name, a hash reference with information
about object members (methods, scalars, arrays and flags) and other
metadata, which is useful for introspection.
=back
=cut
sub error(@) { LOG llevError, join "", @_ }
sub warn (@) { LOG llevWarn , join "", @_ }
sub info (@) { LOG llevInfo , join "", @_ }
sub debug(@) { LOG llevDebug, join "", @_ }
sub trace(@) { LOG llevTrace, join "", @_ }
$Coro::State::WARNHOOK = sub {
my $msg = join "", @_;
$msg .= "\n"
unless $msg =~ /\n$/;
$msg =~ s/([\x00-\x08\x0b-\x1f])/sprintf "\\x%02x", ord $1/ge;
LOG llevWarn, $msg;
};
$Coro::State::DIEHOOK = sub {
return unless $^S eq 0; # "eq", not "=="
error Carp::longmess $_[0];
if (in_main) {#d#
error "DIEHOOK called in main context, Coro bug?\n";#d#
return;#d#
}#d#
# kill coroutine otherwise
Coro::terminate
};
@safe::cf::global::ISA = @cf::global::ISA = 'cf::attachable';
@safe::cf::object::ISA = @cf::object::ISA = 'cf::attachable';
@safe::cf::player::ISA = @cf::player::ISA = 'cf::attachable';
@safe::cf::client::ISA = @cf::client::ISA = 'cf::attachable';
@safe::cf::map::ISA = @cf::map::ISA = 'cf::attachable';
@safe::cf::arch::ISA = @cf::arch::ISA = 'cf::object';
@safe::cf::object::player::ISA = @cf::object::player::ISA = 'cf::object'; # not really true (yet)
# we bless all objects into (empty) derived classes to force a method lookup
# within the Safe compartment.
for my $pkg (qw(
cf::global cf::attachable
cf::object cf::object::player
cf::client cf::player
cf::arch cf::living
cf::map cf::mapspace
cf::party cf::region
)) {
@{"safe::$pkg\::wrap::ISA"} = @{"$pkg\::wrap::ISA"} = $pkg;
}
$EV::DIED = sub {
Carp::cluck "error in event callback: @_";
};
#############################################################################
sub fork_call(&@);
sub get_slot($;$$);
#############################################################################
=head2 UTILITY FUNCTIONS
=over 4
=item dumpval $ref
=cut
sub dumpval {
eval {
local $SIG{__DIE__};
my $d;
if (1) {
$d = new Data::Dumper([$_[0]], ["*var"]);
$d->Terse(1);
$d->Indent(2);
$d->Quotekeys(0);
$d->Useqq(1);
#$d->Bless(...);
$d->Seen($_[1]) if @_ > 1;
$d = $d->Dump();
}
$d =~ s/([\x00-\x07\x09\x0b\x0c\x0e-\x1f])/sprintf "\\x%02x", ord($1)/ge;
$d
} || "[unable to dump $_[0]: '$@']";
}
=item $scalar = load_file $path
Loads the given file from path and returns its contents. Croaks on error
and can block.
=cut
sub load_file($) {
0 <= aio_load $_[0], my $data
or Carp::croak "$_[0]: $!";
$data
}
=item $ref = cf::decode_json $json
Converts a JSON string into the corresponding perl data structure.
=item $json = cf::encode_json $ref
Converts a perl data structure into its JSON representation.
=cut
our $json_coder = JSON::XS->new->utf8->max_size (1e6); # accept ~1mb max
sub encode_json($) { $json_coder->encode ($_[0]) }
sub decode_json($) { $json_coder->decode ($_[0]) }
=item $ref = cf::decode_storable $scalar
Same as Coro::Storable::thaw, so blocks.
=cut
BEGIN { *decode_storable = \&Coro::Storable::thaw }
=item $ref = cf::decode_yaml $scalar
Same as YAML::XS::Load, but doesn't leak, because it forks (and thus blocks).
=cut
sub decode_yaml($) {
fork_call { YAML::XS::Load $_[0] } @_
}
=item $scalar = cf::unlzf $scalar
Same as Compress::LZF::compress, but takes server ticks into account, so
blocks.
=cut
sub unlzf($) {
# we assume 100mb/s minimum decompression speed (noncompressible data on a ~2ghz machine)
cf::get_slot +(length $_[0]) / 100_000_000, 0, "unlzf";
Compress::LZF::decompress $_[0]
}
=item cf::post_init { BLOCK }
Execute the given codeblock, I all extensions have been (re-)loaded,
but I the server starts ticking again.
The codeblock will have a single boolean argument to indicate whether this
is a reload or not.
=cut
sub post_init(&) {
push @POST_INIT, shift;
}
sub _post_init {
trace "running post_init jobs";
# run them in parallel...
my @join;
while () {
push @join, map &Coro::async ($_, 0), @POST_INIT;
@POST_INIT = ();
@join or last;
(pop @join)->join;
}
}
=item cf::lock_wait $string
Wait until the given lock is available. See cf::lock_acquire.
=item my $lock = cf::lock_acquire $string
Wait until the given lock is available and then acquires it and returns
a L object. If the guard object gets destroyed (goes out of scope,
for example when the coroutine gets canceled), the lock is automatically
returned.
Locks are *not* recursive, locking from the same coro twice results in a
deadlocked coro.
Lock names should begin with a unique identifier (for example, cf::map::find
uses map_find and cf::map::load uses map_load).
=item $locked = cf::lock_active $string
Return true if the lock is currently active, i.e. somebody has locked it.
=cut
our $LOCKS = new Coro::SemaphoreSet;
sub lock_wait($) {
$LOCKS->wait ($_[0]);
}
sub lock_acquire($) {
$LOCKS->guard ($_[0])
}
sub lock_active($) {
$LOCKS->count ($_[0]) < 1
}
sub freeze_mainloop {
tick_inhibit_inc;
&Guard::guard (\&tick_inhibit_dec);
}
=item cf::periodic $interval, $cb
Like EV::periodic, but randomly selects a starting point so that the actions
get spread over time.
=cut
sub periodic($$) {
my ($interval, $cb) = @_;
my $start = rand List::Util::min 180, $interval;
EV::periodic $start, $interval, 0, $cb
}
=item cf::get_slot $time[, $priority[, $name]]
Allocate $time seconds of blocking CPU time at priority C<$priority>
(default: 0): This call blocks and returns only when you have at least
C<$time> seconds of cpu time till the next tick. The slot is only valid
till the next cede.
Background jobs should use a priority les than zero, interactive jobs
should use 100 or more.
The optional C<$name> can be used to identify the job to run. It might be
used for statistical purposes and should identify the same time-class.
Useful for short background jobs.
=cut
our @SLOT_QUEUE;
our $SLOT_QUEUE;
our $SLOT_DECAY = 0.9;
$SLOT_QUEUE->cancel if $SLOT_QUEUE;
$SLOT_QUEUE = Coro::async {
$Coro::current->desc ("timeslot manager");
my $signal = new Coro::Signal;
my $busy;
while () {
next_job:
my $avail = cf::till_tick;
for (0 .. $#SLOT_QUEUE) {
if ($SLOT_QUEUE[$_][0] <= $avail) {
$busy = 0;
my $job = splice @SLOT_QUEUE, $_, 1, ();
$job->[2]->send;
Coro::cede;
goto next_job;
} else {
$SLOT_QUEUE[$_][0] *= $SLOT_DECAY;
}
}
if (@SLOT_QUEUE) {
# we do not use wait_for_tick() as it returns immediately when tick is inactive
$WAIT_FOR_TICK->wait;
} else {
$busy = 0;
Coro::schedule;
}
}
};
sub get_slot($;$$) {
return if tick_inhibit || $Coro::current == $Coro::main;
my ($time, $pri, $name) = @_;
$time = clamp $time, 0.01, $TICK * .6;
my $sig = new Coro::Signal;
push @SLOT_QUEUE, [$time, $pri, $sig, $name];
@SLOT_QUEUE = sort { $b->[1] <=> $a->[1] } @SLOT_QUEUE;
$SLOT_QUEUE->ready;
$sig->wait;
}
=item cf::async { BLOCK }
Currently the same as Coro::async_pool, meaning you cannot use
C, C or other gimmicks on these coroutines. The only
thing you are allowed to do is call C on it.
=cut
BEGIN { *async = \&Coro::async_pool }
=item cf::sync_job { BLOCK }
The design of Deliantra requires that the main coroutine ($Coro::main)
is always able to handle events or runnable, as Deliantra is only
partly reentrant. Thus "blocking" it by e.g. waiting for I/O is not
acceptable.
If it must be done, put the blocking parts into C. This will run
the given BLOCK in another coroutine while waiting for the result. The
server will be frozen during this time, so the block should either finish
fast or be very important.
=cut
sub sync_job(&) {
my ($job) = @_;
if (in_main) {
my $time = AE::time;
# this is the main coro, too bad, we have to block
# till the operation succeeds, freezing the server :/
#LOG llevError, Carp::longmess "sync job";#d#
my $freeze_guard = freeze_mainloop;
my $busy = 1;
my @res;
(async {
$Coro::current->desc ("sync job coro");
@res = eval { $job->() };
error $@ if $@;
undef $busy;
})->prio (Coro::PRIO_MAX);
while ($busy) {
if (Coro::nready) {
Coro::cede_notself;
} else {
EV::loop EV::LOOP_ONESHOT;
}
}
my $time = AE::time - $time;
$TICK_START += $time; # do not account sync jobs to server load
wantarray ? @res : $res[0]
} else {
# we are in another coroutine, how wonderful, everything just works
$job->()
}
}
=item $coro = cf::async_ext { BLOCK }
Like async, but this coro is automatically being canceled when the
extension calling this is being unloaded.
=cut
sub async_ext(&) {
my $cb = shift;
my $coro = &Coro::async ($cb);
$coro->on_destroy (sub {
delete $EXT_CORO{$coro+0};
});
$EXT_CORO{$coro+0} = $coro;
$coro
}
=item fork_call { }, @args
Executes the given code block with the given arguments in a seperate
process, returning the results. Everything must be serialisable with
Coro::Storable. May, of course, block. Note that the executed sub may
never block itself or use any form of event handling.
=cut
sub post_fork {
reset_signals;
}
sub fork_call(&@) {
my ($cb, @args) = @_;
# we seemingly have to make a local copy of the whole thing,
# otherwise perl prematurely frees the stuff :/
# TODO: investigate and fix (likely this will be rather laborious)
my @res = Coro::Util::fork_eval {
cf::post_fork;
&$cb
} @args;
wantarray ? @res : $res[-1]
}
sub objinfo {
(
"counter value" => cf::object::object_count,
"objects created" => cf::object::create_count,
"objects destroyed" => cf::object::destroy_count,
"freelist size" => cf::object::free_count,
"allocated objects" => cf::object::objects_size,
"active objects" => cf::object::actives_size,
)
}
=item $coin = coin_from_name $name
=cut
our %coin_alias = (
"silver" => "silvercoin",
"silvercoin" => "silvercoin",
"silvercoins" => "silvercoin",
"gold" => "goldcoin",
"goldcoin" => "goldcoin",
"goldcoins" => "goldcoin",
"platinum" => "platinacoin",
"platinumcoin" => "platinacoin",
"platinumcoins" => "platinacoin",
"platina" => "platinacoin",
"platinacoin" => "platinacoin",
"platinacoins" => "platinacoin",
"royalty" => "royalty",
"royalties" => "royalty",
);
sub coin_from_name($) {
$coin_alias{$_[0]}
? cf::arch::find $coin_alias{$_[0]}
: undef
}
=item $value = cf::db_get $family => $key
Returns a single value from the environment database.
=item cf::db_put $family => $key => $value
Stores the given C<$value> in the family. It can currently store binary
data only (use Compress::LZF::sfreeze_cr/sthaw to convert to/from binary).
=item $db = cf::db_table "name"
Create and/or open a new database table. The string must not be "db" and must be unique
within each server.
=cut
sub db_table($) {
cf::error "db_get called from main context"
if $Coro::current == $Coro::main;
my ($name) = @_;
my $db = BDB::db_create $DB_ENV;
eval {
$db->set_flags (BDB::CHKSUM);
utf8::encode $name;
BDB::db_open $db, undef, $name, undef, BDB::BTREE,
BDB::CREATE | BDB::AUTO_COMMIT, 0666;
cf::cleanup "db_open(db): $!" if $!;
};
cf::cleanup "db_open(db): $@" if $@;
$db
}
our $DB;
sub db_init {
$DB ||= db_table "db";
}
sub db_get($$) {
my $key = "$_[0]/$_[1]";
cf::error "db_get called from main context"
if $Coro::current == $Coro::main;
BDB::db_get $DB, undef, $key, my $data;
$! ? ()
: $data
}
sub db_put($$$) {
BDB::dbreq_pri 4;
BDB::db_put $DB, undef, "$_[0]/$_[1]", $_[2], 0, sub { };
}
=item cf::cache $id => [$paths...], $processversion => $process
Generic caching function that returns the value of the resource $id,
caching and regenerating as required.
This function can block.
=cut
sub cache {
my ($id, $src, $processversion, $process) = @_;
my $meta =
join "\x00",
$processversion,
map {
aio_stat $_
and Carp::croak "$_: $!";
($_, (stat _)[7,9])
} @$src;
my $dbmeta = db_get cache => "$id/meta";
if ($dbmeta ne $meta) {
# changed, we may need to process
my @data;
my $md5;
for (0 .. $#$src) {
$data[$_] = load_file $src->[$_];
}
# if processing is expensive, check
# checksum first
if (1) {
$md5 =
join "\x00",
$processversion,
map {
cf::cede_to_tick;
($src->[$_], Digest::MD5::md5_hex $data[$_])
} 0.. $#$src;
my $dbmd5 = db_get cache => "$id/md5";
if ($dbmd5 eq $md5) {
db_put cache => "$id/meta", $meta;
return db_get cache => "$id/data";
}
}
my $t1 = Time::HiRes::time;
my $data = $process->(\@data);
my $t2 = Time::HiRes::time;
info "cache: '$id' processed in ", $t2 - $t1, "s\n";
db_put cache => "$id/data", $data;
db_put cache => "$id/md5" , $md5;
db_put cache => "$id/meta", $meta;
return $data;
}
db_get cache => "$id/data"
}
=item cf::datalog type => key => value, ...
Log a datalog packet of the given type with the given key-value pairs.
=cut
sub datalog($@) {
my ($type, %kv) = @_;
info "DATALOG ", JSON::XS->new->ascii->encode ({ %kv, type => $type });
}
=back
=cut
#############################################################################
=head2 ATTACHABLE OBJECTS
Many objects in deliantra are so-called attachable objects. That means you can
attach callbacks/event handlers (a collection of which is called an "attachment")
to it. All such attachable objects support the following methods.
In the following description, CLASS can be any of C, C