--- deliantra/server/lib/cf.pm 2007/10/04 23:59:07 1.379 +++ deliantra/server/lib/cf.pm 2008/04/11 21:09:53 1.418 @@ -1,3 +1,24 @@ +# +# This file is part of Deliantra, the Roguelike Realtime MMORPG. +# +# Copyright (©) 2006,2007,2008 Marc Alexander Lehmann / Robin Redeker / the Deliantra team +# +# Deliantra is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# +# The authors can be reached via e-mail to +# + package cf; use utf8; @@ -6,30 +27,31 @@ use Symbol; use List::Util; use Socket; -use Storable; -use Event; +use EV 3.2; use Opcode; use Safe; use Safe::Hole; +use Storable (); -use Coro 3.64 (); +use Coro 4.50 (); use Coro::State; use Coro::Handle; -use Coro::Event; +use Coro::EV; use Coro::Timer; use Coro::Signal; use Coro::Semaphore; use Coro::AIO; +use Coro::BDB; use Coro::Storable; use Coro::Util (); -use JSON::XS (); +use JSON::XS 2.01 (); use BDB (); use Data::Dumper; use Digest::MD5; use Fcntl; -use YAML::Syck (); -use IO::AIO 2.32 (); +use YAML (); +use IO::AIO 2.51 (); use Time::HiRes; use Compress::LZF; use Digest::MD5 (); @@ -40,11 +62,6 @@ Coro::State::cctx_stacksize 256000; # 1-2MB stack, for deep recursions in maze generator Compress::LZF::sfreeze_cr { }; # prime Compress::LZF so it does not use require later -$Event::Eval = 1; # no idea why this is required, but it is - -# work around bug in YAML::Syck - bad news for perl6, will it be as broken wrt. unicode? -$YAML::Syck::ImplicitUnicode = 1; - $Coro::main->prio (Coro::PRIO_MAX); # run main coroutine ("the server") with very high priority sub WF_AUTOCANCEL () { 1 } # automatically cancel this watcher on reload @@ -72,13 +89,12 @@ our $PLAYERDIR = "$LOCALDIR/" . playerdir; our $RANDOMDIR = "$LOCALDIR/random"; our $BDBDIR = "$LOCALDIR/db"; +our %RESOURCE; our $TICK = MAX_TIME * 1e-6; # this is a CONSTANT(!) -our $TICK_WATCHER; our $AIO_POLL_WATCHER; our $NEXT_RUNTIME_WRITE; # when should the runtime file be written our $NEXT_TICK; -our $NOW; our $USE_FSYNC = 1; # use fsync to write maps - default off our $BDB_POLL_WATCHER; @@ -91,6 +107,7 @@ our $UPTIME; $UPTIME ||= time; our $RUNTIME; +our $NOW; our (%PLAYER, %PLAYER_LOADING); # all users our (%MAP, %MAP_LOADING ); # all maps @@ -101,7 +118,8 @@ our $LOAD; # a number between 0 (idle) and 1 (too many objects) our $LOADAVG; # same thing, but with alpha-smoothing -our $tick_start; # for load detecting purposes +our $JITTER; # average jitter +our $TICK_START; # for load detecting purposes binmode STDOUT; binmode STDERR; @@ -164,7 +182,7 @@ =item %cf::CFG -Configuration for the server, loaded from C, or +Configuration for the server, loaded from C, or from wherever your confdir points to. =item cf::wait_for_tick, cf::wait_for_tick_begin @@ -196,6 +214,21 @@ }; } +$Coro::State::DIEHOOK = sub { + return unless $^S eq 0; # "eq", not "==" + + if ($Coro::current == $Coro::main) {#d# + warn "DIEHOOK called in main context, Coro bug?\n";#d# + return;#d# + }#d# + + # kill coroutine otherwise + warn Carp::longmess $_[0]; + Coro::terminate +}; + +$SIG{__DIE__} = sub { }; #d#? + @safe::cf::global::ISA = @cf::global::ISA = 'cf::attachable'; @safe::cf::object::ISA = @cf::object::ISA = 'cf::attachable'; @safe::cf::player::ISA = @cf::player::ISA = 'cf::attachable'; @@ -217,7 +250,7 @@ @{"safe::$pkg\::wrap::ISA"} = @{"$pkg\::wrap::ISA"} = $pkg; } -$Event::DIED = sub { +$EV::DIED = sub { warn "error in event callback: @_"; }; @@ -250,11 +283,11 @@ } || "[unable to dump $_[0]: '$@']"; } -=item $ref = cf::from_json $json +=item $ref = cf::decode_json $json Converts a JSON string into the corresponding perl data structure. -=item $json = cf::to_json $ref +=item $json = cf::encode_json $ref Converts a perl data structure into its JSON representation. @@ -262,8 +295,8 @@ our $json_coder = JSON::XS->new->utf8->max_size (1e6); # accept ~1mb max -sub to_json ($) { $json_coder->encode ($_[0]) } -sub from_json ($) { $json_coder->decode ($_[0]) } +sub encode_json($) { $json_coder->encode ($_[0]) } +sub decode_json($) { $json_coder->decode ($_[0]) } =item cf::lock_wait $string @@ -329,13 +362,24 @@ } sub freeze_mainloop { - return unless $TICK_WATCHER->is_active; + tick_inhibit_inc; - my $guard = Coro::guard { - $TICK_WATCHER->start; - }; - $TICK_WATCHER->stop; - $guard + Coro::guard \&tick_inhibit_dec; +} + +=item cf::periodic $interval, $cb + +Like EV::periodic, but randomly selects a starting point so that the actions +get spread over timer. + +=cut + +sub periodic($$) { + my ($interval, $cb) = @_; + + my $start = rand List::Util::min 180, $interval; + + EV::periodic $start, $interval, 0, $cb } =item cf::get_slot $time[, $priority[, $name]] @@ -375,7 +419,7 @@ } if (@SLOT_QUEUE) { - # we do not use wait_For_tick() as it returns immediately when tick is inactive + # we do not use wait_for_tick() as it returns immediately when tick is inactive push @cf::WAIT_FOR_TICK, $signal; $signal->wait; } else { @@ -408,8 +452,8 @@ =item cf::sync_job { BLOCK } -The design of Crossfire TRT requires that the main coroutine ($Coro::main) -is always able to handle events or runnable, as Crossfire TRT is only +The design of Deliantra requires that the main coroutine ($Coro::main) +is always able to handle events or runnable, as Deliantra is only partly reentrant. Thus "blocking" it by e.g. waiting for I/O is not acceptable. @@ -424,15 +468,13 @@ my ($job) = @_; if ($Coro::current == $Coro::main) { - my $time = Event::time; + my $time = EV::time; # this is the main coro, too bad, we have to block # till the operation succeeds, freezing the server :/ - LOG llevError | logBacktrace, Carp::longmess "sync job";#d# + LOG llevError, Carp::longmess "sync job";#d# - # TODO: use suspend/resume instead - # (but this is cancel-safe) my $freeze_guard = freeze_mainloop; my $busy = 1; @@ -446,15 +488,16 @@ })->prio (Coro::PRIO_MAX); while ($busy) { - Coro::cede or Event::one_event; + if (Coro::nready) { + Coro::cede_notself; + } else { + EV::loop EV::LOOP_ONESHOT; + } } - $time = Event::time - $time; - - LOG llevError | logBacktrace, Carp::longmess "long sync job" - if $time > $TICK * 0.5 && $TICK_WATCHER->is_active; + my $time = EV::time - $time; - $tick_start += $time; # do not account sync jobs to server load + $TICK_START += $time; # do not account sync jobs to server load wantarray ? @res : $res[0] } else { @@ -489,7 +532,7 @@ Executes the given code block with the given arguments in a seperate process, returning the results. Everything must be serialisable with Coro::Storable. May, of course, block. Note that the executed sub may -never block itself or use any form of Event handling. +never block itself or use any form of event handling. =cut @@ -508,6 +551,33 @@ wantarray ? @res : $res[-1] } +=item $coin = coin_from_name $name + +=cut + +our %coin_alias = ( + "silver" => "silvercoin", + "silvercoin" => "silvercoin", + "silvercoins" => "silvercoin", + "gold" => "goldcoin", + "goldcoin" => "goldcoin", + "goldcoins" => "goldcoin", + "platinum" => "platinacoin", + "platinumcoin" => "platinacoin", + "platinumcoins" => "platinacoin", + "platina" => "platinacoin", + "platinacoin" => "platinacoin", + "platinacoins" => "platinacoin", + "royalty" => "royalty", + "royalties" => "royalty", +); + +sub coin_from_name($) { + $coin_alias{$_[0]} + ? cf::arch::find $coin_alias{$_[0]} + : undef +} + =item $value = cf::db_get $family => $key Returns a single value from the environment database. @@ -660,7 +730,7 @@ In the following description, CLASS can be any of C, C C, C or C (i.e. the attachable objects in -Crossfire TRT). +Deliantra). =over 4 @@ -952,7 +1022,7 @@ 0 } -=item $bool = cf::global::invoke (EVENT_CLASS_XXX, ...) +=item $bool = cf::global->invoke (EVENT_CLASS_XXX, ...) =item $bool = $attachable->invoke (EVENT_CLASS_XXX, ...) @@ -968,16 +1038,47 @@ ############################################################################# # object support -# +sub _object_equal($$); +sub _object_equal($$) { + my ($a, $b) = @_; + + return 0 unless (ref $a) eq (ref $b); + + if ("HASH" eq ref $a) { + my @ka = keys %$a; + my @kb = keys %$b; + + return 0 if @ka != @kb; + + for (0 .. $#ka) { + return 0 unless $ka[$_] eq $kb[$_]; + return 0 unless _object_equal $a->{$ka[$_]}, $b->{$kb[$_]}; + } + + } elsif ("ARRAY" eq ref $a) { + + return 0 if @$a != @$b; + + for (0 .. $#$a) { + return 0 unless _object_equal $a->[$_], $b->[$_]; + } + + } elsif ($a ne $b) { + return 0; + } + + 1 +} + +our $SLOW_MERGES;#d# sub _can_merge { my ($ob1, $ob2) = @_; - local $Storable::canonical = 1; - my $fob1 = Storable::freeze $ob1; - my $fob2 = Storable::freeze $ob2; + ++$SLOW_MERGES;#d# - $fob1 eq $fob2 + # we do the slow way here + return _object_equal $ob1, $ob2 } sub reattach { @@ -1009,7 +1110,7 @@ on_instantiate => sub { my ($obj, $data) = @_; - $data = from_json $data; + $data = decode_json $data; for (@$data) { my ($name, $args) = @$_; @@ -1048,7 +1149,7 @@ if (@$objs) { if (my $fh = aio_open "$filename.pst~", O_WRONLY | O_CREAT, 0600) { chmod SAVE_MODE, $fh; - my $data = Storable::nfreeze { version => 1, objs => $objs }; + my $data = Coro::Storable::nfreeze { version => 1, objs => $objs }; aio_write $fh, 0, (length $data), $data, 0; aio_fsync $fh if $cf::USE_FSYNC; close $fh; @@ -1089,7 +1190,8 @@ (aio_load "$filename.pst", $av) >= 0 or return; - $av = eval { (Storable::thaw $av)->{objs} }; + my $st = eval { Coro::Storable::thaw $av }; + $av = $st->{objs}; } utf8::decode (my $decname = $filename); @@ -1255,8 +1357,7 @@ if (exists $v->{meta}{mandatory}) { warn $msg; - warn "mandatory extension failed to load, exiting.\n"; - exit 1; + cf::cleanup "mandatory extension failed to load, exiting."; } warn $msg; @@ -1514,46 +1615,88 @@ =cut +use re 'eval'; + +my $group; +my $interior; $interior = qr{ + # match a pod interior sequence sans C<< >> + (?: + \ (.*?)\ (?{ $group = $^N }) + | < (??{$interior}) > + ) +}x; + sub expand_cfpod { - ((my $self), (local $_)) = @_; + my ($self, $pod) = @_; + + my $xml; - # escape & and < - s/&/&/g; - s/(?, I<>, U<> etc. - s/B<([^\>]*)>/$1<\/b>/ - || s/I<([^\>]*)>/$1<\/i>/ - || s/U<([^\>]*)>/$1<\/u>/ - || s/T<([^\>]*)>/$1<\/b><\/big>/ - # replace G tags - || s{G<([^>|]*)\|([^>]*)>}{ - $self->gender ? $2 : $1 - }ge - # replace H - || s{H<([^\>]*)>} - { - ("[$1 (Use hintmode to suppress hints)]", - "[Hint suppressed, see hintmode]", - "") - [$self->{hintmode}] - }ge; - - # create single paragraphs (very hackish) - s/(?<=\S)\n(?=\w)/ /g; - - # compress some whitespace - s/\s+\n/\n/g; # ws line-ends - s/\n\n+/\n/g; # double lines - s/^\n+//; # beginning lines - s/\n+$//; # ending lines + while () { + if ($pod =~ /\G( (?: [^BCGHITU]+ | .(?!<) )+ )/xgcs) { + $group = $1; - $_ + $group =~ s/&/&/g; + $group =~ s/]*) (?{ $group = $^N }) + | < $interior > + ) + > + %gcsx + ) { + my ($code, $data) = ($1, $group); + + if ($code eq "B") { + $xml .= "" . expand_cfpod ($self, $data) . ""; + } elsif ($code eq "I") { + $xml .= "" . expand_cfpod ($self, $data) . ""; + } elsif ($code eq "U") { + $xml .= "" . expand_cfpod ($self, $data) . ""; + } elsif ($code eq "C") { + $xml .= "" . expand_cfpod ($self, $data) . ""; + } elsif ($code eq "T") { + $xml .= "" . expand_cfpod ($self, $data) . ""; + } elsif ($code eq "G") { + my ($male, $female) = split /\|/, $data; + $data = $self->gender ? $female : $male; + $xml .= expand_cfpod ($self, $data); + } elsif ($code eq "H") { + $xml .= ("[" . expand_cfpod ($self, $data) . " (Use hintmode to suppress hints)]", + "[Hint suppressed, see hintmode]", + "") + [$self->{hintmode}]; + } else { + $xml .= "error processing '$code($data)' directive"; + } + } else { + if ($pod =~ /\G(.+)/) { + warn "parse error while expanding $pod (at $1)"; + } + last; + } + } + + for ($xml) { + # create single paragraphs (very hackish) + s/(?<=\S)\n(?=\w)/ /g; + + # compress some whitespace + s/\s+\n/\n/g; # ws line-ends + s/\n\n+/\n/g; # double lines + s/^\n+//; # beginning lines + s/\n+$//; # ending lines + } + + $xml } +no re 'eval'; + sub hintmode { $_[0]{hintmode} = $_[1] if @_ > 1; $_[0]{hintmode} @@ -1634,6 +1777,9 @@ sub generate_random_map { my ($self, $rmp) = @_; + + my $lock = cf::lock_acquire "generate_random_map"; # the random map generator is NOT reentrant ATM + # mit "rum" bekleckern, nicht $self->_create_random_map ( $rmp->{wallstyle}, $rmp->{wall_name}, $rmp->{floorstyle}, $rmp->{monsterstyle}, @@ -2236,8 +2382,6 @@ ] } -package cf; - =back =head3 cf::object @@ -2549,7 +2693,7 @@ $rmp->{random_seed} ||= $exit->random_seed; - my $data = cf::to_json $rmp; + my $data = cf::encode_json $rmp; my $md5 = Digest::MD5::md5_hex $data; my $meta = "$RANDOMDIR/$md5.meta"; @@ -2632,6 +2776,7 @@ =cut +# non-persistent channels (usually the info channel) our %CHANNEL = ( "c/identify" => { id => "infobox", @@ -2645,12 +2790,42 @@ reply => undef, tooltip => "Signs and other items you examined", }, + "c/book" => { + id => "infobox", + title => "Book", + reply => undef, + tooltip => "The contents of a note or book", + }, "c/lookat" => { id => "infobox", title => "Look", reply => undef, tooltip => "What you saw there", }, + "c/who" => { + id => "infobox", + title => "Players", + reply => undef, + tooltip => "Shows players who are currently online", + }, + "c/body" => { + id => "infobox", + title => "Body Parts", + reply => undef, + tooltip => "Shows which body parts you posess and are available", + }, + "c/uptime" => { + id => "infobox", + title => "Uptime", + reply => undef, + tooltip => "How long the server has been running since last restart", + }, + "c/mapinfo" => { + id => "infobox", + title => "Map Info", + reply => undef, + tooltip => "Information related to the maps", + }, ); sub cf::client::send_msg { @@ -2896,7 +3071,7 @@ The following functions and methods are available within a safe environment: cf::object - contr pay_amount pay_player map x y force_find force_add + contr pay_amount pay_player map x y force_find force_add destroy insert remove name archname title slaying race decrease_ob_nr cf::object::player @@ -2913,7 +3088,7 @@ for ( ["cf::object" => qw(contr pay_amount pay_player map force_find force_add x y insert remove inv name archname title slaying race - decrease_ob_nr)], + decrease_ob_nr destroy)], ["cf::object::player" => qw(player)], ["cf::player" => qw(peaceful)], ["cf::map" => qw(trigger)], @@ -3028,6 +3203,7 @@ while (my ($face, $info) = each %$faces) { my $idx = (cf::face::find $face) || cf::face::alloc $face; + cf::face::set_visibility $idx, $info->{visibility}; cf::face::set_magicmap $idx, $info->{magicmap}; cf::face::set_data $idx, 0, $info->{data32}, Digest::MD5::md5 $info->{data32}; @@ -3038,8 +3214,10 @@ while (my ($face, $info) = each %$faces) { next unless $info->{smooth}; + my $idx = cf::face::find $face or next; + if (my $smooth = cf::face::find $info->{smooth}) { cf::face::set_smooth $idx, $smooth; cf::face::set_smoothlevel $idx, $info->{smoothlevel}; @@ -3067,52 +3245,58 @@ # that gcfclient doesn't grok a >10000 face index. my $res = $facedata->{resource}; - my $soundconf = delete $res->{"res/sound.conf"}; - while (my ($name, $info) = each %$res) { - my $idx = (cf::face::find $name) || cf::face::alloc $name; - my $data; - - if ($info->{type} & 1) { - # prepend meta info + if (defined $info->{type}) { + my $idx = (cf::face::find $name) || cf::face::alloc $name; + my $data; + + if ($info->{type} & 1) { + # prepend meta info + + my $meta = $enc->encode ({ + name => $name, + %{ $info->{meta} || {} }, + }); - my $meta = $enc->encode ({ - name => $name, - %{ $info->{meta} || {} }, - }); + $data = pack "(w/a*)*", $meta, $info->{data}; + } else { + $data = $info->{data}; + } - $data = pack "(w/a*)*", $meta, $info->{data}; + cf::face::set_data $idx, 0, $data, Digest::MD5::md5 $data; + cf::face::set_type $idx, $info->{type}; } else { - $data = $info->{data}; + $RESOURCE{$name} = $info; } - cf::face::set_data $idx, 0, $data, Digest::MD5::md5 $data; - cf::face::set_type $idx, $info->{type}; - cf::cede_to_tick; } + } - if ($soundconf) { - $soundconf = $enc->decode (delete $soundconf->{data}); + cf::global->invoke (EVENT_GLOBAL_RESOURCE_UPDATE); - for (0 .. SOUND_CAST_SPELL_0 - 1) { - my $sound = $soundconf->{compat}[$_] - or next; + 1 +} - my $face = cf::face::find "sound/$sound->[1]"; - cf::sound::set $sound->[0] => $face; - cf::sound::old_sound_index $_, $face; # gcfclient-compat - } +cf::global->attach (on_resource_update => sub { + if (my $soundconf = $RESOURCE{"res/sound.conf"}) { + $soundconf = JSON::XS->new->utf8->relaxed->decode ($soundconf->{data}); - while (my ($k, $v) = each %{$soundconf->{event}}) { - my $face = cf::face::find "sound/$v"; - cf::sound::set $k => $face; - } + for (0 .. SOUND_CAST_SPELL_0 - 1) { + my $sound = $soundconf->{compat}[$_] + or next; + + my $face = cf::face::find "sound/$sound->[1]"; + cf::sound::set $sound->[0] => $face; + cf::sound::old_sound_index $_, $face; # gcfclient-compat } - } - 1 -} + while (my ($k, $v) = each %{$soundconf->{event}}) { + my $face = cf::face::find "sound/$v"; + cf::sound::set $k => $face; + } + } +}); register_exticmd fx_want => sub { my ($ns, $want) = @_; @@ -3177,7 +3361,7 @@ or return; local $/; - *CFG = YAML::Syck::Load <$fh>; + *CFG = YAML::Load <$fh>; $EMERGENCY_POSITION = $CFG{emergency_position} || ["/world/world_105_115", 5, 37]; @@ -3199,7 +3383,7 @@ Carp::cluck "FATAL: Coro::idle was called, major BUG, use cf::sync_job!\n";#d# (async { $Coro::current->{desc} = "IDLE BUG HANDLER"; - Event::one_event; + EV::loop EV::LOOP_ONESHOT; })->prio (Coro::PRIO_MAX); }; @@ -3207,8 +3391,9 @@ db_init; load_extensions; - $TICK_WATCHER->start; - Event::loop; + $Coro::current->prio (Coro::PRIO_MAX); # give the main loop max. priority + evthread_start IO::AIO::poll_fileno; + EV::loop; } ############################################################################# @@ -3216,20 +3401,15 @@ # install some emergency cleanup handlers BEGIN { + our %SIGWATCHER = (); for my $signal (qw(INT HUP TERM)) { - Event->signal ( - reentrant => 0, - data => WF_AUTOCANCEL, - signal => $signal, - prio => 0, - cb => sub { - cf::cleanup "SIG$signal"; - }, - ); + $SIGWATCHER{$signal} = EV::signal $signal, sub { + cf::cleanup "SIG$signal"; + }; } } -sub write_runtime { +sub write_runtime_sync { my $runtime = "$LOCALDIR/runtime"; # first touch the runtime file to show we are still running: @@ -3267,6 +3447,49 @@ 1 } +our $uuid_lock; +our $uuid_skip; + +sub write_uuid_sync($) { + $uuid_skip ||= $_[0]; + + return if $uuid_lock; + local $uuid_lock = 1; + + my $uuid = "$LOCALDIR/uuid"; + + my $fh = aio_open "$uuid~", O_WRONLY | O_CREAT, 0644 + or return; + + my $value = uuid_str $uuid_skip + uuid_seq uuid_cur; + $uuid_skip = 0; + + (aio_write $fh, 0, (length $value), $value, 0) <= 0 + and return; + + # always fsync - this file is important + aio_fsync $fh + and return; + + close $fh + or return; + + aio_rename "$uuid~", $uuid + and return; + + warn "uuid file written ($value).\n"; + + 1 + +} + +sub write_uuid($$) { + my ($skip, $sync) = @_; + + $sync ? write_uuid_sync $skip + : async { write_uuid_sync $skip }; +} + sub emergency_save() { my $freeze_guard = cf::freeze_mainloop; @@ -3280,6 +3503,7 @@ for my $login (keys %cf::PLAYER) { my $pl = $cf::PLAYER{$login} or next; $pl->valid or next; + delete $pl->{unclean_save}; # not strictly necessary, but cannot hurt $pl->save; } warn "end emergency player save\n"; @@ -3295,6 +3519,10 @@ warn "begin emergency database checkpoint\n"; BDB::db_env_txn_checkpoint $DB_ENV; warn "end emergency database checkpoint\n"; + + warn "begin write uuid\n"; + write_uuid_sync 1; + warn "end write uuid\n"; }; warn "leave emergency perl save\n"; @@ -3319,25 +3547,20 @@ warn "entering sync_job"; cf::sync_job { - cf::write_runtime; # external watchdog should not bark + cf::write_runtime_sync; # external watchdog should not bark cf::emergency_save; - cf::write_runtime; # external watchdog should not bark + cf::write_runtime_sync; # external watchdog should not bark warn "syncing database to disk"; BDB::db_env_txn_checkpoint $DB_ENV; # if anything goes wrong in here, we should simply crash as we already saved - warn "cancelling all WF_AUTOCANCEL watchers"; - for (Event::all_watchers) { - $_->cancel if $_->data & WF_AUTOCANCEL; - } - warn "flushing outstanding aio requests"; for (;;) { BDB::flush; IO::AIO::flush; - Coro::cede; + Coro::cede_notself; last unless IO::AIO::nreqs || BDB::nreqs; warn "iterate..."; } @@ -3423,8 +3646,7 @@ 1 } or do { warn $@; - warn "error while reloading, exiting."; - exit 1; + cf::cleanup "error while reloading, exiting."; }; warn "reloaded"; @@ -3436,15 +3658,10 @@ # doing reload synchronously and two reloads happen back-to-back, # coro crashes during coro_state_free->destroy here. - $RELOAD_WATCHER ||= Event->timer ( - reentrant => 0, - after => 0, - data => WF_AUTOCANCEL, - cb => sub { - do_reload_perl; - undef $RELOAD_WATCHER; - }, - ); + $RELOAD_WATCHER ||= EV::timer 0, 0, sub { + do_reload_perl; + undef $RELOAD_WATCHER; + }; } register_command "reload" => sub { @@ -3467,7 +3684,7 @@ our @WAIT_FOR_TICK_BEGIN; sub wait_for_tick { - return unless $TICK_WATCHER->is_active; + return if tick_inhibit; return if $Coro::current == $Coro::main; my $signal = new Coro::Signal; @@ -3476,7 +3693,7 @@ } sub wait_for_tick_begin { - return unless $TICK_WATCHER->is_active; + return if tick_inhibit; return if $Coro::current == $Coro::main; my $signal = new Coro::Signal; @@ -3484,83 +3701,49 @@ $signal->wait; } -$TICK_WATCHER = Event->timer ( - reentrant => 0, - parked => 1, - prio => 0, - at => $NEXT_TICK || $TICK, - data => WF_AUTOCANCEL, - cb => sub { - if ($Coro::current != $Coro::main) { - Carp::cluck "major BUG: server tick called outside of main coro, skipping it" - unless ++$bug_warning > 10; - return; - } - - $NOW = $tick_start = Event::time; - - cf::server_tick; # one server iteration - - $RUNTIME += $TICK; - $NEXT_TICK += $TICK; - - if ($NOW >= $NEXT_RUNTIME_WRITE) { - $NEXT_RUNTIME_WRITE = $NOW + 10; - Coro::async_pool { - $Coro::current->{desc} = "runtime saver"; - write_runtime - or warn "ERROR: unable to write runtime file: $!"; - }; - } - - if (my $sig = shift @WAIT_FOR_TICK_BEGIN) { - $sig->send; - } - while (my $sig = shift @WAIT_FOR_TICK) { - $sig->send; - } +sub tick { + if ($Coro::current != $Coro::main) { + Carp::cluck "major BUG: server tick called outside of main coro, skipping it" + unless ++$bug_warning > 10; + return; + } - $NOW = Event::time; + cf::server_tick; # one server iteration - # if we are delayed by four ticks or more, skip them all - $NEXT_TICK = $NOW if $NOW >= $NEXT_TICK + $TICK * 4; + if ($NOW >= $NEXT_RUNTIME_WRITE) { + $NEXT_RUNTIME_WRITE = List::Util::max $NEXT_RUNTIME_WRITE + 10, $NOW + 5.; + Coro::async_pool { + $Coro::current->{desc} = "runtime saver"; + write_runtime_sync + or warn "ERROR: unable to write runtime file: $!"; + }; + } - $TICK_WATCHER->at ($NEXT_TICK); - $TICK_WATCHER->start; + if (my $sig = shift @WAIT_FOR_TICK_BEGIN) { + $sig->send; + } + while (my $sig = shift @WAIT_FOR_TICK) { + $sig->send; + } - $LOAD = ($NOW - $tick_start) / $TICK; - $LOADAVG = $LOADAVG * 0.75 + $LOAD * 0.25; + $LOAD = ($NOW - $TICK_START) / $TICK; + $LOADAVG = $LOADAVG * 0.75 + $LOAD * 0.25; - _post_tick; - }, -); + if (0) { + if ($NEXT_TICK) { + my $jitter = $TICK_START - $NEXT_TICK; + $JITTER = $JITTER * 0.75 + $jitter * 0.25; + warn "jitter $JITTER\n";#d# + } + } +} { - BDB::min_parallel 8; - BDB::max_poll_time $TICK * 0.1; - $BDB_POLL_WATCHER = Event->io ( - reentrant => 0, - fd => BDB::poll_fileno, - poll => 'r', - prio => 0, - data => WF_AUTOCANCEL, - cb => \&BDB::poll_cb, - ); + # configure BDB - BDB::set_sync_prepare { - my $status; - my $current = $Coro::current; - ( - sub { - $status = $!; - $current->ready; undef $current; - }, - sub { - Coro::schedule while defined $current; - $! = $status; - }, - ) - }; + BDB::min_parallel 8; + BDB::max_poll_reqs $TICK * 0.1; + $Coro::BDB::WATCHER->priority (1); unless ($DB_ENV) { $DB_ENV = BDB::db_env_create; @@ -3585,51 +3768,23 @@ }; } - $BDB_DEADLOCK_WATCHER = Event->timer ( - after => 3, - interval => 1, - hard => 1, - prio => 0, - data => WF_AUTOCANCEL, - cb => sub { - BDB::db_env_lock_detect $DB_ENV, 0, BDB::LOCK_DEFAULT, 0, sub { }; - }, - ); - $BDB_CHECKPOINT_WATCHER = Event->timer ( - after => 11, - interval => 60, - hard => 1, - prio => 0, - data => WF_AUTOCANCEL, - cb => sub { - BDB::db_env_txn_checkpoint $DB_ENV, 0, 0, 0, sub { }; - }, - ); - $BDB_TRICKLE_WATCHER = Event->timer ( - after => 5, - interval => 10, - hard => 1, - prio => 0, - data => WF_AUTOCANCEL, - cb => sub { - BDB::db_env_memp_trickle $DB_ENV, 20, 0, sub { }; - }, - ); + $BDB_DEADLOCK_WATCHER = EV::periodic 0, 3, 0, sub { + BDB::db_env_lock_detect $DB_ENV, 0, BDB::LOCK_DEFAULT, 0, sub { }; + }; + $BDB_CHECKPOINT_WATCHER = EV::periodic 0, 60, 0, sub { + BDB::db_env_txn_checkpoint $DB_ENV, 0, 0, 0, sub { }; + }; + $BDB_TRICKLE_WATCHER = EV::periodic 0, 10, 0, sub { + BDB::db_env_memp_trickle $DB_ENV, 20, 0, sub { }; + }; } { - IO::AIO::min_parallel 8; + # configure IO::AIO - undef $Coro::AIO::WATCHER; + IO::AIO::min_parallel 8; IO::AIO::max_poll_time $TICK * 0.1; - $AIO_POLL_WATCHER = Event->io ( - reentrant => 0, - data => WF_AUTOCANCEL, - fd => IO::AIO::poll_fileno, - poll => 'r', - prio => 0, - cb => \&IO::AIO::poll_cb, - ); + $Coro::AIO::WATCHER->priority (1); } my $_log_backtrace;