diff --git a/HISTORY.md b/HISTORY.md index a6f58d16a..9a14a8bf4 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -3,12 +3,13 @@ ## 4.5.0 (2/5/2016) ### Public API Changes -* Add a new perf context level between kEnableCount and kEnableTime. Level 2 now doesn't include timers for mutexes. +* Add a new perf context level between kEnableCount and kEnableTime. Level 2 now does not include timers for mutexes. * Statistics of mutex operation durations will not be measured by default. If you want to have them enabled, you need to set Statistics::stats_level_ to kAll. * DBOptions::delete_scheduler and NewDeleteScheduler() are removed, please use DBOptions::sst_file_manager and NewSstFileManager() instead ### New Features * ldb tool now supports operations to non-default column families. +* Add kPersistedTier to ReadTier. This option allows Get and MultiGet to read only the persited data and skip mem-tables if writes were done with disableWAL = true. * Add DBOptions::sst_file_manager. Use NewSstFileManager() in include/rocksdb/sst_file_manager.h to create a SstFileManager that can be used to track the total size of SST files and control the SST files deletion rate. ## 4.4.0 (1/14/2016) diff --git a/db/db_impl.cc b/db/db_impl.cc index 391bfa6db..87fb9fca7 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -275,7 +275,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) db_options_.delete_obsolete_files_period_micros), last_stats_dump_time_microsec_(0), next_job_id_(1), - flush_on_destroy_(false), + has_unpersisted_data_(false), env_options_(db_options_), #ifndef ROCKSDB_LITE wal_manager_(db_options_, env_options_), @@ -322,7 +322,8 @@ void DBImpl::CancelAllBackgroundWork(bool wait) { DBImpl::~DBImpl() { mutex_.Lock(); - if (!shutting_down_.load(std::memory_order_acquire) && flush_on_destroy_) { + if (!shutting_down_.load(std::memory_order_acquire) && + has_unpersisted_data_) { for (auto cfd : *versions_->GetColumnFamilySet()) { if (!cfd->IsDropped() && !cfd->mem()->IsEmpty()) { cfd->Ref(); @@ -3306,13 +3307,19 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, LookupKey lkey(key, snapshot); PERF_TIMER_STOP(get_snapshot_time); - if (sv->mem->Get(lkey, value, &s, &merge_context)) { - // Done - RecordTick(stats_, MEMTABLE_HIT); - } else if (sv->imm->Get(lkey, value, &s, &merge_context)) { - // Done - RecordTick(stats_, MEMTABLE_HIT); - } else { + bool skip_memtable = + (read_options.read_tier == kPersistedTier && has_unpersisted_data_); + bool done = false; + if (!skip_memtable) { + if (sv->mem->Get(lkey, value, &s, &merge_context)) { + done = true; + RecordTick(stats_, MEMTABLE_HIT); + } else if (sv->imm->Get(lkey, value, &s, &merge_context)) { + done = true; + RecordTick(stats_, MEMTABLE_HIT); + } + } + if (!done) { PERF_TIMER_GUARD(get_from_output_files_time); sv->current->Get(read_options, lkey, value, &s, &merge_context, value_found); @@ -3397,14 +3404,23 @@ std::vector DBImpl::MultiGet( assert(mgd_iter != multiget_cf_data.end()); auto mgd = mgd_iter->second; auto super_version = mgd->super_version; - if (super_version->mem->Get(lkey, value, &s, &merge_context)) { - // Done - } else if (super_version->imm->Get(lkey, value, &s, &merge_context)) { - // Done - } else { + bool skip_memtable = + (read_options.read_tier == kPersistedTier && has_unpersisted_data_); + bool done = false; + if (!skip_memtable) { + if (super_version->mem->Get(lkey, value, &s, &merge_context)) { + done = true; + // TODO(?): RecordTick(stats_, MEMTABLE_HIT)? + } else if (super_version->imm->Get(lkey, value, &s, &merge_context)) { + done = true; + // TODO(?): RecordTick(stats_, MEMTABLE_HIT)? + } + } + if (!done) { PERF_TIMER_GUARD(get_from_output_files_time); super_version->current->Get(read_options, lkey, value, &s, &merge_context); + // TODO(?): RecordTick(stats_, MEMTABLE_MISS)? } if (s.ok()) { @@ -3843,6 +3859,10 @@ bool DBImpl::KeyMayExist(const ReadOptions& read_options, Iterator* DBImpl::NewIterator(const ReadOptions& read_options, ColumnFamilyHandle* column_family) { + if (read_options.read_tier == kPersistedTier) { + return NewErrorIterator(Status::NotSupported( + "ReadTier::kPersistedData is not yet supported in iterators.")); + } auto cfh = reinterpret_cast(column_family); auto cfd = cfh->cfd(); @@ -3949,6 +3969,10 @@ Status DBImpl::NewIterators( const ReadOptions& read_options, const std::vector& column_families, std::vector* iterators) { + if (read_options.read_tier == kPersistedTier) { + return Status::NotSupported( + "ReadTier::kPersistedData is not yet supported in iterators."); + } iterators->clear(); iterators->reserve(column_families.size()); XFUNC_TEST("", "managed_new", managed_new1, xf_manage_new, @@ -4327,7 +4351,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, PERF_TIMER_STOP(write_pre_and_post_process_time); if (write_options.disableWAL) { - flush_on_destroy_ = true; + has_unpersisted_data_ = true; } uint64_t log_size = 0; diff --git a/db/db_impl.h b/db/db_impl.h index 429589360..82dd39135 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -822,7 +822,10 @@ class DBImpl : public DB { // they're unique std::atomic next_job_id_; - bool flush_on_destroy_; // Used when disableWAL is true. + // A flag indicating whether the current rocksdb database has any + // data that is not yet persisted into either WAL or SST file. + // Used when disableWAL is true. + bool has_unpersisted_data_; static const int KEEP_LOG_FILE_NUM = 1000; // MSVC version 1800 still does not have constexpr for ::max() diff --git a/db/db_test.cc b/db/db_test.cc index 7e93f35a7..76a9ec396 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -519,6 +519,135 @@ TEST_F(DBTest, PutSingleDeleteGet) { kSkipUniversalCompaction | kSkipMergePut)); } +TEST_F(DBTest, ReadFromPersistedTier) { + do { + Random rnd(301); + Options options = CurrentOptions(); + for (int disableWAL = 0; disableWAL <= 1; ++disableWAL) { + CreateAndReopenWithCF({"pikachu"}, options); + WriteOptions wopt; + wopt.disableWAL = (disableWAL == 1); + // 1st round: put but not flush + ASSERT_OK(db_->Put(wopt, handles_[1], "foo", "first")); + ASSERT_OK(db_->Put(wopt, handles_[1], "bar", "one")); + ASSERT_EQ("first", Get(1, "foo")); + ASSERT_EQ("one", Get(1, "bar")); + + // Read directly from persited data. + ReadOptions ropt; + ropt.read_tier = kPersistedTier; + std::string value; + if (wopt.disableWAL) { + // as data has not yet being flushed, we expect not found. + ASSERT_TRUE(db_->Get(ropt, handles_[1], "foo", &value).IsNotFound()); + ASSERT_TRUE(db_->Get(ropt, handles_[1], "bar", &value).IsNotFound()); + } else { + ASSERT_OK(db_->Get(ropt, handles_[1], "foo", &value)); + ASSERT_OK(db_->Get(ropt, handles_[1], "bar", &value)); + } + + // Multiget + std::vector multiget_cfs; + multiget_cfs.push_back(handles_[1]); + multiget_cfs.push_back(handles_[1]); + std::vector multiget_keys; + multiget_keys.push_back("foo"); + multiget_keys.push_back("bar"); + std::vector multiget_values; + auto statuses = + db_->MultiGet(ropt, multiget_cfs, multiget_keys, &multiget_values); + if (wopt.disableWAL) { + ASSERT_TRUE(statuses[0].IsNotFound()); + ASSERT_TRUE(statuses[1].IsNotFound()); + } else { + ASSERT_OK(statuses[0]); + ASSERT_OK(statuses[1]); + } + + // 2nd round: flush and put a new value in memtable. + ASSERT_OK(Flush(1)); + ASSERT_OK(db_->Put(wopt, handles_[1], "rocksdb", "hello")); + + // once the data has been flushed, we are able to get the + // data when kPersistedTier is used. + ASSERT_TRUE(db_->Get(ropt, handles_[1], "foo", &value).ok()); + ASSERT_EQ(value, "first"); + ASSERT_TRUE(db_->Get(ropt, handles_[1], "bar", &value).ok()); + ASSERT_EQ(value, "one"); + if (wopt.disableWAL) { + ASSERT_TRUE( + db_->Get(ropt, handles_[1], "rocksdb", &value).IsNotFound()); + } else { + ASSERT_OK(db_->Get(ropt, handles_[1], "rocksdb", &value)); + ASSERT_EQ(value, "hello"); + } + + // Expect same result in multiget + multiget_cfs.push_back(handles_[1]); + multiget_keys.push_back("rocksdb"); + statuses = + db_->MultiGet(ropt, multiget_cfs, multiget_keys, &multiget_values); + ASSERT_TRUE(statuses[0].ok()); + ASSERT_EQ("first", multiget_values[0]); + ASSERT_TRUE(statuses[1].ok()); + ASSERT_EQ("one", multiget_values[1]); + if (wopt.disableWAL) { + ASSERT_TRUE(statuses[2].IsNotFound()); + } else { + ASSERT_OK(statuses[2]); + } + + // 3rd round: delete and flush + ASSERT_OK(db_->Delete(wopt, handles_[1], "foo")); + Flush(1); + ASSERT_OK(db_->Delete(wopt, handles_[1], "bar")); + + ASSERT_TRUE(db_->Get(ropt, handles_[1], "foo", &value).IsNotFound()); + if (wopt.disableWAL) { + // Still expect finding the value as its delete has not yet being + // flushed. + ASSERT_TRUE(db_->Get(ropt, handles_[1], "bar", &value).ok()); + ASSERT_EQ(value, "one"); + } else { + ASSERT_TRUE(db_->Get(ropt, handles_[1], "bar", &value).IsNotFound()); + } + ASSERT_TRUE(db_->Get(ropt, handles_[1], "rocksdb", &value).ok()); + ASSERT_EQ(value, "hello"); + + statuses = + db_->MultiGet(ropt, multiget_cfs, multiget_keys, &multiget_values); + ASSERT_TRUE(statuses[0].IsNotFound()); + if (wopt.disableWAL) { + ASSERT_TRUE(statuses[1].ok()); + ASSERT_EQ("one", multiget_values[1]); + } else { + ASSERT_TRUE(statuses[1].IsNotFound()); + } + ASSERT_TRUE(statuses[2].ok()); + ASSERT_EQ("hello", multiget_values[2]); + if (wopt.disableWAL == 0) { + DestroyAndReopen(options); + } + } + } while (ChangeOptions(kSkipHashCuckoo)); +} + +TEST_F(DBTest, PersistedTierOnIterator) { + // The test needs to be changed if kPersistedTier is supported in iterator. + Options options = CurrentOptions(); + CreateAndReopenWithCF({"pikachu"}, options); + ReadOptions ropt; + ropt.read_tier = kPersistedTier; + + auto* iter = db_->NewIterator(ropt, handles_[1]); + ASSERT_TRUE(iter->status().IsNotSupported()); + delete iter; + + std::vector iters; + ASSERT_TRUE(db_->NewIterators(ropt, {handles_[1]}, &iters).IsNotSupported()); + Close(); +} + TEST_F(DBTest, SingleDeleteFlush) { // Test to check whether flushing preserves a single delete hidden // behind a put. diff --git a/db/table_cache.cc b/db/table_cache.cc index 663315840..1d506f002 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -273,7 +273,7 @@ Status TableCache::Get(const ReadOptions& options, if (handle != nullptr) { ReleaseHandle(handle); } - } else if (options.read_tier && s.IsIncomplete()) { + } else if (options.read_tier == kBlockCacheTier && s.IsIncomplete()) { // Couldn't find Table in cache but treat as kFound if no_io set get_context->MarkKeyMayExist(); return Status::OK(); diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index a26ed7d81..b1f752cf8 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -1294,8 +1294,12 @@ struct Options : public DBOptions, public ColumnFamilyOptions { // the block cache. It will not page in data from the OS cache or data that // resides in storage. enum ReadTier { - kReadAllTier = 0x0, // data in memtable, block cache, OS cache or storage - kBlockCacheTier = 0x1 // data in memtable or block cache + kReadAllTier = 0x0, // data in memtable, block cache, OS cache or storage + kBlockCacheTier = 0x1, // data in memtable or block cache + kPersistedTier = 0x2 // persisted data. When WAL is disabled, this option + // will skip data in memtable. + // Note that this ReadTier currently only supports + // Get and MultiGet and does not support iterators. }; // Options that control read operations diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index 00997f3ea..8c0149fd0 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -1255,7 +1255,8 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key, BlockIter biter; NewDataBlockIterator(rep_, read_options, iiter.value(), &biter); - if (read_options.read_tier && biter.status().IsIncomplete()) { + if (read_options.read_tier == kBlockCacheTier && + biter.status().IsIncomplete()) { // couldn't get block from block_cache // Update Saver.state to Found because we are only looking for whether // we can guarantee the key is not there when "no_io" is set