Enable a multi-level db to smoothly migrate to FIFO via DB::Open (#10348)

Summary:
FIFO compaction can theoretically open a DB with any compaction style.
However, the current code only allows FIFO compaction to open a DB with
a single level.

This PR relaxes the limitation of FIFO compaction and allows it to open a
DB with multiple levels.  Below is the read / write / compaction behavior:

* The read behavior is untouched, and it works like a regular rocksdb instance.
* The write behavior is untouched as well.  When a FIFO compacted DB
is opened with multiple levels, all new files will still be in level 0, and no files
will be moved to a different level.
* Compaction logic is extended.  It will first identify the bottom-most non-empty level.
Then, it will delete the oldest file in that level.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10348

Test Plan:
Added a new test to verify the migration from level to FIFO where the db has multiple levels.
Extended existing test cases in db_test and db_basic_test to also verify
all entries of a key after reopening the DB with FIFO compaction.

Reviewed By: jay-zhuang

Differential Revision: D40233744

fbshipit-source-id: 6cc011d6c3467e6bfb9b6a4054b87619e69815e1
main
Yueh-Hsuan Chiang 2 years ago committed by Facebook GitHub Bot
parent e466173d5c
commit e267909ecf
  1. 3
      HISTORY.md
  2. 1
      db/column_family.cc
  3. 89
      db/compaction/compaction_picker_fifo.cc
  4. 6
      db/db_bloom_filter_test.cc
  5. 1
      db/db_impl/db_impl_compaction_flush.cc
  6. 12
      db/db_impl/db_impl_open.cc
  7. 131
      db/db_test.cc
  8. 19
      db/db_test_util.cc
  9. 9
      db/db_test_util.h

@ -4,6 +4,9 @@
* `DeleteRange()` now supports user-defined timestamp.
* Provide support for async_io with tailing iterators when ReadOptions.tailing is enabled during scans.
* Tiered Storage: allow data moving up from the last level to the penultimate level if the input level is penultimate level or above.
* FIFO compaction now supports migrating from a multi-level DB via DB::Open(). During the migration phase, FIFO compaction picker will:
* picks the sst file with the smallest starting key in the bottom-most non-empty level.
* Note that during the migration phase, the file purge order will only be an approximation of "FIFO" as files in lower-level might sometime contain newer keys than files in upper-level.
### Bug Fixes
* Fix a bug in io_uring_prep_cancel in AbortIO API for posix which expects sqe->addr to match with read request submitted and wrong paramter was being passed.

@ -291,7 +291,6 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
}
if (result.compaction_style == kCompactionStyleFIFO) {
result.num_levels = 1;
// since we delete level0 files in FIFO compaction when there are too many
// of them, these options don't really mean anything
result.level0_slowdown_writes_trigger = std::numeric_limits<int>::max();

@ -121,20 +121,45 @@ Compaction* FIFOCompactionPicker::PickTTLCompaction(
return c;
}
// The size-based compaction picker for FIFO.
//
// When the entire column family size exceeds max_table_files_size, FIFO will
// try to delete the oldest sst file(s) until the resulting column family size
// is smaller than max_table_files_size.
//
// This function also takes care the case where a DB is migrating from level /
// universal compaction to FIFO compaction. During the migration, the column
// family will also have non-L0 files while FIFO can only create L0 files.
// In this case, this function will first purge the sst files in the bottom-
// most non-empty level first, and the DB will eventually converge to the
// regular FIFO case where there're only L0 files. Note that during the
// migration case, the purge order will only be an approximation of "FIFO"
// as entries inside lower-level files might sometimes be newer than some
// entries inside upper-level files.
Compaction* FIFOCompactionPicker::PickSizeCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
LogBuffer* log_buffer) {
const int kLevel0 = 0;
const std::vector<FileMetaData*>& level_files = vstorage->LevelFiles(kLevel0);
uint64_t total_size = GetTotalFilesSize(level_files);
// compute the total size and identify the last non-empty level
int last_level = 0;
uint64_t total_size = 0;
for (int level = 0; level < vstorage->num_levels(); ++level) {
auto level_size = GetTotalFilesSize(vstorage->LevelFiles(level));
total_size += level_size;
if (level_size > 0) {
last_level = level;
}
}
const std::vector<FileMetaData*>& last_level_files =
vstorage->LevelFiles(last_level);
if (total_size <=
mutable_cf_options.compaction_options_fifo.max_table_files_size ||
level_files.size() == 0) {
// total size not exceeded
if (last_level == 0 &&
total_size <=
mutable_cf_options.compaction_options_fifo.max_table_files_size) {
// total size not exceeded, try to find intra level 0 compaction if enabled
const std::vector<FileMetaData*>& level0_files = vstorage->LevelFiles(0);
if (mutable_cf_options.compaction_options_fifo.allow_compaction &&
level_files.size() > 0) {
level0_files.size() > 0) {
CompactionInputFiles comp_inputs;
// try to prevent same files from being compacted multiple times, which
// could produce large files that may never TTL-expire. Achieve this by
@ -146,7 +171,7 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction(
static_cast<uint64_t>(mutable_cf_options.write_buffer_size),
1.1));
if (FindIntraL0Compaction(
level_files,
level0_files,
mutable_cf_options
.level0_file_num_compaction_trigger /* min_files_to_compact */
,
@ -187,9 +212,12 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction(
std::vector<CompactionInputFiles> inputs;
inputs.emplace_back();
inputs[0].level = 0;
inputs[0].level = last_level;
for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) {
if (last_level == 0) {
// In L0, right-most files are the oldest files.
for (auto ritr = last_level_files.rbegin(); ritr != last_level_files.rend();
++ritr) {
auto f = *ritr;
total_size -= f->fd.file_size;
inputs[0].files.push_back(f);
@ -204,10 +232,38 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction(
break;
}
}
} else {
// If the last level is non-L0, we actually don't know which file is
// logically the oldest since the file creation time only represents
// when this file was compacted to this level, which is independent
// to when the entries in this file were first inserted.
//
// As a result, we delete files from the left instead. This means the sst
// file with the smallest key will be deleted first. This design decision
// better serves a major type of FIFO use cases where smaller keys are
// associated with older data.
for (const auto& f : last_level_files) {
total_size -= f->fd.file_size;
inputs[0].files.push_back(f);
char tmp_fsize[16];
AppendHumanBytes(f->fd.GetFileSize(), tmp_fsize, sizeof(tmp_fsize));
ROCKS_LOG_BUFFER(log_buffer,
"[%s] FIFO compaction: picking file %" PRIu64
" with size %s for deletion",
cf_name.c_str(), f->fd.GetNumber(), tmp_fsize);
if (total_size <=
mutable_cf_options.compaction_options_fifo.max_table_files_size) {
break;
}
}
}
Compaction* c = new Compaction(
vstorage, ioptions_, mutable_cf_options, mutable_db_options,
std::move(inputs), 0, 0, 0, 0, kNoCompression,
std::move(inputs), last_level,
/* target_file_size */ 0,
/* max_compaction_bytes */ 0,
/* output_path_id */ 0, kNoCompression,
mutable_cf_options.compression_opts, Temperature::kUnknown,
/* max_subcompactions */ 0, {}, /* is manual */ false,
/* trim_ts */ "", vstorage->CompactionScore(0),
@ -224,6 +280,13 @@ Compaction* FIFOCompactionPicker::PickCompactionToWarm(
return nullptr;
}
// PickCompactionToWarm is only triggered if there is no non-L0 files.
for (int level = 1; level < vstorage->num_levels(); ++level) {
if (GetTotalFilesSize(vstorage->LevelFiles(level)) > 0) {
return nullptr;
}
}
const int kLevel0 = 0;
const std::vector<FileMetaData*>& level_files = vstorage->LevelFiles(kLevel0);
@ -327,8 +390,6 @@ Compaction* FIFOCompactionPicker::PickCompaction(
const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
LogBuffer* log_buffer, SequenceNumber /*earliest_memtable_seqno*/) {
assert(vstorage->num_levels() == 1);
Compaction* c = nullptr;
if (mutable_cf_options.ttl > 0) {
c = PickTTLCompaction(cf_name, mutable_cf_options, mutable_db_options,

@ -1706,7 +1706,7 @@ TEST_F(DBBloomFilterTest, ContextCustomFilterPolicy) {
ASSERT_OK(Put(1, Key(maxKey + 55555), Key(maxKey + 55555)));
Flush(1);
EXPECT_EQ(policy->DumpTestReport(),
fifo ? "cf=abe,s=kCompactionStyleFIFO,n=1,l=0,b=0,r=kFlush\n"
fifo ? "cf=abe,s=kCompactionStyleFIFO,n=7,l=0,b=0,r=kFlush\n"
: "cf=bob,s=kCompactionStyleLevel,n=7,l=0,b=0,r=kFlush\n");
for (int i = maxKey / 2; i < maxKey; i++) {
@ -1714,7 +1714,7 @@ TEST_F(DBBloomFilterTest, ContextCustomFilterPolicy) {
}
Flush(1);
EXPECT_EQ(policy->DumpTestReport(),
fifo ? "cf=abe,s=kCompactionStyleFIFO,n=1,l=0,b=0,r=kFlush\n"
fifo ? "cf=abe,s=kCompactionStyleFIFO,n=7,l=0,b=0,r=kFlush\n"
: "cf=bob,s=kCompactionStyleLevel,n=7,l=0,b=0,r=kFlush\n");
// Check that they can be found
@ -1738,7 +1738,7 @@ TEST_F(DBBloomFilterTest, ContextCustomFilterPolicy) {
EXPECT_LE(useful_count, maxKey * 2 * (fifo ? 0.9995 : 0.98));
}
if (!fifo) { // FIFO only has L0
if (!fifo) { // FIFO doesn't fully support CompactRange
// Full compaction
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
nullptr));

@ -3279,7 +3279,6 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
TEST_SYNC_POINT_CALLBACK("DBImpl::BackgroundCompaction:BeforeCompaction",
c->column_family_data());
assert(c->num_input_files(1) == 0);
assert(c->level() == 0);
assert(c->column_family_data()->ioptions()->compaction_style ==
kCompactionStyleFIFO);

@ -1944,18 +1944,6 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
if (s.ok()) {
for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
if (cfd->ioptions()->compaction_style == kCompactionStyleFIFO) {
auto* vstorage = cfd->current()->storage_info();
for (int i = 1; i < vstorage->num_levels(); ++i) {
int num_files = vstorage->NumLevelFiles(i);
if (num_files > 0) {
s = Status::InvalidArgument(
"Not all files are at level 0. Cannot "
"open with FIFO compaction style.");
break;
}
}
}
if (!cfd->mem()->IsSnapshotSupported()) {
impl->is_snapshot_supported_ = false;
}

@ -483,6 +483,135 @@ TEST_F(DBTest, LevelLimitReopen) {
}
#endif // ROCKSDB_LITE
#ifndef ROCKSDB_LITE
TEST_F(DBTest, LevelReopenWithFIFO) {
const int kLevelCount = 4;
const int kKeyCount = 5;
const int kTotalSstFileCount = kLevelCount * kKeyCount;
const int kCF = 1;
Options options = CurrentOptions();
// Config level0_file_num_compaction_trigger to prevent L0 files being
// automatically compacted while we are constructing a LSM tree structure
// to test multi-level FIFO compaction.
options.level0_file_num_compaction_trigger = kKeyCount + 1;
CreateAndReopenWithCF({"pikachu"}, options);
// The expected number of files per level after each file creation.
const std::string expected_files_per_level[kLevelCount][kKeyCount] = {
{"0,0,0,1", "0,0,0,2", "0,0,0,3", "0,0,0,4", "0,0,0,5"},
{"0,0,1,5", "0,0,2,5", "0,0,3,5", "0,0,4,5", "0,0,5,5"},
{"0,1,5,5", "0,2,5,5", "0,3,5,5", "0,4,5,5", "0,5,5,5"},
{"1,5,5,5", "2,5,5,5", "3,5,5,5", "4,5,5,5", "5,5,5,5"},
};
const std::string expected_entries[kKeyCount][kLevelCount + 1] = {
{"[ ]", "[ a3 ]", "[ a2, a3 ]", "[ a1, a2, a3 ]", "[ a0, a1, a2, a3 ]"},
{"[ ]", "[ b3 ]", "[ b2, b3 ]", "[ b1, b2, b3 ]", "[ b0, b1, b2, b3 ]"},
{"[ ]", "[ c3 ]", "[ c2, c3 ]", "[ c1, c2, c3 ]", "[ c0, c1, c2, c3 ]"},
{"[ ]", "[ d3 ]", "[ d2, d3 ]", "[ d1, d2, d3 ]", "[ d0, d1, d2, d3 ]"},
{"[ ]", "[ e3 ]", "[ e2, e3 ]", "[ e1, e2, e3 ]", "[ e0, e1, e2, e3 ]"},
};
// The loop below creates the following LSM tree where each (k, v) pair
// represents a file that contains that entry. When a file is created,
// the db is reopend with FIFO compaction and verified the LSM tree
// structure is still the same.
//
// The resulting LSM tree will contain 5 different keys. Each key as
// 4 different versions, located in different level.
//
// L0: (e, e0) (d, d0) (c, c0) (b, b0) (a, a0)
// L1: (a, a1) (b, b1) (c, c1) (d, d1) (e, e1)
// L2: (a, a2) (b, b2) (c, c2) (d, d2) (e, e2)
// L3: (a, a3) (b, b3) (c, c3) (d, d3) (e, e3)
for (int l = 0; l < kLevelCount; ++l) {
int level = kLevelCount - 1 - l;
for (int p = 0; p < kKeyCount; ++p) {
std::string put_key = std::string(1, char('a' + p));
ASSERT_OK(Put(kCF, put_key, put_key + std::to_string(level)));
ASSERT_OK(Flush(kCF));
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
for (int g = 0; g < kKeyCount; ++g) {
int entry_count = (p >= g) ? l + 1 : l;
std::string get_key = std::string(1, char('a' + g));
CheckAllEntriesWithFifoReopen(expected_entries[g][entry_count], get_key,
kCF, {"pikachu"}, options);
}
if (level != 0) {
MoveFilesToLevel(level, kCF);
for (int g = 0; g < kKeyCount; ++g) {
int entry_count = (p >= g) ? l + 1 : l;
std::string get_key = std::string(1, char('a' + g));
CheckAllEntriesWithFifoReopen(expected_entries[g][entry_count],
get_key, kCF, {"pikachu"}, options);
}
}
ASSERT_EQ(expected_files_per_level[l][p], FilesPerLevel(kCF));
}
}
// The expected number of sst files in each level after each FIFO compaction
// that deletes the oldest sst file.
const std::string expected_files_per_level_after_fifo[] = {
"5,5,5,4", "5,5,5,3", "5,5,5,2", "5,5,5,1", "5,5,5", "5,5,4", "5,5,3",
"5,5,2", "5,5,1", "5,5", "5,4", "5,3", "5,2", "5,1",
"5", "4", "3", "2", "1", "",
};
// The expected value entries of each key after each FIFO compaction.
// This verifies whether FIFO removes the file with the smallest key in non-L0
// files first then the oldest files in L0.
const std::string expected_entries_after_fifo[kKeyCount][kLevelCount + 1] = {
{"[ a0, a1, a2, a3 ]", "[ a0, a1, a2 ]", "[ a0, a1 ]", "[ a0 ]", "[ ]"},
{"[ b0, b1, b2, b3 ]", "[ b0, b1, b2 ]", "[ b0, b1 ]", "[ b0 ]", "[ ]"},
{"[ c0, c1, c2, c3 ]", "[ c0, c1, c2 ]", "[ c0, c1 ]", "[ c0 ]", "[ ]"},
{"[ d0, d1, d2, d3 ]", "[ d0, d1, d2 ]", "[ d0, d1 ]", "[ d0 ]", "[ ]"},
{"[ e0, e1, e2, e3 ]", "[ e0, e1, e2 ]", "[ e0, e1 ]", "[ e0 ]", "[ ]"},
};
// In the 2nd phase, we reopen the DB with FIFO compaction. In each reopen,
// we config max_table_files_size so that FIFO will remove exactly one file
// at a time upon compaction, and we will use it to verify whether the sst
// files are deleted in the correct order.
for (int i = 0; i < kTotalSstFileCount; ++i) {
uint64_t total_sst_files_size = 0;
ASSERT_TRUE(dbfull()->GetIntProperty(
handles_[1], "rocksdb.total-sst-files-size", &total_sst_files_size));
ASSERT_TRUE(total_sst_files_size > 0);
Options fifo_options(options);
fifo_options.compaction_style = kCompactionStyleFIFO;
options.create_if_missing = false;
fifo_options.max_open_files = -1;
fifo_options.disable_auto_compactions = false;
// Config max_table_files_size to be total_sst_files_size - 1 so that
// FIFO will delete one file.
fifo_options.compaction_options_fifo.max_table_files_size =
total_sst_files_size - 1;
ASSERT_OK(
TryReopenWithColumnFamilies({"default", "pikachu"}, fifo_options));
// For FIFO to pick a compaction
ASSERT_OK(dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]));
ASSERT_OK(dbfull()->TEST_WaitForCompact(false));
for (int g = 0; g < kKeyCount; ++g) {
std::string get_key = std::string(1, char('a' + g));
int status_index = i / kKeyCount;
if ((i % kKeyCount) >= g) {
// If true, then it means the sst file containing the get_key in the
// current level has already been deleted, so we need to move the
// status_index for checking the expected value.
status_index++;
}
CheckAllEntriesWithFifoReopen(
expected_entries_after_fifo[g][status_index], get_key, kCF,
{"pikachu"}, options);
}
ASSERT_EQ(expected_files_per_level_after_fifo[i], FilesPerLevel(kCF));
}
}
#endif // !ROCKSDB_LITE
TEST_F(DBTest, PutSingleDeleteGet) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
@ -668,7 +797,7 @@ TEST_F(DBTest, SingleDeletePutFlush) {
ASSERT_OK(Flush(1));
ASSERT_EQ("[ ]", AllEntriesFor("a", 1));
// Skip FIFO and universal compaction beccaus they do not apply to the test
// Skip FIFO and universal compaction because they do not apply to the test
// case. Skip MergePut because single delete does not get removed when it
// encounters a merge.
} while (ChangeOptions(kSkipFIFOCompaction | kSkipUniversalCompaction |

@ -963,6 +963,25 @@ std::string DBTestBase::Contents(int cf) {
return result;
}
void DBTestBase::CheckAllEntriesWithFifoReopen(
const std::string& expected_value, const Slice& user_key, int cf,
const std::vector<std::string>& cfs, const Options& options) {
ASSERT_EQ(AllEntriesFor(user_key, cf), expected_value);
std::vector<std::string> cfs_plus_default = cfs;
cfs_plus_default.insert(cfs_plus_default.begin(), kDefaultColumnFamilyName);
Options fifo_options(options);
fifo_options.compaction_style = kCompactionStyleFIFO;
fifo_options.max_open_files = -1;
fifo_options.disable_auto_compactions = true;
ASSERT_OK(TryReopenWithColumnFamilies(cfs_plus_default, fifo_options));
ASSERT_EQ(AllEntriesFor(user_key, cf), expected_value);
ASSERT_OK(TryReopenWithColumnFamilies(cfs_plus_default, options));
ASSERT_EQ(AllEntriesFor(user_key, cf), expected_value);
}
std::string DBTestBase::AllEntriesFor(const Slice& user_key, int cf) {
Arena arena;
auto options = CurrentOptions();

@ -1242,6 +1242,15 @@ class DBTestBase : public testing::Test {
std::string AllEntriesFor(const Slice& user_key, int cf = 0);
// Similar to AllEntriesFor but this function also covers reopen with fifo.
// Note that test cases with snapshots or entries in memtable should simply
// use AllEntriesFor instead as snapshots and entries in memtable will
// survive after db reopen.
void CheckAllEntriesWithFifoReopen(const std::string& expected_value,
const Slice& user_key, int cf,
const std::vector<std::string>& cfs,
const Options& options);
#ifndef ROCKSDB_LITE
int NumSortedRuns(int cf = 0);

Loading…
Cancel
Save