diff --git a/HISTORY.md b/HISTORY.md index 055df7a59..de4d6826f 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -4,6 +4,9 @@ * `SstFileWriter::DeleteRange()` now returns `Status::InvalidArgument` if the range's end key comes before its start key according to the user comparator. Previously the behavior was undefined. * Add `multi_get_for_update` to C API. +### Behavior changes +* For level compaction with `level_compaction_dynamic_level_bytes=true`, RocksDB now trivially moves levels down to fill LSM starting from bottommost level during DB open. See more in comments for option `level_compaction_dynamic_level_bytes`. + ## 8.1.0 (03/18/2023) ### Behavior changes * Compaction output file cutting logic now considers range tombstone start keys. For example, SST partitioner now may receive ParitionRequest for range tombstone start keys. diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 96b579c24..4f6ca802c 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -9111,6 +9111,147 @@ TEST_F(DBCompactionTest, BottommostFileCompactionAllowIngestBehind) { // ASSERT_OK(dbfull()->TEST_WaitForCompact(true /* wait_unscheduled */)); } +TEST_F(DBCompactionTest, TurnOnLevelCompactionDynamicLevelBytes) { + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleLevel; + options.allow_ingest_behind = false; + options.level_compaction_dynamic_level_bytes = false; + options.num_levels = 6; + options.compression = kNoCompression; + DestroyAndReopen(options); + + // put files in L0, L1 and L2 + WriteOptions write_opts; + ASSERT_OK(db_->Put(write_opts, Key(1), "val1")); + ASSERT_OK(Flush()); + MoveFilesToLevel(2); + ASSERT_OK(db_->Put(write_opts, Key(2), "val2")); + ASSERT_OK(Flush()); + MoveFilesToLevel(2); + ASSERT_OK(db_->Put(write_opts, Key(1), "new_val1")); + ASSERT_OK(Flush()); + MoveFilesToLevel(1); + ASSERT_OK(db_->Put(write_opts, Key(3), "val3")); + ASSERT_OK(Flush()); + ASSERT_EQ("1,1,2", FilesPerLevel()); + auto verify_db = [&]() { + ASSERT_EQ(Get(Key(1)), "new_val1"); + ASSERT_EQ(Get(Key(2)), "val2"); + ASSERT_EQ(Get(Key(3)), "val3"); + }; + verify_db(); + + options.level_compaction_dynamic_level_bytes = true; + Reopen(options); + // except for L0, files should be pushed down as much as possible + ASSERT_EQ("1,0,0,0,1,2", FilesPerLevel()); + verify_db(); + + // turning the options on and off should be safe + options.level_compaction_dynamic_level_bytes = false; + Reopen(options); + MoveFilesToLevel(1); + ASSERT_EQ("0,1,0,0,1,2", FilesPerLevel()); + verify_db(); + + // newly flushed file is also pushed down + options.level_compaction_dynamic_level_bytes = true; + Reopen(options); + ASSERT_EQ("0,0,0,1,1,2", FilesPerLevel()); + verify_db(); +} + +TEST_F(DBCompactionTest, TurnOnLevelCompactionDynamicLevelBytesIngestBehind) { + Options options = CurrentOptions(); + options.compaction_style = kCompactionStyleLevel; + options.allow_ingest_behind = true; + options.level_compaction_dynamic_level_bytes = false; + options.num_levels = 6; + options.compression = kNoCompression; + DestroyAndReopen(options); + + // put files in L0, L1 and L2 + WriteOptions write_opts; + ASSERT_OK(db_->Put(write_opts, Key(1), "val1")); + ASSERT_OK(Flush()); + MoveFilesToLevel(2); + ASSERT_OK(db_->Put(write_opts, Key(2), "val2")); + ASSERT_OK(Flush()); + MoveFilesToLevel(2); + ASSERT_OK(db_->Put(write_opts, Key(1), "new_val1")); + ASSERT_OK(Flush()); + MoveFilesToLevel(1); + ASSERT_OK(db_->Put(write_opts, Key(3), "val3")); + ASSERT_OK(Flush()); + ASSERT_EQ("1,1,2", FilesPerLevel()); + auto verify_db = [&]() { + ASSERT_EQ(Get(Key(1)), "new_val1"); + ASSERT_EQ(Get(Key(2)), "val2"); + ASSERT_EQ(Get(Key(3)), "val3"); + }; + verify_db(); + + options.level_compaction_dynamic_level_bytes = true; + Reopen(options); + // note that last level (L6) should be empty + ASSERT_EQ("1,0,0,1,2", FilesPerLevel()); + verify_db(); + + // turning the options on and off should both be safe + options.level_compaction_dynamic_level_bytes = false; + Reopen(options); + MoveFilesToLevel(1); + ASSERT_EQ("0,1,0,1,2", FilesPerLevel()); + verify_db(); + + // newly flushed file is also pushed down + options.level_compaction_dynamic_level_bytes = true; + Reopen(options); + ASSERT_EQ("0,0,1,1,2", FilesPerLevel()); + verify_db(); + + // files will be pushed down to last level (L6) + options.allow_ingest_behind = false; + Reopen(options); + ASSERT_EQ("0,0,0,1,1,2", FilesPerLevel()); + verify_db(); +} + +TEST_F(DBCompactionTest, TurnOnLevelCompactionDynamicLevelBytesUCToLC) { + // Basic test for migrating from UC to LC. + // DB has non-empty L1 that should be pushed down to last level (L49). + Options options = CurrentOptions(); + options.compaction_style = CompactionStyle::kCompactionStyleUniversal; + options.allow_ingest_behind = false; + options.level_compaction_dynamic_level_bytes = false; + options.num_levels = 50; + CreateAndReopenWithCF({"pikachu"}, options); + + Random rnd(33); + for (int f = 0; f < 10; ++f) { + ASSERT_OK(Put(1, Key(f), rnd.RandomString(1000))); + ASSERT_OK(Flush(1)); + } + CompactRangeOptions compact_options; + compact_options.change_level = true; + compact_options.target_level = 1; + ASSERT_OK(db_->CompactRange(compact_options, handles_[1], nullptr, nullptr)); + ASSERT_EQ("0,1", FilesPerLevel(1)); + + options.compaction_style = CompactionStyle::kCompactionStyleLevel; + options.level_compaction_dynamic_level_bytes = true; + ReopenWithColumnFamilies({"default", "pikachu"}, options); + std::string expected_lsm = ""; + for (int i = 0; i < 49; ++i) { + expected_lsm += "0,"; + } + expected_lsm += "1"; + ASSERT_EQ(expected_lsm, FilesPerLevel(1)); + + // Tests that entries for trial move in MANIFEST should be valid + ReopenWithColumnFamilies({"default", "pikachu"}, options); + ASSERT_EQ(expected_lsm, FilesPerLevel(1)); +} } // namespace ROCKSDB_NAMESPACE diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 94f36e862..478384c91 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -533,6 +533,100 @@ Status DBImpl::Recover( if (!s.ok()) { return s; } + if (s.ok() && !read_only) { + for (auto cfd : *versions_->GetColumnFamilySet()) { + // Try to trivially move files down the LSM tree to start from bottommost + // level when level_compaction_dynamic_level_bytes is enabled. This should + // only be useful when user is migrating to turning on this option. + // If a user is migrating from Level Compaction with a smaller level + // multiplier or from Universal Compaction, there may be too many + // non-empty levels and the trivial moves here are not sufficed for + // migration. Additional compactions are needed to drain unnecessary + // levels. + // + // Note that this step moves files down LSM without consulting + // SSTPartitioner. Further compactions are still needed if + // the user wants to partition SST files. + // Note that files moved in this step may not respect the compression + // option in target level. + if (cfd->ioptions()->compaction_style == + CompactionStyle::kCompactionStyleLevel && + cfd->ioptions()->level_compaction_dynamic_level_bytes && + !cfd->GetLatestMutableCFOptions()->disable_auto_compactions) { + int to_level = cfd->ioptions()->num_levels - 1; + // last level is reserved + if (cfd->ioptions()->allow_ingest_behind || + cfd->ioptions()->preclude_last_level_data_seconds > 0) { + to_level -= 1; + } + // Whether this column family has a level trivially moved + bool moved = false; + // Fill the LSM starting from to_level and going up one level at a time. + // Some loop invariants (when last level is not reserved): + // - levels in (from_level, to_level] are empty, and + // - levels in (to_level, last_level] are non-empty. + for (int from_level = to_level; from_level >= 0; --from_level) { + const std::vector& level_files = + cfd->current()->storage_info()->LevelFiles(from_level); + if (level_files.empty() || from_level == 0) { + continue; + } + assert(from_level <= to_level); + // Trivial move files from `from_level` to `to_level` + if (from_level < to_level) { + if (!moved) { + // lsm_state will look like "[1,2,3,4,5,6,0]" for an LSM with + // 7 levels + std::string lsm_state = "["; + for (int i = 0; i < cfd->ioptions()->num_levels; ++i) { + lsm_state += std::to_string( + cfd->current()->storage_info()->NumLevelFiles(i)); + if (i < cfd->ioptions()->num_levels - 1) { + lsm_state += ","; + } + } + lsm_state += "]"; + ROCKS_LOG_WARN(immutable_db_options_.info_log, + "[%s] Trivially move files down the LSM when open " + "with level_compaction_dynamic_level_bytes=true," + " lsm_state: %s (Files are moved only if DB " + "Recovery is successful).", + cfd->GetName().c_str(), lsm_state.c_str()); + moved = true; + } + ROCKS_LOG_WARN( + immutable_db_options_.info_log, + "[%s] Moving %zu files from from_level-%d to from_level-%d", + cfd->GetName().c_str(), level_files.size(), from_level, + to_level); + VersionEdit edit; + edit.SetColumnFamily(cfd->GetID()); + for (const FileMetaData* f : level_files) { + edit.DeleteFile(from_level, f->fd.GetNumber()); + edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(), + f->fd.GetFileSize(), f->smallest, f->largest, + f->fd.smallest_seqno, f->fd.largest_seqno, + f->marked_for_compaction, + f->temperature, // this can be different from + // `last_level_temperature` + f->oldest_blob_file_number, f->oldest_ancester_time, + f->file_creation_time, f->epoch_number, + f->file_checksum, f->file_checksum_func_name, + f->unique_id, f->compensated_range_deletion_size); + ROCKS_LOG_WARN(immutable_db_options_.info_log, + "[%s] Moving #%" PRIu64 + " from from_level-%d to from_level-%d %" PRIu64 + " bytes\n", + cfd->GetName().c_str(), f->fd.GetNumber(), + from_level, to_level, f->fd.GetFileSize()); + } + recovery_ctx->UpdateVersionEdits(cfd, edit); + } + --to_level; + } + } + } + } s = SetupDBId(read_only, recovery_ctx); ROCKS_LOG_INFO(immutable_db_options_.info_log, "DB ID: %s\n", db_id_.c_str()); if (s.ok() && !read_only) { @@ -1828,7 +1922,9 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, // Handles create_if_missing, error_if_exists uint64_t recovered_seq(kMaxSequenceNumber); - s = impl->Recover(column_families, false, false, false, &recovered_seq, + s = impl->Recover(column_families, false /* read_only */, + false /* error_if_wal_file_exists */, + false /* error_if_data_exists_in_wals */, &recovered_seq, &recovery_ctx); if (s.ok()) { uint64_t new_log_number = impl->versions_->NewFileNumber(); diff --git a/db/version_set.cc b/db/version_set.cc index 9075a58ac..d7500ff08 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3262,7 +3262,7 @@ void VersionStorageInfo::ComputeCompactionScore( // the level's target size, and 1.0 is the threshold for triggering // compaction. Higher score means higher prioritization. // Now we keep the compaction triggering condition, but consider more - // factors for priorization, while still keeping the 1.0 threshold. + // factors for prioritization, while still keeping the 1.0 threshold. // In order to provide flexibility for reducing score while still // maintaining it to be over 1.0, we scale the original score by 10x // if it is larger than 1.0. @@ -3295,7 +3295,7 @@ void VersionStorageInfo::ComputeCompactionScore( // compaction score for the whole DB. Adding other levels as if // they are L0 files. for (int i = 1; i < num_levels(); i++) { - // Its possible that a subset of the files in a level may be in a + // It's possible that a subset of the files in a level may be in a // compaction, due to delete triggered compaction or trivial move. // In that case, the below check may not catch a level being // compacted as it only checks the first file. The worst that can @@ -3344,7 +3344,7 @@ void VersionStorageInfo::ComputeCompactionScore( // When calculating estimated_compaction_needed_bytes, we assume // L0 is qualified as pending compactions. We will need to make // sure that it qualifies for compaction. - // It might be guafanteed by logic below anyway, but we are + // It might be guaranteed by logic below anyway, but we are // explicit here to make sure we don't stop writes with no // compaction scheduled. score = std::max(score, 1.01); diff --git a/db/version_set.h b/db/version_set.h index ef7e69fc7..bc5b4177b 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -1204,7 +1204,7 @@ class VersionSet { uint64_t* manifest_file_number); void WakeUpWaitingManifestWriters(); - // Recover the last saved descriptor from persistent storage. + // Recover the last saved descriptor (MANIFEST) from persistent storage. // If read_only == true, Recover() will not complain if some column families // are not opened Status Recover(const std::vector& column_families, diff --git a/include/rocksdb/advanced_options.h b/include/rocksdb/advanced_options.h index 9e74a259b..07261bd06 100644 --- a/include/rocksdb/advanced_options.h +++ b/include/rocksdb/advanced_options.h @@ -642,8 +642,31 @@ struct AdvancedColumnFamilyOptions { // // max_bytes_for_level_multiplier_additional is ignored with this flag on. // - // Turning this feature on or off for an existing DB can cause unexpected - // LSM tree structure so it's not recommended. + // To make the migration easier, when turning this feature on, files in the + // LSM will be trivially moved down to fill the LSM starting from the + // bottommost level during DB open. For example, if the LSM looks like: + // L0: f0, f1 + // L1: f2, f3 + // L2: f4 + // L3: + // L4: f5 + // and the DB is opened with num_levels = 7 with this feature turned on, + // new LSM after DB open looks like the following: + // L0: f0, f1, (and possibly data flushed from WAL) + // L4: f2, f3 + // L5: f4 + // L6: f5 + // + // If `allow_ingest_behind=true` or `preclude_last_level_data_seconds > 0`, + // then the last level is reserved, and we will start filling LSM from the + // second last level (L5 in the above example). + // + // Note that there may be excessive levels (where target level size is 0 when + // computed based on this feature) in the LSM after a user migrates to turn + // this feature on. This is especially likely when a user migrates from + // leveled compaction with a smaller multiplier or from universal compaction. + // A full manual compaction is needed to drain these levels explicitly. + // // // Default: false bool level_compaction_dynamic_level_bytes = false; diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 729f221a2..e8786a375 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -165,7 +165,7 @@ default_params = { "max_write_batch_group_size_bytes": lambda: random.choice( [16, 64, 1024 * 1024, 16 * 1024 * 1024] ), - "level_compaction_dynamic_level_bytes": True, + "level_compaction_dynamic_level_bytes": lambda: random.randint(0, 1), "verify_checksum_one_in": 1000000, "verify_db_one_in": 100000, "continuous_verification_interval": 0, @@ -322,7 +322,7 @@ simple_default_params = { "target_file_size_multiplier": 1, "test_batches_snapshots": 0, "write_buffer_size": 32 * 1024 * 1024, - "level_compaction_dynamic_level_bytes": False, + "level_compaction_dynamic_level_bytes": lambda: random.randint(0, 1), "paranoid_file_checks": lambda: random.choice([0, 1, 1, 1]), "verify_iterator_with_expected_state_one_in": 5, # this locks a range of keys }