diff --git a/db/builder.cc b/db/builder.cc index 6c4fe337d..1de062e5b 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -19,11 +19,22 @@ Status BuildTable(const std::string& dbname, const Options& options, TableCache* table_cache, Iterator* iter, - FileMetaData* meta) { + FileMetaData* meta, + const Comparator* user_comparator, + const SequenceNumber newest_snapshot, + const SequenceNumber earliest_seqno_in_memtable) { Status s; meta->file_size = 0; iter->SeekToFirst(); + // If the sequence number of the smallest entry in the memtable is + // smaller than the most recent snapshot, then we do not trigger + // removal of duplicate/deleted keys as part of this builder. + bool purge = options.purge_redundant_kvs_while_flush; + if (earliest_seqno_in_memtable <= newest_snapshot) { + purge = false; + } + std::string fname = TableFileName(dbname, meta->number); if (iter->Valid()) { unique_ptr file; @@ -31,13 +42,53 @@ Status BuildTable(const std::string& dbname, if (!s.ok()) { return s; } - TableBuilder* builder = new TableBuilder(options, file.get(), 0); - meta->smallest.DecodeFrom(iter->key()); - for (; iter->Valid(); iter->Next()) { - Slice key = iter->key(); - meta->largest.DecodeFrom(key); - builder->Add(key, iter->value()); + + // the first key is the smallest key + Slice key = iter->key(); + meta->smallest.DecodeFrom(key); + + if (purge) { + ParsedInternalKey prev_ikey; + std::string prev_value; + std::string prev_key; + + // store first key-value + prev_key.assign(key.data(), key.size()); + prev_value.assign(iter->value().data(), iter->value().size()); + ParseInternalKey(Slice(prev_key), &prev_ikey); + assert(prev_ikey.sequence >= earliest_seqno_in_memtable); + + for (iter->Next(); iter->Valid(); iter->Next()) { + ParsedInternalKey this_ikey; + Slice key = iter->key(); + ParseInternalKey(key, &this_ikey); + assert(this_ikey.sequence >= earliest_seqno_in_memtable); + + if (user_comparator->Compare(prev_ikey.user_key, this_ikey.user_key)) { + // This key is different from previous key. + // Output prev key and remember current key + builder->Add(Slice(prev_key), Slice(prev_value)); + prev_key.assign(key.data(), key.size()); + prev_value.assign(iter->value().data(), iter->value().size()); + ParseInternalKey(Slice(prev_key), &prev_ikey); + } else { + // seqno within the same key are in decreasing order + assert(this_ikey.sequence < prev_ikey.sequence); + // This key is an earlier version of the same key in prev_key. + // Skip current key. + } + } + // output last key + builder->Add(Slice(prev_key), Slice(prev_value)); + meta->largest.DecodeFrom(Slice(prev_key)); + + } else { + for (; iter->Valid(); iter->Next()) { + Slice key = iter->key(); + meta->largest.DecodeFrom(key); + builder->Add(key, iter->value()); + } } // Finish and check for builder errors diff --git a/db/builder.h b/db/builder.h index 62431fcf4..e4abc7eaa 100644 --- a/db/builder.h +++ b/db/builder.h @@ -5,7 +5,9 @@ #ifndef STORAGE_LEVELDB_DB_BUILDER_H_ #define STORAGE_LEVELDB_DB_BUILDER_H_ +#include "leveldb/comparator.h" #include "leveldb/status.h" +#include "leveldb/types.h" namespace leveldb { @@ -27,7 +29,10 @@ extern Status BuildTable(const std::string& dbname, const Options& options, TableCache* table_cache, Iterator* iter, - FileMetaData* meta); + FileMetaData* meta, + const Comparator* user_comparator, + const SequenceNumber newest_snapshot, + const SequenceNumber earliest_seqno_in_memtable); } // namespace leveldb diff --git a/db/db_impl.cc b/db/db_impl.cc index d96db23ef..5cbe08236 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -666,13 +666,18 @@ Status DBImpl::WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit) { meta.number = versions_->NewFileNumber(); pending_outputs_.insert(meta.number); Iterator* iter = mem->NewIterator(); + const SequenceNumber newest_snapshot = snapshots_.GetNewest(); + const SequenceNumber earliest_seqno_in_memtable = + mem->GetFirstSequenceNumber(); Log(options_.info_log, "Level-0 table #%llu: started", (unsigned long long) meta.number); Status s; { mutex_.Unlock(); - s = BuildTable(dbname_, env_, options_, table_cache_.get(), iter, &meta); + s = BuildTable(dbname_, env_, options_, table_cache_.get(), iter, &meta, + user_comparator(), newest_snapshot, + earliest_seqno_in_memtable); mutex_.Lock(); } @@ -710,6 +715,9 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, *filenumber = meta.number; pending_outputs_.insert(meta.number); Iterator* iter = mem->NewIterator(); + const SequenceNumber newest_snapshot = snapshots_.GetNewest(); + const SequenceNumber earliest_seqno_in_memtable = + mem->GetFirstSequenceNumber(); Log(options_.info_log, "Level-0 flush table #%llu: started", (unsigned long long) meta.number); @@ -718,7 +726,9 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, Status s; { mutex_.Unlock(); - s = BuildTable(dbname_, env_, options_, table_cache_.get(), iter, &meta); + s = BuildTable(dbname_, env_, options_, table_cache_.get(), iter, &meta, + user_comparator(), newest_snapshot, + earliest_seqno_in_memtable); mutex_.Lock(); } base->Unref(); diff --git a/db/db_test.cc b/db/db_test.cc index 12d8424fc..180666909 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -211,6 +211,7 @@ class DBTest { kNumLevel_3, kDBLogDir, kManifestFileSize, + kCompactOnFlush, kEnd }; int option_config_; @@ -268,6 +269,8 @@ class DBTest { break; case kManifestFileSize: options.max_manifest_file_size = 50; // 50 bytes + case kCompactOnFlush: + options.purge_redundant_kvs_while_flush = !options.purge_redundant_kvs_while_flush; default: break; } @@ -1817,7 +1820,11 @@ TEST(DBTest, DeletionMarkers1) { Put("foo", "v2"); ASSERT_EQ(AllEntriesFor("foo"), "[ v2, DEL, v1 ]"); ASSERT_OK(dbfull()->TEST_CompactMemTable()); // Moves to level last-2 - ASSERT_EQ(AllEntriesFor("foo"), "[ v2, DEL, v1 ]"); + if (CurrentOptions().purge_redundant_kvs_while_flush) { + ASSERT_EQ(AllEntriesFor("foo"), "[ v2, v1 ]"); + } else { + ASSERT_EQ(AllEntriesFor("foo"), "[ v2, DEL, v1 ]"); + } Slice z("z"); dbfull()->TEST_CompactRange(last-2, nullptr, &z); // DEL eliminated, but v1 remains because we aren't compacting that level @@ -2420,6 +2427,90 @@ TEST(DBTest, SnapshotFiles) { dbfull()->DisableFileDeletions(); } +TEST(DBTest, CompactOnFlush) { + Options options = CurrentOptions(); + options.purge_redundant_kvs_while_flush = true; + options.disable_auto_compactions = true; + Reopen(&options); + + Put("foo", "v1"); + ASSERT_OK(dbfull()->TEST_CompactMemTable()); + ASSERT_EQ(AllEntriesFor("foo"), "[ v1 ]"); + + // Write two new keys + Put("a", "begin"); + Put("z", "end"); + dbfull()->TEST_CompactMemTable(); + + // Case1: Delete followed by a put + Delete("foo"); + Put("foo", "v2"); + ASSERT_EQ(AllEntriesFor("foo"), "[ v2, DEL, v1 ]"); + + // After the current memtable is flushed, the DEL should + // have been removed + ASSERT_OK(dbfull()->TEST_CompactMemTable()); + ASSERT_EQ(AllEntriesFor("foo"), "[ v2, v1 ]"); + + dbfull()->CompactRange(nullptr, nullptr); + ASSERT_EQ(AllEntriesFor("foo"), "[ v2 ]"); + + // Case 2: Delete followed by another delete + Delete("foo"); + Delete("foo"); + ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, DEL, v2 ]"); + ASSERT_OK(dbfull()->TEST_CompactMemTable()); + ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v2 ]"); + dbfull()->CompactRange(nullptr, nullptr); + ASSERT_EQ(AllEntriesFor("foo"), "[ ]"); + + // Case 3: Put followed by a delete + Put("foo", "v3"); + Delete("foo"); + ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v3 ]"); + ASSERT_OK(dbfull()->TEST_CompactMemTable()); + ASSERT_EQ(AllEntriesFor("foo"), "[ DEL ]"); + dbfull()->CompactRange(nullptr, nullptr); + ASSERT_EQ(AllEntriesFor("foo"), "[ ]"); + + // Case 4: Put followed by another Put + Put("foo", "v4"); + Put("foo", "v5"); + ASSERT_EQ(AllEntriesFor("foo"), "[ v5, v4 ]"); + ASSERT_OK(dbfull()->TEST_CompactMemTable()); + ASSERT_EQ(AllEntriesFor("foo"), "[ v5 ]"); + dbfull()->CompactRange(nullptr, nullptr); + ASSERT_EQ(AllEntriesFor("foo"), "[ v5 ]"); + + // clear database + Delete("foo"); + dbfull()->CompactRange(nullptr, nullptr); + ASSERT_EQ(AllEntriesFor("foo"), "[ ]"); + + // Case 5: Put followed by snapshot followed by another Put + // Both puts should remain. + Put("foo", "v6"); + const Snapshot* snapshot = db_->GetSnapshot(); + Put("foo", "v7"); + ASSERT_OK(dbfull()->TEST_CompactMemTable()); + ASSERT_EQ(AllEntriesFor("foo"), "[ v7, v6 ]"); + db_->ReleaseSnapshot(snapshot); + + // clear database + Delete("foo"); + dbfull()->CompactRange(nullptr, nullptr); + ASSERT_EQ(AllEntriesFor("foo"), "[ ]"); + + // Case 5: snapshot followed by a put followed by another Put + // Only the last put should remain. + const Snapshot* snapshot1 = db_->GetSnapshot(); + Put("foo", "v8"); + Put("foo", "v9"); + ASSERT_OK(dbfull()->TEST_CompactMemTable()); + ASSERT_EQ(AllEntriesFor("foo"), "[ v9 ]"); + db_->ReleaseSnapshot(snapshot1); +} + void ListLogFiles(Env* env, const std::string& path, std::vector* logFiles) { @@ -2898,7 +2989,6 @@ static bool CompareIterators(int step, ok = false; } } - fprintf(stderr, "%d entries compared: ok=%d\n", count, ok); delete miter; delete dbiter; return ok; @@ -2913,9 +3003,6 @@ TEST(DBTest, Randomized) { const Snapshot* db_snap = nullptr; std::string k, v; for (int step = 0; step < N; step++) { - if (step % 100 == 0) { - fprintf(stderr, "Step %d of %d\n", step, N); - } // TODO(sanjay): Test Get() works int p = rnd.Uniform(100); if (p < 45) { // Put diff --git a/db/memtable.cc b/db/memtable.cc index 1f6a9d266..efe383fe7 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -25,7 +25,8 @@ MemTable::MemTable(const InternalKeyComparator& cmp, int numlevel) flush_in_progress_(false), flush_completed_(false), file_number_(0), - edit_(numlevel) { + edit_(numlevel), + first_seqno_(0) { } MemTable::~MemTable() { @@ -107,6 +108,12 @@ void MemTable::Add(SequenceNumber s, ValueType type, memcpy(p, value.data(), val_size); assert((p + val_size) - buf == (unsigned)encoded_len); table_.Insert(buf); + + // The first sequence number inserted into the memtable + assert(first_seqno_ == 0 || s > first_seqno_); + if (first_seqno_ == 0) { + first_seqno_ = s; + } } bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) { diff --git a/db/memtable.h b/db/memtable.h index 31e2aef1d..61aa29205 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -68,6 +68,10 @@ class MemTable { // Returns the edits area that is needed for flushing the memtable VersionEdit* GetEdits() { return &edit_; } + // Returns the sequence number of the first element that was inserted + // into the memtable + SequenceNumber GetFirstSequenceNumber() { return first_seqno_; } + private: ~MemTable(); // Private since only Unref() should be used to delete it @@ -96,6 +100,9 @@ class MemTable { // memtable is flushed to storage. VersionEdit edit_; + // The sequence number of the kv that was inserted first + SequenceNumber first_seqno_; + // No copying allowed MemTable(const MemTable&); void operator=(const MemTable&); diff --git a/db/repair.cc b/db/repair.cc index 8e41c43c6..b991f833e 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -216,7 +216,8 @@ class Repairer { FileMetaData meta; meta.number = next_file_number_++; Iterator* iter = mem->NewIterator(); - status = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta); + status = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta, + icmp_.user_comparator(), 0, 0); delete iter; mem->Unref(); mem = nullptr; diff --git a/db/snapshot.h b/db/snapshot.h index 1b9a57847..96cf203a3 100644 --- a/db/snapshot.h +++ b/db/snapshot.h @@ -70,6 +70,14 @@ class SnapshotList { } } + // get the sequence number of the most recent snapshot + const SequenceNumber GetNewest() { + if (empty()) { + return 0; + } + return newest()->number_; + } + private: // Dummy head of doubly-linked list of snapshots SnapshotImpl list_; diff --git a/include/leveldb/options.h b/include/leveldb/options.h index 119dde4d5..2a44f7e5d 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -380,6 +380,10 @@ struct Options { // as well as prevent overallocation for mounts that preallocate // large amounts of data (such as xfs's allocsize option). size_t manifest_preallocation_size; + + // Purge duplicate/deleted keys when a memtable is flushed to storage. + // Default: true + bool purge_redundant_kvs_while_flush; }; // Options that control read operations diff --git a/util/options.cc b/util/options.cc index 3551aa0db..b68b2758c 100644 --- a/util/options.cc +++ b/util/options.cc @@ -58,7 +58,8 @@ Options::Options() CompactionFilter(nullptr), disable_auto_compactions(false), WAL_ttl_seconds(0), - manifest_preallocation_size(4 * 1024 * 1024) { + manifest_preallocation_size(4 * 1024 * 1024), + purge_redundant_kvs_while_flush(true) { } @@ -101,6 +102,8 @@ Options::Dump(Logger* log) const Log(log," Options.keep_log_file_num: %ld", keep_log_file_num); Log(log," Options.db_stats_log_interval: %d", db_stats_log_interval); + Log(log," Options.purge_redundant_kvs_while_flush: %d", + purge_redundant_kvs_while_flush); Log(log," Options.compression_opts.window_bits: %d", compression_opts.window_bits); Log(log," Options.compression_opts.level: %d",