--- deliantra/server/lib/cf.pm 2007/11/08 19:43:25 1.394 +++ deliantra/server/lib/cf.pm 2007/12/17 08:27:44 1.406 @@ -6,24 +6,25 @@ use Symbol; use List::Util; use Socket; -use Event; +use EV 1.86; use Opcode; use Safe; use Safe::Hole; use Storable (); -use Coro 4.1 (); +use Coro 4.32 (); use Coro::State; use Coro::Handle; -use Coro::Event; +use Coro::EV; use Coro::Timer; use Coro::Signal; use Coro::Semaphore; use Coro::AIO; +use Coro::BDB; use Coro::Storable; use Coro::Util (); -use JSON::XS (); +use JSON::XS 2.01 (); use BDB (); use Data::Dumper; use Digest::MD5; @@ -40,8 +41,6 @@ Coro::State::cctx_stacksize 256000; # 1-2MB stack, for deep recursions in maze generator Compress::LZF::sfreeze_cr { }; # prime Compress::LZF so it does not use require later -$Event::Eval = 1; # no idea why this is required, but it is - # work around bug in YAML::Syck - bad news for perl6, will it be as broken wrt. unicode? $YAML::Syck::ImplicitUnicode = 1; @@ -72,13 +71,13 @@ our $PLAYERDIR = "$LOCALDIR/" . playerdir; our $RANDOMDIR = "$LOCALDIR/random"; our $BDBDIR = "$LOCALDIR/db"; +our %RESOURCE; our $TICK = MAX_TIME * 1e-6; # this is a CONSTANT(!) our $TICK_WATCHER; our $AIO_POLL_WATCHER; our $NEXT_RUNTIME_WRITE; # when should the runtime file be written our $NEXT_TICK; -our $NOW; our $USE_FSYNC = 1; # use fsync to write maps - default off our $BDB_POLL_WATCHER; @@ -91,6 +90,7 @@ our $UPTIME; $UPTIME ||= time; our $RUNTIME; +our $NOW; our (%PLAYER, %PLAYER_LOADING); # all users our (%MAP, %MAP_LOADING ); # all maps @@ -164,7 +164,7 @@ =item %cf::CFG -Configuration for the server, loaded from C, or +Configuration for the server, loaded from C, or from wherever your confdir points to. =item cf::wait_for_tick, cf::wait_for_tick_begin @@ -217,7 +217,7 @@ @{"safe::$pkg\::wrap::ISA"} = @{"$pkg\::wrap::ISA"} = $pkg; } -$Event::DIED = sub { +$EV::DIED = sub { warn "error in event callback: @_"; }; @@ -250,11 +250,11 @@ } || "[unable to dump $_[0]: '$@']"; } -=item $ref = cf::from_json $json +=item $ref = cf::decode_json $json Converts a JSON string into the corresponding perl data structure. -=item $json = cf::to_json $ref +=item $json = cf::encode_json $ref Converts a perl data structure into its JSON representation. @@ -262,8 +262,8 @@ our $json_coder = JSON::XS->new->utf8->max_size (1e6); # accept ~1mb max -sub to_json ($) { $json_coder->encode ($_[0]) } -sub from_json ($) { $json_coder->decode ($_[0]) } +sub encode_json($) { $json_coder->encode ($_[0]) } +sub decode_json($) { $json_coder->decode ($_[0]) } =item cf::lock_wait $string @@ -338,6 +338,21 @@ $guard } +=item cf::periodic $interval, $cb + +Like EV::periodic, but randomly selects a starting point so that the actions +get spread over timer. + +=cut + +sub periodic($$) { + my ($interval, $cb) = @_; + + my $start = rand List::Util::min 180, $interval; + + EV::periodic $start, $interval, 0, $cb +} + =item cf::get_slot $time[, $priority[, $name]] Allocate $time seconds of blocking CPU time at priority C<$priority>: @@ -424,7 +439,7 @@ my ($job) = @_; if ($Coro::current == $Coro::main) { - my $time = Event::time; + my $time = EV::time; # this is the main coro, too bad, we have to block # till the operation succeeds, freezing the server :/ @@ -449,11 +464,11 @@ if (Coro::nready) { Coro::cede_notself; } else { - Event::one_event; + EV::loop EV::LOOP_ONESHOT; } } - $time = Event::time - $time; + $time = EV::time - $time; LOG llevError | logBacktrace, Carp::longmess "long sync job" if $time > $TICK * 0.5 && $TICK_WATCHER->is_active; @@ -493,7 +508,7 @@ Executes the given code block with the given arguments in a seperate process, returning the results. Everything must be serialisable with Coro::Storable. May, of course, block. Note that the executed sub may -never block itself or use any form of Event handling. +never block itself or use any form of event handling. =cut @@ -956,7 +971,7 @@ 0 } -=item $bool = cf::global::invoke (EVENT_CLASS_XXX, ...) +=item $bool = cf::global->invoke (EVENT_CLASS_XXX, ...) =item $bool = $attachable->invoke (EVENT_CLASS_XXX, ...) @@ -1044,7 +1059,7 @@ on_instantiate => sub { my ($obj, $data) = @_; - $data = from_json $data; + $data = decode_json $data; for (@$data) { my ($name, $args) = @$_; @@ -2627,7 +2642,7 @@ $rmp->{random_seed} ||= $exit->random_seed; - my $data = cf::to_json $rmp; + my $data = cf::encode_json $rmp; my $md5 = Digest::MD5::md5_hex $data; my $meta = "$RANDOMDIR/$md5.meta"; @@ -3137,6 +3152,7 @@ while (my ($face, $info) = each %$faces) { my $idx = (cf::face::find $face) || cf::face::alloc $face; + cf::face::set_visibility $idx, $info->{visibility}; cf::face::set_magicmap $idx, $info->{magicmap}; cf::face::set_data $idx, 0, $info->{data32}, Digest::MD5::md5 $info->{data32}; @@ -3147,8 +3163,10 @@ while (my ($face, $info) = each %$faces) { next unless $info->{smooth}; + my $idx = cf::face::find $face or next; + if (my $smooth = cf::face::find $info->{smooth}) { cf::face::set_smooth $idx, $smooth; cf::face::set_smoothlevel $idx, $info->{smoothlevel}; @@ -3176,52 +3194,58 @@ # that gcfclient doesn't grok a >10000 face index. my $res = $facedata->{resource}; - my $soundconf = delete $res->{"res/sound.conf"}; - while (my ($name, $info) = each %$res) { - my $idx = (cf::face::find $name) || cf::face::alloc $name; - my $data; - - if ($info->{type} & 1) { - # prepend meta info + if (defined $info->{type}) { + my $idx = (cf::face::find $name) || cf::face::alloc $name; + my $data; + + if ($info->{type} & 1) { + # prepend meta info + + my $meta = $enc->encode ({ + name => $name, + %{ $info->{meta} || {} }, + }); - my $meta = $enc->encode ({ - name => $name, - %{ $info->{meta} || {} }, - }); + $data = pack "(w/a*)*", $meta, $info->{data}; + } else { + $data = $info->{data}; + } - $data = pack "(w/a*)*", $meta, $info->{data}; + cf::face::set_data $idx, 0, $data, Digest::MD5::md5 $data; + cf::face::set_type $idx, $info->{type}; } else { - $data = $info->{data}; + $RESOURCE{$name} = $info; } - cf::face::set_data $idx, 0, $data, Digest::MD5::md5 $data; - cf::face::set_type $idx, $info->{type}; - cf::cede_to_tick; } + } - if ($soundconf) { - $soundconf = $enc->decode (delete $soundconf->{data}); + cf::global->invoke (EVENT_GLOBAL_RESOURCE_UPDATE); - for (0 .. SOUND_CAST_SPELL_0 - 1) { - my $sound = $soundconf->{compat}[$_] - or next; + 1 +} - my $face = cf::face::find "sound/$sound->[1]"; - cf::sound::set $sound->[0] => $face; - cf::sound::old_sound_index $_, $face; # gcfclient-compat - } +cf::global->attach (on_resource_update => sub { + if (my $soundconf = $RESOURCE{"res/sound.conf"}) { + $soundconf = JSON::XS->new->utf8->relaxed->decode ($soundconf->{data}); - while (my ($k, $v) = each %{$soundconf->{event}}) { - my $face = cf::face::find "sound/$v"; - cf::sound::set $k => $face; - } + for (0 .. SOUND_CAST_SPELL_0 - 1) { + my $sound = $soundconf->{compat}[$_] + or next; + + my $face = cf::face::find "sound/$sound->[1]"; + cf::sound::set $sound->[0] => $face; + cf::sound::old_sound_index $_, $face; # gcfclient-compat } - } - 1 -} + while (my ($k, $v) = each %{$soundconf->{event}}) { + my $face = cf::face::find "sound/$v"; + cf::sound::set $k => $face; + } + } +}); register_exticmd fx_want => sub { my ($ns, $want) = @_; @@ -3308,7 +3332,7 @@ Carp::cluck "FATAL: Coro::idle was called, major BUG, use cf::sync_job!\n";#d# (async { $Coro::current->{desc} = "IDLE BUG HANDLER"; - Event::one_event; + EV::loop EV::LOOP_ONESHOT; })->prio (Coro::PRIO_MAX); }; @@ -3317,7 +3341,8 @@ load_extensions; $TICK_WATCHER->start; - Event::loop; + $Coro::current->prio (Coro::PRIO_MAX); # give the main loop max. priority + EV::loop; } ############################################################################# @@ -3325,16 +3350,11 @@ # install some emergency cleanup handlers BEGIN { + our %SIGWATCHER = (); for my $signal (qw(INT HUP TERM)) { - Event->signal ( - reentrant => 0, - data => WF_AUTOCANCEL, - signal => $signal, - prio => 0, - cb => sub { - cf::cleanup "SIG$signal"; - }, - ); + $SIGWATCHER{$signal} = EV::signal $signal, sub { + cf::cleanup "SIG$signal"; + }; } } @@ -3438,11 +3458,6 @@ # if anything goes wrong in here, we should simply crash as we already saved - warn "cancelling all WF_AUTOCANCEL watchers"; - for (Event::all_watchers) { - $_->cancel if $_->data & WF_AUTOCANCEL; - } - warn "flushing outstanding aio requests"; for (;;) { BDB::flush; @@ -3546,15 +3561,10 @@ # doing reload synchronously and two reloads happen back-to-back, # coro crashes during coro_state_free->destroy here. - $RELOAD_WATCHER ||= Event->timer ( - reentrant => 0, - after => 0, - data => WF_AUTOCANCEL, - cb => sub { - do_reload_perl; - undef $RELOAD_WATCHER; - }, - ); + $RELOAD_WATCHER ||= EV::timer 0, 0, sub { + undef $RELOAD_WATCHER; + do_reload_perl; + }; } register_command "reload" => sub { @@ -3594,83 +3604,49 @@ $signal->wait; } -$TICK_WATCHER = Event->timer ( - reentrant => 0, - parked => 1, - prio => 0, - at => $NEXT_TICK || $TICK, - data => WF_AUTOCANCEL, - cb => sub { - if ($Coro::current != $Coro::main) { - Carp::cluck "major BUG: server tick called outside of main coro, skipping it" - unless ++$bug_warning > 10; - return; - } - - $NOW = $tick_start = Event::time; - - cf::server_tick; # one server iteration - - $RUNTIME += $TICK; - $NEXT_TICK += $TICK; - - if ($NOW >= $NEXT_RUNTIME_WRITE) { - $NEXT_RUNTIME_WRITE = $NOW + 10; - Coro::async_pool { - $Coro::current->{desc} = "runtime saver"; - write_runtime - or warn "ERROR: unable to write runtime file: $!"; - }; - } +$TICK_WATCHER = EV::periodic_ns 0, $TICK, 0, sub { + if ($Coro::current != $Coro::main) { + Carp::cluck "major BUG: server tick called outside of main coro, skipping it" + unless ++$bug_warning > 10; + return; + } - if (my $sig = shift @WAIT_FOR_TICK_BEGIN) { - $sig->send; - } - while (my $sig = shift @WAIT_FOR_TICK) { - $sig->send; - } + $NOW = $tick_start = EV::now; - $NOW = Event::time; + cf::server_tick; # one server iteration - # if we are delayed by four ticks or more, skip them all - $NEXT_TICK = $NOW if $NOW >= $NEXT_TICK + $TICK * 4; + $RUNTIME += $TICK; + $NEXT_TICK = $_[0]->at; - $TICK_WATCHER->at ($NEXT_TICK); - $TICK_WATCHER->start; + if ($NOW >= $NEXT_RUNTIME_WRITE) { + $NEXT_RUNTIME_WRITE = List::Util::max $NEXT_RUNTIME_WRITE + 10, $NOW + 5.; + Coro::async_pool { + $Coro::current->{desc} = "runtime saver"; + write_runtime + or warn "ERROR: unable to write runtime file: $!"; + }; + } + + if (my $sig = shift @WAIT_FOR_TICK_BEGIN) { + $sig->send; + } + while (my $sig = shift @WAIT_FOR_TICK) { + $sig->send; + } - $LOAD = ($NOW - $tick_start) / $TICK; - $LOADAVG = $LOADAVG * 0.75 + $LOAD * 0.25; + $LOAD = ($NOW - $tick_start) / $TICK; + $LOADAVG = $LOADAVG * 0.75 + $LOAD * 0.25; - _post_tick; - }, -); + _post_tick; +}; +$TICK_WATCHER->priority (EV::MAXPRI); { - BDB::min_parallel 8; - BDB::max_poll_time $TICK * 0.1; - $BDB_POLL_WATCHER = Event->io ( - reentrant => 0, - fd => BDB::poll_fileno, - poll => 'r', - prio => 0, - data => WF_AUTOCANCEL, - cb => \&BDB::poll_cb, - ); + # configure BDB - BDB::set_sync_prepare { - my $status; - my $current = $Coro::current; - ( - sub { - $status = $!; - $current->ready; undef $current; - }, - sub { - Coro::schedule while defined $current; - $! = $status; - }, - ) - }; + BDB::min_parallel 8; + BDB::max_poll_reqs $TICK * 0.1; + $Coro::BDB::WATCHER->priority (1); unless ($DB_ENV) { $DB_ENV = BDB::db_env_create; @@ -3695,51 +3671,23 @@ }; } - $BDB_DEADLOCK_WATCHER = Event->timer ( - after => 3, - interval => 1, - hard => 1, - prio => 0, - data => WF_AUTOCANCEL, - cb => sub { - BDB::db_env_lock_detect $DB_ENV, 0, BDB::LOCK_DEFAULT, 0, sub { }; - }, - ); - $BDB_CHECKPOINT_WATCHER = Event->timer ( - after => 11, - interval => 60, - hard => 1, - prio => 0, - data => WF_AUTOCANCEL, - cb => sub { - BDB::db_env_txn_checkpoint $DB_ENV, 0, 0, 0, sub { }; - }, - ); - $BDB_TRICKLE_WATCHER = Event->timer ( - after => 5, - interval => 10, - hard => 1, - prio => 0, - data => WF_AUTOCANCEL, - cb => sub { - BDB::db_env_memp_trickle $DB_ENV, 20, 0, sub { }; - }, - ); + $BDB_DEADLOCK_WATCHER = EV::periodic 0, 3, 0, sub { + BDB::db_env_lock_detect $DB_ENV, 0, BDB::LOCK_DEFAULT, 0, sub { }; + }; + $BDB_CHECKPOINT_WATCHER = EV::periodic 0, 60, 0, sub { + BDB::db_env_txn_checkpoint $DB_ENV, 0, 0, 0, sub { }; + }; + $BDB_TRICKLE_WATCHER = EV::periodic 0, 10, 0, sub { + BDB::db_env_memp_trickle $DB_ENV, 20, 0, sub { }; + }; } { - IO::AIO::min_parallel 8; + # configure IO::AIO - undef $Coro::AIO::WATCHER; + IO::AIO::min_parallel 8; IO::AIO::max_poll_time $TICK * 0.1; - $AIO_POLL_WATCHER = Event->io ( - reentrant => 0, - data => WF_AUTOCANCEL, - fd => IO::AIO::poll_fileno, - poll => 'r', - prio => 0, - cb => \&IO::AIO::poll_cb, - ); + $Coro::AIO::WATCHER->priority (1); } my $_log_backtrace;