diff --git a/HISTORY.md b/HISTORY.md index c05f98dec..4ee3ace97 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,5 +1,8 @@ # Rocksdb Change Log ## Unreleased +### Bug Fixes +* Fix data corruption casued by output of intra-L0 compaction on ingested file not being placed in correct order in L0. + ### Public API Change * TTL Compactions in Level compaction style now initiate successive cascading compactions on a key range so that it reaches the bottom level quickly on TTL expiry. `creation_time` table property for compaction output files is now set to the minimum of the creation times of all compaction inputs. * Changed the default value of periodic_compaction_seconds to `UINT64_MAX` which allows RocksDB to auto-tune periodic compaction scheduling. When using the default value, periodic compactions are now auto-enabled if a compaction filter is used. A value of `0` will turn off the feature completely. diff --git a/db/column_family.cc b/db/column_family.cc index f6a012d8f..c9f4123e6 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -954,8 +954,12 @@ bool ColumnFamilyData::NeedsCompaction() const { Compaction* ColumnFamilyData::PickCompaction( const MutableCFOptions& mutable_options, LogBuffer* log_buffer) { + SequenceNumber earliest_mem_seqno = + std::min(mem_->GetEarliestSequenceNumber(), + imm_.current()->GetEarliestSequenceNumber(false)); auto* result = compaction_picker_->PickCompaction( - GetName(), mutable_options, current_->storage_info(), log_buffer); + GetName(), mutable_options, current_->storage_info(), log_buffer, + earliest_mem_seqno); if (result != nullptr) { result->SetInputVersion(current_); } diff --git a/db/compaction/compaction_picker.cc b/db/compaction/compaction_picker.cc index 3357e0631..361549ad1 100644 --- a/db/compaction/compaction_picker.cc +++ b/db/compaction/compaction_picker.cc @@ -39,20 +39,40 @@ bool FindIntraL0Compaction(const std::vector& level_files, size_t min_files_to_compact, uint64_t max_compact_bytes_per_del_file, uint64_t max_compaction_bytes, - CompactionInputFiles* comp_inputs) { - size_t compact_bytes = static_cast(level_files[0]->fd.file_size); - uint64_t compensated_compact_bytes = level_files[0]->compensated_file_size; + CompactionInputFiles* comp_inputs, + SequenceNumber earliest_mem_seqno) { + // Do not pick ingested file when there is at least one memtable not flushed + // which of seqno is overlap with the sst. + size_t start = 0; + for (; start < level_files.size(); start++) { + if (level_files[start]->being_compacted) { + return false; + } + // If there is no data in memtable, the earliest sequence number would the + // largest sequence number in last memtable. + // Because all files are sorted in descending order by largest_seqno, so we + // only need to check the first one. + if (level_files[start]->fd.largest_seqno <= earliest_mem_seqno) { + break; + } + } + if (start >= level_files.size()) { + return false; + } + size_t compact_bytes = static_cast(level_files[start]->fd.file_size); + uint64_t compensated_compact_bytes = + level_files[start]->compensated_file_size; size_t compact_bytes_per_del_file = port::kMaxSizet; - // Compaction range will be [0, span_len). - size_t span_len; + // Compaction range will be [start, limit). + size_t limit; // Pull in files until the amount of compaction work per deleted file begins // increasing or maximum total compaction size is reached. size_t new_compact_bytes_per_del_file = 0; - for (span_len = 1; span_len < level_files.size(); ++span_len) { - compact_bytes += static_cast(level_files[span_len]->fd.file_size); - compensated_compact_bytes += level_files[span_len]->compensated_file_size; - new_compact_bytes_per_del_file = compact_bytes / span_len; - if (level_files[span_len]->being_compacted || + for (limit = start + 1; limit < level_files.size(); ++limit) { + compact_bytes += static_cast(level_files[limit]->fd.file_size); + compensated_compact_bytes += level_files[limit]->compensated_file_size; + new_compact_bytes_per_del_file = compact_bytes / (limit - start); + if (level_files[limit]->being_compacted || new_compact_bytes_per_del_file > compact_bytes_per_del_file || compensated_compact_bytes > max_compaction_bytes) { break; @@ -60,11 +80,11 @@ bool FindIntraL0Compaction(const std::vector& level_files, compact_bytes_per_del_file = new_compact_bytes_per_del_file; } - if (span_len >= min_files_to_compact && + if ((limit - start) >= min_files_to_compact && compact_bytes_per_del_file < max_compact_bytes_per_del_file) { assert(comp_inputs != nullptr); comp_inputs->level = 0; - for (size_t i = 0; i < span_len; ++i) { + for (size_t i = start; i < limit; ++i) { comp_inputs->files.push_back(level_files[i]); } return true; diff --git a/db/compaction/compaction_picker.h b/db/compaction/compaction_picker.h index 53477014c..ae158059a 100644 --- a/db/compaction/compaction_picker.h +++ b/db/compaction/compaction_picker.h @@ -54,10 +54,10 @@ class CompactionPicker { // Returns nullptr if there is no compaction to be done. // Otherwise returns a pointer to a heap-allocated object that // describes the compaction. Caller should delete the result. - virtual Compaction* PickCompaction(const std::string& cf_name, - const MutableCFOptions& mutable_cf_options, - VersionStorageInfo* vstorage, - LogBuffer* log_buffer) = 0; + virtual Compaction* PickCompaction( + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, LogBuffer* log_buffer, + SequenceNumber earliest_memtable_seqno = kMaxSequenceNumber) = 0; // Return a compaction object for compacting the range [begin,end] in // the specified level. Returns nullptr if there is nothing in that @@ -247,10 +247,11 @@ class NullCompactionPicker : public CompactionPicker { virtual ~NullCompactionPicker() {} // Always return "nullptr" - Compaction* PickCompaction(const std::string& /*cf_name*/, - const MutableCFOptions& /*mutable_cf_options*/, - VersionStorageInfo* /*vstorage*/, - LogBuffer* /*log_buffer*/) override { + Compaction* PickCompaction( + const std::string& /*cf_name*/, + const MutableCFOptions& /*mutable_cf_options*/, + VersionStorageInfo* /*vstorage*/, LogBuffer* /* log_buffer */, + SequenceNumber /* earliest_memtable_seqno */) override { return nullptr; } @@ -292,11 +293,11 @@ class NullCompactionPicker : public CompactionPicker { // files. Cannot be nullptr. // // @return true iff compaction was found. -bool FindIntraL0Compaction(const std::vector& level_files, - size_t min_files_to_compact, - uint64_t max_compact_bytes_per_del_file, - uint64_t max_compaction_bytes, - CompactionInputFiles* comp_inputs); +bool FindIntraL0Compaction( + const std::vector& level_files, size_t min_files_to_compact, + uint64_t max_compact_bytes_per_del_file, uint64_t max_compaction_bytes, + CompactionInputFiles* comp_inputs, + SequenceNumber earliest_mem_seqno = kMaxSequenceNumber); CompressionType GetCompressionType(const ImmutableCFOptions& ioptions, const VersionStorageInfo* vstorage, diff --git a/db/compaction/compaction_picker_fifo.cc b/db/compaction/compaction_picker_fifo.cc index cacb33c22..cdf5e46da 100644 --- a/db/compaction/compaction_picker_fifo.cc +++ b/db/compaction/compaction_picker_fifo.cc @@ -202,7 +202,8 @@ Compaction* FIFOCompactionPicker::PickSizeCompaction( Compaction* FIFOCompactionPicker::PickCompaction( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, - VersionStorageInfo* vstorage, LogBuffer* log_buffer) { + VersionStorageInfo* vstorage, LogBuffer* log_buffer, + SequenceNumber /*earliest_memtable_seqno*/) { assert(vstorage->num_levels() == 1); Compaction* c = nullptr; diff --git a/db/compaction/compaction_picker_fifo.h b/db/compaction/compaction_picker_fifo.h index a4e63803c..065faef13 100644 --- a/db/compaction/compaction_picker_fifo.h +++ b/db/compaction/compaction_picker_fifo.h @@ -19,10 +19,10 @@ class FIFOCompactionPicker : public CompactionPicker { const InternalKeyComparator* icmp) : CompactionPicker(ioptions, icmp) {} - virtual Compaction* PickCompaction(const std::string& cf_name, - const MutableCFOptions& mutable_cf_options, - VersionStorageInfo* version, - LogBuffer* log_buffer) override; + virtual Compaction* PickCompaction( + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* version, LogBuffer* log_buffer, + SequenceNumber earliest_memtable_seqno = kMaxSequenceNumber) override; virtual Compaction* CompactRange( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, diff --git a/db/compaction/compaction_picker_level.cc b/db/compaction/compaction_picker_level.cc index cc0f19b81..4c2afa667 100644 --- a/db/compaction/compaction_picker_level.cc +++ b/db/compaction/compaction_picker_level.cc @@ -45,12 +45,14 @@ class LevelCompactionBuilder { public: LevelCompactionBuilder(const std::string& cf_name, VersionStorageInfo* vstorage, + SequenceNumber earliest_mem_seqno, CompactionPicker* compaction_picker, LogBuffer* log_buffer, const MutableCFOptions& mutable_cf_options, const ImmutableCFOptions& ioptions) : cf_name_(cf_name), vstorage_(vstorage), + earliest_mem_seqno_(earliest_mem_seqno), compaction_picker_(compaction_picker), log_buffer_(log_buffer), mutable_cf_options_(mutable_cf_options), @@ -97,6 +99,7 @@ class LevelCompactionBuilder { const std::string& cf_name_; VersionStorageInfo* vstorage_; + SequenceNumber earliest_mem_seqno_; CompactionPicker* compaction_picker_; LogBuffer* log_buffer_; int start_level_ = -1; @@ -537,17 +540,19 @@ bool LevelCompactionBuilder::PickIntraL0Compaction() { // resort to L0->L0 compaction yet. return false; } - return FindIntraL0Compaction( - level_files, kMinFilesForIntraL0Compaction, port::kMaxUint64, - mutable_cf_options_.max_compaction_bytes, &start_level_inputs_); + return FindIntraL0Compaction(level_files, kMinFilesForIntraL0Compaction, + port::kMaxUint64, + mutable_cf_options_.max_compaction_bytes, + &start_level_inputs_, earliest_mem_seqno_); } } // namespace Compaction* LevelCompactionPicker::PickCompaction( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, - VersionStorageInfo* vstorage, LogBuffer* log_buffer) { - LevelCompactionBuilder builder(cf_name, vstorage, this, log_buffer, - mutable_cf_options, ioptions_); + VersionStorageInfo* vstorage, LogBuffer* log_buffer, + SequenceNumber earliest_mem_seqno) { + LevelCompactionBuilder builder(cf_name, vstorage, earliest_mem_seqno, this, + log_buffer, mutable_cf_options, ioptions_); return builder.PickCompaction(); } } // namespace rocksdb diff --git a/db/compaction/compaction_picker_level.h b/db/compaction/compaction_picker_level.h index 9fc196698..c8d905ef9 100644 --- a/db/compaction/compaction_picker_level.h +++ b/db/compaction/compaction_picker_level.h @@ -20,10 +20,10 @@ class LevelCompactionPicker : public CompactionPicker { LevelCompactionPicker(const ImmutableCFOptions& ioptions, const InternalKeyComparator* icmp) : CompactionPicker(ioptions, icmp) {} - virtual Compaction* PickCompaction(const std::string& cf_name, - const MutableCFOptions& mutable_cf_options, - VersionStorageInfo* vstorage, - LogBuffer* log_buffer) override; + virtual Compaction* PickCompaction( + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, LogBuffer* log_buffer, + SequenceNumber earliest_memtable_seqno = kMaxSequenceNumber) override; virtual bool NeedsCompaction( const VersionStorageInfo* vstorage) const override; diff --git a/db/compaction/compaction_picker_test.cc b/db/compaction/compaction_picker_test.cc index 793261bfc..5dddd2c91 100644 --- a/db/compaction/compaction_picker_test.cc +++ b/db/compaction/compaction_picker_test.cc @@ -1648,12 +1648,12 @@ TEST_F(CompactionPickerTest, IntraL0MaxCompactionBytesNotHit) { // All 5 L0 files will be picked for intra L0 compaction. The one L1 file // spans entire L0 key range and is marked as being compacted to avoid // L0->L1 compaction. - Add(0, 1U, "100", "150", 200000U); - Add(0, 2U, "151", "200", 200000U); - Add(0, 3U, "201", "250", 200000U); - Add(0, 4U, "251", "300", 200000U); - Add(0, 5U, "301", "350", 200000U); - Add(1, 6U, "100", "350", 200000U); + Add(0, 1U, "100", "150", 200000U, 0, 100, 101); + Add(0, 2U, "151", "200", 200000U, 0, 102, 103); + Add(0, 3U, "201", "250", 200000U, 0, 104, 105); + Add(0, 4U, "251", "300", 200000U, 0, 106, 107); + Add(0, 5U, "301", "350", 200000U, 0, 108, 109); + Add(1, 6U, "100", "350", 200000U, 0, 110, 111); vstorage_->LevelFiles(1)[0]->being_compacted = true; UpdateVersionStorageInfo(); @@ -1678,12 +1678,12 @@ TEST_F(CompactionPickerTest, IntraL0MaxCompactionBytesHit) { // max_compaction_bytes limit (the minimum number of files for triggering // intra L0 compaction is 4). The one L1 file spans entire L0 key range and // is marked as being compacted to avoid L0->L1 compaction. - Add(0, 1U, "100", "150", 200000U); - Add(0, 2U, "151", "200", 200000U); - Add(0, 3U, "201", "250", 200000U); - Add(0, 4U, "251", "300", 200000U); - Add(0, 5U, "301", "350", 200000U); - Add(1, 6U, "100", "350", 200000U); + Add(0, 1U, "100", "150", 200000U, 0, 100, 101); + Add(0, 2U, "151", "200", 200000U, 0, 102, 103); + Add(0, 3U, "201", "250", 200000U, 0, 104, 105); + Add(0, 4U, "251", "300", 200000U, 0, 106, 107); + Add(0, 5U, "301", "350", 200000U, 0, 108, 109); + Add(1, 6U, "100", "350", 200000U, 0, 109, 110); vstorage_->LevelFiles(1)[0]->being_compacted = true; UpdateVersionStorageInfo(); @@ -1697,6 +1697,38 @@ TEST_F(CompactionPickerTest, IntraL0MaxCompactionBytesHit) { ASSERT_EQ(0, compaction->output_level()); } +TEST_F(CompactionPickerTest, IntraL0ForEarliestSeqno) { + // Intra L0 compaction triggers only if there are at least + // level0_file_num_compaction_trigger + 2 L0 files. + mutable_cf_options_.level0_file_num_compaction_trigger = 3; + mutable_cf_options_.max_compaction_bytes = 999999u; + NewVersionStorage(6, kCompactionStyleLevel); + + // 4 out of 6 L0 files will be picked for intra L0 compaction due to + // being_compact limit. And the latest one L0 will be skipped due to earliest + // seqno. The one L1 file spans entire L0 key range and is marked as being + // compacted to avoid L0->L1 compaction. + Add(1, 1U, "100", "350", 200000U, 0, 110, 111); + Add(0, 2U, "301", "350", 1U, 0, 108, 109); + Add(0, 3U, "251", "300", 1U, 0, 106, 107); + Add(0, 4U, "201", "250", 1U, 0, 104, 105); + Add(0, 5U, "151", "200", 1U, 0, 102, 103); + Add(0, 6U, "100", "150", 1U, 0, 100, 101); + Add(0, 7U, "100", "100", 1U, 0, 99, 100); + vstorage_->LevelFiles(0)[5]->being_compacted = true; + vstorage_->LevelFiles(1)[0]->being_compacted = true; + UpdateVersionStorageInfo(); + + std::unique_ptr compaction(level_compaction_picker.PickCompaction( + cf_name_, mutable_cf_options_, vstorage_.get(), &log_buffer_, 107)); + ASSERT_TRUE(compaction.get() != nullptr); + ASSERT_EQ(1U, compaction->num_input_levels()); + ASSERT_EQ(4U, compaction->num_input_files(0)); + ASSERT_EQ(CompactionReason::kLevelL0FilesNum, + compaction->compaction_reason()); + ASSERT_EQ(0, compaction->output_level()); +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/compaction/compaction_picker_universal.cc b/db/compaction/compaction_picker_universal.cc index 4115af395..473a480cb 100644 --- a/db/compaction/compaction_picker_universal.cc +++ b/db/compaction/compaction_picker_universal.cc @@ -277,7 +277,8 @@ bool UniversalCompactionPicker::NeedsCompaction( Compaction* UniversalCompactionPicker::PickCompaction( const std::string& cf_name, const MutableCFOptions& mutable_cf_options, - VersionStorageInfo* vstorage, LogBuffer* log_buffer) { + VersionStorageInfo* vstorage, LogBuffer* log_buffer, + SequenceNumber /* earliest_memtable_seqno */) { UniversalCompactionBuilder builder(ioptions_, icmp_, cf_name, mutable_cf_options, vstorage, this, log_buffer); diff --git a/db/compaction/compaction_picker_universal.h b/db/compaction/compaction_picker_universal.h index 28f3a63cd..150b6bd79 100644 --- a/db/compaction/compaction_picker_universal.h +++ b/db/compaction/compaction_picker_universal.h @@ -18,11 +18,10 @@ class UniversalCompactionPicker : public CompactionPicker { UniversalCompactionPicker(const ImmutableCFOptions& ioptions, const InternalKeyComparator* icmp) : CompactionPicker(ioptions, icmp) {} - virtual Compaction* PickCompaction(const std::string& cf_name, - const MutableCFOptions& mutable_cf_options, - VersionStorageInfo* vstorage, - LogBuffer* log_buffer) override; - + virtual Compaction* PickCompaction( + const std::string& cf_name, const MutableCFOptions& mutable_cf_options, + VersionStorageInfo* vstorage, LogBuffer* log_buffer, + SequenceNumber earliest_memtable_seqno = kMaxSequenceNumber) override; virtual int MaxOutputLevel() const override { return NumberLevels() - 1; } virtual bool NeedsCompaction( diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index f8c25e896..b40b8917d 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -12,6 +12,7 @@ #include "port/stack_trace.h" #include "rocksdb/concurrent_task_limiter.h" #include "rocksdb/experimental.h" +#include "rocksdb/sst_file_writer.h" #include "rocksdb/utilities/convenience.h" #include "test_util/fault_injection_test_env.h" #include "test_util/sync_point.h" @@ -4830,6 +4831,7 @@ TEST_P(DBCompactionTestWithParam, FixFileIngestionCompactionDeadlock) { ASSERT_OK(dbfull()->TEST_WaitForCompact()); Close(); } + TEST_F(DBCompactionTest, ConsistencyFailTest) { Options options = CurrentOptions(); DestroyAndReopen(options); @@ -4855,6 +4857,162 @@ TEST_F(DBCompactionTest, ConsistencyFailTest) { ASSERT_NOK(Put("foo", "bar")); rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } + +void IngestOneKeyValue(DBImpl* db, const std::string& key, + const std::string& value, const Options& options) { + ExternalSstFileInfo info; + std::string f = test::PerThreadDBPath("sst_file" + key); + EnvOptions env; + rocksdb::SstFileWriter writer(env, options); + auto s = writer.Open(f); + ASSERT_OK(s); + // ASSERT_OK(writer.Put(Key(), "")); + ASSERT_OK(writer.Put(key, value)); + + ASSERT_OK(writer.Finish(&info)); + IngestExternalFileOptions ingest_opt; + + ASSERT_OK(db->IngestExternalFile({info.file_path}, ingest_opt)); +} + +TEST_P(DBCompactionTestWithParam, + FlushAfterL0IntraCompactionCheckConsistencyFail) { + Options options = CurrentOptions(); + options.force_consistency_checks = true; + options.compression = kNoCompression; + options.level0_file_num_compaction_trigger = 5; + options.max_background_compactions = 2; + options.max_subcompactions = max_subcompactions_; + DestroyAndReopen(options); + + const size_t kValueSize = 1 << 20; + Random rnd(301); + std::string value(RandomString(&rnd, kValueSize)); + + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"LevelCompactionPicker::PickCompactionBySize:0", + "CompactionJob::Run():Start"}}); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + // prevents trivial move + for (int i = 0; i < 10; ++i) { + ASSERT_OK(Put(Key(i), "")); // prevents trivial move + } + ASSERT_OK(Flush()); + Compact("", Key(99)); + ASSERT_EQ(0, NumTableFilesAtLevel(0)); + + // Flush 5 L0 sst. + for (int i = 0; i < 5; ++i) { + ASSERT_OK(Put(Key(i + 1), value)); + ASSERT_OK(Flush()); + } + ASSERT_EQ(5, NumTableFilesAtLevel(0)); + + // Put one key, to make smallest log sequence number in this memtable is less + // than sst which would be ingested in next step. + ASSERT_OK(Put(Key(0), "a")); + + ASSERT_EQ(5, NumTableFilesAtLevel(0)); + + // Ingest 5 L0 sst. And this files would trigger PickIntraL0Compaction. + for (int i = 5; i < 10; i++) { + IngestOneKeyValue(dbfull(), Key(i), value, options); + ASSERT_EQ(i + 1, NumTableFilesAtLevel(0)); + } + + // Put one key, to make biggest log sequence number in this memtable is bigger + // than sst which would be ingested in next step. + ASSERT_OK(Put(Key(2), "b")); + ASSERT_EQ(10, NumTableFilesAtLevel(0)); + dbfull()->TEST_WaitForCompact(); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + std::vector> level_to_files; + dbfull()->TEST_GetFilesMetaData(dbfull()->DefaultColumnFamily(), + &level_to_files); + ASSERT_GT(level_to_files[0].size(), 0); + + ASSERT_OK(Flush()); +} + +TEST_P(DBCompactionTestWithParam, + IntraL0CompactionAfterFlushCheckConsistencyFail) { + Options options = CurrentOptions(); + options.force_consistency_checks = true; + options.compression = kNoCompression; + options.level0_file_num_compaction_trigger = 5; + options.max_background_compactions = 2; + options.max_subcompactions = max_subcompactions_; + options.write_buffer_size = 2 << 20; + options.max_write_buffer_number = 6; + DestroyAndReopen(options); + + const size_t kValueSize = 1 << 20; + Random rnd(301); + std::string value(RandomString(&rnd, kValueSize)); + std::string value2(RandomString(&rnd, kValueSize)); + std::string bigvalue = value + value; + + // prevents trivial move + for (int i = 0; i < 10; ++i) { + ASSERT_OK(Put(Key(i), "")); // prevents trivial move + } + ASSERT_OK(Flush()); + Compact("", Key(99)); + ASSERT_EQ(0, NumTableFilesAtLevel(0)); + rocksdb::SyncPoint::GetInstance()->LoadDependency( + {{"LevelCompactionPicker::PickCompactionBySize:0", + "CompactionJob::Run():Start"}}); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + // Make 6 L0 sst. + for (int i = 0; i < 6; ++i) { + if (i % 2 == 0) { + IngestOneKeyValue(dbfull(), Key(i), value, options); + } else { + ASSERT_OK(Put(Key(i), value)); + ASSERT_OK(Flush()); + } + } + + ASSERT_EQ(6, NumTableFilesAtLevel(0)); + + // Stop run flush job + env_->SetBackgroundThreads(1, Env::HIGH); + test::SleepingBackgroundTask sleeping_tasks; + env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_tasks, + Env::Priority::HIGH); + sleeping_tasks.WaitUntilSleeping(); + + // Put many keys to make memtable request to flush + for (int i = 0; i < 6; ++i) { + ASSERT_OK(Put(Key(i), bigvalue)); + } + + ASSERT_EQ(6, NumTableFilesAtLevel(0)); + // ingest file to trigger IntraL0Compaction + for (int i = 6; i < 10; ++i) { + ASSERT_EQ(i, NumTableFilesAtLevel(0)); + IngestOneKeyValue(dbfull(), Key(i), value2, options); + } + ASSERT_EQ(10, NumTableFilesAtLevel(0)); + + // Wake up flush job + sleeping_tasks.WakeUp(); + sleeping_tasks.WaitUntilDone(); + dbfull()->TEST_WaitForCompact(); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + + uint64_t error_count = 0; + db_->GetIntProperty("rocksdb.background-errors", &error_count); + ASSERT_EQ(error_count, 0); + for (int i = 0; i < 6; ++i) { + ASSERT_EQ(bigvalue, Get(Key(i))); + } + for (int i = 6; i < 10; ++i) { + ASSERT_EQ(value2, Get(Key(i))); + } +} + #endif // !defined(ROCKSDB_LITE) } // namespace rocksdb