diff --git a/db/builder.cc b/db/builder.cc index 953213610..de9f91e61 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -9,6 +9,7 @@ #include "db/builder.h" +#include #include #include @@ -31,6 +32,28 @@ namespace rocksdb { +namespace { +inline SequenceNumber EarliestVisibleSnapshot( + SequenceNumber in, const std::vector& snapshots, + SequenceNumber* prev_snapshot) { + if (snapshots.empty()) { + *prev_snapshot = 0; // 0 means no previous snapshot + return kMaxSequenceNumber; + } + SequenceNumber prev = 0; + for (const auto cur : snapshots) { + assert(prev <= cur); + if (cur >= in) { + *prev_snapshot = prev; + return cur; + } + prev = cur; // assignment + } + *prev_snapshot = prev; + return kMaxSequenceNumber; +} +} // namespace + class TableFactory; TableBuilder* NewTableBuilder( @@ -53,9 +76,7 @@ Status BuildTable( FileMetaData* meta, const InternalKeyComparator& internal_comparator, const std::vector>* int_tbl_prop_collector_factories, - const SequenceNumber newest_snapshot, - const SequenceNumber earliest_seqno_in_memtable, - const CompressionType compression, + std::vector snapshots, const CompressionType compression, const CompressionOptions& compression_opts, bool paranoid_file_checks, InternalStats* internal_stats, const Env::IOPriority io_priority, TableProperties* table_properties) { @@ -66,14 +87,6 @@ Status BuildTable( meta->smallest_seqno = meta->largest_seqno = 0; iter->SeekToFirst(); - // If the sequence number of the smallest entry in the memtable is - // smaller than the most recent snapshot, then we do not trigger - // removal of duplicate/deleted keys as part of this builder. - bool purge = true; - if (earliest_seqno_in_memtable <= newest_snapshot) { - purge = false; - } - std::string fname = TableFileName(ioptions.db_paths, meta->fd.GetNumber(), meta->fd.GetPathId()); if (iter->Valid()) { @@ -107,112 +120,112 @@ Status BuildTable( ioptions.min_partial_merge_operands, true /* internal key corruption is not ok */); - if (purge) { + IterKey current_user_key; + bool has_current_user_key = false; + // If has_current_user_key == true, this variable remembers the earliest + // snapshot in which this current key already exists. If two internal keys + // have the same user key AND the earlier one should be visible in the + // snapshot in which we already have a user key, we can drop the earlier + // user key + SequenceNumber current_user_key_exists_in_snapshot = kMaxSequenceNumber; + + while (iter->Valid()) { + // Get current key + ParsedInternalKey ikey; + Slice key = iter->key(); + Slice value = iter->value(); + + // In-memory key corruption is not ok; + // TODO: find a clean way to treat in memory key corruption // Ugly walkaround to avoid compiler error for release build bool ok __attribute__((unused)) = true; + ok = ParseInternalKey(key, &ikey); + assert(ok); + + meta->smallest_seqno = std::min(meta->smallest_seqno, ikey.sequence); + meta->largest_seqno = std::max(meta->largest_seqno, ikey.sequence); + + // If the key is the same as the previous key (and it is not the + // first key), then we skip it, since it is an older version. + // Otherwise we output the key and mark it as the "new" previous key. + if (!has_current_user_key || + internal_comparator.user_comparator()->Compare( + ikey.user_key, current_user_key.GetKey()) != 0) { + // First occurrence of this user key + current_user_key.SetKey(ikey.user_key); + has_current_user_key = true; + current_user_key_exists_in_snapshot = 0; + } - // Will write to builder if current key != prev key - ParsedInternalKey prev_ikey; - std::string prev_key; - bool is_first_key = true; // Also write if this is the very first key - - while (iter->Valid()) { - bool iterator_at_next = false; - - // Get current key - ParsedInternalKey this_ikey; - Slice key = iter->key(); - Slice value = iter->value(); - - // In-memory key corruption is not ok; - // TODO: find a clean way to treat in memory key corruption - ok = ParseInternalKey(key, &this_ikey); - assert(ok); - assert(this_ikey.sequence >= earliest_seqno_in_memtable); - - // If the key is the same as the previous key (and it is not the - // first key), then we skip it, since it is an older version. - // Otherwise we output the key and mark it as the "new" previous key. - if (!is_first_key && !internal_comparator.user_comparator()->Compare( - prev_ikey.user_key, this_ikey.user_key)) { - // seqno within the same key are in decreasing order - assert(this_ikey.sequence < prev_ikey.sequence); - } else { - is_first_key = false; - - if (this_ikey.type == kTypeMerge) { - // TODO(tbd): Add a check here to prevent RocksDB from crash when - // reopening a DB w/o properly specifying the merge operator. But - // currently we observed a memory leak on failing in RocksDB - // recovery, so we decide to let it crash instead of causing - // memory leak for now before we have identified the real cause - // of the memory leak. - - // Handle merge-type keys using the MergeHelper - // TODO: pass statistics to MergeUntil - merge.MergeUntil(iter, 0 /* don't worry about snapshot */); - iterator_at_next = true; - - // Write them out one-by-one. (Proceed back() to front()) - // If the merge successfully merged the input into - // a kTypeValue, the list contains a single element. - const std::deque& keys = merge.keys(); - const std::deque& values = merge.values(); - assert(keys.size() == values.size() && keys.size() >= 1); - std::deque::const_reverse_iterator key_iter; - std::deque::const_reverse_iterator value_iter; - for (key_iter = keys.rbegin(), value_iter = values.rbegin(); - key_iter != keys.rend() && value_iter != values.rend(); - ++key_iter, ++value_iter) { - builder->Add(Slice(*key_iter), Slice(*value_iter)); - } - - // Sanity check. Both iterators should end at the same time - assert(key_iter == keys.rend() && value_iter == values.rend()); - - prev_key.assign(keys.front()); - ok = ParseInternalKey(Slice(prev_key), &prev_ikey); - assert(ok); - } else { - // Handle Put/Delete-type keys by simply writing them - builder->Add(key, value); - prev_key.assign(key.data(), key.size()); - ok = ParseInternalKey(Slice(prev_key), &prev_ikey); - assert(ok); - } - } + // If there are no snapshots, then this kv affect visibility at tip. + // Otherwise, search though all existing snapshots to find + // the earlist snapshot that is affected by this kv. + SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot + SequenceNumber key_needs_to_exist_in_snapshot = + EarliestVisibleSnapshot(ikey.sequence, snapshots, &prev_snapshot); + + if (current_user_key_exists_in_snapshot == + key_needs_to_exist_in_snapshot) { + // If this user key already exists in snapshot in which it needs to + // exist, we can drop it. + // In other words, if the earliest snapshot is which this key is visible + // in is the same as the visibily of a previous instance of the + // same key, then this kv is not visible in any snapshot. + // Hidden by an newer entry for same user key + iter->Next(); + } else if (ikey.type == kTypeMerge) { + meta->largest.DecodeFrom(key); - if (io_priority == Env::IO_HIGH && - IOSTATS(bytes_written) >= kReportFlushIOStatsEvery) { - ThreadStatusUtil::IncreaseThreadOperationProperty( - ThreadStatus::FLUSH_BYTES_WRITTEN, - IOSTATS(bytes_written)); - IOSTATS_RESET(bytes_written); + // TODO(tbd): Add a check here to prevent RocksDB from crash when + // reopening a DB w/o properly specifying the merge operator. But + // currently we observed a memory leak on failing in RocksDB + // recovery, so we decide to let it crash instead of causing + // memory leak for now before we have identified the real cause + // of the memory leak. + + // Handle merge-type keys using the MergeHelper + // TODO: pass statistics to MergeUntil + merge.MergeUntil(iter, prev_snapshot, false, nullptr, env); + // IMPORTANT: Slice key doesn't point to a valid value anymore!! + + const auto& keys = merge.keys(); + const auto& values = merge.values(); + assert(!keys.empty()); + assert(keys.size() == values.size()); + + // largest possible sequence number in a merge queue is already stored + // in ikey.sequence. + // we additionally have to consider the front of the merge queue, which + // might have the smallest sequence number (out of all the merges with + // the same key) + meta->smallest_seqno = + std::min(meta->smallest_seqno, GetInternalKeySeqno(keys.front())); + + // We have a list of keys to write, write all keys in the list. + for (auto key_iter = keys.rbegin(), value_iter = values.rbegin(); + key_iter != keys.rend(); key_iter++, value_iter++) { + key = Slice(*key_iter); + value = Slice(*value_iter); + bool valid_key __attribute__((__unused__)) = + ParseInternalKey(key, &ikey); + // MergeUntil stops when it encounters a corrupt key and does not + // include them in the result, so we expect the keys here to valid. + assert(valid_key); + builder->Add(key, value); } - if (!iterator_at_next) iter->Next(); + } else { // just write out the key-value + builder->Add(key, value); + meta->largest.DecodeFrom(key); + iter->Next(); } - // The last key is the largest key - meta->largest.DecodeFrom(Slice(prev_key)); - SequenceNumber seqno = GetInternalKeySeqno(Slice(prev_key)); - meta->smallest_seqno = std::min(meta->smallest_seqno, seqno); - meta->largest_seqno = std::max(meta->largest_seqno, seqno); + current_user_key_exists_in_snapshot = key_needs_to_exist_in_snapshot; - } else { - for (; iter->Valid(); iter->Next()) { - Slice key = iter->key(); - meta->largest.DecodeFrom(key); - builder->Add(key, iter->value()); - SequenceNumber seqno = GetInternalKeySeqno(key); - meta->smallest_seqno = std::min(meta->smallest_seqno, seqno); - meta->largest_seqno = std::max(meta->largest_seqno, seqno); - if (io_priority == Env::IO_HIGH && - IOSTATS(bytes_written) >= kReportFlushIOStatsEvery) { - ThreadStatusUtil::IncreaseThreadOperationProperty( - ThreadStatus::FLUSH_BYTES_WRITTEN, - IOSTATS(bytes_written)); - IOSTATS_RESET(bytes_written); - } + if (io_priority == Env::IO_HIGH && + IOSTATS(bytes_written) >= kReportFlushIOStatsEvery) { + ThreadStatusUtil::IncreaseThreadOperationProperty( + ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written)); + IOSTATS_RESET(bytes_written); } } diff --git a/db/builder.h b/db/builder.h index 9a48227bd..09d81bfe4 100644 --- a/db/builder.h +++ b/db/builder.h @@ -52,9 +52,7 @@ extern Status BuildTable( FileMetaData* meta, const InternalKeyComparator& internal_comparator, const std::vector>* int_tbl_prop_collector_factories, - const SequenceNumber newest_snapshot, - const SequenceNumber earliest_seqno_in_memtable, - const CompressionType compression, + std::vector snapshots, const CompressionType compression, const CompressionOptions& compression_opts, bool paranoid_file_checks, InternalStats* internal_stats, const Env::IOPriority io_priority = Env::IO_HIGH, diff --git a/db/db_impl.cc b/db/db_impl.cc index f7069c65f..acc093bef 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1274,9 +1274,6 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, TableProperties table_properties; { ScopedArenaIterator iter(mem->NewIterator(ro, &arena)); - const SequenceNumber newest_snapshot = snapshots_.GetNewest(); - const SequenceNumber earliest_seqno_in_memtable = - mem->GetFirstSequenceNumber(); Log(InfoLogLevel::DEBUG_LEVEL, db_options_.info_log, "[%s] [WriteLevel0TableForRecovery]" " Level-0 table #%" PRIu64 ": started", @@ -1290,8 +1287,8 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, s = BuildTable( dbname_, env_, *cfd->ioptions(), env_options_, cfd->table_cache(), iter.get(), &meta, cfd->internal_comparator(), - cfd->int_tbl_prop_collector_factories(), newest_snapshot, - earliest_seqno_in_memtable, GetCompressionFlush(*cfd->ioptions()), + cfd->int_tbl_prop_collector_factories(), snapshots_.GetAll(), + GetCompressionFlush(*cfd->ioptions()), cfd->ioptions()->compression_opts, paranoid_file_checks, cfd->internal_stats(), Env::IO_HIGH, &info.table_properties); LogFlush(db_options_.info_log); @@ -1348,7 +1345,7 @@ Status DBImpl::FlushMemTableToOutputFile( FlushJob flush_job(dbname_, cfd, db_options_, mutable_cf_options, env_options_, versions_.get(), &mutex_, &shutting_down_, - snapshots_.GetNewest(), job_context, log_buffer, + snapshots_.GetAll(), job_context, log_buffer, directories_.GetDbDir(), directories_.GetDataDir(0U), GetCompressionFlush(*cfd->ioptions()), stats_, &event_logger_); diff --git a/db/db_test.cc b/db/db_test.cc index 6d2bc79f9..1c4c1e08a 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -3280,22 +3280,16 @@ TEST_F(DBTest, CompactBetweenSnapshots) { Put(1, "foo", "sixth"); // All entries (including duplicates) exist - // before any compaction is triggered. - ASSERT_OK(Flush(1)); - ASSERT_EQ("sixth", Get(1, "foo")); - ASSERT_EQ("fourth", Get(1, "foo", snapshot2)); - ASSERT_EQ("first", Get(1, "foo", snapshot1)); + // before any compaction or flush is triggered. ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth, fifth, fourth, third, second, first ]"); - - // After a compaction, "second", "third" and "fifth" should - // be removed - FillLevels("a", "z", 1); - dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, - nullptr); ASSERT_EQ("sixth", Get(1, "foo")); ASSERT_EQ("fourth", Get(1, "foo", snapshot2)); ASSERT_EQ("first", Get(1, "foo", snapshot1)); + + // After a flush, "second", "third" and "fifth" should + // be removed + ASSERT_OK(Flush(1)); ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth, fourth, first ]"); // after we release the snapshot1, only two values left diff --git a/db/flush_job.cc b/db/flush_job.cc index 2576a44ca..ae986e8f9 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -60,9 +60,9 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, const EnvOptions& env_options, VersionSet* versions, InstrumentedMutex* db_mutex, std::atomic* shutting_down, - SequenceNumber newest_snapshot, JobContext* job_context, - LogBuffer* log_buffer, Directory* db_directory, - Directory* output_file_directory, + std::vector existing_snapshots, + JobContext* job_context, LogBuffer* log_buffer, + Directory* db_directory, Directory* output_file_directory, CompressionType output_compression, Statistics* stats, EventLogger* event_logger) : dbname_(dbname), @@ -73,7 +73,7 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, versions_(versions), db_mutex_(db_mutex), shutting_down_(shutting_down), - newest_snapshot_(newest_snapshot), + existing_snapshots_(std::move(existing_snapshots)), job_context_(job_context), log_buffer_(log_buffer), db_directory_(db_directory), @@ -188,8 +188,6 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, // path 0 for level 0 file. meta->fd = FileDescriptor(versions_->NewFileNumber(), 0, 0); - const SequenceNumber earliest_seqno_in_memtable = - mems[0]->GetFirstSequenceNumber(); Version* base = cfd_->current(); base->Ref(); // it is likely that we do not need this reference Status s; @@ -234,9 +232,8 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, s = BuildTable( dbname_, db_options_.env, *cfd_->ioptions(), env_options_, cfd_->table_cache(), iter.get(), meta, cfd_->internal_comparator(), - cfd_->int_tbl_prop_collector_factories(), newest_snapshot_, - earliest_seqno_in_memtable, output_compression_, - cfd_->ioptions()->compression_opts, + cfd_->int_tbl_prop_collector_factories(), existing_snapshots_, + output_compression_, cfd_->ioptions()->compression_opts, mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(), Env::IO_HIGH, &info.table_properties); LogFlush(db_options_.info_log); diff --git a/db/flush_job.h b/db/flush_job.h index 180112584..14555ef56 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -57,10 +57,11 @@ class FlushJob { const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options, VersionSet* versions, InstrumentedMutex* db_mutex, std::atomic* shutting_down, - SequenceNumber newest_snapshot, JobContext* job_context, - LogBuffer* log_buffer, Directory* db_directory, - Directory* output_file_directory, CompressionType output_compression, - Statistics* stats, EventLogger* event_logger); + std::vector existing_snapshots, + JobContext* job_context, LogBuffer* log_buffer, + Directory* db_directory, Directory* output_file_directory, + CompressionType output_compression, Statistics* stats, + EventLogger* event_logger); ~FlushJob(); @@ -80,7 +81,7 @@ class FlushJob { VersionSet* versions_; InstrumentedMutex* db_mutex_; std::atomic* shutting_down_; - SequenceNumber newest_snapshot_; + std::vector existing_snapshots_; JobContext* job_context_; LogBuffer* log_buffer_; Directory* db_directory_; diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index 085415d92..1c6fe8482 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -3,6 +3,7 @@ // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. +#include #include #include @@ -91,7 +92,7 @@ TEST_F(FlushJobTest, Empty) { FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, *cfd->GetLatestMutableCFOptions(), env_options_, versions_.get(), &mutex_, &shutting_down_, - SequenceNumber(), &job_context, nullptr, nullptr, nullptr, + {}, &job_context, nullptr, nullptr, nullptr, kNoCompression, nullptr, &event_logger); ASSERT_OK(flush_job.Run()); job_context.Clean(); @@ -104,9 +105,17 @@ TEST_F(FlushJobTest, NonEmpty) { kMaxSequenceNumber); new_mem->Ref(); mock::MockFileContents inserted_keys; + // Test data: + // seqno [ 1, 2 ... 8998, 8999, 9000, 9001, 9002 ... 9999 ] + // key [ 1001, 1002 ... 9998, 9999, 0, 1, 2 ... 999 ] + // Expected: + // smallest_key = "0" + // largest_key = "9999" + // smallest_seqno = 1 + // smallest_seqno = 9999 for (int i = 1; i < 10000; ++i) { - std::string key(ToString(i)); - std::string value("value" + ToString(i)); + std::string key(ToString((i + 1000) % 10000)); + std::string value("value" + key); new_mem->Add(SequenceNumber(i), kTypeValue, key, value); InternalKey internal_key(key, SequenceNumber(i), kTypeValue); inserted_keys.insert({internal_key.Encode().ToString(), value}); @@ -122,7 +131,71 @@ TEST_F(FlushJobTest, NonEmpty) { FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, *cfd->GetLatestMutableCFOptions(), env_options_, versions_.get(), &mutex_, &shutting_down_, - SequenceNumber(), &job_context, nullptr, nullptr, nullptr, + {}, &job_context, nullptr, nullptr, nullptr, + kNoCompression, nullptr, &event_logger); + FileMetaData fd; + mutex_.Lock(); + ASSERT_OK(flush_job.Run(&fd)); + mutex_.Unlock(); + ASSERT_EQ(ToString(0), fd.smallest.user_key().ToString()); + ASSERT_EQ(ToString(9999), fd.largest.user_key().ToString()); + ASSERT_EQ(1, fd.smallest_seqno); + ASSERT_EQ(9999, fd.largest_seqno); + mock_table_factory_->AssertSingleFile(inserted_keys); + job_context.Clean(); +} + +TEST_F(FlushJobTest, Snapshots) { + JobContext job_context(0); + auto cfd = versions_->GetColumnFamilySet()->GetDefault(); + auto new_mem = cfd->ConstructNewMemtable(*cfd->GetLatestMutableCFOptions(), + kMaxSequenceNumber); + + std::vector snapshots; + std::set snapshots_set; + int keys = 10000; + int max_inserts_per_keys = 8; + + Random rnd(301); + for (int i = 0; i < keys / 2; ++i) { + snapshots.push_back(rnd.Uniform(keys * (max_inserts_per_keys / 2)) + 1); + snapshots_set.insert(snapshots.back()); + } + std::sort(snapshots.begin(), snapshots.end()); + + new_mem->Ref(); + SequenceNumber current_seqno = 0; + mock::MockFileContents inserted_keys; + for (int i = 1; i < keys; ++i) { + std::string key(ToString(i)); + int insertions = rnd.Uniform(max_inserts_per_keys); + for (int j = 0; j < insertions; ++j) { + std::string value(test::RandomHumanReadableString(&rnd, 10)); + auto seqno = ++current_seqno; + new_mem->Add(SequenceNumber(seqno), kTypeValue, key, value); + // a key is visible only if: + // 1. it's the last one written (j == insertions - 1) + // 2. there's a snapshot pointing at it + bool visible = (j == insertions - 1) || + (snapshots_set.find(seqno) != snapshots_set.end()); + if (visible) { + InternalKey internal_key(key, seqno, kTypeValue); + inserted_keys.insert({internal_key.Encode().ToString(), value}); + } + } + } + + autovector to_delete; + cfd->imm()->Add(new_mem, &to_delete); + for (auto& m : to_delete) { + delete m; + } + + EventLogger event_logger(db_options_.info_log.get()); + FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), + db_options_, *cfd->GetLatestMutableCFOptions(), + env_options_, versions_.get(), &mutex_, &shutting_down_, + snapshots, &job_context, nullptr, nullptr, nullptr, kNoCompression, nullptr, &event_logger); mutex_.Lock(); ASSERT_OK(flush_job.Run()); diff --git a/db/repair.cc b/db/repair.cc index ff2b4bcaa..d1ef6db74 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -292,7 +292,7 @@ class Repairer { ScopedArenaIterator iter(mem->NewIterator(ro, &arena)); status = BuildTable(dbname_, env_, ioptions_, env_options_, table_cache_, iter.get(), &meta, icmp_, - &int_tbl_prop_collector_factories_, 0, 0, + &int_tbl_prop_collector_factories_, {}, kNoCompression, CompressionOptions(), false, nullptr); } delete mem->Unref();