ViewVC Help
View File | Revision Log | Show Annotations | Download File
/cvs/deliantra/Deliantra-Client/DC/DB.pm
(Generate patch)

Comparing deliantra/Deliantra-Client/DC/DB.pm (file contents):
Revision 1.29 by root, Wed Dec 26 21:03:21 2007 UTC vs.
Revision 1.33 by root, Tue Mar 25 21:11:51 2008 UTC

1=head1 NAME 1=head1 NAME
2 2
3DC::DB - async. database and filesystem access for cfplus 3DC::DB - async. database and filesystem access for deliantra
4 4
5=head1 SYNOPSIS 5=head1 SYNOPSIS
6 6
7 use DC::DB; 7 use DC::DB;
8 8
22use Config; 22use Config;
23use BDB; 23use BDB;
24 24
25use DC; 25use DC;
26 26
27our $DBDIR = "cfplus-" . BDB::VERSION . "-$Config{archname}"; 27our $ODBDIR = "cfplus-" . BDB::VERSION . "-$Config{archname}";
28our $DBDIR = "client-" . BDB::VERSION . "-$Config{archname}";
28our $DB_HOME = "$Deliantra::VARDIR/$DBDIR"; 29our $DB_HOME = "$Deliantra::VARDIR/$DBDIR";
29 30
31if (!-e $DB_HOME and -e "$Deliantra::VARDIR/$ODBDIR") {
32 rename "$Deliantra::VARDIR/$ODBDIR", $DB_HOME;
33 print STDERR "INFO: moved old database from $Deliantra::VARDIR/$ODBDIR to $DB_HOME\n";
34}
35
30if (!-e $DB_HOME and -e "$Deliantra::OLDDIR/$DBDIR") { 36if (!-e $DB_HOME and -e "$Deliantra::OLDDIR/$ODBDIR") {
31 rename "$Deliantra::OLDDIR/$DBDIR", $DB_HOME; 37 rename "$Deliantra::OLDDIR/$DBDIR", $DB_HOME;
32 print STDERR "INFO: moved old database from $Deliantra::OLDDIR/$DBDIR to $DB_HOME\n"; 38 print STDERR "INFO: moved old database from $Deliantra::OLDDIR/$ODBDIR to $DB_HOME\n";
33} 39}
40
41BDB::max_poll_time 0.03;
42BDB::max_parallel 1;
34 43
35our $DB_ENV; 44our $DB_ENV;
36our $DB_STATE; 45our $DB_STATE;
37our %DB_TABLE; 46our %DB_TABLE;
38 47
39sub open_db { 48sub try_open_db {
40 mkdir $DB_HOME, 0777; 49 mkdir $DB_HOME, 0777;
41 50
42 $DB_ENV = db_env_create; 51 $DB_ENV = db_env_create;
43 52
44 $DB_ENV->set_errfile (\*STDERR); 53 $DB_ENV->set_errfile (\*STDERR);
75 } 84 }
76} 85}
77 86
78############################################################################# 87#############################################################################
79 88
80unless (eval { open_db }) { 89our $WATCHER;
81 warn "$@";#d# 90our $SYNC;
82 eval { File::Path::rmtree $DB_HOME };
83 open_db;
84}
85
86our $WATCHER = EV::io BDB::poll_fileno, EV::READ, \&BDB::poll_cb;
87
88our $SYNC = EV::timer_ns 0, 60, sub {
89 $_[0]->stop;
90 db_env_txn_checkpoint $DB_ENV, 0, 0, 0, sub { };
91};
92
93our $tilemap; 91our $tilemap;
94 92
95sub exists($$$) { 93sub exists($$$) {
96 my ($db, $key, $cb) = @_; 94 my ($db, $key, $cb) = @_;
97 95
166 die "maximum number of transaction retries reached - database problems?"; 164 die "maximum number of transaction retries reached - database problems?";
167} 165}
168 166
169sub get_tile_id_sync($) { 167sub get_tile_id_sync($) {
170 my ($name) = @_; 168 my ($name) = @_;
169
170 $tilemap->{$name} ||= do {
171 my $id;
172 do_get_tile_id $name, sub {
173 $id = $_[0];
174 };
175 BDB::flush;
176 $id
177 }
178}
179
180#############################################################################
181
182sub path_of_res($) {
183 utf8::downgrade $_[0]; # bug in unpack "H*"
184 "$DB_HOME/res-data-" . unpack "H*", $_[0]
185}
186
187sub sync {
188 # for debugging
189 #DC::DB::Server::req (sync => sub { });
190 DC::DB::Server::sync ();
191}
192
193sub unlink($$) {
194 DC::DB::Server::req (unlink => @_);
195}
196
197sub read_file($$) {
198 DC::DB::Server::req (read_file => @_);
199}
200
201sub write_file($$$) {
202 DC::DB::Server::req (write_file => @_);
203}
204
205sub prefetch_file($$$) {
206 DC::DB::Server::req (prefetch_file => @_);
207}
208
209sub logprint($$$) {
210 DC::DB::Server::req (logprint => @_);
211}
212
213#############################################################################
214
215package DC::DB::Server;
216
217use strict;
218
219use EV ();
220use Fcntl;
221
222our %CB;
223our $FH;
224our $ID = "aaa0";
225our ($fh_r_watcher, $fh_w_watcher);
226our $sync_timer;
227our $write_buf;
228our $read_buf;
229
230sub fh_write {
231 my $len = syswrite $FH, $write_buf;
232
233 substr $write_buf, 0, $len, "";
234
235 $fh_w_watcher->stop
236 unless length $write_buf;
237}
238
239sub fh_read {
240 my $status = sysread $FH, $read_buf, 16384, length $read_buf;
241
242 die "FATAL: database process died\n"
243 if $status == 0 && defined $status;
244
245 while () {
246 return if 4 > length $read_buf;
247 my $len = unpack "N", $read_buf;
248
249 return if $len + 4 > length $read_buf;
250
251 substr $read_buf, 0, 4, "";
252 my $res = Storable::thaw substr $read_buf, 0, $len, "";
253
254 my ($id, @args) = @$res;
255 (delete $CB{$id})->(@args);
256 }
257}
258
259sub sync {
260 # biggest mess evarr
261 my $fds; (vec $fds, fileno $FH, 1) = 1;
262
263 while (1 < scalar keys %CB) {
264 my $r = $fds;
265 my $w = length $write_buf ? $fds : undef;
266 select $r, $w, undef, undef;
267
268 fh_write if vec $w, fileno $FH, 1;
269 fh_read if vec $r, fileno $FH, 1;
270 }
271}
272
273sub req {
274 my ($type, @args) = @_;
275 my $cb = pop @args;
276
277 my $id = ++$ID;
278 $write_buf .= pack "N/a*", Storable::freeze [$id, $type, @args];
279 $CB{$id} = $cb;
280
281 $fh_w_watcher->start;
282}
283
284sub do_unlink {
285 unlink $_[0];
286}
287
288sub do_read_file {
289 my ($path) = @_;
290
291 utf8::downgrade $path;
292 open my $fh, "<:raw", $path
293 or return;
294 sysread $fh, my $buf, -s $fh;
295
296 $buf
297}
298
299sub do_write_file {
300 my ($path, $data) = @_;
301
302 utf8::downgrade $path;
303 utf8::downgrade $data;
304 open my $fh, ">:raw", $path
305 or return;
306 syswrite $fh, $data;
307 close $fh;
308
309 1
310}
311
312sub do_prefetch_file {
313 my ($path, $size) = @_;
314
315 utf8::downgrade $path;
316 open my $fh, "<:raw", $path
317 or return;
318 sysread $fh, my $buf, $size;
319
320 1
321}
322
323our %LOG_FH;
324
325sub do_logprint {
326 my ($path, $line) = @_;
327
328 $LOG_FH{$path} ||= do {
329 open my $fh, ">>:utf8", $path
330 or warn "Couldn't open logfile $path: $!";
331
332 $fh->autoflush (1);
333
334 $fh
335 };
336
337 my ($sec, $min, $hour, $mday, $mon, $year) = localtime time;
338
339 my $ts = sprintf "%04d-%02d-%02d %02d:%02d:%02d",
340 $year + 1900, $mon + 1, $mday, $hour, $min, $sec;
341
342 print { $LOG_FH{$path} } "$ts $line\n"
343}
344
345sub run {
346 ($FH, my $fh) = DC::socketpipe;
347
348 my $oldfh = select $FH; $| = 1; select $oldfh;
349 my $oldfh = select $fh; $| = 1; select $oldfh;
350
351 my $pid = fork;
352
353 if (defined $pid && !$pid) {
354 local $SIG{QUIT} = "IGNORE";
355 local $SIG{__DIE__};
356 local $SIG{__WARN__};
357 eval {
358 close $FH;
359
360 while () {
361 4 == read $fh, my $len, 4
362 or last;
363 $len = unpack "N", $len;
364 $len == read $fh, my $req, $len
365 or die "unexpected eof while reading request";
366
367 $req = Storable::thaw $req;
368
369 my ($id, $type, @args) = @$req;
370 my $cb = DC::DB::Server->can ("do_$type")
371 or die "$type: unknown database request type\n";
372 my $res = pack "N/a*", Storable::freeze [$id, $cb->(@args)];
373 (syswrite $fh, $res) == length $res
374 or die "DB::write: $!";
375 }
376 };
377
378 my $error = $@;
379
380 eval {
381 Storable::store_fd [die => $error], $fh;
382 };
383
384 warn $error
385 if $error;
386
387 DC::_exit 0;
388 }
389
390 close $fh;
391 DC::fh_nonblocking $FH, 1;
392
393 $CB{die} = sub { die shift };
394
395 $fh_r_watcher = EV::io $FH, EV::READ , \&fh_read;
396 $fh_w_watcher = EV::io $FH, EV::WRITE, \&fh_write;
397}
398
399sub stop {
400 close $FH;
401}
402
403package DC::DB;
404
405sub open_db {
406 unless (eval { try_open_db }) {
407 warn "$@";#d#
408 eval { File::Path::rmtree $DB_HOME };
409 try_open_db;
410 }
171 411
172 # fetch the full face table first 412 # fetch the full face table first
173 unless ($tilemap) { 413 unless ($tilemap) {
174 do_table facemap => sub { 414 do_table facemap => sub {
175 $tilemap = $_[0]; 415 $tilemap = $_[0];
178 if ((scalar keys %$tilemap) != (scalar keys %maptile)) {#d# 418 if ((scalar keys %$tilemap) != (scalar keys %maptile)) {#d#
179 $tilemap = { };#d# 419 $tilemap = { };#d#
180 DC::error "FATAL: facemap is not a 1:1 mapping, please report this and delete your $DB_HOME directory!\n";#d# 420 DC::error "FATAL: facemap is not a 1:1 mapping, please report this and delete your $DB_HOME directory!\n";#d#
181 }#d# 421 }#d#
182 }; 422 };
183 BDB::flush;
184 }
185
186 $tilemap->{$name} ||= do {
187 my $id;
188 do_get_tile_id $name, sub {
189 $id = $_[0];
190 };
191 BDB::flush;
192 $id
193 }
194}
195
196#############################################################################
197
198sub path_of_res($) {
199 utf8::downgrade $_[0]; # bug in unpack "H*"
200 "$DB_HOME/res-data-" . unpack "H*", $_[0]
201}
202
203sub sync {
204 # for debugging
205 #DC::DB::Server::req (sync => sub { });
206 DC::DB::Server::sync ();
207}
208
209sub unlink($$) {
210 DC::DB::Server::req (unlink => @_);
211}
212
213sub read_file($$) {
214 DC::DB::Server::req (read_file => @_);
215}
216
217sub write_file($$$) {
218 DC::DB::Server::req (write_file => @_);
219}
220
221sub prefetch_file($$$) {
222 DC::DB::Server::req (prefetch_file => @_);
223}
224
225sub logprint($$$) {
226 DC::DB::Server::req (logprint => @_);
227}
228
229package DC::DB::Server;
230
231use strict;
232
233use EV ();
234use Fcntl;
235
236our %CB;
237our $FH;
238our $ID = "aaa0";
239our ($fh_r_watcher, $fh_w_watcher);
240our $sync_timer;
241our $write_buf;
242our $read_buf;
243
244sub fh_write {
245 my $len = syswrite $FH, $write_buf;
246
247 substr $write_buf, 0, $len, "";
248
249 $fh_w_watcher->stop
250 unless length $write_buf;
251}
252
253sub fh_read {
254 my $status = sysread $FH, $read_buf, 16384, length $read_buf;
255
256 die "FATAL: database process died\n"
257 if $status == 0 && defined $status;
258
259 while () {
260 return if 4 > length $read_buf;
261 my $len = unpack "N", $read_buf;
262
263 return if $len + 4 > length $read_buf;
264
265 substr $read_buf, 0, 4, "";
266 my $res = Storable::thaw substr $read_buf, 0, $len, "";
267
268 my ($id, @args) = @$res;
269 (delete $CB{$id})->(@args);
270 }
271}
272
273sub sync {
274 # biggest mess evarr
275 my $fds; (vec $fds, fileno $FH, 1) = 1;
276
277 while (1 < scalar keys %CB) {
278 my $r = $fds;
279 my $w = length $write_buf ? $fds : undef;
280 select $r, $w, undef, undef;
281
282 fh_write if vec $w, fileno $FH, 1;
283 fh_read if vec $r, fileno $FH, 1;
284 }
285}
286
287sub req {
288 my ($type, @args) = @_;
289 my $cb = pop @args;
290
291 my $id = ++$ID;
292 $write_buf .= pack "N/a*", Storable::freeze [$id, $type, @args];
293 $CB{$id} = $cb;
294
295 $fh_w_watcher->start;
296}
297
298sub do_unlink {
299 unlink $_[0];
300}
301
302sub do_read_file {
303 my ($path) = @_;
304
305 utf8::downgrade $path;
306 open my $fh, "<:raw", $path
307 or return;
308 sysread $fh, my $buf, -s $fh;
309
310 $buf
311}
312
313sub do_write_file {
314 my ($path, $data) = @_;
315
316 utf8::downgrade $path;
317 utf8::downgrade $data;
318 open my $fh, ">:raw", $path
319 or return;
320 syswrite $fh, $data;
321 close $fh;
322
323 1 423 }
324}
325 424
326sub do_prefetch_file { 425 $WATCHER = EV::io BDB::poll_fileno, EV::READ, \&BDB::poll_cb;
327 my ($path, $size) = @_; 426 $SYNC = EV::timer_ns 0, 60, sub {
328 427 $_[0]->stop;
329 utf8::downgrade $path; 428 db_env_txn_checkpoint $DB_ENV, 0, 0, 0, sub { };
330 open my $fh, "<:raw", $path
331 or return;
332 sysread $fh, my $buf, $size;
333
334 1
335}
336
337our %LOG_FH;
338
339sub do_logprint {
340 my ($path, $line) = @_;
341
342 $LOG_FH{$path} ||= do {
343 open my $fh, ">>:utf8", $path
344 or warn "Couldn't open logfile $path: $!";
345
346 $fh->autoflush (1);
347
348 $fh
349 }; 429 };
350
351 my ($sec, $min, $hour, $mday, $mon, $year) = localtime time;
352
353 my $ts = sprintf "%04d-%02d-%02d %02d:%02d:%02d",
354 $year + 1900, $mon + 1, $mday, $hour, $min, $sec;
355
356 print { $LOG_FH{$path} } "$ts $line\n"
357} 430}
358 431
359sub run { 432END {
360 ($FH, my $fh) = DC::socketpipe; 433 %DB_TABLE = ();
361 434 undef $DB_ENV;
362 my $oldfh = select $FH; $| = 1; select $oldfh;
363 my $oldfh = select $fh; $| = 1; select $oldfh;
364
365 my $pid = fork;
366
367 if (defined $pid && !$pid) {
368 local $SIG{QUIT};
369 local $SIG{__DIE__};
370 local $SIG{__WARN__};
371 eval {
372 close $FH;
373
374 while () {
375 4 == read $fh, my $len, 4
376 or last;
377 $len = unpack "N", $len;
378 $len == read $fh, my $req, $len
379 or die "unexpected eof while reading request";
380
381 $req = Storable::thaw $req;
382
383 my ($id, $type, @args) = @$req;
384 my $cb = DC::DB::Server->can ("do_$type")
385 or die "$type: unknown database request type\n";
386 my $res = pack "N/a*", Storable::freeze [$id, $cb->(@args)];
387 (syswrite $fh, $res) == length $res
388 or die "DB::write: $!";
389 }
390 };
391
392 my $error = $@;
393
394 eval {
395 Storable::store_fd [die => $error], $fh;
396 };
397
398 warn $error
399 if $error;
400
401 DC::_exit 0;
402 }
403
404 close $fh;
405 DC::fh_nonblocking $FH, 1;
406
407 $CB{die} = sub { die shift };
408
409 $fh_r_watcher = EV::io $FH, EV::READ , \&fh_read;
410 $fh_w_watcher = EV::io $FH, EV::WRITE, \&fh_write;
411}
412
413sub stop {
414 close $FH;
415} 435}
416 436
4171; 4371;
418 438
419=back 439=back

Diff Legend

Removed lines
+ Added lines
< Changed lines
> Changed lines