From 9ee68871dc97b5394e1e2c21e12dd2ac191567be Mon Sep 17 00:00:00 2001 From: Haobo Xu Date: Sat, 29 Jun 2013 23:21:36 -0700 Subject: [PATCH 01/15] [RocksDB] Enable manual compaction to move files back to an appropriate level. Summary: As title. This diff added an option reduce_level to CompactRange. When set to true, it will try to move the files back to the minimum level sufficient to hold the data set. Note that the default is set to true now, just to excerise it in all existing tests. Will set the default to false before check-in, for backward compatibility. Test Plan: make check; Reviewers: dhruba, emayanke CC: leveldb Differential Revision: https://reviews.facebook.net/D11553 --- db/db_impl.cc | 83 +++++++++++++++++++++++++++++++++++++++-- db/db_impl.h | 17 ++++++++- db/db_impl_readonly.h | 3 +- db/db_test.cc | 3 +- db/version_set.h | 1 + include/leveldb/db.h | 9 ++++- utilities/ttl/db_ttl.cc | 5 ++- utilities/ttl/db_ttl.h | 3 +- 8 files changed, 114 insertions(+), 10 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 1bdd61d39..03d88708a 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -182,7 +182,9 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) stats_(options.num_levels), delayed_writes_(0), last_flushed_sequence_(0), - storage_options_(options) { + storage_options_(options), + bg_work_gate_closed_(false), + refitting_level_(false) { mem_->Ref(); @@ -900,7 +902,8 @@ Status DBImpl::CompactMemTable(bool* madeProgress) { return s; } -void DBImpl::CompactRange(const Slice* begin, const Slice* end) { +void DBImpl::CompactRange(const Slice* begin, const Slice* end, + bool reduce_level) { int max_level_with_files = 1; { MutexLock l(&mutex_); @@ -915,6 +918,78 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end) { for (int level = 0; level < max_level_with_files; level++) { TEST_CompactRange(level, begin, end); } + + if (reduce_level) { + ReFitLevel(max_level_with_files); + } +} + +// return the same level if it cannot be moved +int DBImpl::FindMinimumEmptyLevelFitting(int level) { + mutex_.AssertHeld(); + int minimum_level = level; + for (int i = level - 1; i > 0; ++i) { + // stop if level i is not empty + if (versions_->NumLevelFiles(i) > 0) break; + + // stop if level i is too small (cannot fit the level files) + if (versions_->MaxBytesForLevel(i) < versions_->NumLevelBytes(level)) break; + + minimum_level = i; + } + return minimum_level; +} + +void DBImpl::ReFitLevel(int level) { + assert(level < NumberLevels()); + + MutexLock l(&mutex_); + + // only allow one thread refitting + if (refitting_level_) { + Log(options_.info_log, "ReFitLevel: another thread is refitting"); + return; + } + refitting_level_ = true; + + // wait for all background threads to stop + bg_work_gate_closed_ = true; + while (bg_compaction_scheduled_ > 0) { + Log(options_.info_log, + "RefitLevel: waiting for background threads to stop: %d", + bg_compaction_scheduled_); + bg_cv_.Wait(); + } + + // move to a smaller level + int to_level = FindMinimumEmptyLevelFitting(level); + + assert(to_level <= level); + + if (to_level < level) { + Log(options_.info_log, "Before refitting:\n%s", + versions_->current()->DebugString().data()); + + VersionEdit edit(NumberLevels()); + for (const auto& f : versions_->current()->files_[level]) { + edit.DeleteFile(level, f->number); + edit.AddFile(to_level, f->number, f->file_size, f->smallest, f->largest); + } + Log(options_.info_log, "Apply version edit:\n%s", + edit.DebugString().data()); + + auto status = versions_->LogAndApply(&edit, &mutex_); + + Log(options_.info_log, "LogAndApply: %s\n", status.ToString().data()); + + if (status.ok()) { + Log(options_.info_log, "After refitting:\n%s", + versions_->current()->DebugString().data()); + } + } + + refitting_level_ = false; + bg_work_gate_closed_ = false; } int DBImpl::NumberLevels() { @@ -1238,7 +1313,9 @@ Status DBImpl::TEST_WaitForCompact() { void DBImpl::MaybeScheduleCompaction() { mutex_.AssertHeld(); - if (bg_compaction_scheduled_ >= options_.max_background_compactions) { + if (bg_work_gate_closed_) { + // gate closed for backgrond work + } else if (bg_compaction_scheduled_ >= options_.max_background_compactions) { // Already scheduled } else if (shutting_down_.Acquire_Load()) { // DB is being deleted; no more background compactions diff --git a/db/db_impl.h b/db/db_impl.h index 5f09035f2..157acd9b8 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -57,7 +57,8 @@ class DBImpl : public DB { virtual void ReleaseSnapshot(const Snapshot* snapshot); virtual bool GetProperty(const Slice& property, std::string* value); virtual void GetApproximateSizes(const Range* range, int n, uint64_t* sizes); - virtual void CompactRange(const Slice* begin, const Slice* end); + virtual void CompactRange(const Slice* begin, const Slice* end, + bool reduce_level = false); virtual int NumberLevels(); virtual int MaxMemCompactionLevel(); virtual int Level0StopWriteTrigger(); @@ -221,6 +222,14 @@ class DBImpl : public DB { // dump leveldb.stats to LOG void MaybeDumpStats(); + // Return the minimum empty level that could hold the total data in the + // input level. Return the input level, if such level could not be found. + int FindMinimumEmptyLevelFitting(int level); + + // Move the files in the input level to the minimum level that could hold + // the data set. + void ReFitLevel(int level); + // Constant after construction const InternalFilterPolicy internal_filter_policy_; bool owns_info_log_; @@ -370,6 +379,12 @@ class DBImpl : public DB { // The options to access storage files const EnvOptions storage_options_; + // A value of true temporarily disables scheduling of background work + bool bg_work_gate_closed_; + + // Guard against multiple concurrent refitting + bool refitting_level_; + // No copying allowed DBImpl(const DBImpl&); void operator=(const DBImpl&); diff --git a/db/db_impl_readonly.h b/db/db_impl_readonly.h index 317d290d0..6199b5e7b 100644 --- a/db/db_impl_readonly.h +++ b/db/db_impl_readonly.h @@ -47,7 +47,8 @@ public: virtual Status Write(const WriteOptions& options, WriteBatch* updates) { return Status::NotSupported("Not supported operation in read only mode."); } - virtual void CompactRange(const Slice* begin, const Slice* end) { + virtual void CompactRange(const Slice* begin, const Slice* end, + bool reduce_level = false) { } virtual Status DisableFileDeletions() { return Status::NotSupported("Not supported operation in read only mode."); diff --git a/db/db_test.cc b/db/db_test.cc index 52c6ad794..acdcd41e2 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -3058,7 +3058,8 @@ class ModelDB: public DB { sizes[i] = 0; } } - virtual void CompactRange(const Slice* start, const Slice* end) { + virtual void CompactRange(const Slice* start, const Slice* end, + bool reduce_level ) { } virtual int NumberLevels() diff --git a/db/version_set.h b/db/version_set.h index 2369bcc1e..ec7a2750e 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -136,6 +136,7 @@ class Version { private: friend class Compaction; friend class VersionSet; + friend class DBImpl; class LevelFileNumIterator; Iterator* NewConcatenatingIterator(const ReadOptions&, diff --git a/include/leveldb/db.h b/include/leveldb/db.h index 056920d9e..b7efeb54b 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -180,7 +180,14 @@ class DB { // end==nullptr is treated as a key after all keys in the database. // Therefore the following call will compact the entire database: // db->CompactRange(nullptr, nullptr); - virtual void CompactRange(const Slice* begin, const Slice* end) = 0; + // Note that after the entire database is compacted, all data are pushed + // down to the last level containing any data. If the total data size + // after compaction is reduced, that level might not be appropriate for + // hosting all the files. In this case, client could set reduce_level + // to true, to move the files back to the minimum level capable of holding + // the data set. + virtual void CompactRange(const Slice* begin, const Slice* end, + bool reduce_level = false) = 0; // Number of levels used for this DB. virtual int NumberLevels() = 0; diff --git a/utilities/ttl/db_ttl.cc b/utilities/ttl/db_ttl.cc index eff675340..7dfda7207 100644 --- a/utilities/ttl/db_ttl.cc +++ b/utilities/ttl/db_ttl.cc @@ -223,8 +223,9 @@ void DBWithTTL::GetApproximateSizes(const Range* r, int n, uint64_t* sizes) { db_->GetApproximateSizes(r, n, sizes); } -void DBWithTTL::CompactRange(const Slice* begin, const Slice* end) { - db_->CompactRange(begin, end); +void DBWithTTL::CompactRange(const Slice* begin, const Slice* end, + bool reduce_level) { + db_->CompactRange(begin, end, reduce_level); } int DBWithTTL::NumberLevels() { diff --git a/utilities/ttl/db_ttl.h b/utilities/ttl/db_ttl.h index d24efbe48..d66e396ca 100644 --- a/utilities/ttl/db_ttl.h +++ b/utilities/ttl/db_ttl.h @@ -54,7 +54,8 @@ class DBWithTTL : public DB, CompactionFilter { virtual void GetApproximateSizes(const Range* r, int n, uint64_t* sizes); - virtual void CompactRange(const Slice* begin, const Slice* end); + virtual void CompactRange(const Slice* begin, const Slice* end, + bool reduce_level = false); virtual int NumberLevels(); From d364eea1fc615917133c49b5fed2acf1e884fbec Mon Sep 17 00:00:00 2001 From: Haobo Xu Date: Mon, 22 Jul 2013 12:19:46 -0700 Subject: [PATCH 02/15] [RocksDB] Fix FindMinimumEmptyLevelFitting Summary: as title Test Plan: make check; Reviewers: xjin CC: leveldb Differential Revision: https://reviews.facebook.net/D11751 --- db/db_impl.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 03d88708a..326b61e83 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -928,7 +928,7 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end, int DBImpl::FindMinimumEmptyLevelFitting(int level) { mutex_.AssertHeld(); int minimum_level = level; - for (int i = level - 1; i > 0; ++i) { + for (int i = level - 1; i > 0; --i) { // stop if level i is not empty if (versions_->NumLevelFiles(i) > 0) break; From bf66c10b13eb828a7a771cd869a5ae296caa3dea Mon Sep 17 00:00:00 2001 From: Mayank Agarwal Date: Fri, 12 Jul 2013 16:56:52 -0700 Subject: [PATCH 03/15] Use KeyMayExist for WriteBatch-Deletes Summary: Introduced KeyMayExist checking during writebatch-delete and removed from Outer Delete API because it uses writebatch-delete. Added code to skip getting Table from disk if not already present in table_cache. Some renaming of variables. Introduced KeyMayExistImpl which allows checking since specified sequence number in GetImpl useful to check partially written writebatch. Changed KeyMayExist to not be pure virtual and provided a default implementation. Expanded unit-tests in db_test to check appropriately. Ran db_stress for 1 hour with ./db_stress --max_key=100000 --ops_per_thread=10000000 --delpercent=50 --filter_deletes=1 --statistics=1. Test Plan: db_stress;make check Reviewers: dhruba, haobo Reviewed By: dhruba CC: leveldb, xjin Differential Revision: https://reviews.facebook.net/D11745 --- db/db_bench.cc | 8 ++++---- db/db_impl.cc | 32 +++++++++++++++++------------ db/db_impl.h | 13 ++++++++++-- db/db_test.cc | 43 ++++++++++++++++++++++++++++++++++++++- db/table_cache.cc | 22 +++++++++++++------- db/table_cache.h | 10 ++++----- db/version_set.cc | 6 +++--- db/version_set.h | 2 +- db/write_batch.cc | 34 +++++++++++++++++++++++++------ db/write_batch_internal.h | 9 +++++++- include/leveldb/db.h | 8 ++++++-- include/leveldb/options.h | 2 +- table/table.cc | 6 +++--- table/table.h | 2 +- tools/db_stress.cc | 10 ++++----- util/options.cc | 6 +++--- 16 files changed, 155 insertions(+), 58 deletions(-) diff --git a/db/db_bench.cc b/db/db_bench.cc index 253a64bcf..a5c95732e 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -328,7 +328,7 @@ static auto FLAGS_bytes_per_sync = leveldb::Options().bytes_per_sync; // On true, deletes use bloom-filter and drop the delete if key not present -static bool FLAGS_deletes_check_filter_first = false; +static bool FLAGS_filter_deletes = false; namespace leveldb { @@ -1114,7 +1114,7 @@ unique_ptr GenerateKeyFromInt(int v, const char* suffix = "") options.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base; options.max_bytes_for_level_multiplier = FLAGS_max_bytes_for_level_multiplier; - options.deletes_check_filter_first = FLAGS_deletes_check_filter_first; + options.filter_deletes = FLAGS_filter_deletes; if (FLAGS_max_bytes_for_level_multiplier_additional.size() > 0) { if (FLAGS_max_bytes_for_level_multiplier_additional.size() != (unsigned int)FLAGS_num_levels) { @@ -2220,9 +2220,9 @@ int main(int argc, char** argv) { FLAGS_keys_per_multiget = n; } else if (sscanf(argv[i], "--bytes_per_sync=%ld%c", &l, &junk) == 1) { FLAGS_bytes_per_sync = l; - } else if (sscanf(argv[i], "--deletes_check_filter_first=%d%c", &n, &junk) + } else if (sscanf(argv[i], "--filter_deletes=%d%c", &n, &junk) == 1 && (n == 0 || n ==1 )) { - FLAGS_deletes_check_filter_first = n; + FLAGS_filter_deletes = n; } else { fprintf(stderr, "Invalid flag '%s'\n", argv[i]); exit(1); diff --git a/db/db_impl.cc b/db/db_impl.cc index 326b61e83..3b4e77d65 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -691,7 +691,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, mem = new MemTable(internal_comparator_, NumberLevels()); mem->Ref(); } - status = WriteBatchInternal::InsertInto(&batch, mem); + status = WriteBatchInternal::InsertInto(&batch, mem, &options_); MaybeIgnoreError(&status); if (!status.ok()) { break; @@ -2078,13 +2078,13 @@ Status DBImpl::Get(const ReadOptions& options, return GetImpl(options, key, value); } -// If no_IO is true, then returns Status::NotFound if key is not in memtable, +// If no_io is true, then returns Status::NotFound if key is not in memtable, // immutable-memtable and bloom-filters can guarantee that key is not in db, -// "value" is garbage string if no_IO is true +// "value" is garbage string if no_io is true Status DBImpl::GetImpl(const ReadOptions& options, const Slice& key, std::string* value, - const bool no_IO) { + const bool no_io) { Status s; StopWatch sw(env_, options_.statistics, DB_GET); @@ -2113,12 +2113,12 @@ Status DBImpl::GetImpl(const ReadOptions& options, // s is both in/out. When in, s could either be OK or MergeInProgress. // value will contain the current merge operand in the latter case. LookupKey lkey(key, snapshot); - if (mem->Get(lkey, value, &s, options_, no_IO)) { + if (mem->Get(lkey, value, &s, options_, no_io)) { // Done - } else if (imm.Get(lkey, value, &s, options_, no_IO)) { + } else if (imm.Get(lkey, value, &s, options_, no_io)) { // Done } else { - current->Get(options, lkey, value, &s, &stats, options_, no_IO); + current->Get(options, lkey, value, &s, &stats, options_, no_io); have_stat_update = true; } mutex_.Lock(); @@ -2209,8 +2209,17 @@ std::vector DBImpl::MultiGet(const ReadOptions& options, } bool DBImpl::KeyMayExist(const Slice& key) { + return KeyMayExistImpl(key, versions_->LastSequence()); +} + +bool DBImpl::KeyMayExistImpl(const Slice& key, + const SequenceNumber read_from_seq) { std::string value; - const Status s = GetImpl(ReadOptions(), key, &value, true); + SnapshotImpl read_from_snapshot; + read_from_snapshot.number_ = read_from_seq; + ReadOptions ropts; + ropts.snapshot = &read_from_snapshot; + const Status s = GetImpl(ropts, key, &value, true); return !s.IsNotFound(); } @@ -2249,10 +2258,6 @@ Status DBImpl::Merge(const WriteOptions& o, const Slice& key, } Status DBImpl::Delete(const WriteOptions& options, const Slice& key) { - if (options_.deletes_check_filter_first && !KeyMayExist(key)) { - RecordTick(options_.statistics, NUMBER_FILTERED_DELETES); - return Status::OK(); - } return DB::Delete(options, key); } @@ -2311,7 +2316,8 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { } } if (status.ok()) { - status = WriteBatchInternal::InsertInto(updates, mem_); + status = WriteBatchInternal::InsertInto(updates, mem_, &options_, this, + options_.filter_deletes); if (!status.ok()) { // Panic for in-memory corruptions // Note that existing logic was not sound. Any partial failure writing diff --git a/db/db_impl.h b/db/db_impl.h index 157acd9b8..5229cf3e2 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -103,6 +103,15 @@ class DBImpl : public DB { // Trigger's a background call for testing. void TEST_PurgeObsoleteteWAL(); + // KeyMayExist's internal function, but can be called internally from rocksdb + // to check memtable from sequence_number=read_from_seq. This is useful to + // check presence of key in db when key's existence is to be also checked in + // an incompletely written WriteBatch in memtable. eg. Database doesn't have + // key A and WriteBatch=[PutA,B; DelA]. A KeyMayExist called from DelA also + // needs to check itself for any PutA to be sure to not drop the delete. + bool KeyMayExistImpl(const Slice& key, + const SequenceNumber read_from_seq); + protected: Env* const env_; const std::string dbname_; @@ -399,11 +408,11 @@ class DBImpl : public DB { std::vector& snapshots, SequenceNumber* prev_snapshot); - // Function that Get and KeyMayExist call with no_IO true or false + // Function that Get and KeyMayExistImpl call with no_io true or false Status GetImpl(const ReadOptions& options, const Slice& key, std::string* value, - const bool no_IO = false); + const bool no_io = false); }; // Sanitize db options. The caller should delete result.info_log if diff --git a/db/db_test.cc b/db/db_test.cc index acdcd41e2..d25fe8b3e 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -291,7 +291,7 @@ class DBTest { // TODO -- test more options break; case kDeletesFilterFirst: - options.deletes_check_filter_first = true; + options.filter_deletes = true; break; default: break; @@ -805,6 +805,44 @@ TEST(DBTest, KeyMayExist) { } while (ChangeOptions()); } +// A delete is skipped for key if KeyMayExist(key) returns False +// Tests Writebatch consistency and proper delete behaviour +TEST(DBTest, FilterDeletes) { + Options options = CurrentOptions(); + options.filter_policy = NewBloomFilterPolicy(20); + options.filter_deletes = true; + Reopen(&options); + WriteBatch batch; + + batch.Delete("a"); + dbfull()->Write(WriteOptions(), &batch); + ASSERT_EQ(AllEntriesFor("a"), "[ ]"); // Delete skipped + batch.Clear(); + + batch.Put("a", "b"); + batch.Delete("a"); + dbfull()->Write(WriteOptions(), &batch); + ASSERT_EQ(Get("a"), "NOT_FOUND"); + ASSERT_EQ(AllEntriesFor("a"), "[ DEL, b ]"); // Delete issued + batch.Clear(); + + batch.Delete("c"); + batch.Put("c", "d"); + dbfull()->Write(WriteOptions(), &batch); + ASSERT_EQ(Get("c"), "d"); + ASSERT_EQ(AllEntriesFor("c"), "[ d ]"); // Delete skipped + batch.Clear(); + + dbfull()->Flush(FlushOptions()); // A stray Flush + + batch.Delete("c"); + dbfull()->Write(WriteOptions(), &batch); + ASSERT_EQ(AllEntriesFor("c"), "[ DEL, d ]"); // Delete issued + batch.Clear(); + + delete options.filter_policy; +} + TEST(DBTest, IterEmpty) { Iterator* iter = db_->NewIterator(ReadOptions()); @@ -3192,6 +3230,9 @@ static bool CompareIterators(int step, TEST(DBTest, Randomized) { Random rnd(test::RandomSeed()); do { + if (CurrentOptions().filter_deletes) { + ChangeOptions(); // DBTest.Randomized not suited for filter_deletes + } ModelDB model(CurrentOptions()); const int N = 10000; const Snapshot* model_snap = nullptr; diff --git a/db/table_cache.cc b/db/table_cache.cc index 4cc105afe..02408d95c 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -39,15 +39,19 @@ TableCache::~TableCache() { Status TableCache::FindTable(const EnvOptions& toptions, uint64_t file_number, uint64_t file_size, - Cache::Handle** handle, bool* tableIO) { + Cache::Handle** handle, bool* table_io, + const bool no_io) { Status s; char buf[sizeof(file_number)]; EncodeFixed64(buf, file_number); Slice key(buf, sizeof(buf)); *handle = cache_->Lookup(key); if (*handle == nullptr) { - if (tableIO != nullptr) { - *tableIO = true; // we had to do IO from storage + if (no_io) { // Dont do IO and return a not-found status + return Status::NotFound("Table not found in table_cache, no_io is set"); + } + if (table_io != nullptr) { + *table_io = true; // we had to do IO from storage } std::string fname = TableFileName(dbname_, file_number); unique_ptr file; @@ -112,17 +116,21 @@ Status TableCache::Get(const ReadOptions& options, const Slice& k, void* arg, bool (*saver)(void*, const Slice&, const Slice&, bool), - bool* tableIO, + bool* table_io, void (*mark_key_may_exist)(void*), - const bool no_IO) { + const bool no_io) { Cache::Handle* handle = nullptr; Status s = FindTable(storage_options_, file_number, file_size, - &handle, tableIO); + &handle, table_io, no_io); if (s.ok()) { Table* t = reinterpret_cast(cache_->Value(handle)); - s = t->InternalGet(options, k, arg, saver, mark_key_may_exist, no_IO); + s = t->InternalGet(options, k, arg, saver, mark_key_may_exist, no_io); cache_->Release(handle); + } else if (no_io && s.IsNotFound()) { + // Couldnt find Table in cache but treat as kFound if no_io set + (*mark_key_may_exist)(arg); + return Status::OK(); } return s; } diff --git a/db/table_cache.h b/db/table_cache.h index 737e53c9e..2f3787609 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -48,9 +48,9 @@ class TableCache { const Slice& k, void* arg, bool (*handle_result)(void*, const Slice&, const Slice&, bool), - bool* tableIO, + bool* table_io, void (*mark_key_may_exist)(void*) = nullptr, - const bool no_IO = false); + const bool no_io = false); // Evict any entry for the specified file number void Evict(uint64_t file_number); @@ -62,9 +62,9 @@ class TableCache { const EnvOptions& storage_options_; std::shared_ptr cache_; - Status FindTable(const EnvOptions& toptions, - uint64_t file_number, uint64_t file_size, Cache::Handle**, - bool* tableIO = nullptr); + Status FindTable(const EnvOptions& toptions, uint64_t file_number, + uint64_t file_size, Cache::Handle**, bool* table_io=nullptr, + const bool no_io = false); }; } // namespace leveldb diff --git a/db/version_set.cc b/db/version_set.cc index 19dc022f4..15ff1330f 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -339,7 +339,7 @@ void Version::Get(const ReadOptions& options, Status *status, GetStats* stats, const Options& db_options, - const bool no_IO) { + const bool no_io) { Slice ikey = k.internal_key(); Slice user_key = k.user_key(); const Comparator* ucmp = vset_->icmp_.user_comparator(); @@ -348,7 +348,7 @@ void Version::Get(const ReadOptions& options, auto logger = db_options.info_log; assert(status->ok() || status->IsMergeInProgress()); - if (no_IO) { + if (no_io) { assert(status->ok()); } Saver saver; @@ -419,7 +419,7 @@ void Version::Get(const ReadOptions& options, bool tableIO = false; *status = vset_->table_cache_->Get(options, f->number, f->file_size, ikey, &saver, SaveValue, &tableIO, - MarkKeyMayExist, no_IO); + MarkKeyMayExist, no_io); // TODO: examine the behavior for corrupted key if (!status->ok()) { return; diff --git a/db/version_set.h b/db/version_set.h index ec7a2750e..11bfb9961 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -75,7 +75,7 @@ class Version { }; void Get(const ReadOptions&, const LookupKey& key, std::string* val, Status* status, GetStats* stats, const Options& db_option, - const bool no_IO = false); + const bool no_io = false); // Adds "stats" into the current state. Returns true if a new // compaction may need to be triggered, false otherwise. diff --git a/db/write_batch.cc b/db/write_batch.cc index 4ca4819fd..9845f49ad 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -16,8 +16,9 @@ #include "leveldb/write_batch.h" -#include "leveldb/db.h" +#include "leveldb/statistics.h" #include "db/dbformat.h" +#include "db/db_impl.h" #include "db/memtable.h" #include "db/write_batch_internal.h" #include "util/coding.h" @@ -139,6 +140,23 @@ class MemTableInserter : public WriteBatch::Handler { public: SequenceNumber sequence_; MemTable* mem_; + const Options* options_; + DBImpl* db_; + const bool filter_deletes_; + + MemTableInserter(SequenceNumber sequence, MemTable* mem, const Options* opts, + DB* db, const bool filter_deletes) + : sequence_(sequence), + mem_(mem), + options_(opts), + db_(reinterpret_cast(db)), + filter_deletes_(filter_deletes) { + assert(mem_); + if (filter_deletes_) { + assert(options_); + assert(db_); + } + } virtual void Put(const Slice& key, const Slice& value) { mem_->Add(sequence_, kTypeValue, key, value); @@ -149,17 +167,21 @@ class MemTableInserter : public WriteBatch::Handler { sequence_++; } virtual void Delete(const Slice& key) { + if (filter_deletes_ && !db_->KeyMayExistImpl(key, sequence_)) { + RecordTick(options_->statistics, NUMBER_FILTERED_DELETES); + return; + } mem_->Add(sequence_, kTypeDeletion, key, Slice()); sequence_++; } }; } // namespace -Status WriteBatchInternal::InsertInto(const WriteBatch* b, - MemTable* memtable) { - MemTableInserter inserter; - inserter.sequence_ = WriteBatchInternal::Sequence(b); - inserter.mem_ = memtable; +Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* mem, + const Options* opts, DB* db, + const bool filter_deletes) { + MemTableInserter inserter(WriteBatchInternal::Sequence(b), mem, opts, db, + filter_deletes); return b->Iterate(&inserter); } diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index eb37733c2..649752ce6 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -7,6 +7,8 @@ #include "leveldb/types.h" #include "leveldb/write_batch.h" +#include "leveldb/db.h" +#include "leveldb/options.h" namespace leveldb { @@ -39,7 +41,12 @@ class WriteBatchInternal { static void SetContents(WriteBatch* batch, const Slice& contents); - static Status InsertInto(const WriteBatch* batch, MemTable* memtable); + // Inserts batch entries into memtable + // Drops deletes in batch if filter_del is set to true and + // db->KeyMayExist returns false + static Status InsertInto(const WriteBatch* batch, MemTable* memtable, + const Options* opts = nullptr, DB* db = nullptr, + const bool filter_del = false); static void Append(WriteBatch* dst, const WriteBatch* src); }; diff --git a/include/leveldb/db.h b/include/leveldb/db.h index b7efeb54b..e331e7f67 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -122,8 +122,12 @@ class DB { // If the key definitely does not exist in the database, then this method // returns false. Otherwise return true. This check is potentially - // lighter-weight than invoking DB::Get(). No IO is performed - virtual bool KeyMayExist(const Slice& key) = 0; + // lighter-weight than invoking DB::Get(). One way to make this lighter weight + // is to avoid doing any IOs + // Default implementation here returns true + virtual bool KeyMayExist(const Slice& key) { + return true; + } // Return a heap-allocated iterator over the contents of the database. // The result of NewIterator() is initially invalid (caller must diff --git a/include/leveldb/options.h b/include/leveldb/options.h index 3341b72a2..c178ddeee 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -472,7 +472,7 @@ struct Options { // only incurs in-memory look up. This optimization avoids writing the delete // to storage when appropriate. // Default: false - bool deletes_check_filter_first; + bool filter_deletes; }; diff --git a/table/table.cc b/table/table.cc index f7b664a4f..c51a0aa44 100644 --- a/table/table.cc +++ b/table/table.cc @@ -324,7 +324,7 @@ Status Table::InternalGet(const ReadOptions& options, const Slice& k, bool (*saver)(void*, const Slice&, const Slice&, bool), void (*mark_key_may_exist)(void*), - const bool no_IO) { + const bool no_io) { Status s; Iterator* iiter = rep_->index_block->NewIterator(rep_->options.comparator); bool done = false; @@ -340,9 +340,9 @@ Status Table::InternalGet(const ReadOptions& options, const Slice& k, // cross one data block, we should be fine. RecordTick(rep_->options.statistics, BLOOM_FILTER_USEFUL); break; - } else if (no_IO) { + } else if (no_io) { // Update Saver.state to Found because we are only looking for whether - // bloom-filter can guarantee the key is not there when "no_IO" + // bloom-filter can guarantee the key is not there when "no_io" (*mark_key_may_exist)(arg); done = true; } else { diff --git a/table/table.h b/table/table.h index 4674e262b..0be95f368 100644 --- a/table/table.h +++ b/table/table.h @@ -88,7 +88,7 @@ class Table { void* arg, bool (*handle_result)(void* arg, const Slice& k, const Slice& v, bool), void (*mark_key_may_exist)(void*) = nullptr, - const bool no_IO = false); + const bool no_io = false); void ReadMeta(const Footer& footer); diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 8d27c1a68..fcd5dc269 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -181,7 +181,7 @@ static uint32_t FLAGS_log2_keys_per_lock = 2; // implies 2^2 keys per lock static uint32_t FLAGS_purge_redundant_percent = 50; // On true, deletes use bloom-filter and drop the delete if key not present -static bool FLAGS_deletes_check_filter_first = false; +static bool FLAGS_filter_deletes = false; // Level0 compaction start trigger static int FLAGS_level0_file_num_compaction_trigger = 0; @@ -904,7 +904,7 @@ class StressTest { fprintf(stdout, "Purge redundant %% : %d\n", FLAGS_purge_redundant_percent); fprintf(stdout, "Deletes use filter : %d\n", - FLAGS_deletes_check_filter_first); + FLAGS_filter_deletes); fprintf(stdout, "Num keys per lock : %d\n", 1 << FLAGS_log2_keys_per_lock); @@ -960,7 +960,7 @@ class StressTest { options.delete_obsolete_files_period_micros = FLAGS_delete_obsolete_files_period_micros; options.max_manifest_file_size = 1024; - options.deletes_check_filter_first = FLAGS_deletes_check_filter_first; + options.filter_deletes = FLAGS_filter_deletes; static Random purge_percent(1000); // no benefit from non-determinism here if (purge_percent.Uniform(100) < FLAGS_purge_redundant_percent - 1) { options.purge_redundant_kvs_while_flush = false; @@ -1160,9 +1160,9 @@ int main(int argc, char** argv) { } else if (sscanf(argv[i], "--purge_redundant_percent=%d%c", &n, &junk) == 1 && (n >= 0 && n <= 100)) { FLAGS_purge_redundant_percent = n; - } else if (sscanf(argv[i], "--deletes_check_filter_first=%d%c", &n, &junk) + } else if (sscanf(argv[i], "--filter_deletes=%d%c", &n, &junk) == 1 && (n == 0 || n == 1)) { - FLAGS_deletes_check_filter_first = n; + FLAGS_filter_deletes = n; } else { fprintf(stderr, "Invalid flag '%s'\n", argv[i]); exit(1); diff --git a/util/options.cc b/util/options.cc index 1ac25845a..e884d87d7 100644 --- a/util/options.cc +++ b/util/options.cc @@ -75,7 +75,7 @@ Options::Options() access_hint_on_compaction_start(NORMAL), use_adaptive_mutex(false), bytes_per_sync(0), - deletes_check_filter_first(false) { + filter_deletes(false) { } static const char* const access_hints[] = { @@ -209,8 +209,8 @@ Options::Dump(Logger* log) const use_adaptive_mutex); Log(log," Options.bytes_per_sync: %ld", bytes_per_sync); - Log(log," Options.deletes_check_filter_first: %d", - deletes_check_filter_first); + Log(log," Options.filter_deletes: %d", + filter_deletes); } // Options::Dump // From f3baeecd44910ce95588bcdac202c17adec9faca Mon Sep 17 00:00:00 2001 From: Mayank Agarwal Date: Tue, 23 Jul 2013 13:46:23 -0700 Subject: [PATCH 04/15] Adding filter_deletes to crash_tests run in jenkins Summary: filter_deletes options introduced in db_stress makes it drop Deletes on key if KeyMayExist(key) returns false on the key. code change was simple and tested so not wasting reviewer's time. Test Plan: maek crash_test; python tools/db_crashtest[1|2].py CC: dhruba, vamsi Differential Revision: https://reviews.facebook.net/D11769 --- tools/db_crashtest.py | 3 ++- tools/db_crashtest2.py | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index 0f5803713..187f45995 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -79,7 +79,8 @@ def main(argv): ' --target_file_size_multiplier=2 ' + \ ' --max_write_buffer_number=3 ' + \ ' --max_background_compactions=20 ' + \ - ' --max_bytes_for_level_base=10485760' + ' --max_bytes_for_level_base=10485760 ' + \ + ' --filter_deletes=' + str(random.randint(0, 1)) killtime = time.time() + interval child = subprocess.Popen(['./db_stress \ --test_batches_snapshots=1 \ diff --git a/tools/db_crashtest2.py b/tools/db_crashtest2.py index de599f1b5..2398efd91 100644 --- a/tools/db_crashtest2.py +++ b/tools/db_crashtest2.py @@ -84,7 +84,8 @@ def main(argv): ' --target_file_size_multiplier=2 ' + \ ' --max_write_buffer_number=3 ' + \ ' --max_background_compactions=20 ' + \ - ' --max_bytes_for_level_base=10485760' + ' --max_bytes_for_level_base=10485760 ' + \ + ' --filter_deletes=' + str(random.randint(0, 1)) print ("Running db_stress with additional options=\n" + additional_opts + "\n") From 6fbe4e981a3d74270a0160445bd993c464c23d76 Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Tue, 23 Jul 2013 12:18:57 -0700 Subject: [PATCH 05/15] If disable wal is set, then batch commits are avoided. Summary: rocksdb uses batch commit to write to transaction log. But if disable wal is set, then writes to transaction log are anyways avoided. In this case, there is not much value-add to batch things, batching can cause unnecessary delays to Puts(). This patch avoids batching when disableWal is set. Test Plan: make check. I am running db_stress now. Reviewers: haobo Reviewed By: haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D11763 --- db/db_impl.cc | 59 +++++++++++++++++++++++++++++---------------------- 1 file changed, 34 insertions(+), 25 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 3b4e77d65..b765c0f4b 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2270,20 +2270,26 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { StopWatch sw(env_, options_.statistics, DB_WRITE); MutexLock l(&mutex_); - writers_.push_back(&w); - while (!w.done && &w != writers_.front()) { - w.cv.Wait(); - } - if (w.done) { - return w.status; + + // If WAL is disabled, we avoid any queueing. + if (!options.disableWAL) { + writers_.push_back(&w); + while (!w.done && &w != writers_.front()) { + w.cv.Wait(); + } + if (w.done) { + return w.status; + } } // May temporarily unlock and wait. Status status = MakeRoomForWrite(my_batch == nullptr); uint64_t last_sequence = versions_->LastSequence(); Writer* last_writer = &w; + if (status.ok() && my_batch != nullptr) { // nullptr batch is for compactions - WriteBatch* updates = BuildBatchGroup(&last_writer); + WriteBatch* updates = options.disableWAL ? my_batch : + BuildBatchGroup(&last_writer); const SequenceNumber current_sequence = last_sequence + 1; WriteBatchInternal::SetSequence(updates, current_sequence); int my_batch_count = WriteBatchInternal::Count(updates); @@ -2298,12 +2304,12 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { // and protects against concurrent loggers and concurrent writes // into mem_. { - mutex_.Unlock(); if (options.disableWAL) { + // If WAL is disabled, then we do not drop the mutex. We keep the + // mutex to protect concurrent insertions into the memtable. flush_on_destroy_ = true; - } - - if (!options.disableWAL) { + } else { + mutex_.Unlock(); status = log_->AddRecord(WriteBatchInternal::Contents(updates)); if (status.ok() && options.sync) { if (options_.use_fsync) { @@ -2328,25 +2334,29 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { versions_->SetLastSequence(last_sequence); last_flushed_sequence_ = current_sequence; } - mutex_.Lock(); + if (!options.disableWAL) { + mutex_.Lock(); + } } if (updates == &tmp_batch_) tmp_batch_.Clear(); } - while (true) { - Writer* ready = writers_.front(); - writers_.pop_front(); - if (ready != &w) { - ready->status = status; - ready->done = true; - ready->cv.Signal(); + if (!options.disableWAL) { + while (true) { + Writer* ready = writers_.front(); + writers_.pop_front(); + if (ready != &w) { + ready->status = status; + ready->done = true; + ready->cv.Signal(); + } + if (ready == last_writer) break; } - if (ready == last_writer) break; - } - // Notify new head of write queue - if (!writers_.empty()) { - writers_.front()->cv.Signal(); + // Notify new head of write queue + if (!writers_.empty()) { + writers_.front()->cv.Signal(); + } } return status; } @@ -2410,7 +2420,6 @@ WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) { // REQUIRES: this thread is currently at the front of the writer queue Status DBImpl::MakeRoomForWrite(bool force) { mutex_.AssertHeld(); - assert(!writers_.empty()); bool allow_delay = !force; bool allow_rate_limit_delay = !force; uint64_t rate_limit_delay_millis = 0; From 52d7ecfc786a1c5433d48a056b5a17da6f43f33d Mon Sep 17 00:00:00 2001 From: Jim Paton Date: Tue, 23 Jul 2013 14:42:27 -0700 Subject: [PATCH 06/15] Virtualize SkipList Interface Summary: This diff virtualizes the skiplist interface so that users can provide their own implementation of a backing store for MemTables. Eventually, the backing store will be responsible for its own synchronization, allowing users (and us) to experiment with different lockless implementations. Test Plan: make clean make -j32 check ./db_stress Reviewers: dhruba, emayanke, haobo Reviewed By: dhruba CC: leveldb Differential Revision: https://reviews.facebook.net/D11739 --- db/db_impl.cc | 13 +++-- db/db_impl.h | 2 + db/memtable.cc | 58 +++++++++++-------- db/memtable.h | 21 ++++--- db/repair.cc | 3 +- db/skiplistrep.h | 106 ++++++++++++++++++++++++++++++++++ db/write_batch_test.cc | 5 +- include/leveldb/memtablerep.h | 91 +++++++++++++++++++++++++++++ include/leveldb/options.h | 6 ++ table/table_test.cc | 13 +++-- util/options.cc | 5 +- 11 files changed, 276 insertions(+), 47 deletions(-) create mode 100644 db/skiplistrep.h create mode 100644 include/leveldb/memtablerep.h diff --git a/db/db_impl.cc b/db/db_impl.cc index b765c0f4b..17c4466a3 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -163,7 +163,9 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) mutex_(options.use_adaptive_mutex), shutting_down_(nullptr), bg_cv_(&mutex_), - mem_(new MemTable(internal_comparator_, NumberLevels())), + mem_rep_factory_(options_.memtable_factory), + mem_(new MemTable(internal_comparator_, + mem_rep_factory_, NumberLevels())), logfile_number_(0), tmp_batch_(), bg_compaction_scheduled_(0), @@ -688,7 +690,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, WriteBatchInternal::SetContents(&batch, record); if (mem == nullptr) { - mem = new MemTable(internal_comparator_, NumberLevels()); + mem = new MemTable(internal_comparator_, + mem_rep_factory_, NumberLevels()); mem->Ref(); } status = WriteBatchInternal::InsertInto(&batch, mem, &options_); @@ -2528,7 +2531,8 @@ Status DBImpl::MakeRoomForWrite(bool force) { log_.reset(new log::Writer(std::move(lfile))); mem_->SetLogNumber(logfile_number_); imm_.Add(mem_); - mem_ = new MemTable(internal_comparator_, NumberLevels()); + mem_ = new MemTable(internal_comparator_, + mem_rep_factory_, NumberLevels()); mem_->Ref(); force = false; // Do not force another compaction if have room MaybeScheduleCompaction(); @@ -2782,8 +2786,7 @@ Status DB::Merge(const WriteOptions& opt, const Slice& key, DB::~DB() { } -Status DB::Open(const Options& options, const std::string& dbname, - DB** dbptr) { +Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { *dbptr = nullptr; EnvOptions soptions; diff --git a/db/db_impl.h b/db/db_impl.h index 5229cf3e2..b7f48dcac 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -18,6 +18,7 @@ #include "port/port.h" #include "util/stats_logger.h" #include "memtablelist.h" +#include "leveldb/memtablerep.h" #ifdef USE_SCRIBE #include "scribe/scribe_logger.h" @@ -253,6 +254,7 @@ class DBImpl : public DB { port::Mutex mutex_; port::AtomicPointer shutting_down_; port::CondVar bg_cv_; // Signalled when background work finishes + std::shared_ptr mem_rep_factory_; MemTable* mem_; MemTableList imm_; // Memtable that are not changing uint64_t logfile_number_; diff --git a/db/memtable.cc b/db/memtable.cc index cfd2bed04..8ccf49df0 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -3,6 +3,9 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "db/memtable.h" + +#include + #include "db/dbformat.h" #include "leveldb/comparator.h" #include "leveldb/env.h" @@ -19,23 +22,28 @@ static Slice GetLengthPrefixedSlice(const char* data) { return Slice(p, len); } -MemTable::MemTable(const InternalKeyComparator& cmp, int numlevel) +MemTable::MemTable(const InternalKeyComparator& cmp, + std::shared_ptr table_factory, + int numlevel) : comparator_(cmp), refs_(0), - table_(comparator_, &arena_), + table_(table_factory->CreateMemTableRep(comparator_)), flush_in_progress_(false), flush_completed_(false), file_number_(0), edit_(numlevel), first_seqno_(0), - mem_logfile_number_(0) { -} + mem_logfile_number_(0) { } MemTable::~MemTable() { assert(refs_ == 0); } -size_t MemTable::ApproximateMemoryUsage() { return arena_.MemoryUsage(); } +size_t MemTable::ApproximateMemoryUsage() { + // The first term is the amount of memory used by the memtable and + // the second term is the amount of memory used by the backing store + return arena_.MemoryUsage() + table_->ApproximateMemoryUsage(); +} int MemTable::KeyComparator::operator()(const char* aptr, const char* bptr) const { @@ -57,24 +65,27 @@ static const char* EncodeKey(std::string* scratch, const Slice& target) { class MemTableIterator: public Iterator { public: - explicit MemTableIterator(MemTable::Table* table) : iter_(table) { } - - virtual bool Valid() const { return iter_.Valid(); } - virtual void Seek(const Slice& k) { iter_.Seek(EncodeKey(&tmp_, k)); } - virtual void SeekToFirst() { iter_.SeekToFirst(); } - virtual void SeekToLast() { iter_.SeekToLast(); } - virtual void Next() { iter_.Next(); } - virtual void Prev() { iter_.Prev(); } - virtual Slice key() const { return GetLengthPrefixedSlice(iter_.key()); } + explicit MemTableIterator(MemTableRep* table) + : iter_(table->GetIterator()) { } + + virtual bool Valid() const { return iter_->Valid(); } + virtual void Seek(const Slice& k) { iter_->Seek(EncodeKey(&tmp_, k)); } + virtual void SeekToFirst() { iter_->SeekToFirst(); } + virtual void SeekToLast() { iter_->SeekToLast(); } + virtual void Next() { iter_->Next(); } + virtual void Prev() { iter_->Prev(); } + virtual Slice key() const { + return GetLengthPrefixedSlice(iter_->key()); + } virtual Slice value() const { - Slice key_slice = GetLengthPrefixedSlice(iter_.key()); + Slice key_slice = GetLengthPrefixedSlice(iter_->key()); return GetLengthPrefixedSlice(key_slice.data() + key_slice.size()); } virtual Status status() const { return Status::OK(); } private: - MemTable::Table::Iterator iter_; + std::shared_ptr iter_; std::string tmp_; // For passing to EncodeKey // No copying allowed @@ -83,7 +94,7 @@ class MemTableIterator: public Iterator { }; Iterator* MemTable::NewIterator() { - return new MemTableIterator(&table_); + return new MemTableIterator(table_.get()); } void MemTable::Add(SequenceNumber s, ValueType type, @@ -109,7 +120,7 @@ void MemTable::Add(SequenceNumber s, ValueType type, p = EncodeVarint32(p, val_size); memcpy(p, value.data(), val_size); assert((p + val_size) - buf == (unsigned)encoded_len); - table_.Insert(buf); + table_->Insert(buf); // The first sequence number inserted into the memtable assert(first_seqno_ == 0 || s > first_seqno_); @@ -119,10 +130,10 @@ void MemTable::Add(SequenceNumber s, ValueType type, } bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, - const Options& options, const bool check_presence_only) { + const Options& options, const bool check_presence_only) { Slice memkey = key.memtable_key(); - Table::Iterator iter(&table_); - iter.Seek(memkey.data()); + std::shared_ptr iter(table_.get()->GetIterator()); + iter->Seek(memkey.data()); bool merge_in_progress = false; std::string operand; @@ -131,10 +142,9 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, merge_in_progress = true; } - auto merge_operator = options.merge_operator; auto logger = options.info_log; - for (; iter.Valid(); iter.Next()) { + for (; iter->Valid(); iter->Next()) { // entry format is: // klength varint32 // userkey char[klength-8] @@ -144,7 +154,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, // Check that it belongs to same user key. We do not check the // sequence number since the Seek() call above should have skipped // all entries with overly large sequence numbers. - const char* entry = iter.key(); + const char* entry = iter->key(); uint32_t key_length; const char* key_ptr = GetVarint32Ptr(entry, entry+5, &key_length); if (comparator_.comparator.user_comparator()->Compare( diff --git a/db/memtable.h b/db/memtable.h index def3a5d3d..2ffe4b913 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -6,24 +6,31 @@ #define STORAGE_LEVELDB_DB_MEMTABLE_H_ #include +#include #include "leveldb/db.h" #include "db/dbformat.h" #include "db/skiplist.h" #include "db/version_set.h" #include "util/arena.h" +#include "leveldb/memtablerep.h" namespace leveldb { -class InternalKeyComparator; class Mutex; class MemTableIterator; class MemTable { public: + struct KeyComparator : public MemTableRep::KeyComparator { + const InternalKeyComparator comparator; + explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) { } + virtual int operator()(const char* a, const char* b) const; + }; + // MemTables are reference counted. The initial reference count // is zero and the caller must call Ref() at least once. explicit MemTable(const InternalKeyComparator& comparator, - int numlevel = 7); + std::shared_ptr table_factory, int numlevel = 7); // Increase reference count. void Ref() { ++refs_; } @@ -88,22 +95,14 @@ class MemTable { private: ~MemTable(); // Private since only Unref() should be used to delete it - - struct KeyComparator { - const InternalKeyComparator comparator; - explicit KeyComparator(const InternalKeyComparator& c) : comparator(c) { } - int operator()(const char* a, const char* b) const; - }; friend class MemTableIterator; friend class MemTableBackwardIterator; friend class MemTableList; - typedef SkipList Table; - KeyComparator comparator_; int refs_; Arena arena_; - Table table_; + shared_ptr table_; // These are used to manage memtable flushes to storage bool flush_in_progress_; // started the flush diff --git a/db/repair.cc b/db/repair.cc index d1c0c4525..049aabb3d 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -191,7 +191,8 @@ class Repairer { std::string scratch; Slice record; WriteBatch batch; - MemTable* mem = new MemTable(icmp_, options_.num_levels); + MemTable* mem = new MemTable(icmp_, options_.memtable_factory, + options_.num_levels); mem->Ref(); int counter = 0; while (reader.ReadRecord(&record, &scratch)) { diff --git a/db/skiplistrep.h b/db/skiplistrep.h new file mode 100644 index 000000000..0f7523b6e --- /dev/null +++ b/db/skiplistrep.h @@ -0,0 +1,106 @@ +#ifndef STORAGE_LEVELDB_DB_SKIPLISTREP_H_ +#define STORAGE_LEVELDB_DB_SKIPLISTREP_H_ + +#include "leveldb/memtablerep.h" +#include "db/memtable.h" +#include "db/skiplist.h" + +namespace leveldb { + +class Arena; + +class SkipListRep : public MemTableRep { + Arena arena_; + SkipList skip_list_; +public: + explicit SkipListRep(MemTableRep::KeyComparator& compare) + : skip_list_(compare, &arena_) { } + + // Insert key into the list. + // REQUIRES: nothing that compares equal to key is currently in the list. + virtual void Insert(const char* key) { + skip_list_.Insert(key); + } + + // Returns true iff an entry that compares equal to key is in the list. + virtual bool Contains(const char* key) const { + return skip_list_.Contains(key); + } + + virtual size_t ApproximateMemoryUsage() { + return arena_.MemoryUsage(); + } + + virtual ~SkipListRep() { } + + // Iteration over the contents of a skip list + class Iterator : public MemTableRep::Iterator { + SkipList::Iterator iter_; + public: + // Initialize an iterator over the specified list. + // The returned iterator is not valid. + explicit Iterator( + const SkipList* list + ) : iter_(list) { } + + virtual ~Iterator() { } + + // Returns true iff the iterator is positioned at a valid node. + virtual bool Valid() const { + return iter_.Valid(); + } + + // Returns the key at the current position. + // REQUIRES: Valid() + virtual const char* key() const { + return iter_.key(); + } + + // Advances to the next position. + // REQUIRES: Valid() + virtual void Next() { + iter_.Next(); + } + + // Advances to the previous position. + // REQUIRES: Valid() + virtual void Prev() { + iter_.Prev(); + } + + // Advance to the first entry with a key >= target + virtual void Seek(const char* target) { + iter_.Seek(target); + } + + // Position at the first entry in list. + // Final state of iterator is Valid() iff list is not empty. + virtual void SeekToFirst() { + iter_.SeekToFirst(); + } + + // Position at the last entry in list. + // Final state of iterator is Valid() iff list is not empty. + virtual void SeekToLast() { + iter_.SeekToLast(); + } + }; + + virtual std::shared_ptr GetIterator() { + return std::shared_ptr( + new SkipListRep::Iterator(&skip_list_) + ); + } +}; + +class SkipListFactory : public MemTableRepFactory { +public: + virtual std::shared_ptr CreateMemTableRep ( + MemTableRep::KeyComparator& compare) { + return std::shared_ptr(new SkipListRep(compare)); + } +}; + +} + +#endif // STORAGE_LEVELDB_DB_SKIPLISTREP_H_ diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index d17a08e8e..945ef16bd 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -4,6 +4,8 @@ #include "leveldb/db.h" +#include +#include "db/skiplistrep.h" #include "db/memtable.h" #include "db/write_batch_internal.h" #include "leveldb/env.h" @@ -14,7 +16,8 @@ namespace leveldb { static std::string PrintContents(WriteBatch* b) { InternalKeyComparator cmp(BytewiseComparator()); - MemTable* mem = new MemTable(cmp); + auto factory = std::make_shared(); + MemTable* mem = new MemTable(cmp, factory); mem->Ref(); std::string state; Status s = WriteBatchInternal::InsertInto(b, mem); diff --git a/include/leveldb/memtablerep.h b/include/leveldb/memtablerep.h new file mode 100644 index 000000000..cb5a6ed35 --- /dev/null +++ b/include/leveldb/memtablerep.h @@ -0,0 +1,91 @@ +// This file contains the interface that must be implemented by any collection +// to be used as the backing store for a MemTable. Such a collection must +// satisfy the following properties: +// (1) It does not store duplicate items. +// (2) It uses MemTableRep::KeyComparator to compare items for iteration and +// equality. +// (3) It can be accessed concurrently by multiple readers but need not support +// concurrent writes. +// (4) Items are never deleted. +// The liberal use of assertions is encouraged to enforce (1). + +#ifndef STORAGE_LEVELDB_DB_TABLE_H_ +#define STORAGE_LEVELDB_DB_TABLE_H_ + +#include + +namespace leveldb { + +class MemTableRep { + public: + // KeyComparator(a, b) returns a negative value if a is less than b, 0 if they + // are equal, and a positive value if b is greater than a + class KeyComparator { + public: + virtual int operator()(const char* a, const char* b) const = 0; + virtual ~KeyComparator() { } + }; + + // Insert key into the collection. (The caller will pack key and value into a + // single buffer and pass that in as the parameter to Insert) + // REQUIRES: nothing that compares equal to key is currently in the + // collection. + virtual void Insert(const char* key) = 0; + + // Returns true iff an entry that compares equal to key is in the collection. + virtual bool Contains(const char* key) const = 0; + + // Returns an estimate of the number of bytes of data in use by this + // data structure. + virtual size_t ApproximateMemoryUsage() = 0; + + virtual ~MemTableRep() { } + + // Iteration over the contents of a skip collection + class Iterator { + public: + // Initialize an iterator over the specified collection. + // The returned iterator is not valid. + // explicit Iterator(const MemTableRep* collection); + virtual ~Iterator() { }; + + // Returns true iff the iterator is positioned at a valid node. + virtual bool Valid() const = 0; + + // Returns the key at the current position. + // REQUIRES: Valid() + virtual const char* key() const = 0; + + // Advances to the next position. + // REQUIRES: Valid() + virtual void Next() = 0; + + // Advances to the previous position. + // REQUIRES: Valid() + virtual void Prev() = 0; + + // Advance to the first entry with a key >= target + virtual void Seek(const char* target) = 0; + + // Position at the first entry in collection. + // Final state of iterator is Valid() iff collection is not empty. + virtual void SeekToFirst() = 0; + + // Position at the last entry in collection. + // Final state of iterator is Valid() iff collection is not empty. + virtual void SeekToLast() = 0; + }; + + virtual std::shared_ptr GetIterator() = 0; +}; + +class MemTableRepFactory { + public: + virtual ~MemTableRepFactory() { }; + virtual std::shared_ptr CreateMemTableRep( + MemTableRep::KeyComparator&) = 0; +}; + +} + +#endif // STORAGE_LEVELDB_DB_TABLE_H_ diff --git a/include/leveldb/options.h b/include/leveldb/options.h index c178ddeee..f754fd3bf 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -12,6 +12,7 @@ #include #include "leveldb/slice.h" #include "leveldb/statistics.h" +#include "leveldb/memtablerep.h" namespace leveldb { @@ -474,6 +475,11 @@ struct Options { // Default: false bool filter_deletes; + // This is a factory that provides MemTableRep objects. + // Default: a factory that provides a skip-list-based implementation of + // MemTableRep. + std::shared_ptr memtable_factory; + }; // Options that control read operations diff --git a/table/table_test.cc b/table/table_test.cc index a2bba940d..118ffa232 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -3,8 +3,10 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include #include +#include #include "db/dbformat.h" #include "db/memtable.h" +#include "db/skiplistrep.h" #include "db/write_batch_internal.h" #include "leveldb/db.h" #include "leveldb/env.h" @@ -342,8 +344,9 @@ class MemTableConstructor: public Constructor { public: explicit MemTableConstructor(const Comparator* cmp) : Constructor(cmp), - internal_comparator_(cmp) { - memtable_ = new MemTable(internal_comparator_); + internal_comparator_(cmp), + table_factory_(new SkipListFactory) { + memtable_ = new MemTable(internal_comparator_, table_factory_); memtable_->Ref(); } ~MemTableConstructor() { @@ -351,7 +354,7 @@ class MemTableConstructor: public Constructor { } virtual Status FinishImpl(const Options& options, const KVMap& data) { memtable_->Unref(); - memtable_ = new MemTable(internal_comparator_); + memtable_ = new MemTable(internal_comparator_, table_factory_); memtable_->Ref(); int seq = 1; for (KVMap::const_iterator it = data.begin(); @@ -369,6 +372,7 @@ class MemTableConstructor: public Constructor { private: InternalKeyComparator internal_comparator_; MemTable* memtable_; + std::shared_ptr table_factory_; }; class DBConstructor: public Constructor { @@ -805,7 +809,8 @@ class MemTableTest { }; TEST(MemTableTest, Simple) { InternalKeyComparator cmp(BytewiseComparator()); - MemTable* memtable = new MemTable(cmp); + auto table_factory = std::make_shared(); + MemTable* memtable = new MemTable(cmp, table_factory); memtable->Ref(); WriteBatch batch; WriteBatchInternal::SetSequence(&batch, 100); diff --git a/util/options.cc b/util/options.cc index e884d87d7..912c53923 100644 --- a/util/options.cc +++ b/util/options.cc @@ -12,6 +12,7 @@ #include "leveldb/env.h" #include "leveldb/filter_policy.h" #include "leveldb/merge_operator.h" +#include "db/skiplistrep.h" namespace leveldb { @@ -75,7 +76,9 @@ Options::Options() access_hint_on_compaction_start(NORMAL), use_adaptive_mutex(false), bytes_per_sync(0), - filter_deletes(false) { + filter_deletes(false), + memtable_factory(std::shared_ptr(new SkipListFactory)) { + assert(memtable_factory.get() != nullptr); } static const char* const access_hints[] = { From d7ba5bce37063d2a0d2b6715157d9d0fc5fbe801 Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Wed, 24 Jul 2013 10:01:13 -0700 Subject: [PATCH 07/15] Revert 6fbe4e981a3d74270a0160445bd993c464c23d76: If disable wal is set, then batch commits are avoided Summary: Revert "If disable wal is set, then batch commits are avoided" because keeping the mutex while inserting into the skiplist means that readers and writes are all serialized on the mutex. Test Plan: Reviewers: CC: Task ID: # Blame Rev: --- db/db_impl.cc | 59 ++++++++++++++++++++++----------------------------- 1 file changed, 25 insertions(+), 34 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 17c4466a3..0587774fb 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2273,26 +2273,20 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { StopWatch sw(env_, options_.statistics, DB_WRITE); MutexLock l(&mutex_); - - // If WAL is disabled, we avoid any queueing. - if (!options.disableWAL) { - writers_.push_back(&w); - while (!w.done && &w != writers_.front()) { - w.cv.Wait(); - } - if (w.done) { - return w.status; - } + writers_.push_back(&w); + while (!w.done && &w != writers_.front()) { + w.cv.Wait(); + } + if (w.done) { + return w.status; } // May temporarily unlock and wait. Status status = MakeRoomForWrite(my_batch == nullptr); uint64_t last_sequence = versions_->LastSequence(); Writer* last_writer = &w; - if (status.ok() && my_batch != nullptr) { // nullptr batch is for compactions - WriteBatch* updates = options.disableWAL ? my_batch : - BuildBatchGroup(&last_writer); + WriteBatch* updates = BuildBatchGroup(&last_writer); const SequenceNumber current_sequence = last_sequence + 1; WriteBatchInternal::SetSequence(updates, current_sequence); int my_batch_count = WriteBatchInternal::Count(updates); @@ -2307,12 +2301,12 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { // and protects against concurrent loggers and concurrent writes // into mem_. { + mutex_.Unlock(); if (options.disableWAL) { - // If WAL is disabled, then we do not drop the mutex. We keep the - // mutex to protect concurrent insertions into the memtable. flush_on_destroy_ = true; - } else { - mutex_.Unlock(); + } + + if (!options.disableWAL) { status = log_->AddRecord(WriteBatchInternal::Contents(updates)); if (status.ok() && options.sync) { if (options_.use_fsync) { @@ -2337,29 +2331,25 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { versions_->SetLastSequence(last_sequence); last_flushed_sequence_ = current_sequence; } - if (!options.disableWAL) { - mutex_.Lock(); - } + mutex_.Lock(); } if (updates == &tmp_batch_) tmp_batch_.Clear(); } - if (!options.disableWAL) { - while (true) { - Writer* ready = writers_.front(); - writers_.pop_front(); - if (ready != &w) { - ready->status = status; - ready->done = true; - ready->cv.Signal(); - } - if (ready == last_writer) break; + while (true) { + Writer* ready = writers_.front(); + writers_.pop_front(); + if (ready != &w) { + ready->status = status; + ready->done = true; + ready->cv.Signal(); } + if (ready == last_writer) break; + } - // Notify new head of write queue - if (!writers_.empty()) { - writers_.front()->cv.Signal(); - } + // Notify new head of write queue + if (!writers_.empty()) { + writers_.front()->cv.Signal(); } return status; } @@ -2423,6 +2413,7 @@ WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) { // REQUIRES: this thread is currently at the front of the writer queue Status DBImpl::MakeRoomForWrite(bool force) { mutex_.AssertHeld(); + assert(!writers_.empty()); bool allow_delay = !force; bool allow_rate_limit_delay = !force; uint64_t rate_limit_delay_millis = 0; From 18afff2e631698c312daa3c6c44bb540542fe3ab Mon Sep 17 00:00:00 2001 From: Jim Paton Date: Mon, 29 Jul 2013 10:34:23 -0700 Subject: [PATCH 08/15] Add stall counts to statistics Summary: Previously, statistics are kept on how much time is spent on stalls of different types. This patch adds support for keeping number of stalls of each type. For example, instead of just reporting how many microseconds are spent waiting for memtables to be compacted, it will also report how many times a write stalled for that to occur. Test Plan: make -j32 check ./db_stress # Not really sure what else should be done... Reviewers: dhruba, MarkCallaghan, haobo Reviewed By: dhruba CC: leveldb Differential Revision: https://reviews.facebook.net/D11841 --- db/db_impl.cc | 53 ++++++++++++++++++++++++++++++------ db/db_impl.h | 4 +++ include/leveldb/statistics.h | 15 ++++++++-- 3 files changed, 60 insertions(+), 12 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 0587774fb..4ba270e91 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -179,6 +179,9 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) stall_level0_slowdown_(0), stall_memtable_compaction_(0), stall_level0_num_files_(0), + stall_level0_slowdown_count_(0), + stall_memtable_compaction_count_(0), + stall_level0_num_files_count_(0), started_at_(options.env->NowMicros()), flush_on_destroy_(false), stats_(options.num_levels), @@ -193,8 +196,11 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) env_->GetAbsolutePath(dbname, &db_absolute_path_); stall_leveln_slowdown_.resize(options.num_levels); - for (int i = 0; i < options.num_levels; ++i) + stall_leveln_slowdown_count_.resize(options.num_levels); + for (int i = 0; i < options.num_levels; ++i) { stall_leveln_slowdown_[i] = 0; + stall_leveln_slowdown_count_[i] = 0; + } // Reserve ten files or so for other uses and give the rest to TableCache. const int table_cache_size = options_.max_open_files - 10; @@ -2437,10 +2443,14 @@ Status DBImpl::MakeRoomForWrite(bool force) { // case it is sharing the same core as the writer. mutex_.Unlock(); uint64_t t1 = env_->NowMicros(); - env_->SleepForMicroseconds(1000); + { + StopWatch sw(env_, options_.statistics, STALL_L0_SLOWDOWN_COUNT); + env_->SleepForMicroseconds(1000); + } uint64_t delayed = env_->NowMicros() - t1; RecordTick(options_.statistics, STALL_L0_SLOWDOWN_MICROS, delayed); stall_level0_slowdown_ += delayed; + stall_level0_slowdown_count_++; allow_delay = false; // Do not delay a single write more than once //Log(options_.info_log, // "delaying write %llu usecs for level0_slowdown_writes_trigger\n", @@ -2460,20 +2470,29 @@ Status DBImpl::MakeRoomForWrite(bool force) { DelayLoggingAndReset(); Log(options_.info_log, "wait for memtable compaction...\n"); uint64_t t1 = env_->NowMicros(); - bg_cv_.Wait(); + { + StopWatch sw(env_, options_.statistics, + STALL_MEMTABLE_COMPACTION_COUNT); + bg_cv_.Wait(); + } const uint64_t stall = env_->NowMicros() -t1; RecordTick(options_.statistics, STALL_MEMTABLE_COMPACTION_MICROS, stall); stall_memtable_compaction_ += stall; + stall_memtable_compaction_count_++; } else if (versions_->NumLevelFiles(0) >= options_.level0_stop_writes_trigger) { // There are too many level-0 files. DelayLoggingAndReset(); uint64_t t1 = env_->NowMicros(); Log(options_.info_log, "wait for fewer level0 files...\n"); - bg_cv_.Wait(); + { + StopWatch sw(env_, options_.statistics, STALL_L0_NUM_FILES_COUNT); + bg_cv_.Wait(); + } const uint64_t stall = env_->NowMicros() - t1; RecordTick(options_.statistics, STALL_L0_NUM_FILES_MICROS, stall); stall_level0_num_files_ += stall; + stall_level0_num_files_count_++; } else if ( allow_rate_limit_delay && options_.rate_limit > 1.0 && @@ -2482,9 +2501,13 @@ Status DBImpl::MakeRoomForWrite(bool force) { int max_level = versions_->MaxCompactionScoreLevel(); mutex_.Unlock(); uint64_t t1 = env_->NowMicros(); - env_->SleepForMicroseconds(1000); + { + StopWatch sw(env_, options_.statistics, RATE_LIMIT_DELAY_COUNT); + env_->SleepForMicroseconds(1000); + } uint64_t delayed = env_->NowMicros() - t1; stall_leveln_slowdown_[max_level] += delayed; + stall_leveln_slowdown_count_[max_level]++; // Make sure the following value doesn't round to zero. uint64_t rate_limit = std::max((delayed / 1000), (uint64_t) 1); rate_limit_delay_millis += rate_limit; @@ -2579,6 +2602,7 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { // Add "+1" to make sure seconds_up is > 0 and avoid NaN later double seconds_up = (micros_up + 1) / 1000000.0; uint64_t total_slowdown = 0; + uint64_t total_slowdown_count = 0; uint64_t interval_bytes_written = 0; uint64_t interval_bytes_read = 0; uint64_t interval_bytes_new = 0; @@ -2587,8 +2611,8 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { // Pardon the long line but I think it is easier to read this way. snprintf(buf, sizeof(buf), " Compactions\n" - "Level Files Size(MB) Score Time(sec) Read(MB) Write(MB) Rn(MB) Rnp1(MB) Wnew(MB) Amplify Read(MB/s) Write(MB/s) Rn Rnp1 Wnp1 NewW Count Ln-stall\n" - "----------------------------------------------------------------------------------------------------------------------------------------------------------------------------\n" + "Level Files Size(MB) Score Time(sec) Read(MB) Write(MB) Rn(MB) Rnp1(MB) Wnew(MB) Amplify Read(MB/s) Write(MB/s) Rn Rnp1 Wnp1 NewW Count Ln-stall Stall-cnt\n" + "--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------\n" ); value->append(buf); for (int level = 0; level < NumberLevels(); level++) { @@ -2608,7 +2632,7 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { snprintf( buf, sizeof(buf), - "%3d %8d %8.0f %5.1f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f %7.1f %9.1f %11.1f %8d %8d %8d %8d %8d %9.1f\n", + "%3d %8d %8.0f %5.1f %9.0f %9.0f %9.0f %9.0f %9.0f %9.0f %7.1f %9.1f %11.1f %8d %8d %8d %8d %8d %9.1f %9lu\n", level, files, versions_->NumLevelBytes(level) / 1048576.0, @@ -2630,8 +2654,10 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { stats_[level].files_out_levelnp1, stats_[level].files_out_levelnp1 - stats_[level].files_in_levelnp1, stats_[level].count, - stall_leveln_slowdown_[level] / 1000000.0); + stall_leveln_slowdown_[level] / 1000000.0, + (unsigned long) stall_leveln_slowdown_count_[level]); total_slowdown += stall_leveln_slowdown_[level]; + total_slowdown_count += stall_leveln_slowdown_count_[level]; value->append(buf); } } @@ -2707,6 +2733,15 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { total_slowdown / 1000000.0); value->append(buf); + snprintf(buf, sizeof(buf), + "Stalls(count): %lu level0_slowdown, %lu level0_numfiles, " + "%lu memtable_compaction, %lu leveln_slowdown\n", + (unsigned long) stall_level0_slowdown_count_, + (unsigned long) stall_level0_num_files_count_, + (unsigned long) stall_memtable_compaction_count_, + (unsigned long) total_slowdown_count); + value->append(buf); + last_stats_.bytes_read_ = total_bytes_read; last_stats_.bytes_written_ = total_bytes_written; last_stats_.bytes_new_ = stats_[0].bytes_written; diff --git a/db/db_impl.h b/db/db_impl.h index b7f48dcac..fa20fe2b1 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -313,6 +313,10 @@ class DBImpl : public DB { uint64_t stall_memtable_compaction_; uint64_t stall_level0_num_files_; std::vector stall_leveln_slowdown_; + uint64_t stall_level0_slowdown_count_; + uint64_t stall_memtable_compaction_count_; + uint64_t stall_level0_num_files_count_; + std::vector stall_leveln_slowdown_count_; // Time at which this instance was started. const uint64_t started_at_; diff --git a/include/leveldb/statistics.h b/include/leveldb/statistics.h index 4ce4f6d1b..6bd4a05fa 100644 --- a/include/leveldb/statistics.h +++ b/include/leveldb/statistics.h @@ -80,7 +80,7 @@ const std::vector> TickersNameMap = { { STALL_L0_SLOWDOWN_MICROS, "rocksdb.l0.slowdown.micros" }, { STALL_MEMTABLE_COMPACTION_MICROS, "rocksdb.memtable.compaction.micros" }, { STALL_L0_NUM_FILES_MICROS, "rocksdb.l0.num.files.stall.micros" }, - { RATE_LIMIT_DELAY_MILLIS, "rocksdb.rate.limit.dleay.millis" }, + { RATE_LIMIT_DELAY_MILLIS, "rocksdb.rate.limit.delay.millis" }, { NO_ITERATORS, "rocksdb.num.iterators" }, { NUMBER_MULTIGET_CALLS, "rocksdb.number.multiget.get" }, { NUMBER_MULTIGET_KEYS_READ, "rocksdb.number.multiget.keys.read" }, @@ -109,7 +109,12 @@ enum Histograms { READ_BLOCK_COMPACTION_MICROS = 9, READ_BLOCK_GET_MICROS = 10, WRITE_RAW_BLOCK_MICROS = 11, - HISTOGRAM_ENUM_MAX = 12 + + STALL_L0_SLOWDOWN_COUNT = 12, + STALL_MEMTABLE_COMPACTION_COUNT = 13, + STALL_L0_NUM_FILES_COUNT = 14, + RATE_LIMIT_DELAY_COUNT = 15, + HISTOGRAM_ENUM_MAX = 16 }; const std::vector> HistogramsNameMap = { @@ -124,7 +129,11 @@ const std::vector> HistogramsNameMap = { { DB_MULTIGET, "rocksdb.db.multiget.micros" }, { READ_BLOCK_COMPACTION_MICROS, "rocksdb.read.block.compaction.micros" }, { READ_BLOCK_GET_MICROS, "rocksdb.read.block.get.micros" }, - { WRITE_RAW_BLOCK_MICROS, "rocksdb.write.raw.block.micros" } + { WRITE_RAW_BLOCK_MICROS, "rocksdb.write.raw.block.micros" }, + { STALL_L0_SLOWDOWN_COUNT, "rocksdb.l0.slowdown.count"}, + { STALL_MEMTABLE_COMPACTION_COUNT, "rocksdb.memtable.compaction.count"}, + { STALL_L0_NUM_FILES_COUNT, "rocksdb.num.files.stall.count"}, + { RATE_LIMIT_DELAY_COUNT, "rocksdb.rate.limit.delay.count"} }; struct HistogramData { From abc90b067ccceee1f58580ee8ca578c97c14d452 Mon Sep 17 00:00:00 2001 From: Jim Paton Date: Mon, 29 Jul 2013 13:26:38 -0700 Subject: [PATCH 09/15] Use specific DB name in merge_test Summary: Currently, merge_test uses /tmp/testdb for the test database. It should really use something more specific to merge_test. Most of the other tests use test::TmpDir() + "/db". This patch implements such behavior for merge_test; it makes merge_test use test::TmpDir() + "/merge_testdb" Test Plan: make clean make -j32 merge_test ./merge_test Reviewers: dhruba, haobo Reviewed By: dhruba CC: leveldb Differential Revision: https://reviews.facebook.net/D11877 --- Makefile | 4 ++-- db/merge_test.cc | 7 ++++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/Makefile b/Makefile index 9b1d05fcf..765e52b52 100644 --- a/Makefile +++ b/Makefile @@ -240,8 +240,8 @@ reduce_levels_test: tools/reduce_levels_test.o $(LIBOBJECTS) $(TESTHARNESS) write_batch_test: db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) -merge_test: db/merge_test.o $(LIBOBJECTS) - $(CXX) db/merge_test.o $(LIBOBJECTS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) +merge_test: db/merge_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(CXX) db/merge_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(MEMENVLIBRARY) : $(MEMENVOBJECTS) rm -f $@ diff --git a/db/merge_test.cc b/db/merge_test.cc index 2d2f6514f..47fad025d 100644 --- a/db/merge_test.cc +++ b/db/merge_test.cc @@ -9,6 +9,7 @@ #include "leveldb/merge_operator.h" #include "db/dbformat.h" #include "utilities/merge_operators.h" +#include "util/testharness.h" using namespace std; using namespace leveldb; @@ -20,7 +21,7 @@ std::shared_ptr OpenDb() { Options options; options.create_if_missing = true; options.merge_operator = mergeOperator.get(); - Status s = DB::Open(options, "/tmp/testdb", &db); + Status s = DB::Open(options, test::TmpDir() + "/merge_testdb", &db); if (!s.ok()) { cerr << s.ToString() << endl; assert(false); @@ -45,7 +46,7 @@ class Counters { uint64_t default_; public: - Counters(std::shared_ptr db, uint64_t defaultCount = 0) + explicit Counters(std::shared_ptr db, uint64_t defaultCount = 0) : db_(db), put_option_(), get_option_(), @@ -143,7 +144,7 @@ class MergeBasedCounters : public Counters { WriteOptions merge_option_; // for merge public: - MergeBasedCounters(std::shared_ptr db, uint64_t defaultCount = 0) + explicit MergeBasedCounters(std::shared_ptr db, uint64_t defaultCount = 0) : Counters(db, defaultCount), merge_option_() { } From 6db52b525ab57983d2791dfc8a73df24d3995770 Mon Sep 17 00:00:00 2001 From: Jim Paton Date: Mon, 29 Jul 2013 15:46:36 -0700 Subject: [PATCH 10/15] Don't use redundant Env::NowMicros() calls Summary: After my patch for stall histograms, there are redundant calls to NowMicros() by both the stop watches and DBImpl::MakeRoomForWrites. So I removed the redundant calls such that the information is gotten from the stopwatch. Test Plan: make clean make -j32 check Reviewers: dhruba, haobo, MarkCallaghan Reviewed By: dhruba CC: leveldb Differential Revision: https://reviews.facebook.net/D11883 --- db/db_impl.cc | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 4ba270e91..dbc6f71d3 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2442,12 +2442,12 @@ Status DBImpl::MakeRoomForWrite(bool force) { // this delay hands over some CPU to the compaction thread in // case it is sharing the same core as the writer. mutex_.Unlock(); - uint64_t t1 = env_->NowMicros(); + uint64_t delayed; { StopWatch sw(env_, options_.statistics, STALL_L0_SLOWDOWN_COUNT); env_->SleepForMicroseconds(1000); + delayed = sw.ElapsedMicros(); } - uint64_t delayed = env_->NowMicros() - t1; RecordTick(options_.statistics, STALL_L0_SLOWDOWN_MICROS, delayed); stall_level0_slowdown_ += delayed; stall_level0_slowdown_count_++; @@ -2469,13 +2469,13 @@ Status DBImpl::MakeRoomForWrite(bool force) { // ones are still being compacted, so we wait. DelayLoggingAndReset(); Log(options_.info_log, "wait for memtable compaction...\n"); - uint64_t t1 = env_->NowMicros(); + uint64_t stall; { StopWatch sw(env_, options_.statistics, STALL_MEMTABLE_COMPACTION_COUNT); bg_cv_.Wait(); + stall = sw.ElapsedMicros(); } - const uint64_t stall = env_->NowMicros() -t1; RecordTick(options_.statistics, STALL_MEMTABLE_COMPACTION_MICROS, stall); stall_memtable_compaction_ += stall; stall_memtable_compaction_count_++; @@ -2483,13 +2483,13 @@ Status DBImpl::MakeRoomForWrite(bool force) { options_.level0_stop_writes_trigger) { // There are too many level-0 files. DelayLoggingAndReset(); - uint64_t t1 = env_->NowMicros(); Log(options_.info_log, "wait for fewer level0 files...\n"); + uint64_t stall; { StopWatch sw(env_, options_.statistics, STALL_L0_NUM_FILES_COUNT); bg_cv_.Wait(); + stall = sw.ElapsedMicros(); } - const uint64_t stall = env_->NowMicros() - t1; RecordTick(options_.statistics, STALL_L0_NUM_FILES_MICROS, stall); stall_level0_num_files_ += stall; stall_level0_num_files_count_++; @@ -2500,12 +2500,12 @@ Status DBImpl::MakeRoomForWrite(bool force) { // Delay a write when the compaction score for any level is too large. int max_level = versions_->MaxCompactionScoreLevel(); mutex_.Unlock(); - uint64_t t1 = env_->NowMicros(); + uint64_t delayed; { StopWatch sw(env_, options_.statistics, RATE_LIMIT_DELAY_COUNT); env_->SleepForMicroseconds(1000); + delayed = sw.ElapsedMicros(); } - uint64_t delayed = env_->NowMicros() - t1; stall_leveln_slowdown_[max_level] += delayed; stall_leveln_slowdown_count_[max_level]++; // Make sure the following value doesn't round to zero. From 542cc10b1943f7bd425841ad413bb38cd9bca94d Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Tue, 30 Jul 2013 08:30:13 -0700 Subject: [PATCH 11/15] Fix README contents. Summary: Fix README contents. Test Plan: Reviewers: CC: Task ID: # Blame Rev: --- README | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/README b/README index 4ac8f93cb..4d605f5bc 100644 --- a/README +++ b/README @@ -1,5 +1,7 @@ rocksdb: A persistent key-value store for flash storage -Authors: The Facebook Database Engineering Team +Authors: * The Facebook Database Engineering Team + * Build on earlier work on leveldb by Sanjay Ghemawat + (sanjay@google.com) and Jeff Dean (jeff@google.com) This code is a library that forms the core building block for a fast key value server, especially suited for storing data on flash drives. @@ -56,6 +58,25 @@ include/env.h Abstraction of the OS environment. A posix implementation of this interface is in util/env_posix.cc -include/table.h include/table_builder.h Lower-level modules that most clients probably won't use directly + +include/cache.h + An API for the block cache. + +include/compaction_filter.h + An API for a application filter invoked on every compaction. + +include/filter_policy.h + An API for configuring a bloom filter. + +include/memtablerep.h + An API for implementing a memtable. + +include/statistics.h + An API to retrieve various database statistics. + +include/transaction_log_iterator.h + An API to retrieve transaction logs from a database. + + From 0f0a24e2981ba428cd5971bb960c8f80498aaa0a Mon Sep 17 00:00:00 2001 From: Xing Jin Date: Wed, 31 Jul 2013 12:42:23 -0700 Subject: [PATCH 12/15] Make arena block size configurable Summary: Add an option for arena block size, default value 4096 bytes. Arena will allocate blocks with such size. I am not sure about passing parameter to skiplist in the new virtualized framework, though I talked to Jim a bit. So add Jim as reviewer. Test Plan: new unit test, I am running db_test. For passing paramter from configured option to Arena, I tried tests like: TEST(DBTest, Arena_Option) { std::string dbname = test::TmpDir() + "/db_arena_option_test"; DestroyDB(dbname, Options()); DB* db = nullptr; Options opts; opts.create_if_missing = true; opts.arena_block_size = 1000000; // tested 99, 999999 Status s = DB::Open(opts, dbname, &db); db->Put(WriteOptions(), "a", "123"); } and printed some debug info. The results look good. Any suggestion for such a unit-test? Reviewers: haobo, dhruba, emayanke, jpaton Reviewed By: dhruba CC: leveldb, zshao Differential Revision: https://reviews.facebook.net/D11799 --- db/corruption_test.cc | 1 + db/db_impl.cc | 18 ++++++---- db/memtable.cc | 12 +++---- db/memtable.h | 11 +++--- db/memtablelist.h | 1 - db/skiplist.h | 3 -- db/skiplist_test.cc | 14 ++++---- db/skiplistrep.h | 14 +++----- include/leveldb/arena.h | 38 ++++++++++++++++++++ include/leveldb/memtablerep.h | 7 ++-- include/leveldb/options.h | 7 ++++ util/{arena.cc => arena_impl.cc} | 27 +++++++++------ util/{arena.h => arena_impl.h} | 43 +++++++++++++++-------- util/arena_test.cc | 59 ++++++++++++++++++++++++++------ util/options.cc | 3 ++ 15 files changed, 181 insertions(+), 77 deletions(-) create mode 100644 include/leveldb/arena.h rename util/{arena.cc => arena_impl.cc} (74%) rename util/{arena.h => arena_impl.h} (55%) diff --git a/db/corruption_test.cc b/db/corruption_test.cc index 7d9b4b068..4ed04c0d1 100644 --- a/db/corruption_test.cc +++ b/db/corruption_test.cc @@ -57,6 +57,7 @@ class CorruptionTest { opt.env = &env_; opt.block_cache = tiny_cache_; opt.block_size_deviation = 0; + opt.arena_block_size = 4096; return DB::Open(opt, dbname_, &db_); } diff --git a/db/db_impl.cc b/db/db_impl.cc index dbc6f71d3..7bc46c2b4 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -128,6 +128,12 @@ Options SanitizeOptions(const std::string& dbname, ((size_t)64)<<30); ClipToRange(&result.block_size, 1<<10, 4<<20); + // if user sets arena_block_size, we trust user to use this value. Otherwise, + // calculate a proper value from writer_buffer_size; + if (result.arena_block_size <= 0) { + result.arena_block_size = result.write_buffer_size / 10; + } + result.min_write_buffer_number_to_merge = std::min( result.min_write_buffer_number_to_merge, result.max_write_buffer_number-1); if (result.info_log == nullptr) { @@ -164,8 +170,8 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) shutting_down_(nullptr), bg_cv_(&mutex_), mem_rep_factory_(options_.memtable_factory), - mem_(new MemTable(internal_comparator_, - mem_rep_factory_, NumberLevels())), + mem_(new MemTable(internal_comparator_, mem_rep_factory_, + NumberLevels(), options_)), logfile_number_(0), tmp_batch_(), bg_compaction_scheduled_(0), @@ -696,8 +702,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, WriteBatchInternal::SetContents(&batch, record); if (mem == nullptr) { - mem = new MemTable(internal_comparator_, - mem_rep_factory_, NumberLevels()); + mem = new MemTable(internal_comparator_, mem_rep_factory_, + NumberLevels(), options_); mem->Ref(); } status = WriteBatchInternal::InsertInto(&batch, mem, &options_); @@ -2545,8 +2551,8 @@ Status DBImpl::MakeRoomForWrite(bool force) { log_.reset(new log::Writer(std::move(lfile))); mem_->SetLogNumber(logfile_number_); imm_.Add(mem_); - mem_ = new MemTable(internal_comparator_, - mem_rep_factory_, NumberLevels()); + mem_ = new MemTable(internal_comparator_, mem_rep_factory_, + NumberLevels(), options_); mem_->Ref(); force = false; // Do not force another compaction if have room MaybeScheduleCompaction(); diff --git a/db/memtable.cc b/db/memtable.cc index 8ccf49df0..fb3f4f1f7 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -24,10 +24,12 @@ static Slice GetLengthPrefixedSlice(const char* data) { MemTable::MemTable(const InternalKeyComparator& cmp, std::shared_ptr table_factory, - int numlevel) + int numlevel, + const Options& options) : comparator_(cmp), refs_(0), - table_(table_factory->CreateMemTableRep(comparator_)), + arena_impl_(options.arena_block_size), + table_(table_factory->CreateMemTableRep(comparator_, &arena_impl_)), flush_in_progress_(false), flush_completed_(false), file_number_(0), @@ -40,9 +42,7 @@ MemTable::~MemTable() { } size_t MemTable::ApproximateMemoryUsage() { - // The first term is the amount of memory used by the memtable and - // the second term is the amount of memory used by the backing store - return arena_.MemoryUsage() + table_->ApproximateMemoryUsage(); + return arena_impl_.ApproximateMemoryUsage(); } int MemTable::KeyComparator::operator()(const char* aptr, const char* bptr) @@ -111,7 +111,7 @@ void MemTable::Add(SequenceNumber s, ValueType type, const size_t encoded_len = VarintLength(internal_key_size) + internal_key_size + VarintLength(val_size) + val_size; - char* buf = arena_.Allocate(encoded_len); + char* buf = arena_impl_.Allocate(encoded_len); char* p = EncodeVarint32(buf, internal_key_size); memcpy(p, key.data(), key_size); p += key_size; diff --git a/db/memtable.h b/db/memtable.h index 2ffe4b913..6a3c7fcfd 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -11,8 +11,8 @@ #include "db/dbformat.h" #include "db/skiplist.h" #include "db/version_set.h" -#include "util/arena.h" #include "leveldb/memtablerep.h" +#include "util/arena_impl.h" namespace leveldb { @@ -29,8 +29,11 @@ class MemTable { // MemTables are reference counted. The initial reference count // is zero and the caller must call Ref() at least once. - explicit MemTable(const InternalKeyComparator& comparator, - std::shared_ptr table_factory, int numlevel = 7); + explicit MemTable( + const InternalKeyComparator& comparator, + std::shared_ptr table_factory, + int numlevel = 7, + const Options& options = Options()); // Increase reference count. void Ref() { ++refs_; } @@ -101,7 +104,7 @@ class MemTable { KeyComparator comparator_; int refs_; - Arena arena_; + ArenaImpl arena_impl_; shared_ptr table_; // These are used to manage memtable flushes to storage diff --git a/db/memtablelist.h b/db/memtablelist.h index 40419e56f..d741a6630 100644 --- a/db/memtablelist.h +++ b/db/memtablelist.h @@ -8,7 +8,6 @@ #include "leveldb/db.h" #include "db/dbformat.h" #include "db/skiplist.h" -#include "util/arena.h" #include "memtable.h" namespace leveldb { diff --git a/db/skiplist.h b/db/skiplist.h index 1c7b4dd71..a3fe05dbb 100644 --- a/db/skiplist.h +++ b/db/skiplist.h @@ -31,13 +31,10 @@ #include #include #include "port/port.h" -#include "util/arena.h" #include "util/random.h" namespace leveldb { -class Arena; - template class SkipList { private: diff --git a/db/skiplist_test.cc b/db/skiplist_test.cc index fa8a21a31..3542183f1 100644 --- a/db/skiplist_test.cc +++ b/db/skiplist_test.cc @@ -5,7 +5,7 @@ #include "db/skiplist.h" #include #include "leveldb/env.h" -#include "util/arena.h" +#include "util/arena_impl.h" #include "util/hash.h" #include "util/random.h" #include "util/testharness.h" @@ -29,9 +29,9 @@ struct TestComparator { class SkipTest { }; TEST(SkipTest, Empty) { - Arena arena; + ArenaImpl arena_impl; TestComparator cmp; - SkipList list(cmp, &arena); + SkipList list(cmp, &arena_impl); ASSERT_TRUE(!list.Contains(10)); SkipList::Iterator iter(&list); @@ -49,9 +49,9 @@ TEST(SkipTest, InsertAndLookup) { const int R = 5000; Random rnd(1000); std::set keys; - Arena arena; + ArenaImpl arena_impl; TestComparator cmp; - SkipList list(cmp, &arena); + SkipList list(cmp, &arena_impl); for (int i = 0; i < N; i++) { Key key = rnd.Next() % R; if (keys.insert(key).second) { @@ -204,14 +204,14 @@ class ConcurrentTest { // Current state of the test State current_; - Arena arena_; + ArenaImpl arena_impl_; // SkipList is not protected by mu_. We just use a single writer // thread to modify it. SkipList list_; public: - ConcurrentTest() : list_(TestComparator(), &arena_) { } + ConcurrentTest() : list_(TestComparator(), &arena_impl_) { } // REQUIRES: External synchronization void WriteStep(Random* rnd) { diff --git a/db/skiplistrep.h b/db/skiplistrep.h index 0f7523b6e..d22768f4d 100644 --- a/db/skiplistrep.h +++ b/db/skiplistrep.h @@ -10,11 +10,11 @@ namespace leveldb { class Arena; class SkipListRep : public MemTableRep { - Arena arena_; SkipList skip_list_; public: - explicit SkipListRep(MemTableRep::KeyComparator& compare) - : skip_list_(compare, &arena_) { } + explicit SkipListRep(MemTableRep::KeyComparator& compare, Arena* arena) + : skip_list_(compare, arena) { +} // Insert key into the list. // REQUIRES: nothing that compares equal to key is currently in the list. @@ -27,10 +27,6 @@ public: return skip_list_.Contains(key); } - virtual size_t ApproximateMemoryUsage() { - return arena_.MemoryUsage(); - } - virtual ~SkipListRep() { } // Iteration over the contents of a skip list @@ -96,8 +92,8 @@ public: class SkipListFactory : public MemTableRepFactory { public: virtual std::shared_ptr CreateMemTableRep ( - MemTableRep::KeyComparator& compare) { - return std::shared_ptr(new SkipListRep(compare)); + MemTableRep::KeyComparator& compare, Arena* arena) { + return std::shared_ptr(new SkipListRep(compare, arena)); } }; diff --git a/include/leveldb/arena.h b/include/leveldb/arena.h new file mode 100644 index 000000000..6e3a1f00b --- /dev/null +++ b/include/leveldb/arena.h @@ -0,0 +1,38 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// Arena class defines memory allocation methods. It's used by memtable and +// skiplist. + +#ifndef STORAGE_LEVELDB_INCLUDE_ARENA_H_ +#define STORAGE_LEVELDB_INCLUDE_ARENA_H_ + +namespace leveldb { + +class Arena { + public: + Arena() {}; + virtual ~Arena() {}; + + // Return a pointer to a newly allocated memory block of "bytes" bytes. + virtual char* Allocate(size_t bytes) = 0; + + // Allocate memory with the normal alignment guarantees provided by malloc. + virtual char* AllocateAligned(size_t bytes) = 0; + + // Returns an estimate of the total memory used by arena. + virtual const size_t ApproximateMemoryUsage() = 0; + + // Returns the total number of bytes in all blocks allocated so far. + virtual const size_t MemoryAllocatedBytes() = 0; + + private: + // No copying allowed + Arena(const Arena&); + void operator=(const Arena&); +}; + +} // namespace leveldb + +#endif // STORAGE_LEVELDB_INCLUDE_ARENA_H_ diff --git a/include/leveldb/memtablerep.h b/include/leveldb/memtablerep.h index cb5a6ed35..bf769f543 100644 --- a/include/leveldb/memtablerep.h +++ b/include/leveldb/memtablerep.h @@ -13,6 +13,7 @@ #define STORAGE_LEVELDB_DB_TABLE_H_ #include +#include "leveldb/arena.h" namespace leveldb { @@ -35,10 +36,6 @@ class MemTableRep { // Returns true iff an entry that compares equal to key is in the collection. virtual bool Contains(const char* key) const = 0; - // Returns an estimate of the number of bytes of data in use by this - // data structure. - virtual size_t ApproximateMemoryUsage() = 0; - virtual ~MemTableRep() { } // Iteration over the contents of a skip collection @@ -83,7 +80,7 @@ class MemTableRepFactory { public: virtual ~MemTableRepFactory() { }; virtual std::shared_ptr CreateMemTableRep( - MemTableRep::KeyComparator&) = 0; + MemTableRep::KeyComparator&, Arena* arena) = 0; }; } diff --git a/include/leveldb/options.h b/include/leveldb/options.h index f754fd3bf..6252a1eb0 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -375,6 +375,13 @@ struct Options { // Number of shards used for table cache. int table_cache_numshardbits; + // size of one block in arena memory allocation. + // If <= 0, a proper value is automatically calculated (usually 1/10 of + // writer_buffer_size). + // + // Default: 0 + size_t arena_block_size; + // Create an Options object with default values for all fields. Options(); diff --git a/util/arena.cc b/util/arena_impl.cc similarity index 74% rename from util/arena.cc rename to util/arena_impl.cc index a339f4055..6d39a80d7 100644 --- a/util/arena.cc +++ b/util/arena_impl.cc @@ -2,27 +2,32 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. -#include "util/arena.h" -#include +#include "util/arena_impl.h" namespace leveldb { -static const int kBlockSize = 4096; +ArenaImpl::ArenaImpl(size_t block_size) { + if (block_size < kMinBlockSize) { + block_size_ = kMinBlockSize; + } else if (block_size > kMaxBlockSize) { + block_size_ = kMaxBlockSize; + } else { + block_size_ = block_size; + } -Arena::Arena() { blocks_memory_ = 0; alloc_ptr_ = nullptr; // First allocation will allocate a block alloc_bytes_remaining_ = 0; } -Arena::~Arena() { +ArenaImpl::~ArenaImpl() { for (size_t i = 0; i < blocks_.size(); i++) { delete[] blocks_[i]; } } -char* Arena::AllocateFallback(size_t bytes) { - if (bytes > kBlockSize / 4) { +char* ArenaImpl::AllocateFallback(size_t bytes) { + if (bytes > block_size_ / 4) { // Object is more than a quarter of our block size. Allocate it separately // to avoid wasting too much space in leftover bytes. char* result = AllocateNewBlock(bytes); @@ -30,8 +35,8 @@ char* Arena::AllocateFallback(size_t bytes) { } // We waste the remaining space in the current block. - alloc_ptr_ = AllocateNewBlock(kBlockSize); - alloc_bytes_remaining_ = kBlockSize; + alloc_ptr_ = AllocateNewBlock(block_size_); + alloc_bytes_remaining_ = block_size_; char* result = alloc_ptr_; alloc_ptr_ += bytes; @@ -39,7 +44,7 @@ char* Arena::AllocateFallback(size_t bytes) { return result; } -char* Arena::AllocateAligned(size_t bytes) { +char* ArenaImpl::AllocateAligned(size_t bytes) { const int align = sizeof(void*); // We'll align to pointer size assert((align & (align-1)) == 0); // Pointer size should be a power of 2 size_t current_mod = reinterpret_cast(alloc_ptr_) & (align-1); @@ -58,7 +63,7 @@ char* Arena::AllocateAligned(size_t bytes) { return result; } -char* Arena::AllocateNewBlock(size_t block_bytes) { +char* ArenaImpl::AllocateNewBlock(size_t block_bytes) { char* result = new char[block_bytes]; blocks_memory_ += block_bytes; blocks_.push_back(result); diff --git a/util/arena.h b/util/arena_impl.h similarity index 55% rename from util/arena.h rename to util/arena_impl.h index 8f7dde226..a5425e87a 100644 --- a/util/arena.h +++ b/util/arena_impl.h @@ -2,38 +2,53 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. -#ifndef STORAGE_LEVELDB_UTIL_ARENA_H_ -#define STORAGE_LEVELDB_UTIL_ARENA_H_ +// ArenaImpl is an implementation of Arena class. For a request of small size, +// it allocates a block with pre-defined block size. For a request of big +// size, it uses malloc to directly get the requested size. + +#ifndef STORAGE_LEVELDB_UTIL_ARENA_IMPL_H_ +#define STORAGE_LEVELDB_UTIL_ARENA_IMPL_H_ #include #include #include #include +#include "leveldb/arena.h" namespace leveldb { -class Arena { +class ArenaImpl : public Arena { public: - Arena(); - ~Arena(); + explicit ArenaImpl(size_t block_size = kMinBlockSize); + virtual ~ArenaImpl(); - // Return a pointer to a newly allocated memory block of "bytes" bytes. - char* Allocate(size_t bytes); + virtual char* Allocate(size_t bytes); - // Allocate memory with the normal alignment guarantees provided by malloc - char* AllocateAligned(size_t bytes); + virtual char* AllocateAligned(size_t bytes); // Returns an estimate of the total memory usage of data allocated // by the arena (including space allocated but not yet used for user // allocations). - size_t MemoryUsage() const { + // + // TODO: Do we need to exclude space allocated but not used? + virtual const size_t ApproximateMemoryUsage() { return blocks_memory_ + blocks_.capacity() * sizeof(char*); } + virtual const size_t MemoryAllocatedBytes() { + return blocks_memory_; + } + private: char* AllocateFallback(size_t bytes); char* AllocateNewBlock(size_t block_bytes); + static const size_t kMinBlockSize = 4096; + static const size_t kMaxBlockSize = 2 << 30; + + // Number of bytes allocated in one block + size_t block_size_; + // Allocation state char* alloc_ptr_; size_t alloc_bytes_remaining_; @@ -45,11 +60,11 @@ class Arena { size_t blocks_memory_; // No copying allowed - Arena(const Arena&); - void operator=(const Arena&); + ArenaImpl(const ArenaImpl&); + void operator=(const ArenaImpl&); }; -inline char* Arena::Allocate(size_t bytes) { +inline char* ArenaImpl::Allocate(size_t bytes) { // The semantics of what to return are a bit messy if we allow // 0-byte allocations, so we disallow them here (we don't need // them for our internal use). @@ -65,4 +80,4 @@ inline char* Arena::Allocate(size_t bytes) { } // namespace leveldb -#endif // STORAGE_LEVELDB_UTIL_ARENA_H_ +#endif // STORAGE_LEVELDB_UTIL_ARENA_IMPL_H_ diff --git a/util/arena_test.cc b/util/arena_test.cc index d5c33d75b..13c6e9391 100644 --- a/util/arena_test.cc +++ b/util/arena_test.cc @@ -2,22 +2,59 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. -#include "util/arena.h" - +#include "util/arena_impl.h" #include "util/random.h" #include "util/testharness.h" namespace leveldb { -class ArenaTest { }; +class ArenaImplTest { }; + +TEST(ArenaImplTest, Empty) { + ArenaImpl arena0; +} + +TEST(ArenaImplTest, MemoryAllocatedBytes) { + const int N = 17; + size_t req_sz; //requested size + size_t bsz = 8192; // block size + size_t expected_memory_allocated; -TEST(ArenaTest, Empty) { - Arena arena; + ArenaImpl arena_impl(bsz); + + // requested size > quarter of a block: + // allocate requested size separately + req_sz = 3001; + for (int i = 0; i < N; i++) { + arena_impl.Allocate(req_sz); + } + expected_memory_allocated = req_sz * N; + ASSERT_EQ(arena_impl.MemoryAllocatedBytes(), expected_memory_allocated); + + // requested size < quarter of a block: + // allocate a block with the default size, then try to use unused part + // of the block. So one new block will be allocated for the first + // Allocate(99) call. All the remaining calls won't lead to new allocation. + req_sz = 99; + for (int i = 0; i < N; i++) { + arena_impl.Allocate(req_sz); + } + expected_memory_allocated += bsz; + ASSERT_EQ(arena_impl.MemoryAllocatedBytes(), expected_memory_allocated); + + // requested size > quarter of a block: + // allocate requested size separately + req_sz = 99999999; + for (int i = 0; i < N; i++) { + arena_impl.Allocate(req_sz); + } + expected_memory_allocated += req_sz * N; + ASSERT_EQ(arena_impl.MemoryAllocatedBytes(), expected_memory_allocated); } -TEST(ArenaTest, Simple) { +TEST(ArenaImplTest, Simple) { std::vector > allocated; - Arena arena; + ArenaImpl arena_impl; const int N = 100000; size_t bytes = 0; Random rnd(301); @@ -35,9 +72,9 @@ TEST(ArenaTest, Simple) { } char* r; if (rnd.OneIn(10)) { - r = arena.AllocateAligned(s); + r = arena_impl.AllocateAligned(s); } else { - r = arena.Allocate(s); + r = arena_impl.Allocate(s); } for (unsigned int b = 0; b < s; b++) { @@ -46,9 +83,9 @@ TEST(ArenaTest, Simple) { } bytes += s; allocated.push_back(std::make_pair(s, r)); - ASSERT_GE(arena.MemoryUsage(), bytes); + ASSERT_GE(arena_impl.ApproximateMemoryUsage(), bytes); if (i > N/10) { - ASSERT_LE(arena.MemoryUsage(), bytes * 1.10); + ASSERT_LE(arena_impl.ApproximateMemoryUsage(), bytes * 1.10); } } for (unsigned int i = 0; i < allocated.size(); i++) { diff --git a/util/options.cc b/util/options.cc index 912c53923..99cd1b094 100644 --- a/util/options.cc +++ b/util/options.cc @@ -61,6 +61,7 @@ Options::Options() max_manifest_file_size(std::numeric_limits::max()), no_block_cache(false), table_cache_numshardbits(4), + arena_block_size(0), disable_auto_compactions(false), WAL_ttl_seconds(0), manifest_preallocation_size(4 * 1024 * 1024), @@ -174,6 +175,8 @@ Options::Dump(Logger* log) const no_block_cache); Log(log," Options.table_cache_numshardbits: %d", table_cache_numshardbits); + Log(log," Options.arena_block_size: %ld", + arena_block_size); Log(log," Options.delete_obsolete_files_period_micros: %ld", delete_obsolete_files_period_micros); Log(log," Options.max_background_compactions: %d", From 9700677a2b83b3a9aa9820cd5e6953b393db85e6 Mon Sep 17 00:00:00 2001 From: Jim Paton Date: Wed, 31 Jul 2013 16:20:48 -0700 Subject: [PATCH 13/15] Slow down writes gradually rather than suddenly Summary: Currently, when a certain number of level0 files (level0_slowdown_writes_trigger) are present, RocksDB will slow down each write by 1ms. There is a second limit of level0 files at which RocksDB will stop writes altogether (level0_stop_writes_trigger). This patch enables the user to supply a third parameter specifying the number of files at which Rocks will start slowing down writes (level0_start_slowdown_writes). When this number is reached, Rocks will slow down writes as a quadratic function of level0_slowdown_writes_trigger - num_level0_files. For some workloads, this improves latency and throughput. I will post some stats momentarily in https://our.intern.facebook.com/intern/tasks/?t=2613384. Test Plan: make -j32 check ./db_stress ./db_bench Reviewers: dhruba, haobo, MarkCallaghan, xjin Reviewed By: xjin CC: leveldb, xjin, zshao Differential Revision: https://reviews.facebook.net/D11859 --- db/db_impl.cc | 38 ++++++++++++++++++++++++++++++++++++-- db/db_impl.h | 1 + include/leveldb/options.h | 6 +++--- 3 files changed, 40 insertions(+), 5 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 7bc46c2b4..9cff6991d 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2421,6 +2421,40 @@ WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) { return result; } +// This function computes the amount of time in microseconds by which a write +// should be delayed based on the number of level-0 files according to the +// following formula: +// if num_level_files < level0_slowdown_writes_trigger, return 0; +// if num_level_files >= level0_stop_writes_trigger, return 1000; +// otherwise, let r = (num_level_files - level0_slowdown) / +// (level0_stop - level0_slowdown) +// and return r^2 * 1000. +// The goal of this formula is to gradually increase the rate at which writes +// are slowed. We also tried linear delay (r * 1000), but it seemed to do +// slightly worse. There is no other particular reason for choosing quadratic. +uint64_t DBImpl::SlowdownAmount(int num_level0_files) { + uint64_t delay; + int stop_trigger = options_.level0_stop_writes_trigger; + int slowdown_trigger = options_.level0_slowdown_writes_trigger; + if (num_level0_files >= stop_trigger) { + delay = 1000; + } + else if (num_level0_files < slowdown_trigger) { + delay = 0; + } + else { + // If we are here, we know that: + // slowdown_trigger <= num_level0_files < stop_trigger + // since the previous two conditions are false. + float how_much = + (float) (num_level0_files - slowdown_trigger) / + (stop_trigger - slowdown_trigger); + delay = how_much * how_much * 1000; + } + assert(delay <= 1000); + return delay; +} + // REQUIRES: mutex_ is held // REQUIRES: this thread is currently at the front of the writer queue Status DBImpl::MakeRoomForWrite(bool force) { @@ -2444,14 +2478,14 @@ Status DBImpl::MakeRoomForWrite(bool force) { // We are getting close to hitting a hard limit on the number of // L0 files. Rather than delaying a single write by several // seconds when we hit the hard limit, start delaying each - // individual write by 1ms to reduce latency variance. Also, + // individual write by 0-1ms to reduce latency variance. Also, // this delay hands over some CPU to the compaction thread in // case it is sharing the same core as the writer. mutex_.Unlock(); uint64_t delayed; { StopWatch sw(env_, options_.statistics, STALL_L0_SLOWDOWN_COUNT); - env_->SleepForMicroseconds(1000); + env_->SleepForMicroseconds(SlowdownAmount(versions_->NumLevelFiles(0))); delayed = sw.ElapsedMicros(); } RecordTick(options_.statistics, STALL_L0_SLOWDOWN_MICROS, delayed); diff --git a/db/db_impl.h b/db/db_impl.h index fa20fe2b1..dedfd9d7e 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -170,6 +170,7 @@ class DBImpl : public DB { Status WriteLevel0Table(std::vector &mems, VersionEdit* edit, uint64_t* filenumber); + uint64_t SlowdownAmount(int num_level0_files); Status MakeRoomForWrite(bool force /* compact even if there is room? */); WriteBatch* BuildBatchGroup(Writer** last_writer); diff --git a/include/leveldb/options.h b/include/leveldb/options.h index 6252a1eb0..c591ad515 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -219,9 +219,9 @@ struct Options { // level-0 compaction will not be triggered by number of files at all. int level0_file_num_compaction_trigger; - // Soft limit on number of level-0 files. We slow down writes at this point. - // A value <0 means that no writing slow down will be triggered by number - // of files in level-0. + // Soft limit on number of level-0 files. We start slowing down writes at this + // point. A value <0 means that no writing slow down will be triggered by + // number of files in level-0. int level0_slowdown_writes_trigger; // Maximum number of level-0 files. We stop writes at this point. From 59d0b02f8b7135051001d2ef377621cb6bfe7556 Mon Sep 17 00:00:00 2001 From: Mayank Agarwal Date: Fri, 26 Jul 2013 12:57:01 -0700 Subject: [PATCH 14/15] Expand KeyMayExist to return the proper value if it can be found in memory and also check block_cache Summary: Removed KeyMayExistImpl because KeyMayExist demanded Get like semantics now. Removed no_io from memtable and imm because we need the proper value now and shouldn't just stop when we see Merge in memtable. Added checks to block_cache. Updated documentation and unit-test Test Plan: make all check;db_stress for 1 hour Reviewers: dhruba, haobo Reviewed By: dhruba CC: leveldb Differential Revision: https://reviews.facebook.net/D11853 --- db/db_impl.cc | 33 +++++++++++++-------------------- db/db_impl.h | 26 ++++++++++++-------------- db/db_test.cc | 37 +++++++++++++++++++++++++------------ db/memtable.cc | 6 +----- db/memtable.h | 5 ++--- db/memtablelist.cc | 4 ++-- db/memtablelist.h | 2 +- db/version_set.cc | 17 ++++++++++++----- db/version_set.h | 2 +- db/write_batch.cc | 15 ++++++++++++--- include/leveldb/db.h | 21 +++++++++++++++------ include/leveldb/options.h | 10 ++++------ table/table.cc | 22 ++++++++++++++-------- table/table.h | 3 ++- tools/db_stress.cc | 2 +- utilities/ttl/db_ttl.cc | 7 +++++-- utilities/ttl/db_ttl.h | 5 ++++- 17 files changed, 126 insertions(+), 91 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 9cff6991d..8be77d57e 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2093,13 +2093,11 @@ Status DBImpl::Get(const ReadOptions& options, return GetImpl(options, key, value); } -// If no_io is true, then returns Status::NotFound if key is not in memtable, -// immutable-memtable and bloom-filters can guarantee that key is not in db, -// "value" is garbage string if no_io is true Status DBImpl::GetImpl(const ReadOptions& options, const Slice& key, std::string* value, - const bool no_io) { + const bool no_io, + bool* value_found) { Status s; StopWatch sw(env_, options_.statistics, DB_GET); @@ -2128,12 +2126,12 @@ Status DBImpl::GetImpl(const ReadOptions& options, // s is both in/out. When in, s could either be OK or MergeInProgress. // value will contain the current merge operand in the latter case. LookupKey lkey(key, snapshot); - if (mem->Get(lkey, value, &s, options_, no_io)) { + if (mem->Get(lkey, value, &s, options_)) { // Done - } else if (imm.Get(lkey, value, &s, options_, no_io)) { + } else if (imm.Get(lkey, value, &s, options_)) { // Done } else { - current->Get(options, lkey, value, &s, &stats, options_, no_io); + current->Get(options, lkey, value, &s, &stats, options_, no_io,value_found); have_stat_update = true; } mutex_.Lock(); @@ -2223,19 +2221,14 @@ std::vector DBImpl::MultiGet(const ReadOptions& options, return statList; } -bool DBImpl::KeyMayExist(const Slice& key) { - return KeyMayExistImpl(key, versions_->LastSequence()); -} - -bool DBImpl::KeyMayExistImpl(const Slice& key, - const SequenceNumber read_from_seq) { - std::string value; - SnapshotImpl read_from_snapshot; - read_from_snapshot.number_ = read_from_seq; - ReadOptions ropts; - ropts.snapshot = &read_from_snapshot; - const Status s = GetImpl(ropts, key, &value, true); - return !s.IsNotFound(); +bool DBImpl::KeyMayExist(const ReadOptions& options, + const Slice& key, + std::string* value, + bool* value_found) { + if (value_found != nullptr) { + *value_found = true; // falsify later if key-may-exist but can't fetch value + } + return GetImpl(options, key, value, true, value_found).ok(); } Iterator* DBImpl::NewIterator(const ReadOptions& options) { diff --git a/db/db_impl.h b/db/db_impl.h index dedfd9d7e..333a86867 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -50,9 +50,14 @@ class DBImpl : public DB { const std::vector& keys, std::vector* values); - // Returns false if key can't exist- based on memtable, immutable-memtable and - // bloom-filters; true otherwise. No IO is performed - virtual bool KeyMayExist(const Slice& key); + // Returns false if key doesn't exist in the database and true if it may. + // If value_found is not passed in as null, then return the value if found in + // memory. On return, if value was found, then value_found will be set to true + // , otherwise false. + virtual bool KeyMayExist(const ReadOptions& options, + const Slice& key, + std::string* value, + bool* value_found = nullptr); virtual Iterator* NewIterator(const ReadOptions&); virtual const Snapshot* GetSnapshot(); virtual void ReleaseSnapshot(const Snapshot* snapshot); @@ -104,15 +109,6 @@ class DBImpl : public DB { // Trigger's a background call for testing. void TEST_PurgeObsoleteteWAL(); - // KeyMayExist's internal function, but can be called internally from rocksdb - // to check memtable from sequence_number=read_from_seq. This is useful to - // check presence of key in db when key's existence is to be also checked in - // an incompletely written WriteBatch in memtable. eg. Database doesn't have - // key A and WriteBatch=[PutA,B; DelA]. A KeyMayExist called from DelA also - // needs to check itself for any PutA to be sure to not drop the delete. - bool KeyMayExistImpl(const Slice& key, - const SequenceNumber read_from_seq); - protected: Env* const env_; const std::string dbname_; @@ -415,11 +411,13 @@ class DBImpl : public DB { std::vector& snapshots, SequenceNumber* prev_snapshot); - // Function that Get and KeyMayExistImpl call with no_io true or false + // Function that Get and KeyMayExist call with no_io true or false + // Note: 'value_found' from KeyMayExist propagates here Status GetImpl(const ReadOptions& options, const Slice& key, std::string* value, - const bool no_io = false); + const bool no_io = false, + bool* value_found = nullptr); }; // Sanitize db options. The caller should delete result.info_log if diff --git a/db/db_test.cc b/db/db_test.cc index d25fe8b3e..9a7bb2eb0 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -772,34 +772,41 @@ TEST(DBTest, GetEncountersEmptyLevel) { } while (ChangeOptions()); } -// KeyMayExist-API returns false if memtable(s) and in-memory bloom-filters can -// guarantee that the key doesn't exist in the db, else true. This can lead to -// a few false positives, but not false negatives. To make test deterministic, -// use a much larger number of bits per key-20 than bits in the key, so -// that false positives are eliminated +// KeyMayExist can lead to a few false positives, but not false negatives. +// To make test deterministic, use a much larger number of bits per key-20 than +// bits in the key, so that false positives are eliminated TEST(DBTest, KeyMayExist) { do { + ReadOptions ropts; + std::string value; Options options = CurrentOptions(); options.filter_policy = NewBloomFilterPolicy(20); Reopen(&options); - ASSERT_TRUE(!db_->KeyMayExist("a")); + ASSERT_TRUE(!db_->KeyMayExist(ropts, "a", &value)); ASSERT_OK(db_->Put(WriteOptions(), "a", "b")); - ASSERT_TRUE(db_->KeyMayExist("a")); + bool value_found = false; + ASSERT_TRUE(db_->KeyMayExist(ropts, "a", &value, &value_found)); + ASSERT_TRUE(value_found); + ASSERT_EQ("b", value); dbfull()->Flush(FlushOptions()); - ASSERT_TRUE(db_->KeyMayExist("a")); + value.clear(); + value_found = false; + ASSERT_TRUE(db_->KeyMayExist(ropts, "a", &value, &value_found)); + ASSERT_TRUE(value_found); + ASSERT_EQ("b", value); ASSERT_OK(db_->Delete(WriteOptions(), "a")); - ASSERT_TRUE(!db_->KeyMayExist("a")); + ASSERT_TRUE(!db_->KeyMayExist(ropts, "a", &value)); dbfull()->Flush(FlushOptions()); dbfull()->CompactRange(nullptr, nullptr); - ASSERT_TRUE(!db_->KeyMayExist("a")); + ASSERT_TRUE(!db_->KeyMayExist(ropts, "a", &value)); ASSERT_OK(db_->Delete(WriteOptions(), "c")); - ASSERT_TRUE(!db_->KeyMayExist("c")); + ASSERT_TRUE(!db_->KeyMayExist(ropts, "c", &value)); delete options.filter_policy; } while (ChangeOptions()); @@ -3045,7 +3052,13 @@ class ModelDB: public DB { Status::NotSupported("Not implemented.")); return s; } - virtual bool KeyMayExist(const Slice& key) { + virtual bool KeyMayExist(const ReadOptions& options, + const Slice& key, + std::string* value, + bool* value_found = nullptr) { + if (value_found != nullptr) { + *value_found = false; + } return true; // Not Supported directly } virtual Iterator* NewIterator(const ReadOptions& options) { diff --git a/db/memtable.cc b/db/memtable.cc index fb3f4f1f7..622058804 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -130,7 +130,7 @@ void MemTable::Add(SequenceNumber s, ValueType type, } bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, - const Options& options, const bool check_presence_only) { + const Options& options) { Slice memkey = key.memtable_key(); std::shared_ptr iter(table_.get()->GetIterator()); iter->Seek(memkey.data()); @@ -174,10 +174,6 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, return true; } case kTypeMerge: { - if (check_presence_only) { - *s = Status::OK(); - return true; - } Slice v = GetLengthPrefixedSlice(key_ptr + key_length); if (merge_in_progress) { merge_operator->Merge(key.user_key(), &v, operand, diff --git a/db/memtable.h b/db/memtable.h index 6a3c7fcfd..73d32fc4c 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -73,13 +73,12 @@ class MemTable { // If memtable contains a deletion for key, store a NotFound() error // in *status and return true. // If memtable contains Merge operation as the most recent entry for a key, - // and if check_presence_only is set, return true with Status::OK, - // else if the merge process does not stop (not reaching a value or delete), + // and the merge process does not stop (not reaching a value or delete), // store the current merged result in value and MergeInProgress in s. // return false // Else, return false. bool Get(const LookupKey& key, std::string* value, Status* s, - const Options& options, const bool check_presence_only = false); + const Options& options); // Returns the edits area that is needed for flushing the memtable VersionEdit* GetEdits() { return &edit_; } diff --git a/db/memtablelist.cc b/db/memtablelist.cc index ac89d1043..9d8b675bf 100644 --- a/db/memtablelist.cc +++ b/db/memtablelist.cc @@ -194,10 +194,10 @@ size_t MemTableList::ApproximateMemoryUsage() { // Search all the memtables starting from the most recent one. // Return the most recent value found, if any. bool MemTableList::Get(const LookupKey& key, std::string* value, Status* s, - const Options& options, const bool check_presence_only) { + const Options& options) { for (list::iterator it = memlist_.begin(); it != memlist_.end(); ++it) { - if ((*it)->Get(key, value, s, options, check_presence_only)) { + if ((*it)->Get(key, value, s, options)) { return true; } } diff --git a/db/memtablelist.h b/db/memtablelist.h index d741a6630..b30089cf6 100644 --- a/db/memtablelist.h +++ b/db/memtablelist.h @@ -70,7 +70,7 @@ class MemTableList { // Search all the memtables starting from the most recent one. // Return the most recent value found, if any. bool Get(const LookupKey& key, std::string* value, Status* s, - const Options& options, const bool check_presence_only = false); + const Options& options); // Returns the list of underlying memtables. void GetMemTables(std::vector* list); diff --git a/db/version_set.cc b/db/version_set.cc index 15ff1330f..48588c712 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -238,6 +238,7 @@ struct Saver { SaverState state; const Comparator* ucmp; Slice user_key; + bool* value_found; // Is value set correctly? Used by KeyMayExist std::string* value; const MergeOperator* merge_operator; Logger* logger; @@ -245,13 +246,17 @@ struct Saver { }; } -// Called from TableCache::Get when bloom-filters can't guarantee that key does -// not exist and Get is not permitted to do IO to read the data-block and be -// certain. -// Set the key as Found and let the caller know that key-may-exist +// Called from TableCache::Get and InternalGet when file/block in which key may +// exist are not there in TableCache/BlockCache respectively. In this case we +// can't guarantee that key does not exist and are not permitted to do IO to be +// certain.Set the status=kFound and value_found=false to let the caller know +// that key may exist but is not there in memory static void MarkKeyMayExist(void* arg) { Saver* s = reinterpret_cast(arg); s->state = kFound; + if (s->value_found != nullptr) { + *(s->value_found) = false; + } } static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){ @@ -339,7 +344,8 @@ void Version::Get(const ReadOptions& options, Status *status, GetStats* stats, const Options& db_options, - const bool no_io) { + const bool no_io, + bool* value_found) { Slice ikey = k.internal_key(); Slice user_key = k.user_key(); const Comparator* ucmp = vset_->icmp_.user_comparator(); @@ -355,6 +361,7 @@ void Version::Get(const ReadOptions& options, saver.state = status->ok()? kNotFound : kMerge; saver.ucmp = ucmp; saver.user_key = user_key; + saver.value_found = value_found; saver.value = value; saver.merge_operator = merge_operator; saver.logger = logger.get(); diff --git a/db/version_set.h b/db/version_set.h index 11bfb9961..320ff8e68 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -75,7 +75,7 @@ class Version { }; void Get(const ReadOptions&, const LookupKey& key, std::string* val, Status* status, GetStats* stats, const Options& db_option, - const bool no_io = false); + const bool no_io = false, bool* value_found = nullptr); // Adds "stats" into the current state. Returns true if a new // compaction may need to be triggered, false otherwise. diff --git a/db/write_batch.cc b/db/write_batch.cc index 9845f49ad..a4213cd97 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -16,10 +16,12 @@ #include "leveldb/write_batch.h" +#include "leveldb/options.h" #include "leveldb/statistics.h" #include "db/dbformat.h" #include "db/db_impl.h" #include "db/memtable.h" +#include "db/snapshot.h" #include "db/write_batch_internal.h" #include "util/coding.h" #include @@ -167,9 +169,16 @@ class MemTableInserter : public WriteBatch::Handler { sequence_++; } virtual void Delete(const Slice& key) { - if (filter_deletes_ && !db_->KeyMayExistImpl(key, sequence_)) { - RecordTick(options_->statistics, NUMBER_FILTERED_DELETES); - return; + if (filter_deletes_) { + SnapshotImpl read_from_snapshot; + read_from_snapshot.number_ = sequence_; + ReadOptions ropts; + ropts.snapshot = &read_from_snapshot; + std::string value; + if (!db_->KeyMayExist(ropts, key, &value)) { + RecordTick(options_->statistics, NUMBER_FILTERED_DELETES); + return; + } } mem_->Add(sequence_, kTypeDeletion, key, Slice()); sequence_++; diff --git a/include/leveldb/db.h b/include/leveldb/db.h index e331e7f67..0c056c362 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -104,7 +104,8 @@ class DB { // // May return some other Status on an error. virtual Status Get(const ReadOptions& options, - const Slice& key, std::string* value) = 0; + const Slice& key, + std::string* value) = 0; // If keys[i] does not exist in the database, then the i'th returned // status will be one for which Status::IsNotFound() is true, and @@ -121,11 +122,19 @@ class DB { std::vector* values) = 0; // If the key definitely does not exist in the database, then this method - // returns false. Otherwise return true. This check is potentially - // lighter-weight than invoking DB::Get(). One way to make this lighter weight - // is to avoid doing any IOs - // Default implementation here returns true - virtual bool KeyMayExist(const Slice& key) { + // returns false, else true. If the caller wants to obtain value when the key + // is found in memory, a bool for 'value_found' must be passed. 'value_found' + // will be true on return if value has been set properly. + // This check is potentially lighter-weight than invoking DB::Get(). One way + // to make this lighter weight is to avoid doing any IOs. + // Default implementation here returns true and sets 'value_found' to false + virtual bool KeyMayExist(const ReadOptions& options, + const Slice& key, + std::string* value, + bool* value_found = nullptr) { + if (value_found != nullptr) { + *value_found = false; + } return true; } diff --git a/include/leveldb/options.h b/include/leveldb/options.h index c591ad515..1bf0f6dda 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -473,12 +473,10 @@ struct Options { // Default: 0 uint64_t bytes_per_sync; - // Use bloom-filter for deletes when this is true. - // db->Delete first calls KeyMayExist which checks memtable,immutable-memtable - // and bloom-filters to determine if the key does not exist in the database. - // If the key definitely does not exist, then the delete is a noop.KeyMayExist - // only incurs in-memory look up. This optimization avoids writing the delete - // to storage when appropriate. + // Use KeyMayExist API to filter deletes when this is true. + // If KeyMayExist returns false, i.e. the key definitely does not exist, then + // the delete is a noop. KeyMayExist only incurs in-memory look up. + // This optimization avoids writing the delete to storage when appropriate. // Default: false bool filter_deletes; diff --git a/table/table.cc b/table/table.cc index c51a0aa44..80c5ef491 100644 --- a/table/table.cc +++ b/table/table.cc @@ -235,7 +235,8 @@ Iterator* Table::BlockReader(void* arg, const ReadOptions& options, const Slice& index_value, bool* didIO, - bool for_compaction) { + bool for_compaction, + const bool no_io) { Table* table = reinterpret_cast(arg); Cache* block_cache = table->rep_->options.block_cache.get(); std::shared_ptr statistics = table->rep_->options.statistics; @@ -264,6 +265,8 @@ Iterator* Table::BlockReader(void* arg, block = reinterpret_cast(block_cache->Value(cache_handle)); RecordTick(statistics, BLOCK_CACHE_HIT); + } else if (no_io) { + return nullptr; // Did not find in block_cache and can't do IO } else { Histograms histogram = for_compaction ? READ_BLOCK_COMPACTION_MICROS : READ_BLOCK_GET_MICROS; @@ -286,7 +289,9 @@ Iterator* Table::BlockReader(void* arg, RecordTick(statistics, BLOCK_CACHE_MISS); } - } else { + } else if (no_io) { + return nullptr; // Could not read from block_cache and can't do IO + }else { s = ReadBlock(table->rep_->file.get(), options, handle, &block, didIO); } } @@ -340,16 +345,17 @@ Status Table::InternalGet(const ReadOptions& options, const Slice& k, // cross one data block, we should be fine. RecordTick(rep_->options.statistics, BLOOM_FILTER_USEFUL); break; - } else if (no_io) { - // Update Saver.state to Found because we are only looking for whether - // bloom-filter can guarantee the key is not there when "no_io" - (*mark_key_may_exist)(arg); - done = true; } else { bool didIO = false; Iterator* block_iter = BlockReader(this, options, iiter->value(), - &didIO); + &didIO, no_io); + if (no_io && !block_iter) { // couldn't get block from block_cache + // Update Saver.state to Found because we are only looking for whether + // we can guarantee the key is not there when "no_io" is set + (*mark_key_may_exist)(arg); + break; + } for (block_iter->Seek(k); block_iter->Valid(); block_iter->Next()) { if (!(*saver)(arg, block_iter->key(), block_iter->value(), didIO)) { done = true; diff --git a/table/table.h b/table/table.h index 0be95f368..b39a5c186 100644 --- a/table/table.h +++ b/table/table.h @@ -77,7 +77,8 @@ class Table { const EnvOptions& soptions, const Slice&, bool for_compaction); static Iterator* BlockReader(void*, const ReadOptions&, const Slice&, - bool* didIO, bool for_compaction = false); + bool* didIO, bool for_compaction = false, + const bool no_io = false); // Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found // after a call to Seek(key), until handle_result returns false. diff --git a/tools/db_stress.cc b/tools/db_stress.cc index fcd5dc269..ca6602373 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -180,7 +180,7 @@ static uint32_t FLAGS_log2_keys_per_lock = 2; // implies 2^2 keys per lock // Percentage of times we want to purge redundant keys in memory before flushing static uint32_t FLAGS_purge_redundant_percent = 50; -// On true, deletes use bloom-filter and drop the delete if key not present +// On true, deletes use KeyMayExist to drop the delete if key not present static bool FLAGS_filter_deletes = false; // Level0 compaction start trigger diff --git a/utilities/ttl/db_ttl.cc b/utilities/ttl/db_ttl.cc index 7dfda7207..90cc1ec4c 100644 --- a/utilities/ttl/db_ttl.cc +++ b/utilities/ttl/db_ttl.cc @@ -158,8 +158,11 @@ std::vector DBWithTTL::MultiGet(const ReadOptions& options, supported with TTL")); } -bool DBWithTTL::KeyMayExist(const Slice& key) { - return db_->KeyMayExist(key); +bool DBWithTTL::KeyMayExist(ReadOptions& options, + const Slice& key, + std::string* value, + bool* value_found) { + return db_->KeyMayExist(options, key, value, value_found); } Status DBWithTTL::Delete(const WriteOptions& wopts, const Slice& key) { diff --git a/utilities/ttl/db_ttl.h b/utilities/ttl/db_ttl.h index d66e396ca..078a069ba 100644 --- a/utilities/ttl/db_ttl.h +++ b/utilities/ttl/db_ttl.h @@ -33,7 +33,10 @@ class DBWithTTL : public DB, CompactionFilter { const std::vector& keys, std::vector* values); - virtual bool KeyMayExist(const Slice& key); + virtual bool KeyMayExist(ReadOptions& options, + const Slice& key, + std::string* value, + bool* value_found = nullptr); virtual Status Delete(const WriteOptions& wopts, const Slice& key); From c42485f67cb4d0bc6def79e90397c5171e47c1b5 Mon Sep 17 00:00:00 2001 From: Mayank Agarwal Date: Mon, 22 Jul 2013 16:49:55 -0700 Subject: [PATCH 15/15] Merge operator for ttl Summary: Implemented a TtlMergeOperator class which inherits from MergeOperator and is TTL aware. It strips out timestamp from existing_value and attaches timestamp to new_value, calling user-provided-Merge in between. Test Plan: make all check Reviewers: haobo, dhruba Reviewed By: haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D11775 --- db/merge_test.cc | 25 ++++++++++++++---- utilities/ttl/db_ttl.cc | 30 +++++++++++++-------- utilities/ttl/db_ttl.h | 58 +++++++++++++++++++++++++++++++++++++++-- 3 files changed, 95 insertions(+), 18 deletions(-) diff --git a/db/merge_test.cc b/db/merge_test.cc index 47fad025d..903a824a5 100644 --- a/db/merge_test.cc +++ b/db/merge_test.cc @@ -8,20 +8,29 @@ #include "leveldb/env.h" #include "leveldb/merge_operator.h" #include "db/dbformat.h" +#include "db/db_impl.h" #include "utilities/merge_operators.h" #include "util/testharness.h" +#include "utilities/utility_db.h" using namespace std; using namespace leveldb; auto mergeOperator = MergeOperators::CreateUInt64AddOperator(); -std::shared_ptr OpenDb() { +std::shared_ptr OpenDb(const string& dbname, const bool ttl = false) { DB* db; Options options; options.create_if_missing = true; options.merge_operator = mergeOperator.get(); - Status s = DB::Open(options, test::TmpDir() + "/merge_testdb", &db); + Status s; + DestroyDB(dbname, Options()); + if (ttl) { + cout << "Opening database with TTL\n"; + s = UtilityDB::OpenTtlDB(options, test::TmpDir() + "/merge_testdbttl", &db); + } else { + s = DB::Open(options, test::TmpDir() + "/merge_testdb", &db); + } if (!s.ok()) { cerr << s.ToString() << endl; assert(false); @@ -228,9 +237,8 @@ void testCounters(Counters& counters, DB* db, bool test_compaction) { } } -int main(int argc, char *argv[]) { - - auto db = OpenDb(); +void runTest(int argc, const string& dbname, const bool use_ttl = false) { + auto db = OpenDb(dbname, use_ttl); { cout << "Test read-modify-write counters... \n"; @@ -250,5 +258,12 @@ int main(int argc, char *argv[]) { testCounters(counters, db.get(), compact); } + DestroyDB(dbname, Options()); +} + +int main(int argc, char *argv[]) { + //TODO: Make this test like a general rocksdb unit-test + runTest(argc, "/tmp/testdb"); + runTest(argc, "/tmp/testdbttl", true); // Run test on TTL database return 0; } diff --git a/utilities/ttl/db_ttl.cc b/utilities/ttl/db_ttl.cc index 90cc1ec4c..a4a7134de 100644 --- a/utilities/ttl/db_ttl.cc +++ b/utilities/ttl/db_ttl.cc @@ -21,6 +21,10 @@ DBWithTTL::DBWithTTL(const int32_t ttl, assert(options.compaction_filter == nullptr); Options options_to_open = options; options_to_open.compaction_filter = this; + if (options.merge_operator) { + ttl_merge_op_.reset(new TtlMergeOperator(options.merge_operator)); + options_to_open.merge_operator = ttl_merge_op_.get(); + } if (read_only) { st = DB::OpenForReadOnly(options_to_open, dbname, &db_); } else { @@ -125,15 +129,12 @@ Status DBWithTTL::StripTS(std::string* str) { } Status DBWithTTL::Put( - const WriteOptions& o, + const WriteOptions& opt, const Slice& key, const Slice& val) { - std::string value_with_ts; - Status st = AppendTS(val, value_with_ts); - if (!st.ok()) { - return st; - } - return db_->Put(o, key, value_with_ts); + WriteBatch batch; + batch.Put(key, val); + return Write(opt, &batch); } Status DBWithTTL::Get(const ReadOptions& options, @@ -169,10 +170,12 @@ Status DBWithTTL::Delete(const WriteOptions& wopts, const Slice& key) { return db_->Delete(wopts, key); } -Status DBWithTTL::Merge(const WriteOptions& options, +Status DBWithTTL::Merge(const WriteOptions& opt, const Slice& key, const Slice& value) { - return Status::NotSupported("Merge operation not supported."); + WriteBatch batch; + batch.Merge(key, value); + return Write(opt, &batch); } Status DBWithTTL::Write(const WriteOptions& opts, WriteBatch* updates) { @@ -190,8 +193,13 @@ Status DBWithTTL::Write(const WriteOptions& opts, WriteBatch* updates) { } } virtual void Merge(const Slice& key, const Slice& value) { - // TTL doesn't support merge operation - batch_rewrite_status = Status::NotSupported("TTL doesn't support Merge"); + std::string value_with_ts; + Status st = AppendTS(value, value_with_ts); + if (!st.ok()) { + batch_rewrite_status = st; + } else { + updates_ttl.Merge(key, value_with_ts); + } } virtual void Delete(const Slice& key) { updates_ttl.Delete(key); diff --git a/utilities/ttl/db_ttl.h b/utilities/ttl/db_ttl.h index 078a069ba..3b8ba8e95 100644 --- a/utilities/ttl/db_ttl.h +++ b/utilities/ttl/db_ttl.h @@ -5,8 +5,10 @@ #ifndef LEVELDB_UTILITIES_TTL_DB_TTL_H_ #define LEVELDB_UTILITIES_TTL_DB_TTL_H_ -#include "include/leveldb/db.h" -#include "include/leveldb/compaction_filter.h" +#include "leveldb/db.h" +#include "leveldb/env.h" +#include "leveldb/compaction_filter.h" +#include "leveldb/merge_operator.h" #include "db/db_impl.h" namespace leveldb { @@ -110,6 +112,7 @@ class DBWithTTL : public DB, CompactionFilter { private: DB* db_; int32_t ttl_; + unique_ptr ttl_merge_op_; }; class TtlIterator : public Iterator { @@ -173,5 +176,56 @@ class TtlIterator : public Iterator { Iterator* iter_; }; +class TtlMergeOperator : public MergeOperator { + + public: + explicit TtlMergeOperator(const MergeOperator* merge_op) + : user_merge_op_(merge_op) { + assert(merge_op); + } + + virtual void Merge(const Slice& key, + const Slice* existing_value, + const Slice& value, + std::string* new_value, + Logger* logger) const { + const uint32_t& ts_len = DBWithTTL::kTSLength; + if ((existing_value && existing_value->size() < ts_len) || + value.size() < ts_len) { + Log(logger, "Error: Could not remove timestamp correctly from value."); + assert(false); + //TODO: Remove assert and make this function return false. + //TODO: Change Merge semantics and add a counter here + } + Slice value_without_ts(value.data(), value.size() - ts_len); + if (existing_value) { + Slice existing_value_without_ts(existing_value->data(), + existing_value->size() - ts_len); + user_merge_op_->Merge(key, &existing_value_without_ts, value_without_ts, + new_value, logger); + } else { + user_merge_op_->Merge(key, nullptr, value_without_ts, new_value, logger); + } + int32_t curtime; + if (!DBWithTTL::GetCurrentTime(curtime).ok()) { + Log(logger, "Error: Could not get current time to be attached internally " + "to the new value."); + assert(false); + //TODO: Remove assert and make this function return false. + } else { + char ts_string[ts_len]; + EncodeFixed32(ts_string, curtime); + new_value->append(ts_string, ts_len); + } + } + + virtual const char* Name() const { + return "Merge By TTL"; + } + + private: + const MergeOperator* user_merge_op_; +}; + } #endif // LEVELDB_UTILITIES_TTL_DB_TTL_H_