#!/opt/bin/perl my ($cachemeta, $cachedata, $sizegb, $orig, $apply) = @ARGV or die "Usage: $0 /cache/meta /cache/data size_gb origdevice\n"; use common::sense; use IO::AIO (); use Coro::AIO (); use Linux::NBD; use Linux::NBD::Server; use POSIX (); use IO::Socket::UNIX; our $SYNC = 1; our $NTFS_COMPRESSION = 1; sub BLKSIZE() { 512 } my $blocks = qx * 512 / BLKSIZE; sysopen my $orig_fh, $orig, IO::AIO::O_RDWR or die "$orig: $!"; sysopen my $meta_fh, "$cachemeta", IO::AIO::O_RDWR | IO::AIO::O_CREAT or die "$cachemeta: $!"; sysopen my $data_fh, "$cachedata", IO::AIO::O_RDWR | IO::AIO::O_CREAT or die "$cachedata: $!"; my $cacheblocks = $sizegb * 1024 * 1024 * 1024 / BLKSIZE; my $meta_data; my $data_data; sub mapin { Coro::AIO::aio_truncate $meta_fh, $blocks * 4; Coro::AIO::aio_truncate $data_fh, $cacheblocks * BLKSIZE; IO::AIO::mmap $meta_data, $blocks * 4 , IO::AIO::PROT_READ | IO::AIO::PROT_WRITE, IO::AIO::MAP_SHARED, $meta_fh, 0; IO::AIO::mmap $data_data, $cacheblocks * BLKSIZE, IO::AIO::PROT_READ | IO::AIO::PROT_WRITE, IO::AIO::MAP_SHARED, $data_fh, 0; # IO::AIO::madvise $meta_data, 0, undef, IO::AIO::MADV_RANDOM; # IO::AIO::madvise $data_data, 0, undef, IO::AIO::MADV_RANDOM; } system "chattr +C \Q$cachemeta\E >/dev/null 2>&1"; system "chattr +C \Q$cachedata\E >/dev/null 2>&1"; system "btrfs property set \Q$cachemeta\E compression '' >/dev/null 2>&1"; system "btrfs property set \Q$cachedata\E compression '' >/dev/null 2>&1"; mapin; IO::AIO::mmap my $orig_data, $blocks * BLKSIZE, IO::AIO::PROT_READ | IO::AIO::PROT_WRITE, IO::AIO::MAP_SHARED, $orig_fh, 0; package myserver; use common::sense; sub BLKSIZE() { 512 } our @ISA = Linux::NBD::Server::; my $alloc = 0; #my $need_sync = 0; sub apply { my ($reset) = @_; Coro::AIO::aio_msync $cachedata, 0, undef, IO::AIO::MS_SYNC and die "cachedata msync: $!\n"; Coro::AIO::aio_msync $cachemeta, 0, undef, IO::AIO::MS_SYNC and die "cachemeta msync: $!\n"; my ($blk, $idx); my ($blk_nxt, $blk_buf, $blk_total); my $blk_flush = sub { return unless length $blk_buf; $blk_nxt -= (length $blk_buf) / BLKSIZE; $blk_total += length $blk_buf; if ($NTFS_COMPRESSION and 0x10000 > length $blk_buf) { printf "patch back blk %d len %d (total: %.3fGB)\n", $blk_nxt, length $blk_buf, $blk_total / 1e9; my $end = $blk_nxt * BLKSIZE + length $blk_buf; my $len = 0x10000 - length $blk_buf; $blk_buf .= substr $orig_data, $end, $len; die unless $end + $len == ($blk_nxt + 128) * BLKSIZE; # die unless 65536 == length $blk_buf; } else { printf "write back blk %d len %d (total: %.3fGB)\n", $blk_nxt, length $blk_buf, $blk_total / 1e9; } Coro::AIO::aio_write $orig_fh, $blk_nxt * BLKSIZE, undef, $blk_buf, 0;#d# #substr $orig_data, $blk_nxt * BLKSIZE, length $blk_buf, $blk_buf; $blk_buf = ""; }; my $nul = (pack "L", 0) x 65536; for (my $base = 0; $base * 4 < length $meta_data; $base += 65536) { if ($nul ne substr $meta_data, $base * 4, 65536 * 4) { # next unless $base >= 3585490256; # next unless $base >= 3643254984 - 65536; # next unless $base >= 3644405080; for $blk ($base .. $base + 65535) { $idx = unpack "L", substr $meta_data, $blk * 4, 4; if ($idx) { if ($blk_nxt != $blk or (64 * 1024 * 1024) <= length $blk_buf) { $blk_flush->(); } #warn "apply cache blk $idx to orig blk $blk\n"; $blk_nxt = $blk + 1; $blk_buf .= substr $data_data, $idx * BLKSIZE, BLKSIZE; #IO::AIO::aio_write $orig_fh, $blk * BLKSIZE, BLKSIZE, $data_data, $idx * BLKSIZE, sub { }; #substr $orig_data, $blk * BLKSIZE, BLKSIZE, substr $data_data, $idx * BLKSIZE, BLKSIZE; } } } } $blk_flush->(); print "done, syncing...\n"; Coro::AIO::aio_fsync $orig_fh if $SYNC; if ($reset) { print "resetting\n"; undef $meta_data; undef $data_data; Coro::AIO::aio_truncate $meta_fh, 0; Coro::AIO::aio_truncate $data_fh, 0; ::mapin; Coro::AIO::aio_fsync $meta_fh if $SYNC; Coro::AIO::aio_fsync $data_fh if $SYNC; } $alloc = 0; } if ($apply) { apply $apply eq "r"; #exit 0; # maybe unsafe? } { my $nul = "\x00" x 65536; for (my $ofs = 0; $ofs < length $meta_data; $ofs += 65536) { if ($nul ne substr $meta_data, $ofs, 65536) { for (unpack "L*", substr $meta_data, $ofs, 65536) { $alloc = $_ if $_ > $alloc; } } } } print "ALLOC is $alloc\n"; sub sync { Coro::AIO::aio_fsync $data_fh if $SYNC; Coro::AIO::aio_fsync $meta_fh if $SYNC; } sub req_read { my ($self, $handle, $ofs, $len) = @_; # warn "get read $ofs,$len\n";#d# my $buf; die if $ofs & (BLKSIZE - 1); die if $len & (BLKSIZE - 1); $ofs /= BLKSIZE; $len /= BLKSIZE; while ($len > 0) { my $idx = unpack "L", substr $meta_data, $ofs * 4, 4; if ($idx) { $buf .= substr $data_data, $idx * BLKSIZE, BLKSIZE; } else { $buf .= substr $orig_data, $ofs * BLKSIZE, BLKSIZE; } ++$ofs; --$len; } $self->reply ($handle, 0, $buf); } sub req_write { my ($self, $handle, $ofs, $buf) = @_; # print "write $ofs, ", length $buf, "\n"; die if $ofs & (BLKSIZE - 1); die if (length $buf) & (BLKSIZE - 1); $ofs /= BLKSIZE; my $len = (length $buf) / BLKSIZE; while (length $buf) { my $idx = unpack "L", substr $meta_data, $ofs * 4, 4; if ($idx) { substr $data_data, $idx * BLKSIZE, BLKSIZE, substr $buf, 0, BLKSIZE; } else { if ($alloc + 1 >= $cacheblocks) { print "cache full, applying...\n"; #$need_sync = 1; sync; apply 1; print "cache applied\n"; } $idx = ++$alloc; substr $data_data, $idx * BLKSIZE, BLKSIZE, substr $buf, 0, BLKSIZE; substr $meta_data,$ofs * 4, 4, pack "L", $idx; } ++$ofs; substr $buf, 0, BLKSIZE, ""; } $self->reply ($handle, 0); } sub req_flush { my ($self, $handle) = @_; # sync if $need_sync; $self->reply ($handle, 0); #print "flushed.\n"; } package main; #my $listen = new IO::Socket::INET LocalPort => 10809, Listen => 1; unlink "/tmp/nbdsock"; my $listen = new IO::Socket::UNIX Local => "/tmp/nbdsock", Listen => 1; while () { print "waiting for connections on nbd:unix:/tmp/nbdsock ...\n"; my $fh = $listen->accept; print "accepted.\n"; syswrite $fh, ("NBDMAGIC\x00\x00\x42\x02\x81\x86\x12\x53") . (pack "Q> N x124", $blocks * BLKSIZE, 1 + 4); # has_flags + can_flush my $server = new myserver socket => $fh; $server->run; myserver::sync; }