From 35ddf18367f386261517109052d12de01cfe47ba Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Wed, 27 Nov 2013 13:32:56 -0800 Subject: [PATCH 01/17] Don't do compression tests if we don't have compression libs Summary: These tests fail if compression libraries are not installed. Test Plan: Manually disabled snappy, observed tests not ran. Reviewers: dhruba, kailiu Reviewed By: dhruba CC: leveldb Differential Revision: https://reviews.facebook.net/D14379 --- db/db_test.cc | 176 ++++++++++++++++++++++++-------------------------- 1 file changed, 85 insertions(+), 91 deletions(-) diff --git a/db/db_test.cc b/db/db_test.cc index 1e85ab1fa..c698d4bde 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -1815,94 +1815,6 @@ TEST(DBTest, CompactionsGenerateMultipleFiles) { } } -// TODO(kailiu) disable the in non-linux platforms to temporarily solve -// the unit test failure. -#ifdef OS_LINUX -TEST(DBTest, CompressedCache) { - int num_iter = 80; - - // Run this test three iterations. - // Iteration 1: only a uncompressed block cache - // Iteration 2: only a compressed block cache - // Iteration 3: both block cache and compressed cache - for (int iter = 0; iter < 3; iter++) { - Options options = CurrentOptions(); - options.write_buffer_size = 64*1024; // small write buffer - options.statistics = rocksdb::CreateDBStatistics(); - - switch (iter) { - case 0: - // only uncompressed block cache - options.block_cache = NewLRUCache(8*1024); - options.block_cache_compressed = nullptr; - break; - case 1: - // no block cache, only compressed cache - options.no_block_cache = true; - options.block_cache = nullptr; - options.block_cache_compressed = NewLRUCache(8*1024); - break; - case 2: - // both compressed and uncompressed block cache - options.block_cache = NewLRUCache(1024); - options.block_cache_compressed = NewLRUCache(8*1024); - break; - default: - ASSERT_TRUE(false); - } - Reopen(&options); - - Random rnd(301); - - // Write 8MB (80 values, each 100K) - ASSERT_EQ(NumTableFilesAtLevel(0), 0); - std::vector values; - std::string str; - for (int i = 0; i < num_iter; i++) { - if (i % 4 == 0) { // high compression ratio - str = RandomString(&rnd, 1000); - } - values.push_back(str); - ASSERT_OK(Put(Key(i), values[i])); - } - - // flush all data from memtable so that reads are from block cache - dbfull()->Flush(FlushOptions()); - - for (int i = 0; i < num_iter; i++) { - ASSERT_EQ(Get(Key(i)), values[i]); - } - - // check that we triggered the appropriate code paths in the cache - switch (iter) { - case 0: - // only uncompressed block cache - ASSERT_GT(options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS), - 0); - ASSERT_EQ(options.statistics.get()->getTickerCount - (BLOCK_CACHE_COMPRESSED_MISS), 0); - break; - case 1: - // no block cache, only compressed cache - ASSERT_EQ(options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS), - 0); - ASSERT_GT(options.statistics.get()->getTickerCount - (BLOCK_CACHE_COMPRESSED_MISS), 0); - break; - case 2: - // both compressed and uncompressed block cache - ASSERT_GT(options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS), - 0); - ASSERT_GT(options.statistics.get()->getTickerCount - (BLOCK_CACHE_COMPRESSED_MISS), 0); - break; - default: - ASSERT_TRUE(false); - } - } -} -#endif - TEST(DBTest, CompactionTrigger) { Options options = CurrentOptions(); options.write_buffer_size = 100<<10; //100KB @@ -2144,9 +2056,91 @@ TEST(DBTest, UniversalCompactionOptions) { } } -// TODO(kailiu) disable the in non-linux platforms to temporarily solve -// the unit test failure. -#ifdef OS_LINUX +#if defined(SNAPPY) && defined(ZLIB) && defined(BZIP2) +TEST(DBTest, CompressedCache) { + int num_iter = 80; + + // Run this test three iterations. + // Iteration 1: only a uncompressed block cache + // Iteration 2: only a compressed block cache + // Iteration 3: both block cache and compressed cache + for (int iter = 0; iter < 3; iter++) { + Options options = CurrentOptions(); + options.write_buffer_size = 64*1024; // small write buffer + options.statistics = rocksdb::CreateDBStatistics(); + + switch (iter) { + case 0: + // only uncompressed block cache + options.block_cache = NewLRUCache(8*1024); + options.block_cache_compressed = nullptr; + break; + case 1: + // no block cache, only compressed cache + options.no_block_cache = true; + options.block_cache = nullptr; + options.block_cache_compressed = NewLRUCache(8*1024); + break; + case 2: + // both compressed and uncompressed block cache + options.block_cache = NewLRUCache(1024); + options.block_cache_compressed = NewLRUCache(8*1024); + break; + default: + ASSERT_TRUE(false); + } + Reopen(&options); + + Random rnd(301); + + // Write 8MB (80 values, each 100K) + ASSERT_EQ(NumTableFilesAtLevel(0), 0); + std::vector values; + std::string str; + for (int i = 0; i < num_iter; i++) { + if (i % 4 == 0) { // high compression ratio + str = RandomString(&rnd, 1000); + } + values.push_back(str); + ASSERT_OK(Put(Key(i), values[i])); + } + + // flush all data from memtable so that reads are from block cache + dbfull()->Flush(FlushOptions()); + + for (int i = 0; i < num_iter; i++) { + ASSERT_EQ(Get(Key(i)), values[i]); + } + + // check that we triggered the appropriate code paths in the cache + switch (iter) { + case 0: + // only uncompressed block cache + ASSERT_GT(options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS), + 0); + ASSERT_EQ(options.statistics.get()->getTickerCount + (BLOCK_CACHE_COMPRESSED_MISS), 0); + break; + case 1: + // no block cache, only compressed cache + ASSERT_EQ(options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS), + 0); + ASSERT_GT(options.statistics.get()->getTickerCount + (BLOCK_CACHE_COMPRESSED_MISS), 0); + break; + case 2: + // both compressed and uncompressed block cache + ASSERT_GT(options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS), + 0); + ASSERT_GT(options.statistics.get()->getTickerCount + (BLOCK_CACHE_COMPRESSED_MISS), 0); + break; + default: + ASSERT_TRUE(false); + } + } +} + static std::string CompressibleString(Random* rnd, int len) { std::string r; test::CompressibleString(rnd, 0.8, len, &r); From fe754fe7e3f4deb6d7406fe88babba16c321fd76 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Wed, 27 Nov 2013 13:33:14 -0800 Subject: [PATCH 02/17] Readrandom with small block cache Summary: Added readrandom benchmark with 300MB block cache, while database has 1GB of data Test Plan: Ran it Reviewers: dhruba, MarkCallaghan Reviewed By: dhruba CC: leveldb Differential Revision: https://reviews.facebook.net/D14373 --- build_tools/regression_build_test.sh | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/build_tools/regression_build_test.sh b/build_tools/regression_build_test.sh index 6ede47466..1c44e5ad2 100755 --- a/build_tools/regression_build_test.sh +++ b/build_tools/regression_build_test.sh @@ -65,7 +65,7 @@ OPT=-DNDEBUG make db_bench -j$(nproc) --sync=0 \ --threads=8 > ${STAT_FILE}.overwrite -# fill up the db for readrandom benchmark +# fill up the db for readrandom benchmark (1GB total size) ./db_bench \ --benchmarks=fillseq \ --db=$DATA_DIR \ @@ -83,7 +83,7 @@ OPT=-DNDEBUG make db_bench -j$(nproc) --sync=0 \ --threads=1 > /dev/null -# measure readrandom +# measure readrandom with 6GB block cache ./db_bench \ --benchmarks=readrandom \ --db=$DATA_DIR \ @@ -102,6 +102,25 @@ OPT=-DNDEBUG make db_bench -j$(nproc) --sync=0 \ --threads=32 > ${STAT_FILE}.readrandom +# measure readrandom with 300MB block cache +./db_bench \ + --benchmarks=readrandom \ + --db=$DATA_DIR \ + --use_existing_db=1 \ + --bloom_bits=10 \ + --num=$NUM \ + --reads=$NUM \ + --cache_size=314572800 \ + --cache_numshardbits=8 \ + --open_files=55000 \ + --disable_seek_compaction=1 \ + --statistics=1 \ + --histogram=1 \ + --disable_data_sync=1 \ + --disable_wal=1 \ + --sync=0 \ + --threads=32 > ${STAT_FILE}.readrandomsmallblockcache + # measure memtable performance -- none of the data gets flushed to disk ./db_bench \ --benchmarks=fillrandom,readrandom, \ @@ -154,5 +173,6 @@ function send_benchmark_to_ods { send_benchmark_to_ods overwrite overwrite $STAT_FILE.overwrite send_benchmark_to_ods fillseq fillseq $STAT_FILE.fillseq send_benchmark_to_ods readrandom readrandom $STAT_FILE.readrandom +send_benchmark_to_ods readrandom readrandom_smallblockcache $STAT_FILE.readrandomsmallblockcache send_benchmark_to_ods fillrandom memtablefillrandom $STAT_FILE.memtablefillreadrandom send_benchmark_to_ods readrandom memtablereadrandom $STAT_FILE.memtablefillreadrandom From 98968ba937f5be35b8de84fa1ff2764808ca85ce Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Wed, 27 Nov 2013 14:56:20 -0800 Subject: [PATCH 03/17] Free obsolete memtables outside the dbmutex had a memory leak. Summary: The commit at 27bbef11802d27c80df7e0b27091876df23b9986 had a memory leak that was detected by valgrind. The memtable that has a refcount decrement in MemTableList::InstallMemtableFlushResults was not freed. Test Plan: valgrind ./db_test --leak-check=full Reviewers: igor Reviewed By: igor CC: leveldb Differential Revision: https://reviews.facebook.net/D14391 --- db/db_impl.cc | 31 ++++++++++++++++++++----------- db/db_impl.h | 10 ++++++++-- db/memtablelist.cc | 7 +++++-- db/memtablelist.h | 3 ++- db/repair.cc | 2 +- 5 files changed, 36 insertions(+), 17 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 2abdd9107..ad189a2e5 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -515,6 +515,19 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state, // files in sst_delete_files and log_delete_files. // It is not necessary to hold the mutex when invoking this method. void DBImpl::PurgeObsoleteFiles(DeletionState& state) { + + // free pending memtables + for (auto m : state.memtables_to_free) { + delete m; + } + + // check if there is anything to do + if (!state.all_files.size() && + !state.sst_delete_files.size() && + !state.log_delete_files.size()) { + return; + } + // this checks if FindObsoleteFiles() was run before. If not, don't do // PurgeObsoleteFiles(). If FindObsoleteFiles() was run, we need to also // run PurgeObsoleteFiles(), even if disable_delete_obsolete_files_ is true @@ -1169,7 +1182,7 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, // Replace immutable memtable with the generated Table s = imm_.InstallMemtableFlushResults( mems, versions_.get(), s, &mutex_, options_.info_log.get(), - file_number, pending_outputs_); + file_number, pending_outputs_, &deletion_state.memtables_to_free); if (s.ok()) { if (madeProgress) { @@ -1655,7 +1668,7 @@ Status DBImpl::BackgroundFlush(bool* madeProgress, void DBImpl::BackgroundCallFlush() { bool madeProgress = false; - DeletionState deletion_state; + DeletionState deletion_state(options_.max_write_buffer_number); assert(bg_flush_scheduled_); MutexLock l(&mutex_); @@ -1701,7 +1714,7 @@ void DBImpl::TEST_PurgeObsoleteteWAL() { void DBImpl::BackgroundCallCompaction() { bool madeProgress = false; - DeletionState deletion_state; + DeletionState deletion_state(options_.max_write_buffer_number); MaybeDumpStats(); @@ -1731,6 +1744,7 @@ void DBImpl::BackgroundCallCompaction() { // FindObsoleteFiles(). This is because deletion_state does not catch // all created files if compaction failed. FindObsoleteFiles(deletion_state, !s.ok()); + // delete unnecessary files if any, this is done outside the mutex if (deletion_state.HaveSomethingToDelete()) { mutex_.Unlock(); @@ -2491,25 +2505,20 @@ struct IterState { static void CleanupIteratorState(void* arg1, void* arg2) { IterState* state = reinterpret_cast(arg1); - std::vector to_delete; - to_delete.reserve(state->mem.size()); + DBImpl::DeletionState deletion_state(state->db->GetOptions(). + max_write_buffer_number); state->mu->Lock(); for (unsigned int i = 0; i < state->mem.size(); i++) { MemTable* m = state->mem[i]->Unref(); if (m != nullptr) { - to_delete.push_back(m); + deletion_state.memtables_to_free.push_back(m); } } state->version->Unref(); - // delete only the sst obsolete files - DBImpl::DeletionState deletion_state; // fast path FindObsoleteFiles state->db->FindObsoleteFiles(deletion_state, false, true); state->mu->Unlock(); state->db->PurgeObsoleteFiles(deletion_state); - - // delete obsolete memtables outside the db-mutex - for (MemTable* m : to_delete) delete m; delete state; } } // namespace diff --git a/db/db_impl.h b/db/db_impl.h index 8a57b92f5..7d0fadb61 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -129,10 +129,12 @@ class DBImpl : public DB { struct DeletionState { inline bool HaveSomethingToDelete() const { - return all_files.size() || + return memtables_to_free.size() || + all_files.size() || sst_delete_files.size() || log_delete_files.size(); } + // a list of all files that we'll consider deleting // (every once in a while this is filled up with all files // in the DB directory) @@ -147,14 +149,18 @@ class DBImpl : public DB { // a list of log files that we need to delete std::vector log_delete_files; + // a list of memtables to be free + std::vector memtables_to_free; + // the current manifest_file_number, log_number and prev_log_number // that corresponds to the set of files in 'live'. uint64_t manifest_file_number, log_number, prev_log_number; - DeletionState() { + explicit DeletionState(const int num_memtables = 0) { manifest_file_number = 0; log_number = 0; prev_log_number = 0; + memtables_to_free.reserve(num_memtables); } }; diff --git a/db/memtablelist.cc b/db/memtablelist.cc index 4453d1721..3d4d35fd8 100644 --- a/db/memtablelist.cc +++ b/db/memtablelist.cc @@ -80,7 +80,8 @@ Status MemTableList::InstallMemtableFlushResults( VersionSet* vset, Status flushStatus, port::Mutex* mu, Logger* info_log, uint64_t file_number, - std::set& pending_outputs) { + std::set& pending_outputs, + std::vector* to_delete) { mu->AssertHeld(); // If the flush was not successful, then just reset state. @@ -151,7 +152,9 @@ Status MemTableList::InstallMemtableFlushResults( // executing compaction threads do not mistakenly assume that this // file is not live. pending_outputs.erase(m->file_number_); - m->Unref(); + if (m->Unref() != nullptr) { + to_delete->push_back(m); + } size_--; } else { //commit failed. setup state so that we can flush again. diff --git a/db/memtablelist.h b/db/memtablelist.h index ef10526c9..17c6c3ae4 100644 --- a/db/memtablelist.h +++ b/db/memtablelist.h @@ -65,7 +65,8 @@ class MemTableList { VersionSet* vset, Status flushStatus, port::Mutex* mu, Logger* info_log, uint64_t file_number, - std::set& pending_outputs); + std::set& pending_outputs, + std::vector* to_delete); // New memtables are inserted at the front of the list. // Takes ownership of the referenced held on *m by the caller of Add(). diff --git a/db/repair.cc b/db/repair.cc index 66aa95ae2..230be565e 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -227,7 +227,7 @@ class Repairer { table_cache_, iter, &meta, icmp_.user_comparator(), 0, 0, true); delete iter; - mem->Unref(); + delete mem->Unref(); mem = nullptr; if (status.ok()) { if (meta.file_size > 0) { From 38feca4f35ca123d71d869b8f7bb814bf442ea12 Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Fri, 29 Nov 2013 18:03:02 -0800 Subject: [PATCH 04/17] Removed redundant slice_transform.h and memtablerep.h Summary: Removed redundant slice_transform.h and memtablerep.h Test Plan: make check Reviewers: CC: Task ID: # Blame Rev: --- include/rocksdb/options.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 0cc33be68..9a4644c8e 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -15,11 +15,9 @@ #include #include -#include "rocksdb/memtablerep.h" #include "rocksdb/memtablerep.h" #include "rocksdb/slice.h" #include "rocksdb/slice_transform.h" -#include "rocksdb/slice_transform.h" #include "rocksdb/statistics.h" #include "rocksdb/table_properties.h" #include "rocksdb/universal_compaction.h" From 45a2f2d8d30103f74b20ae19cd61be1f775c5ddb Mon Sep 17 00:00:00 2001 From: lovro Date: Sun, 1 Dec 2013 11:25:31 -0800 Subject: [PATCH 05/17] Fix build without glibc Summary: The preprocessor does not follow normal rules of && evaluation, tries to evaluate __GLIBC_PREREQ(2, 12) even though the defined() check fails. This breaks the build if __GLIBC_PREREQ is absent. Test Plan: Try adding #undef __GLIBC_PREREQ above the offending line, build no longer breaks Reviewed By: igor Blame Rev: 4c81383628db46d35b674000a3668b5a9a2498a6 --- util/env_posix.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/util/env_posix.cc b/util/env_posix.cc index 16c3d1c61..dd052bb25 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -1398,11 +1398,13 @@ class PosixEnv : public Env { (unsigned long)t); // Set the thread name to aid debugging -#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ) && (__GLIBC_PREREQ(2, 12)) +#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ) +#if __GLIBC_PREREQ(2, 12) char name_buf[16]; snprintf(name_buf, sizeof name_buf, "rocksdb:bg%zu", bgthreads_.size()); name_buf[sizeof name_buf - 1] = '\0'; pthread_setname_np(t, name_buf); +#endif #endif bgthreads_.push_back(t); From 96bc3ec297e116b2fe83edf1c17605f628842959 Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Sun, 1 Dec 2013 21:23:44 -0800 Subject: [PATCH 06/17] Memtables should be deleted appropriately in the unit test. Summary: Memtables should be deleted appropriately in the unit test. Test Plan: Reviewers: CC: Task ID: # Blame Rev: --- db/write_batch_test.cc | 2 +- table/table_test.cc | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 4aba21ca4..6b66e9293 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -69,7 +69,7 @@ static std::string PrintContents(WriteBatch* b) { } else if (count != WriteBatchInternal::Count(b)) { state.append("CountMismatch()"); } - mem->Unref(); + delete mem->Unref(); return state; } diff --git a/table/table_test.cc b/table/table_test.cc index 394aa4b9d..2214c25a1 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -374,10 +374,10 @@ class MemTableConstructor: public Constructor { memtable_->Ref(); } ~MemTableConstructor() { - memtable_->Unref(); + delete memtable_->Unref(); } virtual Status FinishImpl(const Options& options, const KVMap& data) { - memtable_->Unref(); + delete memtable_->Unref(); memtable_ = new MemTable(internal_comparator_, table_factory_); memtable_->Ref(); int seq = 1; @@ -1289,7 +1289,7 @@ TEST(MemTableTest, Simple) { } delete iter; - memtable->Unref(); + delete memtable->Unref(); } From 43d073dff081637fdd557889efd03fb1a5b84a05 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Mon, 2 Dec 2013 06:48:23 -0800 Subject: [PATCH 07/17] Cleaning up INSTALL.md -- there were two occurrences of gflags --- INSTALL.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/INSTALL.md b/INSTALL.md index ed1cbfba1..07d975068 100644 --- a/INSTALL.md +++ b/INSTALL.md @@ -19,13 +19,13 @@ libraries. You are on your own. * **Linux** * Upgrade your gcc to version at least 4.7 to get C++11 support. - * Install gflags. If you're on Ubuntu, here's a nice tutorial: + * Install gflags. First, try: `sudo apt-get install libgflags-dev`. + If this doesn't work and you're using Ubuntu, here's a nice tutorial: (http://askubuntu.com/questions/312173/installing-gflags-12-04) * Install snappy. This is usually as easy as: `sudo apt-get install libsnappy-dev`. * Install zlib. Try: `sudo apt-get install zlib1g-dev`. * Install bzip2: `sudo apt-get install libbz2-dev`. - * Install gflags: `sudo apt-get install libgflags-dev`. * **OS X**: * Install latest C++ compiler that supports C++ 11: * Update XCode: run `xcode-select --install` (or install it from XCode App's settting). From 0b5b81a15421aa48b99b740ee81af697e8651554 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Mon, 2 Dec 2013 11:51:20 -0800 Subject: [PATCH 08/17] Removing reference to doc/impl.html --- README | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README b/README index 076f66d74..c55149d44 100644 --- a/README +++ b/README @@ -16,8 +16,8 @@ The core of this code has been derived from open-source leveldb. The code under this directory implements a system for maintaining a persistent key/value store. -See doc/index.html for more explanation. -See doc/impl.html for a brief overview of the implementation. +See doc/index.html and github wiki (https://github.com/facebook/rocksdb/wiki) +for more explanation. The public interface is in include/*. Callers should not include or rely on the details of any other header files in this package. Those From 930cb0b9ee12c18eb461ef78748ed5b9bcf80d98 Mon Sep 17 00:00:00 2001 From: lovro Date: Mon, 2 Dec 2013 14:59:23 -0800 Subject: [PATCH 09/17] Clarify CompactionFilter thread safety requirements Summary: Documenting our discussion Test Plan: make Reviewers: dhruba, haobo Reviewed By: dhruba CC: igor Differential Revision: https://reviews.facebook.net/D14403 --- include/rocksdb/compaction_filter.h | 10 ++++++++++ include/rocksdb/options.h | 28 ++++++++++++++++++++-------- util/options.cc | 6 +++--- 3 files changed, 33 insertions(+), 11 deletions(-) diff --git a/include/rocksdb/compaction_filter.h b/include/rocksdb/compaction_filter.h index cb55ac44f..f24132a6f 100644 --- a/include/rocksdb/compaction_filter.h +++ b/include/rocksdb/compaction_filter.h @@ -40,6 +40,16 @@ class CompactionFilter { // When the value is to be preserved, the application has the option // to modify the existing_value and pass it back through new_value. // value_changed needs to be set to true in this case. + // + // If multithreaded compaction is being used *and* a single CompactionFilter + // instance was supplied via Options::compaction_filter, this method may be + // called from different threads concurrently. The application must ensure + // that the call is thread-safe. + // + // If the CompactionFilter was created by a factory, then it will only ever + // be used by a single thread that is doing the compaction run, and this + // call does not need to be thread-safe. However, multiple filters may be + // in existence and operating concurrently. virtual bool Filter(int level, const Slice& key, const Slice& existing_value, diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 9a4644c8e..85c1db059 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -93,16 +93,33 @@ struct Options { // Default: nullptr shared_ptr merge_operator; - // The client must provide compaction_filter_factory if it requires a new - // compaction filter to be used for different compaction processes + // A single CompactionFilter instance to call into during compaction. // Allows an application to modify/delete a key-value during background // compaction. - // Ideally, client should specify only one of filter or factory. + // + // If the client requires a new compaction filter to be used for different + // compaction runs, it can specify compaction_filter_factory instead of this + // option. The client should specify only one of the two. // compaction_filter takes precedence over compaction_filter_factory if // client specifies both. + // + // If multithreaded compaction is being used, the supplied CompactionFilter + // instance may be used from different threads concurrently and so should be + // thread-safe. + // // Default: nullptr const CompactionFilter* compaction_filter; + // This is a factory that provides compaction filter objects which allow + // an application to modify/delete a key-value during background compaction. + // + // A new filter will be created on each compaction run. If multithreaded + // compaction is being used, each created CompactionFilter will only be used + // from a single thread and so does not need to be thread-safe. + // + // Default: a factory that doesn't provide any object + std::shared_ptr compaction_filter_factory; + // If true, the database will be created if it is missing. // Default: false bool create_if_missing; @@ -600,11 +617,6 @@ struct Options { // Table and TableBuilder. std::shared_ptr table_factory; - // This is a factory that provides compaction filter objects which allow - // an application to modify/delete a key-value during background compaction. - // Default: a factory that doesn't provide any object - std::shared_ptr compaction_filter_factory; - // This option allows user to to collect their own interested statistics of // the tables. // Default: emtpy vector -- no user-defined statistics collection will be diff --git a/util/options.cc b/util/options.cc index 7fa7586e3..fffcce0a1 100644 --- a/util/options.cc +++ b/util/options.cc @@ -25,6 +25,9 @@ Options::Options() : comparator(BytewiseComparator()), merge_operator(nullptr), compaction_filter(nullptr), + compaction_filter_factory( + std::shared_ptr( + new DefaultCompactionFilterFactory())), create_if_missing(false), error_if_exists(false), paranoid_checks(false), @@ -97,9 +100,6 @@ Options::Options() memtable_factory(std::shared_ptr(new SkipListFactory)), table_factory( std::shared_ptr(new BlockBasedTableFactory())), - compaction_filter_factory( - std::shared_ptr( - new DefaultCompactionFilterFactory())), inplace_update_support(false), inplace_update_num_locks(10000) { assert(memtable_factory.get() != nullptr); From 043fc14c3e4d3d829a1468be29bce5eb4a8802b9 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Tue, 3 Dec 2013 11:17:58 -0800 Subject: [PATCH 10/17] Get rid of some shared_ptrs Summary: I went through all remaining shared_ptrs and removed the ones that I found not-necessary. Only GenerateCachePrefix() is called fairly often, so don't expect much perf wins. The ones that are left are accessed infrequently and I think we're fine with keeping them. Test Plan: make asan_check Reviewers: dhruba, haobo Reviewed By: dhruba CC: leveldb Differential Revision: https://reviews.facebook.net/D14427 --- db/db_impl.cc | 2 +- db/db_impl.h | 2 +- db/db_iter.cc | 10 ++++----- db/memtable.cc | 4 ++-- db/memtable.h | 2 +- db/repair.cc | 2 +- db/write_batch_test.cc | 2 +- table/block_based_table_builder.cc | 7 +++--- table/block_based_table_reader.cc | 10 ++++----- table/block_based_table_reader.h | 4 ++-- table/table_test.cc | 36 +++++++++++++++--------------- 11 files changed, 41 insertions(+), 40 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index ad189a2e5..61f03327c 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -235,7 +235,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) mutex_(options.use_adaptive_mutex), shutting_down_(nullptr), bg_cv_(&mutex_), - mem_rep_factory_(options_.memtable_factory), + mem_rep_factory_(options_.memtable_factory.get()), mem_(new MemTable(internal_comparator_, mem_rep_factory_, NumberLevels(), options_)), logfile_number_(0), diff --git a/db/db_impl.h b/db/db_impl.h index 7d0fadb61..15b6013d0 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -315,7 +315,7 @@ class DBImpl : public DB { port::Mutex mutex_; port::AtomicPointer shutting_down_; port::CondVar bg_cv_; // Signalled when background work finishes - std::shared_ptr mem_rep_factory_; + MemTableRepFactory* mem_rep_factory_; MemTable* mem_; MemTableList imm_; // Memtable that are not changing uint64_t logfile_number_; diff --git a/db/db_iter.cc b/db/db_iter.cc index 89eaf0949..596a9f651 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -61,7 +61,7 @@ class DBIter: public Iterator { const Comparator* cmp, Iterator* iter, SequenceNumber s) : dbname_(dbname), env_(env), - logger_(options.info_log), + logger_(options.info_log.get()), user_comparator_(cmp), user_merge_operator_(options.merge_operator.get()), iter_(iter), @@ -122,7 +122,7 @@ class DBIter: public Iterator { const std::string* const dbname_; Env* const env_; - shared_ptr logger_; + Logger* logger_; const Comparator* const user_comparator_; const MergeOperator* const user_merge_operator_; Iterator* const iter_; @@ -293,7 +293,7 @@ void DBIter::MergeValuesNewToOld() { // ignore corruption if there is any. const Slice value = iter_->value(); user_merge_operator_->FullMerge(ikey.user_key, &value, operands, - &saved_value_, logger_.get()); + &saved_value_, logger_); // iter_ is positioned after put iter_->Next(); return; @@ -310,7 +310,7 @@ void DBIter::MergeValuesNewToOld() { Slice(operands[0]), Slice(operands[1]), &merge_result, - logger_.get())) { + logger_)) { operands.pop_front(); swap(operands.front(), merge_result); } else { @@ -327,7 +327,7 @@ void DBIter::MergeValuesNewToOld() { // feed null as the existing value to the merge operator, such that // client can differentiate this scenario and do things accordingly. user_merge_operator_->FullMerge(saved_key_, nullptr, operands, - &saved_value_, logger_.get()); + &saved_value_, logger_); } void DBIter::Prev() { diff --git a/db/memtable.cc b/db/memtable.cc index ac589a563..f86af4e33 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -33,7 +33,7 @@ struct hash { namespace rocksdb { MemTable::MemTable(const InternalKeyComparator& cmp, - std::shared_ptr table_factory, + MemTableRepFactory* table_factory, int numlevel, const Options& options) : comparator_(cmp), @@ -274,7 +274,7 @@ bool MemTable::Update(SequenceNumber seq, ValueType type, Slice memkey = lkey.memtable_key(); std::shared_ptr iter( - table_.get()->GetIterator(lkey.user_key())); + table_->GetIterator(lkey.user_key())); iter->Seek(memkey.data()); if (iter->Valid()) { diff --git a/db/memtable.h b/db/memtable.h index 5648b7716..7a0d6b343 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -35,7 +35,7 @@ class MemTable { // is zero and the caller must call Ref() at least once. explicit MemTable( const InternalKeyComparator& comparator, - std::shared_ptr table_factory, + MemTableRepFactory* table_factory, int numlevel = 7, const Options& options = Options()); diff --git a/db/repair.cc b/db/repair.cc index 230be565e..fc9ba282d 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -196,7 +196,7 @@ class Repairer { std::string scratch; Slice record; WriteBatch batch; - MemTable* mem = new MemTable(icmp_, options_.memtable_factory, + MemTable* mem = new MemTable(icmp_, options_.memtable_factory.get(), options_.num_levels); mem->Ref(); int counter = 0; diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 6b66e9293..ff9aa63ee 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -22,7 +22,7 @@ namespace rocksdb { static std::string PrintContents(WriteBatch* b) { InternalKeyComparator cmp(BytewiseComparator()); auto factory = std::make_shared(); - MemTable* mem = new MemTable(cmp, factory); + MemTable* mem = new MemTable(cmp, factory.get()); mem->Ref(); std::string state; Options options; diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index f846b1ffd..a5e546be8 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -127,9 +127,10 @@ BlockBasedTableBuilder::BlockBasedTableBuilder( rep_->filter_block->StartBlock(0); } if (options.block_cache_compressed.get() != nullptr) { - BlockBasedTable::GenerateCachePrefix(options.block_cache_compressed, file, - &rep_->compressed_cache_key_prefix[0], - &rep_->compressed_cache_key_prefix_size); + BlockBasedTable::GenerateCachePrefix( + options.block_cache_compressed.get(), file, + &rep_->compressed_cache_key_prefix[0], + &rep_->compressed_cache_key_prefix_size); } } diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index 095c2999c..dcb55fc36 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -95,18 +95,18 @@ void BlockBasedTable::SetupCacheKeyPrefix(Rep* rep) { rep->cache_key_prefix_size = 0; rep->compressed_cache_key_prefix_size = 0; if (rep->options.block_cache != nullptr) { - GenerateCachePrefix(rep->options.block_cache, rep->file.get(), + GenerateCachePrefix(rep->options.block_cache.get(), rep->file.get(), &rep->cache_key_prefix[0], &rep->cache_key_prefix_size); } if (rep->options.block_cache_compressed != nullptr) { - GenerateCachePrefix(rep->options.block_cache_compressed, rep->file.get(), - &rep->compressed_cache_key_prefix[0], + GenerateCachePrefix(rep->options.block_cache_compressed.get(), + rep->file.get(), &rep->compressed_cache_key_prefix[0], &rep->compressed_cache_key_prefix_size); } } -void BlockBasedTable::GenerateCachePrefix(shared_ptr cc, +void BlockBasedTable::GenerateCachePrefix(Cache* cc, RandomAccessFile* file, char* buffer, size_t* size) { // generate an id from the file @@ -120,7 +120,7 @@ void BlockBasedTable::GenerateCachePrefix(shared_ptr cc, } } -void BlockBasedTable::GenerateCachePrefix(shared_ptr cc, +void BlockBasedTable::GenerateCachePrefix(Cache* cc, WritableFile* file, char* buffer, size_t* size) { // generate an id from the file diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index 02bbfd74c..66f63fc59 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -167,9 +167,9 @@ class BlockBasedTable : public TableReader { rep_ = rep; } // Generate a cache key prefix from the file - static void GenerateCachePrefix(shared_ptr cc, + static void GenerateCachePrefix(Cache* cc, RandomAccessFile* file, char* buffer, size_t* size); - static void GenerateCachePrefix(shared_ptr cc, + static void GenerateCachePrefix(Cache* cc, WritableFile* file, char* buffer, size_t* size); // The longest prefix of the cache key used to identify blocks. diff --git a/table/table_test.cc b/table/table_test.cc index 2214c25a1..e4aab1f69 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -370,7 +370,7 @@ class MemTableConstructor: public Constructor { : Constructor(cmp), internal_comparator_(cmp), table_factory_(new SkipListFactory) { - memtable_ = new MemTable(internal_comparator_, table_factory_); + memtable_ = new MemTable(internal_comparator_, table_factory_.get()); memtable_->Ref(); } ~MemTableConstructor() { @@ -378,7 +378,7 @@ class MemTableConstructor: public Constructor { } virtual Status FinishImpl(const Options& options, const KVMap& data) { delete memtable_->Unref(); - memtable_ = new MemTable(internal_comparator_, table_factory_); + memtable_ = new MemTable(internal_comparator_, table_factory_.get()); memtable_->Ref(); int seq = 1; for (KVMap::const_iterator it = data.begin(); @@ -930,19 +930,19 @@ TEST(TableTest, NumBlockStat) { class BlockCacheProperties { public: - explicit BlockCacheProperties(std::shared_ptr statistics) { + explicit BlockCacheProperties(Statistics* statistics) { block_cache_miss = - statistics.get()->getTickerCount(BLOCK_CACHE_MISS); + statistics->getTickerCount(BLOCK_CACHE_MISS); block_cache_hit = - statistics.get()->getTickerCount(BLOCK_CACHE_HIT); + statistics->getTickerCount(BLOCK_CACHE_HIT); index_block_cache_miss = - statistics.get()->getTickerCount(BLOCK_CACHE_INDEX_MISS); + statistics->getTickerCount(BLOCK_CACHE_INDEX_MISS); index_block_cache_hit = - statistics.get()->getTickerCount(BLOCK_CACHE_INDEX_HIT); + statistics->getTickerCount(BLOCK_CACHE_INDEX_HIT); data_block_cache_miss = - statistics.get()->getTickerCount(BLOCK_CACHE_DATA_MISS); + statistics->getTickerCount(BLOCK_CACHE_DATA_MISS); data_block_cache_hit = - statistics.get()->getTickerCount(BLOCK_CACHE_DATA_HIT); + statistics->getTickerCount(BLOCK_CACHE_DATA_HIT); } // Check if the fetched props matches the expected ones. @@ -993,7 +993,7 @@ TEST(TableTest, BlockCacheTest) { // At first, no block will be accessed. { - BlockCacheProperties props(options.statistics); + BlockCacheProperties props(options.statistics.get()); // index will be added to block cache. props.AssertEqual( 1, // index block miss @@ -1006,7 +1006,7 @@ TEST(TableTest, BlockCacheTest) { // Only index block will be accessed { iter.reset(c.NewIterator()); - BlockCacheProperties props(options.statistics); + BlockCacheProperties props(options.statistics.get()); // NOTE: to help better highlight the "detla" of each ticker, I use // + to indicate the increment of changed // value; other numbers remain the same. @@ -1021,7 +1021,7 @@ TEST(TableTest, BlockCacheTest) { // Only data block will be accessed { iter->SeekToFirst(); - BlockCacheProperties props(options.statistics); + BlockCacheProperties props(options.statistics.get()); props.AssertEqual( 1, 1, @@ -1034,7 +1034,7 @@ TEST(TableTest, BlockCacheTest) { { iter.reset(c.NewIterator()); iter->SeekToFirst(); - BlockCacheProperties props(options.statistics); + BlockCacheProperties props(options.statistics.get()); props.AssertEqual( 1, 1 + 1, // index block hit @@ -1054,7 +1054,7 @@ TEST(TableTest, BlockCacheTest) { iter.reset(c.NewIterator()); iter->SeekToFirst(); ASSERT_EQ("key", iter->key().ToString()); - BlockCacheProperties props(options.statistics); + BlockCacheProperties props(options.statistics.get()); // Nothing is affected at all props.AssertEqual(0, 0, 0, 0); } @@ -1065,7 +1065,7 @@ TEST(TableTest, BlockCacheTest) { options.block_cache = NewLRUCache(1); c.Reopen(options); { - BlockCacheProperties props(options.statistics); + BlockCacheProperties props(options.statistics.get()); props.AssertEqual( 1, // index block miss 0, @@ -1080,7 +1080,7 @@ TEST(TableTest, BlockCacheTest) { // It first cache index block then data block. But since the cache size // is only 1, index block will be purged after data block is inserted. iter.reset(c.NewIterator()); - BlockCacheProperties props(options.statistics); + BlockCacheProperties props(options.statistics.get()); props.AssertEqual( 1 + 1, // index block miss 0, @@ -1093,7 +1093,7 @@ TEST(TableTest, BlockCacheTest) { // SeekToFirst() accesses data block. With similar reason, we expect data // block's cache miss. iter->SeekToFirst(); - BlockCacheProperties props(options.statistics); + BlockCacheProperties props(options.statistics.get()); props.AssertEqual( 2, 0, @@ -1268,7 +1268,7 @@ class MemTableTest { }; TEST(MemTableTest, Simple) { InternalKeyComparator cmp(BytewiseComparator()); auto table_factory = std::make_shared(); - MemTable* memtable = new MemTable(cmp, table_factory); + MemTable* memtable = new MemTable(cmp, table_factory.get()); memtable->Ref(); WriteBatch batch; Options options; From eb12e47e0e38ddf18890451f536c14ae7b1aa188 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Tue, 3 Dec 2013 12:42:15 -0800 Subject: [PATCH 11/17] Killing Transform Rep Summary: Let's get rid of TransformRep and it's children. We have confirmed that HashSkipListRep works better with multifeed, so there is no benefit to keeping this around. This diff is mostly just deleting references to obsoleted functions. I also have a diff for fbcode that we'll need to push when we switch to new release. I had to expose HashSkipListRepFactory in the client header files because db_impl.cc needs access to GetTransform() function for SanitizeOptions. Test Plan: make check Reviewers: dhruba, haobo, kailiu, sdong Reviewed By: dhruba CC: leveldb Differential Revision: https://reviews.facebook.net/D14397 --- db/db_bench.cc | 18 +- db/db_impl.cc | 5 +- db/db_test.cc | 164 ++++++------- db/perf_context_test.cc | 4 +- db/prefix_test.cc | 11 +- include/rocksdb/memtablerep.h | 109 ++------- tools/db_stress.cc | 28 +-- util/hash_skiplist_rep.cc | 33 +-- util/hash_skiplist_rep.h | 38 +++ util/stl_wrappers.h | 19 -- util/transformrep.cc | 422 ---------------------------------- 11 files changed, 158 insertions(+), 693 deletions(-) create mode 100644 util/hash_skiplist_rep.h delete mode 100644 util/transformrep.cc diff --git a/db/db_bench.cc b/db/db_bench.cc index 63cc906e7..d7c0223e4 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -431,12 +431,11 @@ static bool ValidatePrefixSize(const char* flagname, int32_t value) { } return true; } -DEFINE_int32(prefix_size, 0, "Control the prefix size for PrefixHashRep"); +DEFINE_int32(prefix_size, 0, "Control the prefix size for HashSkipList"); enum RepFactory { kSkipList, kPrefixHash, - kUnsorted, kVectorRep }; enum RepFactory StringToRepFactory(const char* ctype) { @@ -446,8 +445,6 @@ enum RepFactory StringToRepFactory(const char* ctype) { return kSkipList; else if (!strcasecmp(ctype, "prefix_hash")) return kPrefixHash; - else if (!strcasecmp(ctype, "unsorted")) - return kUnsorted; else if (!strcasecmp(ctype, "vector")) return kVectorRep; @@ -803,9 +800,6 @@ class Benchmark { case kSkipList: fprintf(stdout, "Memtablerep: skip_list\n"); break; - case kUnsorted: - fprintf(stdout, "Memtablerep: unsorted\n"); - break; case kVectorRep: fprintf(stdout, "Memtablerep: vector\n"); break; @@ -1328,14 +1322,8 @@ class Benchmark { } switch (FLAGS_rep_factory) { case kPrefixHash: - options.memtable_factory.reset( - new PrefixHashRepFactory(NewFixedPrefixTransform(FLAGS_prefix_size)) - ); - break; - case kUnsorted: - options.memtable_factory.reset( - new UnsortedRepFactory - ); + options.memtable_factory.reset(NewHashSkipListRepFactory( + NewFixedPrefixTransform(FLAGS_prefix_size))); break; case kSkipList: // no need to do anything diff --git a/db/db_impl.cc b/db/db_impl.cc index 61f03327c..5ac6956b1 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -50,6 +50,7 @@ #include "util/auto_roll_logger.h" #include "util/build_version.h" #include "util/coding.h" +#include "util/hash_skiplist_rep.h" #include "util/logging.h" #include "util/mutexlock.h" #include "util/perf_context_imp.h" @@ -162,10 +163,10 @@ Options SanitizeOptions(const std::string& dbname, Log(result.info_log, "Compaction filter specified, ignore factory"); } if (result.prefix_extractor) { - // If a prefix extractor has been supplied and a PrefixHashRepFactory is + // If a prefix extractor has been supplied and a HashSkipListRepFactory is // being used, make sure that the latter uses the former as its transform // function. - auto factory = dynamic_cast( + auto factory = dynamic_cast( result.memtable_factory.get()); if (factory && factory->GetTransform() != result.prefix_extractor) { diff --git a/db/db_test.cc b/db/db_test.cc index c698d4bde..069ab679f 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -244,7 +244,6 @@ class DBTest { enum OptionConfig { kDefault, kVectorRep, - kUnsortedRep, kMergePut, kFilter, kUncompressed, @@ -255,7 +254,7 @@ class DBTest { kCompactOnFlush, kPerfOptions, kDeletesFilterFirst, - kPrefixHashRep, + kHashSkipList, kUniversalCompaction, kCompressedBlockCache, kEnd @@ -339,9 +338,9 @@ class DBTest { Options CurrentOptions() { Options options; switch (option_config_) { - case kPrefixHashRep: - options.memtable_factory.reset(new - PrefixHashRepFactory(NewFixedPrefixTransform(1))); + case kHashSkipList: + options.memtable_factory.reset( + NewHashSkipListRepFactory(NewFixedPrefixTransform(1))); break; case kMergePut: options.merge_operator = MergeOperators::CreatePutOperator(); @@ -375,9 +374,6 @@ class DBTest { case kDeletesFilterFirst: options.filter_deletes = true; break; - case kUnsortedRep: - options.memtable_factory.reset(new UnsortedRepFactory); - break; case kVectorRep: options.memtable_factory.reset(new VectorRepFactory(100)); break; @@ -4600,7 +4596,7 @@ TEST(DBTest, Randomized) { // TODO(sanjay): Test Get() works int p = rnd.Uniform(100); int minimum = 0; - if (option_config_ == kPrefixHashRep) { + if (option_config_ == kHashSkipList) { minimum = 1; } if (p < 45) { // Put @@ -4770,90 +4766,82 @@ void PrefixScanInit(DBTest *dbtest) { } TEST(DBTest, PrefixScan) { - for (int it = 0; it < 2; ++it) { - ReadOptions ro = ReadOptions(); - int count; - Slice prefix; - Slice key; - char buf[100]; - Iterator* iter; - snprintf(buf, sizeof(buf), "03______:"); - prefix = Slice(buf, 8); - key = Slice(buf, 9); - auto prefix_extractor = NewFixedPrefixTransform(8); - // db configs - env_->count_random_reads_ = true; - Options options = CurrentOptions(); - options.env = env_; - options.no_block_cache = true; - options.filter_policy = NewBloomFilterPolicy(10); - options.prefix_extractor = prefix_extractor; - options.whole_key_filtering = false; - options.disable_auto_compactions = true; - options.max_background_compactions = 2; - options.create_if_missing = true; - options.disable_seek_compaction = true; - if (it == 0) { - options.memtable_factory.reset(NewHashSkipListRepFactory( - prefix_extractor)); - } else { - options.memtable_factory = std::make_shared( - prefix_extractor); - } + ReadOptions ro = ReadOptions(); + int count; + Slice prefix; + Slice key; + char buf[100]; + Iterator* iter; + snprintf(buf, sizeof(buf), "03______:"); + prefix = Slice(buf, 8); + key = Slice(buf, 9); + auto prefix_extractor = NewFixedPrefixTransform(8); + // db configs + env_->count_random_reads_ = true; + Options options = CurrentOptions(); + options.env = env_; + options.no_block_cache = true; + options.filter_policy = NewBloomFilterPolicy(10); + options.prefix_extractor = prefix_extractor; + options.whole_key_filtering = false; + options.disable_auto_compactions = true; + options.max_background_compactions = 2; + options.create_if_missing = true; + options.disable_seek_compaction = true; + options.memtable_factory.reset(NewHashSkipListRepFactory(prefix_extractor)); - // prefix specified, with blooms: 2 RAND I/Os - // SeekToFirst - DestroyAndReopen(&options); - PrefixScanInit(this); - count = 0; - env_->random_read_counter_.Reset(); - ro.prefix = &prefix; - iter = db_->NewIterator(ro); - for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { - assert(iter->key().starts_with(prefix)); - count++; - } - ASSERT_OK(iter->status()); - delete iter; - ASSERT_EQ(count, 2); - ASSERT_EQ(env_->random_read_counter_.Read(), 2); + // prefix specified, with blooms: 2 RAND I/Os + // SeekToFirst + DestroyAndReopen(&options); + PrefixScanInit(this); + count = 0; + env_->random_read_counter_.Reset(); + ro.prefix = &prefix; + iter = db_->NewIterator(ro); + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + assert(iter->key().starts_with(prefix)); + count++; + } + ASSERT_OK(iter->status()); + delete iter; + ASSERT_EQ(count, 2); + ASSERT_EQ(env_->random_read_counter_.Read(), 2); - // prefix specified, with blooms: 2 RAND I/Os - // Seek - DestroyAndReopen(&options); - PrefixScanInit(this); - count = 0; - env_->random_read_counter_.Reset(); - ro.prefix = &prefix; - iter = db_->NewIterator(ro); - for (iter->Seek(key); iter->Valid(); iter->Next()) { - assert(iter->key().starts_with(prefix)); - count++; - } - ASSERT_OK(iter->status()); - delete iter; - ASSERT_EQ(count, 2); - ASSERT_EQ(env_->random_read_counter_.Read(), 2); + // prefix specified, with blooms: 2 RAND I/Os + // Seek + DestroyAndReopen(&options); + PrefixScanInit(this); + count = 0; + env_->random_read_counter_.Reset(); + ro.prefix = &prefix; + iter = db_->NewIterator(ro); + for (iter->Seek(key); iter->Valid(); iter->Next()) { + assert(iter->key().starts_with(prefix)); + count++; + } + ASSERT_OK(iter->status()); + delete iter; + ASSERT_EQ(count, 2); + ASSERT_EQ(env_->random_read_counter_.Read(), 2); - // no prefix specified: 11 RAND I/Os - DestroyAndReopen(&options); - PrefixScanInit(this); - count = 0; - env_->random_read_counter_.Reset(); - iter = db_->NewIterator(ReadOptions()); - for (iter->Seek(prefix); iter->Valid(); iter->Next()) { - if (! iter->key().starts_with(prefix)) { - break; - } - count++; + // no prefix specified: 11 RAND I/Os + DestroyAndReopen(&options); + PrefixScanInit(this); + count = 0; + env_->random_read_counter_.Reset(); + iter = db_->NewIterator(ReadOptions()); + for (iter->Seek(prefix); iter->Valid(); iter->Next()) { + if (! iter->key().starts_with(prefix)) { + break; } - ASSERT_OK(iter->status()); - delete iter; - ASSERT_EQ(count, 2); - ASSERT_EQ(env_->random_read_counter_.Read(), 11); - Close(); - delete options.filter_policy; + count++; } + ASSERT_OK(iter->status()); + delete iter; + ASSERT_EQ(count, 2); + ASSERT_EQ(env_->random_read_counter_.Read(), 11); + Close(); + delete options.filter_policy; } std::string MakeKey(unsigned int num) { diff --git a/db/perf_context_test.cc b/db/perf_context_test.cc index 05416748d..0934de0cd 100644 --- a/db/perf_context_test.cc +++ b/db/perf_context_test.cc @@ -38,8 +38,8 @@ std::shared_ptr OpenDb() { if (FLAGS_use_set_based_memetable) { auto prefix_extractor = rocksdb::NewFixedPrefixTransform(0); - options.memtable_factory = - std::make_shared(prefix_extractor); + options.memtable_factory.reset( + NewHashSkipListRepFactory(prefix_extractor)); } Status s = DB::Open(options, kDbName, &db); diff --git a/db/prefix_test.cc b/db/prefix_test.cc index 6c7fc1697..7e5e9cc0e 100644 --- a/db/prefix_test.cc +++ b/db/prefix_test.cc @@ -11,7 +11,6 @@ #include "util/testharness.h" DEFINE_bool(use_prefix_hash_memtable, true, ""); -DEFINE_bool(use_nolock_version, true, ""); DEFINE_bool(trigger_deadlock, false, "issue delete in range scan to trigger PrefixHashMap deadlock"); DEFINE_uint64(bucket_count, 100000, "number of buckets"); @@ -109,14 +108,8 @@ class PrefixTest { if (FLAGS_use_prefix_hash_memtable) { auto prefix_extractor = NewFixedPrefixTransform(8); options.prefix_extractor = prefix_extractor; - if (FLAGS_use_nolock_version) { - options.memtable_factory.reset(NewHashSkipListRepFactory( - prefix_extractor, FLAGS_bucket_count)); - } else { - options.memtable_factory = - std::make_shared( - prefix_extractor, FLAGS_bucket_count, FLAGS_num_locks); - } + options.memtable_factory.reset(NewHashSkipListRepFactory( + prefix_extractor, FLAGS_bucket_count)); } Status s = DB::Open(options, kDbName, &db); diff --git a/include/rocksdb/memtablerep.h b/include/rocksdb/memtablerep.h index 4147e5f3a..fcb782d41 100644 --- a/include/rocksdb/memtablerep.h +++ b/include/rocksdb/memtablerep.h @@ -17,21 +17,13 @@ // The factory will be passed an Arena object when a new MemTableRep is // requested. The API for this object is in rocksdb/arena.h. // -// Users can implement their own memtable representations. We include four +// Users can implement their own memtable representations. We include three // types built in: // - SkipListRep: This is the default; it is backed by a skip list. -// - TransformRep: This is backed by an custom hash map. -// On construction, they are given a SliceTransform object. This -// object is applied to the user key of stored items which indexes into the -// hash map to yield a skiplist containing all records that share the same -// user key under the transform function. -// - UnsortedRep: A subclass of TransformRep where the transform function is -// the identity function. Optimized for point lookups. -// - PrefixHashRep: A subclass of TransformRep where the transform function is -// a fixed-size prefix extractor. If you use PrefixHashRepFactory, the transform -// must be identical to options.prefix_extractor, otherwise it will be discarded -// and the default will be used. It is optimized for ranged scans over a -// prefix. +// - HashSkipListRep: The memtable rep that is best used for keys that are +// structured like "prefix:suffix" where iteration withing a prefix is +// common and iteration across different prefixes is rare. It is backed by +// a hash map where each bucket is a skip list. // - VectorRep: This is backed by an unordered std::vector. On iteration, the // vector is sorted. It is intelligent about sorting; once the MarkReadOnly() // has been called, the vector will only be sorted once. It is optimized for @@ -186,88 +178,23 @@ public: } }; -// TransformReps are backed by an unordered map of buffers to buckets. When -// looking up a key, the user key is extracted and a user-supplied transform -// function (see rocksdb/slice_transform.h) is applied to get the key into the -// unordered map. This allows the user to bin user keys based on arbitrary -// criteria. Two example implementations are UnsortedRepFactory and -// PrefixHashRepFactory. +// HashSkipListRep is backed by hash map of buckets. Each bucket is a skip +// list. All the keys with the same prefix will be in the same bucket. +// The prefix is determined using user supplied SliceTransform. It has +// to match prefix_extractor in options.prefix_extractor. // // Iteration over the entire collection is implemented by dumping all the keys -// into an std::set. Thus, these data structures are best used when iteration -// over the entire collection is rare. +// into a separate skip list. Thus, these data structures are best used when +// iteration over the entire collection is rare. // // Parameters: -// transform: The SliceTransform to bucket user keys on. TransformRepFactory -// owns the pointer. -// bucket_count: Passed to the constructor of the underlying -// std::unordered_map of each TransformRep. On initialization, the -// underlying array will be at least bucket_count size. -// num_locks: Number of read-write locks to have for the rep. Each bucket is -// hashed onto a read-write lock which controls access to that lock. More -// locks means finer-grained concurrency but more memory overhead. -class TransformRepFactory : public MemTableRepFactory { - public: - explicit TransformRepFactory(const SliceTransform* transform, - size_t bucket_count, size_t num_locks = 1000) - : transform_(transform), - bucket_count_(bucket_count), - num_locks_(num_locks) { } - - virtual ~TransformRepFactory() { delete transform_; } - - virtual std::shared_ptr CreateMemTableRep( - MemTableRep::KeyComparator&, Arena*) override; - - virtual const char* Name() const override { - return "TransformRepFactory"; - } - - const SliceTransform* GetTransform() { return transform_; } - - protected: - const SliceTransform* transform_; - const size_t bucket_count_; - const size_t num_locks_; -}; - -// UnsortedReps bin user keys based on an identity function transform -- that -// is, transform(key) = key. This optimizes for point look-ups. -// -// Parameters: See TransformRepFactory. -class UnsortedRepFactory : public TransformRepFactory { -public: - explicit UnsortedRepFactory(size_t bucket_count = 0, size_t num_locks = 1000) - : TransformRepFactory(NewNoopTransform(), - bucket_count, - num_locks) { } - virtual const char* Name() const override { - return "UnsortedRepFactory"; - } -}; - -// PrefixHashReps bin user keys based on a fixed-size prefix. This optimizes for -// short ranged scans over a given prefix. -// -// Parameters: See TransformRepFactory. -class PrefixHashRepFactory : public TransformRepFactory { -public: - explicit PrefixHashRepFactory(const SliceTransform* prefix_extractor, - size_t bucket_count = 0, size_t num_locks = 1000) - : TransformRepFactory(prefix_extractor, bucket_count, num_locks) - { } - - virtual std::shared_ptr CreateMemTableRep( - MemTableRep::KeyComparator&, Arena*) override; - - virtual const char* Name() const override { - return "PrefixHashRepFactory"; - } -}; - -// The same as TransformRepFactory except it doesn't use locks. -// Experimental, will replace TransformRepFactory once we are sure -// it performs better +// transform: The prefix extractor that returns prefix when supplied a user +// key. Has to match options.prefix_extractor +// bucket_count: Number of buckets in a hash_map. Each bucket needs +// 8 bytes. By default, we set buckets to one million, which +// will take 8MB of memory. If you know the number of keys you'll +// keep in hash map, set bucket count to be approximately twice +// the number of keys extern MemTableRepFactory* NewHashSkipListRepFactory( const SliceTransform* transform, size_t bucket_count = 1000000); diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 71e36e901..966f007e8 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -305,8 +305,7 @@ DEFINE_bool(filter_deletes, false, "On true, deletes use KeyMayExist to drop" enum RepFactory { kSkipList, - kPrefixHash, - kUnsorted, + kHashSkipList, kVectorRep }; enum RepFactory StringToRepFactory(const char* ctype) { @@ -315,9 +314,7 @@ enum RepFactory StringToRepFactory(const char* ctype) { if (!strcasecmp(ctype, "skip_list")) return kSkipList; else if (!strcasecmp(ctype, "prefix_hash")) - return kPrefixHash; - else if (!strcasecmp(ctype, "unsorted")) - return kUnsorted; + return kHashSkipList; else if (!strcasecmp(ctype, "vector")) return kVectorRep; @@ -335,7 +332,7 @@ static bool ValidatePrefixSize(const char* flagname, int32_t value) { } return true; } -DEFINE_int32(prefix_size, 0, "Control the prefix size for PrefixHashRep"); +DEFINE_int32(prefix_size, 0, "Control the prefix size for HashSkipListRep"); static const bool FLAGS_prefix_size_dummy = google::RegisterFlagValidator(&FLAGS_prefix_size, &ValidatePrefixSize); @@ -1338,12 +1335,9 @@ class StressTest { case kSkipList: memtablerep = "skip_list"; break; - case kPrefixHash: + case kHashSkipList: memtablerep = "prefix_hash"; break; - case kUnsorted: - memtablerep = "unsorted"; - break; case kVectorRep: memtablerep = "vector"; break; @@ -1393,21 +1387,15 @@ class StressTest { FLAGS_delete_obsolete_files_period_micros; options.max_manifest_file_size = 1024; options.filter_deletes = FLAGS_filter_deletes; - if ((FLAGS_prefix_size == 0) == (FLAGS_rep_factory == kPrefixHash)) { + if ((FLAGS_prefix_size == 0) == (FLAGS_rep_factory == kHashSkipList)) { fprintf(stderr, "prefix_size should be non-zero iff memtablerep == prefix_hash\n"); exit(1); } switch (FLAGS_rep_factory) { - case kPrefixHash: - options.memtable_factory.reset( - new PrefixHashRepFactory(NewFixedPrefixTransform(FLAGS_prefix_size)) - ); - break; - case kUnsorted: - options.memtable_factory.reset( - new UnsortedRepFactory() - ); + case kHashSkipList: + options.memtable_factory.reset(NewHashSkipListRepFactory( + NewFixedPrefixTransform(FLAGS_prefix_size))); break; case kSkipList: // no need to do anything diff --git a/util/hash_skiplist_rep.cc b/util/hash_skiplist_rep.cc index b67911f93..c669769e0 100644 --- a/util/hash_skiplist_rep.cc +++ b/util/hash_skiplist_rep.cc @@ -4,6 +4,8 @@ // of patent rights can be found in the PATENTS file in the same directory. // +#include "util/hash_skiplist_rep.h" + #include "rocksdb/memtablerep.h" #include "rocksdb/arena.h" #include "rocksdb/slice.h" @@ -296,31 +298,12 @@ std::shared_ptr } // anon namespace -class HashSkipListRepFactory : public MemTableRepFactory { - public: - explicit HashSkipListRepFactory(const SliceTransform* transform, - size_t bucket_count = 1000000) - : transform_(transform), - bucket_count_(bucket_count) { } - - virtual ~HashSkipListRepFactory() { delete transform_; } - - virtual std::shared_ptr CreateMemTableRep( - MemTableRep::KeyComparator& compare, Arena* arena) override { - return std::make_shared(compare, arena, transform_, - bucket_count_); - } - - virtual const char* Name() const override { - return "HashSkipListRepFactory"; - } - - const SliceTransform* GetTransform() { return transform_; } - - private: - const SliceTransform* transform_; - const size_t bucket_count_; -}; +std::shared_ptr +HashSkipListRepFactory::CreateMemTableRep(MemTableRep::KeyComparator &compare, + Arena *arena) { + return std::make_shared(compare, arena, transform_, + bucket_count_); +} MemTableRepFactory* NewHashSkipListRepFactory( const SliceTransform* transform, size_t bucket_count) { diff --git a/util/hash_skiplist_rep.h b/util/hash_skiplist_rep.h new file mode 100644 index 000000000..b946cf05e --- /dev/null +++ b/util/hash_skiplist_rep.h @@ -0,0 +1,38 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once +#include "rocksdb/slice_transform.h" +#include "rocksdb/memtablerep.h" + +namespace rocksdb { + +class HashSkipListRepFactory : public MemTableRepFactory { + public: + explicit HashSkipListRepFactory(const SliceTransform* transform, + size_t bucket_count = 1000000) + : transform_(transform), + bucket_count_(bucket_count) { } + + virtual ~HashSkipListRepFactory() { delete transform_; } + + virtual std::shared_ptr CreateMemTableRep( + MemTableRep::KeyComparator& compare, Arena* arena) override; + + virtual const char* Name() const override { + return "HashSkipListRepFactory"; + } + + const SliceTransform* GetTransform() { return transform_; } + + private: + const SliceTransform* transform_; + const size_t bucket_count_; +}; + +} diff --git a/util/stl_wrappers.h b/util/stl_wrappers.h index b42a58427..b4c14b4ba 100644 --- a/util/stl_wrappers.h +++ b/util/stl_wrappers.h @@ -28,24 +28,5 @@ namespace stl_wrappers { } }; - struct Hash { - inline size_t operator()(const char* buf) const { - Slice internal_key = GetLengthPrefixedSlice(buf); - Slice value = - GetLengthPrefixedSlice(internal_key.data() + internal_key.size()); - unsigned int hval = MurmurHash(internal_key.data(), internal_key.size(), - 0); - hval = MurmurHash(value.data(), value.size(), hval); - return hval; - } - }; - - struct KeyEqual : private Base { - explicit KeyEqual(const MemTableRep::KeyComparator& compare) - : Base(compare) { } - inline bool operator()(const char* a, const char* b) const { - return this->compare_(a, b) == 0; - } - }; } } diff --git a/util/transformrep.cc b/util/transformrep.cc deleted file mode 100644 index 4c7df1321..000000000 --- a/util/transformrep.cc +++ /dev/null @@ -1,422 +0,0 @@ -// Copyright (c) 2013, Facebook, Inc. All rights reserved. -// This source code is licensed under the BSD-style license found in the -// LICENSE file in the root directory of this source tree. An additional grant -// of patent rights can be found in the PATENTS file in the same directory. -// -#include -#include -#include -#include -#include - -#include "rocksdb/memtablerep.h" -#include "rocksdb/arena.h" -#include "rocksdb/slice.h" -#include "rocksdb/slice_transform.h" -#include "port/port.h" -#include "util/mutexlock.h" -#include "util/murmurhash.h" -#include "util/stl_wrappers.h" - -namespace std { -template <> -struct hash { - size_t operator()(const rocksdb::Slice& slice) const { - return MurmurHash(slice.data(), slice.size(), 0); - } -}; -} - -namespace rocksdb { -namespace { - -using namespace stl_wrappers; - -class TransformRep : public MemTableRep { - public: - TransformRep(const KeyComparator& compare, Arena* arena, - const SliceTransform* transform, size_t bucket_size, - size_t num_locks); - - virtual void Insert(const char* key) override; - - virtual bool Contains(const char* key) const override; - - virtual size_t ApproximateMemoryUsage() override; - - virtual ~TransformRep() { } - - virtual std::shared_ptr GetIterator() override; - - virtual std::shared_ptr GetIterator( - const Slice& slice) override; - - virtual std::shared_ptr GetDynamicPrefixIterator() - override { - return std::make_shared(*this); - } - - std::shared_ptr GetTransformIterator( - const Slice& transformed); - - private: - friend class DynamicPrefixIterator; - typedef std::set Bucket; - typedef std::unordered_map> BucketMap; - - // Maps slices (which are transformed user keys) to buckets of keys sharing - // the same transform. - BucketMap buckets_; - - // rwlock_ protects access to the buckets_ data structure itself. Each bucket - // has its own read-write lock as well. - mutable port::RWMutex rwlock_; - - // Keep track of approximately how much memory is being used. - size_t memory_usage_ = 0; - - // The user-supplied transform whose domain is the user keys. - const SliceTransform* transform_; - - // Get a bucket from buckets_. If the bucket hasn't been initialized yet, - // initialize it before returning. Must be externally synchronized. - std::shared_ptr& GetBucket(const Slice& transformed); - - port::RWMutex* GetLock(const Slice& transformed) const; - - mutable std::vector locks_; - - const KeyComparator& compare_; - - class Iterator : public MemTableRep::Iterator { - public: - explicit Iterator(std::shared_ptr items); - - virtual ~Iterator() { }; - - // Returns true iff the iterator is positioned at a valid node. - virtual bool Valid() const; - - // Returns the key at the current position. - // REQUIRES: Valid() - virtual const char* key() const; - - // Advances to the next position. - // REQUIRES: Valid() - virtual void Next(); - - // Advances to the previous position. - // REQUIRES: Valid() - virtual void Prev(); - - // Advance to the first entry with a key >= target - virtual void Seek(const char* target); - - // Position at the first entry in collection. - // Final state of iterator is Valid() iff collection is not empty. - virtual void SeekToFirst(); - - // Position at the last entry in collection. - // Final state of iterator is Valid() iff collection is not empty. - virtual void SeekToLast(); - private: - std::shared_ptr items_; - Bucket::const_iterator cit_; - }; - - class EmptyIterator : public MemTableRep::Iterator { - // This is used when there wasn't a bucket. It is cheaper than - // instantiating an empty bucket over which to iterate. - public: - virtual bool Valid() const { - return false; - } - virtual const char* key() const { - assert(false); - return nullptr; - } - virtual void Next() { } - virtual void Prev() { } - virtual void Seek(const char* target) { } - virtual void SeekToFirst() { } - virtual void SeekToLast() { } - static std::shared_ptr GetInstance(); - private: - static std::shared_ptr instance; - EmptyIterator() { } - }; - - class TransformIterator : public Iterator { - public: - explicit TransformIterator(std::shared_ptr items, - port::RWMutex* rwlock); - virtual ~TransformIterator() { } - private: - const ReadLock l_; - }; - - - class DynamicPrefixIterator : public MemTableRep::Iterator { - private: - // the underlying memtable rep - const TransformRep& memtable_rep_; - // the result of a prefix seek - std::unique_ptr bucket_iterator_; - - public: - explicit DynamicPrefixIterator(const TransformRep& memtable_rep) - : memtable_rep_(memtable_rep) {} - - virtual ~DynamicPrefixIterator() { }; - - // Returns true iff the iterator is positioned at a valid node. - virtual bool Valid() const { - return bucket_iterator_ && bucket_iterator_->Valid(); - } - - // Returns the key at the current position. - // REQUIRES: Valid() - virtual const char* key() const { - assert(Valid()); - return bucket_iterator_->key(); - } - - // Advances to the next position. - // REQUIRES: Valid() - virtual void Next() { - assert(Valid()); - bucket_iterator_->Next(); - } - - // Advances to the previous position. - // REQUIRES: Valid() - virtual void Prev() { - assert(Valid()); - bucket_iterator_->Prev(); - } - - // Advance to the first entry with a key >= target within the - // same bucket as target - virtual void Seek(const char* target) { - Slice prefix = memtable_rep_.transform_->Transform( - memtable_rep_.UserKey(target)); - - ReadLock l(&memtable_rep_.rwlock_); - auto bucket = memtable_rep_.buckets_.find(prefix); - if (bucket == memtable_rep_.buckets_.end()) { - bucket_iterator_.reset(nullptr); - } else { - bucket_iterator_.reset( - new TransformIterator(bucket->second, memtable_rep_.GetLock(prefix))); - bucket_iterator_->Seek(target); - } - } - - // Position at the first entry in collection. - // Final state of iterator is Valid() iff collection is not empty. - virtual void SeekToFirst() { - // Prefix iterator does not support total order. - // We simply set the iterator to invalid state - bucket_iterator_.reset(nullptr); - } - - // Position at the last entry in collection. - // Final state of iterator is Valid() iff collection is not empty. - virtual void SeekToLast() { - // Prefix iterator does not support total order. - // We simply set the iterator to invalid state - bucket_iterator_.reset(nullptr); - } - }; -}; - -class PrefixHashRep : public TransformRep { - public: - PrefixHashRep(const KeyComparator& compare, Arena* arena, - const SliceTransform* transform, size_t bucket_size, - size_t num_locks) - : TransformRep(compare, arena, transform, - bucket_size, num_locks) { } - - virtual std::shared_ptr GetPrefixIterator( - const Slice& prefix) override; -}; - -std::shared_ptr& TransformRep::GetBucket( - const Slice& transformed) { - WriteLock l(&rwlock_); - auto& bucket = buckets_[transformed]; - if (!bucket) { - bucket.reset( - new decltype(buckets_)::mapped_type::element_type(Compare(compare_))); - // To memory_usage_ we add the size of the std::set and the size of the - // std::pair (decltype(buckets_)::value_type) which includes the - // Slice and the std::shared_ptr - memory_usage_ += sizeof(*bucket) + - sizeof(decltype(buckets_)::value_type); - } - return bucket; -} - -port::RWMutex* TransformRep::GetLock(const Slice& transformed) const { - return &locks_[std::hash()(transformed) % locks_.size()]; -} - -TransformRep::TransformRep(const KeyComparator& compare, Arena* arena, - const SliceTransform* transform, size_t bucket_size, - size_t num_locks) - : buckets_(bucket_size), - transform_(transform), - locks_(num_locks), - compare_(compare) { } - -void TransformRep::Insert(const char* key) { - assert(!Contains(key)); - auto transformed = transform_->Transform(UserKey(key)); - auto& bucket = GetBucket(transformed); - WriteLock bl(GetLock(transformed)); - bucket->insert(key); - memory_usage_ += sizeof(key); -} - -bool TransformRep::Contains(const char* key) const { - ReadLock l(&rwlock_); - auto transformed = transform_->Transform(UserKey(key)); - auto bucket = buckets_.find(transformed); - if (bucket == buckets_.end()) { - return false; - } - ReadLock bl(GetLock(transformed)); - return bucket->second->count(key) != 0; -} - -size_t TransformRep::ApproximateMemoryUsage() { - return memory_usage_; -} - -std::shared_ptr - TransformRep::EmptyIterator::GetInstance() { - if (!instance) { - instance.reset(new TransformRep::EmptyIterator); - } - return instance; -} - -TransformRep::Iterator::Iterator(std::shared_ptr items) - : items_(items), - cit_(items_->begin()) { } - -// Returns true iff the iterator is positioned at a valid node. -bool TransformRep::Iterator::Valid() const { - return cit_ != items_->end(); -} - -// Returns the key at the current position. -// REQUIRES: Valid() -const char* TransformRep::Iterator::key() const { - assert(Valid()); - return *cit_; -} - -// Advances to the next position. -// REQUIRES: Valid() -void TransformRep::Iterator::Next() { - assert(Valid()); - if (cit_ == items_->end()) { - return; - } - ++cit_; -} - -// Advances to the previous position. -// REQUIRES: Valid() -void TransformRep::Iterator::Prev() { - assert(Valid()); - if (cit_ == items_->begin()) { - // If you try to go back from the first element, the iterator should be - // invalidated. So we set it to past-the-end. This means that you can - // treat the container circularly. - cit_ = items_->end(); - } else { - --cit_; - } -} - -// Advance to the first entry with a key >= target -void TransformRep::Iterator::Seek(const char* target) { - cit_ = items_->lower_bound(target); -} - -// Position at the first entry in collection. -// Final state of iterator is Valid() iff collection is not empty. -void TransformRep::Iterator::SeekToFirst() { - cit_ = items_->begin(); -} - -void TransformRep::Iterator::SeekToLast() { - cit_ = items_->end(); - if (items_->size() != 0) { - --cit_; - } -} - -TransformRep::TransformIterator::TransformIterator( - std::shared_ptr items, port::RWMutex* rwlock) - : Iterator(items), l_(rwlock) { } - -std::shared_ptr TransformRep::GetIterator() { - auto items = std::make_shared(Compare(compare_)); - // Hold read locks on all locks - ReadLock l(&rwlock_); - std::for_each(locks_.begin(), locks_.end(), [] (port::RWMutex& lock) { - lock.ReadLock(); - }); - for (auto& bucket : buckets_) { - items->insert(bucket.second->begin(), bucket.second->end()); - } - std::for_each(locks_.begin(), locks_.end(), [] (port::RWMutex& lock) { - lock.Unlock(); - }); - return std::make_shared(std::move(items)); -} - -std::shared_ptr TransformRep::GetTransformIterator( - const Slice& transformed) { - ReadLock l(&rwlock_); - auto bucket = buckets_.find(transformed); - if (bucket == buckets_.end()) { - return EmptyIterator::GetInstance(); - } - return std::make_shared(bucket->second, - GetLock(transformed)); -} - -std::shared_ptr TransformRep::GetIterator( - const Slice& slice) { - auto transformed = transform_->Transform(slice); - return GetTransformIterator(transformed); -} - -std::shared_ptr - TransformRep::EmptyIterator::instance; - -} // anon namespace - -std::shared_ptr TransformRepFactory::CreateMemTableRep( - MemTableRep::KeyComparator& compare, Arena* arena) { - return std::make_shared(compare, arena, transform_, - bucket_count_, num_locks_); -} - -std::shared_ptr PrefixHashRepFactory::CreateMemTableRep( - MemTableRep::KeyComparator& compare, Arena* arena) { - return std::make_shared(compare, arena, transform_, - bucket_count_, num_locks_); -} - -std::shared_ptr PrefixHashRep::GetPrefixIterator( - const Slice& prefix) { - return TransformRep::GetTransformIterator(prefix); -} - -} // namespace rocksdb From 28a1b9b95ff23032ae1ab73796593458fd039c08 Mon Sep 17 00:00:00 2001 From: Sajal Jain Date: Tue, 3 Dec 2013 12:59:53 -0800 Subject: [PATCH 12/17] [rocksdb] statistics counters for memtable hits and misses Summary: added counters rocksdb.memtable.hit - for memtable hit rocksdb.memtable.miss - for memtable miss Test Plan: db_bench tests Reviewers: igor, dhruba, haobo Reviewed By: dhruba Differential Revision: https://reviews.facebook.net/D14433 --- db/db_impl.cc | 3 +++ include/rocksdb/statistics.h | 7 +++++++ 2 files changed, 10 insertions(+) diff --git a/db/db_impl.cc b/db/db_impl.cc index 5ac6956b1..294e0e4be 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2618,12 +2618,15 @@ Status DBImpl::GetImpl(const ReadOptions& options, LookupKey lkey(key, snapshot); if (mem->Get(lkey, value, &s, &merge_operands, options_)) { // Done + RecordTick(options_.statistics.get(), MEMTABLE_HIT); } else if (imm.Get(lkey, value, &s, &merge_operands, options_)) { // Done + RecordTick(options_.statistics.get(), MEMTABLE_HIT); } else { current->Get(options, lkey, value, &s, &merge_operands, &stats, options_, value_found); have_stat_update = true; + RecordTick(options_.statistics.get(), MEMTABLE_MISS); } mutex_.Lock(); diff --git a/include/rocksdb/statistics.h b/include/rocksdb/statistics.h index 102a4be58..286a624c8 100644 --- a/include/rocksdb/statistics.h +++ b/include/rocksdb/statistics.h @@ -51,6 +51,11 @@ enum Tickers { // # of times bloom filter has avoided file reads. BLOOM_FILTER_USEFUL, + // # of memtable hits. + MEMTABLE_HIT, + // # of memtable misses. + MEMTABLE_MISS, + /** * COMPACTION_KEY_DROP_* count the reasons for key drop during compaction * There are 3 reasons currently. @@ -125,6 +130,8 @@ const std::vector> TickersNameMap = { { BLOCK_CACHE_DATA_MISS, "rocksdb.block.cache.data.miss" }, { BLOCK_CACHE_DATA_HIT, "rocksdb.block.cache.data.hit" }, { BLOOM_FILTER_USEFUL, "rocksdb.bloom.filter.useful" }, + { MEMTABLE_HIT, "rocksdb.memtable.hit" }, + { MEMTABLE_MISS, "rocksdb.memtable.miss" }, { COMPACTION_KEY_DROP_NEWER_ENTRY, "rocksdb.compaction.key.drop.new" }, { COMPACTION_KEY_DROP_OBSOLETE, "rocksdb.compaction.key.drop.obsolete" }, { COMPACTION_KEY_DROP_USER, "rocksdb.compaction.key.drop.user" }, From 97aa401e2fad6457b5ec8bb24d364d4a09bff80c Mon Sep 17 00:00:00 2001 From: Mark Callaghan Date: Tue, 3 Dec 2013 12:32:07 -0800 Subject: [PATCH 13/17] Add compression options to db_bench Summary: This adds 2 options for compression to db_bench: * universal_compression_size_percent * compression_level - to set zlib compression level It also logs compression_size_percent at startup in LOG Task ID: # Blame Rev: Test Plan: make check, run db_bench Revert Plan: Database Impact: Memcache Impact: Other Notes: EImportant: - begin *PUBLIC* platform impact section - Bugzilla: # - end platform impact - Reviewers: dhruba CC: leveldb Differential Revision: https://reviews.facebook.net/D14439 --- db/db_bench.cc | 26 ++++++++++++++++++++++++++ util/options.cc | 3 +++ 2 files changed, 29 insertions(+) diff --git a/db/db_bench.cc b/db/db_bench.cc index d7c0223e4..33c1ecfe1 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -190,6 +190,10 @@ DEFINE_int32(universal_max_merge_width, 0, "The max number of files to compact" DEFINE_int32(universal_max_size_amplification_percent, 0, "The max size amplification for universal style compaction"); +DEFINE_int32(universal_compression_size_percent, -1, + "The percentage of the database to compress for universal " + "compaction. -1 means compress everything."); + DEFINE_int64(cache_size, -1, "Number of bytes to use as a cache of uncompressed" "data. Negative means use default settings."); @@ -324,6 +328,23 @@ DEFINE_string(compression_type, "snappy", static enum rocksdb::CompressionType FLAGS_compression_type_e = rocksdb::kSnappyCompression; +DEFINE_int32(compression_level, -1, + "Compression level. For zlib this should be -1 for the " + "default level, or between 0 and 9."); + +static bool ValidateCompressionLevel(const char* flagname, int32_t value) { + if (value < -1 || value > 9) { + fprintf(stderr, "Invalid value for --%s: %d, must be between -1 and 9\n", + flagname, value); + return false; + } + return true; +} + +static const bool FLAGS_compression_level_dummy = + google::RegisterFlagValidator(&FLAGS_compression_level, + &ValidateCompressionLevel); + DEFINE_int32(min_level_to_compress, -1, "If non-negative, compression starts" " from this level. Levels with number < min_level_to_compress are" " not compressed. Otherwise, apply compression_type to " @@ -1350,6 +1371,7 @@ class Benchmark { options.level0_slowdown_writes_trigger = FLAGS_level0_slowdown_writes_trigger; options.compression = FLAGS_compression_type_e; + options.compression_opts.level = FLAGS_compression_level; options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds; options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB; if (FLAGS_min_level_to_compress >= 0) { @@ -1411,6 +1433,10 @@ class Benchmark { options.compaction_options_universal.max_size_amplification_percent = FLAGS_universal_max_size_amplification_percent; } + if (FLAGS_universal_compression_size_percent != -1) { + options.compaction_options_universal.compression_size_percent = + FLAGS_universal_compression_size_percent; + } Status s; if(FLAGS_readonly) { diff --git a/util/options.cc b/util/options.cc index fffcce0a1..198d55384 100644 --- a/util/options.cc +++ b/util/options.cc @@ -278,6 +278,9 @@ Options::Dump(Logger* log) const Log(log,"Options.compaction_options_universal." "max_size_amplification_percent: %u", compaction_options_universal.max_size_amplification_percent); + Log(log, + "Options.compaction_options_universal.compression_size_percent: %u", + compaction_options_universal.compression_size_percent); std::string collector_names; for (auto collector : table_properties_collectors) { collector_names.append(collector->Name()); From fa88cbc71e52d2c7c4d09c42a5bce24774482d9e Mon Sep 17 00:00:00 2001 From: Vamsi Ponnekanti Date: Tue, 3 Dec 2013 23:16:36 -0800 Subject: [PATCH 14/17] [Log dumper broken when merge operator is in log] Summary: $title Test Plan: on my dev box Revert Plan: OK Task ID: # Reviewers: emayanke, dhruba, haobo Reviewed By: haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D14451 --- util/ldb_cmd.cc | 58 ++++++++++++++++++++++++------------------------- 1 file changed, 28 insertions(+), 30 deletions(-) diff --git a/util/ldb_cmd.cc b/util/ldb_cmd.cc index e255f8196..58d81460e 100644 --- a/util/ldb_cmd.cc +++ b/util/ldb_cmd.cc @@ -1226,25 +1226,41 @@ void ChangeCompactionStyleCommand::DoCommand() { class InMemoryHandler : public WriteBatch::Handler { public: + InMemoryHandler(stringstream& row, bool print_values) : Handler(),row_(row) { + print_values_ = print_values; + } - virtual void Put(const Slice& key, const Slice& value) { - putMap_[key.ToString()] = value.ToString(); + void commonPutMerge(const Slice& key, const Slice& value) { + string k = LDBCommand::StringToHex(key.ToString()); + if (print_values_) { + string v = LDBCommand::StringToHex(value.ToString()); + row_ << k << " : "; + row_ << v << " "; + } else { + row_ << k << " "; + } } - virtual void Delete(const Slice& key) { - deleteList_.push_back(key.ToString(true)); + + virtual void Put(const Slice& key, const Slice& value) { + row_ << "PUT : "; + commonPutMerge(key, value); } - virtual ~InMemoryHandler() { }; - map PutMap() { - return putMap_; + virtual void Merge(const Slice& key, const Slice& value) { + row_ << "MERGE : "; + commonPutMerge(key, value); } - vector DeleteList() { - return deleteList_; + + virtual void Delete(const Slice& key) { + row_ <<",DELETE : "; + row_ << LDBCommand::StringToHex(key.ToString()) << " "; } + virtual ~InMemoryHandler() { }; + private: - map putMap_; - vector deleteList_; + stringstream & row_; + bool print_values_; }; const string WALDumperCommand::ARG_WAL_FILE = "walfile"; @@ -1322,26 +1338,8 @@ void WALDumperCommand::DoCommand() { row< Date: Tue, 3 Dec 2013 06:39:07 -0800 Subject: [PATCH 15/17] Make an API to get database identity from the IDENTITY file Summary: This would enable rocksdb users to get the db identity without depending on implementation details(storing that in IDENTITY file) Test Plan: db/db_test (has identity checks) Reviewers: dhruba, haobo, igor, kailiu Reviewed By: dhruba CC: leveldb Differential Revision: https://reviews.facebook.net/D14463 --- db/db_impl.cc | 27 +++++++++++++++++++++++++++ db/db_impl.h | 2 ++ db/db_test.cc | 28 ++++++++++------------------ include/rocksdb/db.h | 10 +++++++++- 4 files changed, 48 insertions(+), 19 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 294e0e4be..9061adecc 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -3497,6 +3497,33 @@ void DBImpl::GetLiveFilesMetaData(std::vector *metadata) { return versions_->GetLiveFilesMetaData(metadata); } +Status DBImpl::GetDbIdentity(std::string& identity) { + std::string idfilename = IdentityFileName(dbname_); + unique_ptr idfile; + const EnvOptions soptions; + Status s = env_->NewSequentialFile(idfilename, &idfile, soptions); + if (!s.ok()) { + return s; + } + uint64_t file_size; + s = env_->GetFileSize(idfilename, &file_size); + if (!s.ok()) { + return s; + } + char buffer[file_size]; + Slice id; + s = idfile->Read(file_size, &id, buffer); + if (!s.ok()) { + return s; + } + identity.assign(id.ToString()); + // If last character is '\n' remove it from identity + if (identity.size() > 0 && identity.back() == '\n') { + identity.pop_back(); + } + return s; +} + // Default implementations of convenience methods that subclasses of DB // can call if they wish Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { diff --git a/db/db_impl.h b/db/db_impl.h index 15b6013d0..d7a346b6e 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -85,6 +85,8 @@ class DBImpl : public DB { virtual void GetLiveFilesMetaData( std::vector *metadata); + virtual Status GetDbIdentity(std::string& identity); + // Extra methods (for testing) that are not in the public DB interface // Compact any files in the named level that overlap [*begin, *end] diff --git a/db/db_test.cc b/db/db_test.cc index 069ab679f..5850d343f 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -1731,31 +1731,23 @@ TEST(DBTest, ManifestRollOver) { TEST(DBTest, IdentityAcrossRestarts) { do { - std::string idfilename = IdentityFileName(dbname_); - unique_ptr idfile; - const EnvOptions soptions; - ASSERT_OK(env_->NewSequentialFile(idfilename, &idfile, soptions)); - char buffer1[100]; - Slice id1; - ASSERT_OK(idfile->Read(100, &id1, buffer1)); + std::string id1; + ASSERT_OK(db_->GetDbIdentity(id1)); Options options = CurrentOptions(); Reopen(&options); - char buffer2[100]; - Slice id2; - ASSERT_OK(env_->NewSequentialFile(idfilename, &idfile, soptions)); - ASSERT_OK(idfile->Read(100, &id2, buffer2)); + std::string id2; + ASSERT_OK(db_->GetDbIdentity(id2)); // id1 should match id2 because identity was not regenerated - ASSERT_EQ(id1.ToString(), id2.ToString()); + ASSERT_EQ(id1.compare(id2), 0); + std::string idfilename = IdentityFileName(dbname_); ASSERT_OK(env_->DeleteFile(idfilename)); Reopen(&options); - char buffer3[100]; - Slice id3; - ASSERT_OK(env_->NewSequentialFile(idfilename, &idfile, soptions)); - ASSERT_OK(idfile->Read(100, &id3, buffer3)); - // id1 should NOT match id2 because identity was regenerated - ASSERT_NE(id1.ToString(0), id3.ToString()); + std::string id3; + ASSERT_OK(db_->GetDbIdentity(id3)); + // id1 should NOT match id3 because identity was regenerated + ASSERT_NE(id1.compare(id3), 0); } while (ChangeCompactOptions()); } diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 73f9ac4da..9849d3adf 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -273,7 +273,7 @@ class DB { // Sets iter to an iterator that is positioned at a write-batch containing // seq_number. If the sequence number is non existent, it returns an iterator // at the first available seq_no after the requested seq_no - // Returns Status::Ok if iterator is valid + // Returns Status::OK if iterator is valid // Must set WAL_ttl_seconds or WAL_size_limit_MB to large values to // use this api, else the WAL files will get // cleared aggressively and the iterator might keep getting invalid before @@ -292,6 +292,14 @@ class DB { std::vector *metadata) { } + // Sets the globally unique ID created at database creation time by invoking + // Env::GenerateUniqueId(), in identity. Returns Status::OK if identity could + // be set properly + virtual Status GetDbIdentity(std::string& identity) { + identity.clear(); + return Status::OK(); + } + private: // No copying allowed DB(const DB&); From 92e8316118b8bf330e1e7f025252be380940e941 Mon Sep 17 00:00:00 2001 From: Mayank Agarwal Date: Thu, 5 Dec 2013 10:16:39 -0800 Subject: [PATCH 16/17] Make GetDbIdentity pure virtual and also implement it for StackableDB, DBWithTTL Summary: As title Test Plan: make clean and make Reviewers: igor Reviewed By: igor CC: leveldb Differential Revision: https://reviews.facebook.net/D14469 --- db/db_test.cc | 4 ++++ include/rocksdb/db.h | 5 +---- include/utilities/stackable_db.h | 4 ++++ utilities/ttl/db_ttl.cc | 4 ++++ utilities/ttl/db_ttl.h | 2 ++ 5 files changed, 15 insertions(+), 4 deletions(-) diff --git a/db/db_test.cc b/db/db_test.cc index 5850d343f..fea7f1e1b 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -4476,6 +4476,10 @@ class ModelDB: public DB { return Status::OK(); } + virtual Status GetDbIdentity(std::string& identity) { + return Status::OK(); + } + virtual SequenceNumber GetLatestSequenceNumber() const { return 0; } diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 9849d3adf..7396f8445 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -295,10 +295,7 @@ class DB { // Sets the globally unique ID created at database creation time by invoking // Env::GenerateUniqueId(), in identity. Returns Status::OK if identity could // be set properly - virtual Status GetDbIdentity(std::string& identity) { - identity.clear(); - return Status::OK(); - } + virtual Status GetDbIdentity(std::string& identity) = 0; private: // No copying allowed diff --git a/include/utilities/stackable_db.h b/include/utilities/stackable_db.h index dc26ed852..e74bf353b 100644 --- a/include/utilities/stackable_db.h +++ b/include/utilities/stackable_db.h @@ -140,6 +140,10 @@ class StackableDB : public DB { return db_->DeleteFile(name); } + virtual Status GetDbIdentity(std::string& identity) { + return db_->GetDbIdentity(identity); + } + virtual Status GetUpdatesSince(SequenceNumber seq_number, unique_ptr* iter) override { diff --git a/utilities/ttl/db_ttl.cc b/utilities/ttl/db_ttl.cc index abe7408a6..ee4a948b9 100644 --- a/utilities/ttl/db_ttl.cc +++ b/utilities/ttl/db_ttl.cc @@ -291,6 +291,10 @@ Status DBWithTTL::DeleteFile(std::string name) { return db_->DeleteFile(name); } +Status DBWithTTL::GetDbIdentity(std::string& identity) { + return db_->GetDbIdentity(identity); +} + Status DBWithTTL::GetUpdatesSince( SequenceNumber seq_number, unique_ptr* iter) { diff --git a/utilities/ttl/db_ttl.h b/utilities/ttl/db_ttl.h index d09bae966..c5270764e 100644 --- a/utilities/ttl/db_ttl.h +++ b/utilities/ttl/db_ttl.h @@ -84,6 +84,8 @@ class DBWithTTL : public StackableDB { virtual Status DeleteFile(std::string name); + virtual Status GetDbIdentity(std::string& identity); + virtual SequenceNumber GetLatestSequenceNumber() const; virtual Status GetUpdatesSince(SequenceNumber seq_number, From b1d2de4a40f625011bf71d8b59d0f8436eb5f1a9 Mon Sep 17 00:00:00 2001 From: kailiu Date: Thu, 5 Dec 2013 22:29:03 -0800 Subject: [PATCH 17/17] Fix #26 by putting the implementation of CreateDBStatistics() to a cc file --- db/db_statistics.cc | 14 ++++++++++++++ db/db_statistics.h | 4 +--- table/table_test.cc | 2 +- 3 files changed, 16 insertions(+), 4 deletions(-) create mode 100644 db/db_statistics.cc diff --git a/db/db_statistics.cc b/db/db_statistics.cc new file mode 100644 index 000000000..f0cfd6740 --- /dev/null +++ b/db/db_statistics.cc @@ -0,0 +1,14 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "db/db_statistics.h" + +namespace rocksdb { + +std::shared_ptr CreateDBStatistics() { + return std::make_shared(); +} + +} // namespace rocksdb diff --git a/db/db_statistics.h b/db/db_statistics.h index 87bc86304..ec71e1688 100644 --- a/db/db_statistics.h +++ b/db/db_statistics.h @@ -58,8 +58,6 @@ class DBStatistics: public Statistics { std::vector allHistograms_; }; -std::shared_ptr CreateDBStatistics() { - return std::make_shared(); -} +std::shared_ptr CreateDBStatistics(); } // namespace rocksdb diff --git a/table/table_test.cc b/table/table_test.cc index e4aab1f69..1f79fcdf9 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -1047,7 +1047,7 @@ TEST(TableTest, BlockCacheTest) { // -- PART 2: Open without block cache options.block_cache.reset(); - options.statistics = CreateDBStatistics(); // reset the props + options.statistics = CreateDBStatistics(); // reset the stats c.Reopen(options); {