diff --git a/.gitignore b/.gitignore index d987e5767..55f9639d5 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,4 @@ util/build_version.cc build_tools/VALGRIND_LOGS/ coverage/COVERAGE_REPORT util/build_version.cc.tmp +.gdbhistory diff --git a/db/db_impl.cc b/db/db_impl.cc index f19ecba6c..7f1efdb88 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2203,7 +2203,6 @@ Status DBImpl::Get(const ReadOptions& options, Status DBImpl::GetImpl(const ReadOptions& options, const Slice& key, std::string* value, - const bool no_io, bool* value_found) { Status s; @@ -2242,7 +2241,7 @@ Status DBImpl::GetImpl(const ReadOptions& options, // Done } else { current->Get(options, lkey, value, &s, &merge_operands, &stats, - options_, no_io, value_found); + options_, value_found); have_stat_update = true; } mutex_.Lock(); @@ -2348,7 +2347,9 @@ bool DBImpl::KeyMayExist(const ReadOptions& options, if (value_found != nullptr) { *value_found = true; // falsify later if key-may-exist but can't fetch value } - return GetImpl(options, key, value, true, value_found).ok(); + ReadOptions roptions = options; + roptions.read_tier = kBlockCacheTier; // read from block cache only + return GetImpl(roptions, key, value, value_found).ok(); } Iterator* DBImpl::NewIterator(const ReadOptions& options) { diff --git a/db/db_impl.h b/db/db_impl.h index 4d9b09c49..6f4b5db42 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -424,7 +424,6 @@ class DBImpl : public DB { Status GetImpl(const ReadOptions& options, const Slice& key, std::string* value, - const bool no_io = false, bool* value_found = nullptr); }; diff --git a/db/db_test.cc b/db/db_test.cc index faa24c6ba..c64b620c5 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -11,6 +11,7 @@ #include "db/filename.h" #include "db/version_set.h" #include "db/write_batch_internal.h" +#include "db/db_statistics.h" #include "rocksdb/cache.h" #include "rocksdb/compaction_filter.h" #include "rocksdb/env.h" @@ -829,6 +830,7 @@ TEST(DBTest, KeyMayExist) { std::string value; Options options = CurrentOptions(); options.filter_policy = NewBloomFilterPolicy(20); + options.statistics = leveldb::CreateDBStatistics(); Reopen(&options); ASSERT_TRUE(!db_->KeyMayExist(ropts, "a", &value)); @@ -841,24 +843,114 @@ TEST(DBTest, KeyMayExist) { dbfull()->Flush(FlushOptions()); value.clear(); - value_found = false; + + long numopen = options.statistics.get()->getTickerCount(NO_FILE_OPENS); + long cache_miss = + options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS); ASSERT_TRUE(db_->KeyMayExist(ropts, "a", &value, &value_found)); ASSERT_TRUE(!value_found); + // assert that no new files were opened and no new blocks were + // read into block cache. + ASSERT_EQ(numopen, options.statistics.get()->getTickerCount(NO_FILE_OPENS)); + ASSERT_EQ(cache_miss, + options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS)); ASSERT_OK(db_->Delete(WriteOptions(), "a")); + + numopen = options.statistics.get()->getTickerCount(NO_FILE_OPENS); + cache_miss = options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS); ASSERT_TRUE(!db_->KeyMayExist(ropts, "a", &value)); + ASSERT_EQ(numopen, options.statistics.get()->getTickerCount(NO_FILE_OPENS)); + ASSERT_EQ(cache_miss, + options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS)); dbfull()->Flush(FlushOptions()); dbfull()->CompactRange(nullptr, nullptr); + + numopen = options.statistics.get()->getTickerCount(NO_FILE_OPENS); + cache_miss = options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS); ASSERT_TRUE(!db_->KeyMayExist(ropts, "a", &value)); + ASSERT_EQ(numopen, options.statistics.get()->getTickerCount(NO_FILE_OPENS)); + ASSERT_EQ(cache_miss, + options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS)); ASSERT_OK(db_->Delete(WriteOptions(), "c")); + + numopen = options.statistics.get()->getTickerCount(NO_FILE_OPENS); + cache_miss = options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS); ASSERT_TRUE(!db_->KeyMayExist(ropts, "c", &value)); + ASSERT_EQ(numopen, options.statistics.get()->getTickerCount(NO_FILE_OPENS)); + ASSERT_EQ(cache_miss, + options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS)); delete options.filter_policy; } while (ChangeOptions()); } +TEST(DBTest, NonBlockingIteration) { + do { + ReadOptions non_blocking_opts, regular_opts; + Options options = CurrentOptions(); + options.statistics = leveldb::CreateDBStatistics(); + non_blocking_opts.read_tier = kBlockCacheTier; + Reopen(&options); + + // write one kv to the database. + ASSERT_OK(db_->Put(WriteOptions(), "a", "b")); + + // scan using non-blocking iterator. We should find it because + // it is in memtable. + Iterator* iter = db_->NewIterator(non_blocking_opts); + int count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_TRUE(iter->status().ok()); + count++; + } + ASSERT_EQ(count, 1); + delete iter; + + // flush memtable to storage. Now, the key should not be in the + // memtable neither in the block cache. + dbfull()->Flush(FlushOptions()); + + // verify that a non-blocking iterator does not find any + // kvs. Neither does it do any IOs to storage. + long numopen = options.statistics.get()->getTickerCount(NO_FILE_OPENS); + long cache_miss = + options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS); + iter = db_->NewIterator(non_blocking_opts); + count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + count++; + } + ASSERT_EQ(count, 0); + ASSERT_TRUE(iter->status().IsIncomplete()); + ASSERT_EQ(numopen, options.statistics.get()->getTickerCount(NO_FILE_OPENS)); + ASSERT_EQ(cache_miss, + options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS)); + delete iter; + + // read in the specified block via a regular get + ASSERT_EQ(Get("a"), "b"); + + // verify that we can find it via a non-blocking scan + numopen = options.statistics.get()->getTickerCount(NO_FILE_OPENS); + cache_miss = options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS); + iter = db_->NewIterator(non_blocking_opts); + count = 0; + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + ASSERT_TRUE(iter->status().ok()); + count++; + } + ASSERT_EQ(count, 1); + ASSERT_EQ(numopen, options.statistics.get()->getTickerCount(NO_FILE_OPENS)); + ASSERT_EQ(cache_miss, + options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS)); + delete iter; + + } while (ChangeOptions()); +} + // A delete is skipped for key if KeyMayExist(key) returns False // Tests Writebatch consistency and proper delete behaviour TEST(DBTest, FilterDeletes) { diff --git a/db/table_cache.cc b/db/table_cache.cc index 48d117755..dc8769f48 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -48,7 +48,7 @@ Status TableCache::FindTable(const EnvOptions& toptions, *handle = cache_->Lookup(key); if (*handle == nullptr) { if (no_io) { // Dont do IO and return a not-found status - return Status::NotFound("Table not found in table_cache, no_io is set"); + return Status::Incomplete("Table not found in table_cache, no_io is set"); } if (table_io != nullptr) { *table_io = true; // we had to do IO from storage @@ -90,7 +90,8 @@ Iterator* TableCache::NewIterator(const ReadOptions& options, } Cache::Handle* handle = nullptr; - Status s = FindTable(toptions, file_number, file_size, &handle); + Status s = FindTable(toptions, file_number, file_size, &handle, + nullptr, options.read_tier == kBlockCacheTier); if (!s.ok()) { return NewErrorIterator(s); } @@ -117,17 +118,17 @@ Status TableCache::Get(const ReadOptions& options, void* arg, bool (*saver)(void*, const Slice&, const Slice&, bool), bool* table_io, - void (*mark_key_may_exist)(void*), - const bool no_io) { + void (*mark_key_may_exist)(void*)) { Cache::Handle* handle = nullptr; Status s = FindTable(storage_options_, file_number, file_size, - &handle, table_io, no_io); + &handle, table_io, + options.read_tier == kBlockCacheTier); if (s.ok()) { Table* t = reinterpret_cast(cache_->Value(handle)); - s = t->InternalGet(options, k, arg, saver, mark_key_may_exist, no_io); + s = t->InternalGet(options, k, arg, saver, mark_key_may_exist); cache_->Release(handle); - } else if (no_io && s.IsNotFound()) { + } else if (options.read_tier && s.IsIncomplete()) { // Couldnt find Table in cache but treat as kFound if no_io set (*mark_key_may_exist)(arg); return Status::OK(); diff --git a/db/table_cache.h b/db/table_cache.h index d7308020c..c9e68738b 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -49,8 +49,7 @@ class TableCache { void* arg, bool (*handle_result)(void*, const Slice&, const Slice&, bool), bool* table_io, - void (*mark_key_may_exist)(void*) = nullptr, - const bool no_io = false); + void (*mark_key_may_exist)(void*) = nullptr); // Determine whether the table may contain the specified prefix. If // the table index of blooms are not in memory, this may cause an I/O diff --git a/db/version_set.cc b/db/version_set.cc index dca8c7228..54be370bd 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -415,7 +415,6 @@ void Version::Get(const ReadOptions& options, std::deque* operands, GetStats* stats, const Options& db_options, - const bool no_io, bool* value_found) { Slice ikey = k.internal_key(); Slice user_key = k.user_key(); @@ -425,9 +424,6 @@ void Version::Get(const ReadOptions& options, auto logger = db_options.info_log; assert(status->ok() || status->IsMergeInProgress()); - if (no_io) { - assert(status->ok()); - } Saver saver; saver.state = status->ok()? kNotFound : kMerge; saver.ucmp = ucmp; @@ -516,7 +512,7 @@ void Version::Get(const ReadOptions& options, bool tableIO = false; *status = vset_->table_cache_->Get(options, f->number, f->file_size, ikey, &saver, SaveValue, &tableIO, - MarkKeyMayExist, no_io); + MarkKeyMayExist); // TODO: examine the behavior for corrupted key if (!status->ok()) { return; diff --git a/db/version_set.h b/db/version_set.h index 9a7aeb20b..9a1068297 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -76,7 +76,7 @@ class Version { }; void Get(const ReadOptions&, const LookupKey& key, std::string* val, Status* status, std::deque* operands, GetStats* stats, - const Options& db_option, const bool no_io = false, + const Options& db_option, bool* value_found = nullptr); // Adds "stats" into the current state. Returns true if a new diff --git a/include/rocksdb/iterator.h b/include/rocksdb/iterator.h index 9dde6d70f..4270e95f7 100644 --- a/include/rocksdb/iterator.h +++ b/include/rocksdb/iterator.h @@ -65,6 +65,8 @@ class Iterator { virtual Slice value() const = 0; // If an error has occurred, return it. Else return an ok status. + // If non-blocking IO is requested and this operation cannot be + // satisfied without doing some IO, then this returns Status::Incomplete(). virtual Status status() const = 0; // Clients are allowed to register function/arg1/arg2 triples that diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index e3701af09..8e66811ac 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -543,6 +543,18 @@ struct Options { std::shared_ptr compaction_filter_factory; }; +// +// An application can issue a read request (via Get/Iterators) and specify +// if that read should process data that ALREADY resides on a specified cache +// level. For example, if an application specifies kBlockCacheTier then the +// Get call will process data that is already processed in the memtable or +// the block cache. It will not page in data from the OS cache or data that +// resides in storage. +enum ReadTier { + kReadAllTier = 0x0, // data in memtable, block cache, OS cache or storage + kBlockCacheTier = 0x1 // data in memtable or block cache +}; + // Options that control read operations struct ReadOptions { // If true, all data read from underlying storage will be @@ -575,15 +587,23 @@ struct ReadOptions { // Default: nullptr const Slice* prefix; + // Specify if this read request should process data that ALREADY + // resides on a particular cache. If the required data is not + // found at the specified cache, then Status::WouldBlock is returned. + // Default: kReadAllTier + ReadTier read_tier; + ReadOptions() : verify_checksums(false), fill_cache(true), snapshot(nullptr), - prefix(nullptr) { + prefix(nullptr), + read_tier(kReadAllTier) { } ReadOptions(bool cksum, bool cache) : verify_checksums(cksum), fill_cache(cache), - snapshot(nullptr), prefix(nullptr) { + snapshot(nullptr), prefix(nullptr), + read_tier(kReadAllTier) { } }; diff --git a/include/rocksdb/status.h b/include/rocksdb/status.h index f8cdbc7a1..f3af5bfab 100644 --- a/include/rocksdb/status.h +++ b/include/rocksdb/status.h @@ -50,6 +50,9 @@ class Status { static Status MergeInProgress(const Slice& msg, const Slice& msg2 = Slice()) { return Status(kMergeInProgress, msg, msg2); } + static Status Incomplete(const Slice& msg, const Slice& msg2 = Slice()) { + return Status(kIncomplete, msg, msg2); + } // Returns true iff the status indicates success. bool ok() const { return (state_ == nullptr); } @@ -72,6 +75,9 @@ class Status { // Returns true iff the status indicates an MergeInProgress. bool IsMergeInProgress() const { return code() == kMergeInProgress; } + // Returns true iff the status indicates Incomplete + bool IsIncomplete() const { return code() == kIncomplete; } + // Return a string representation of this status suitable for printing. // Returns the string "OK" for success. std::string ToString() const; @@ -92,6 +98,7 @@ class Status { kInvalidArgument = 4, kIOError = 5, kMergeInProgress = 6, + kIncomplete = 7 }; Code code() const { diff --git a/table/table.cc b/table/table.cc index 6d7ddb6ac..f2b80cbbc 100644 --- a/table/table.cc +++ b/table/table.cc @@ -237,8 +237,8 @@ Iterator* Table::BlockReader(void* arg, const ReadOptions& options, const Slice& index_value, bool* didIO, - bool for_compaction, - const bool no_io) { + bool for_compaction) { + const bool no_io = (options.read_tier == kBlockCacheTier); Table* table = reinterpret_cast(arg); Cache* block_cache = table->rep_->options.block_cache.get(); std::shared_ptr statistics = table->rep_->options.statistics; @@ -268,7 +268,8 @@ Iterator* Table::BlockReader(void* arg, RecordTick(statistics, BLOCK_CACHE_HIT); } else if (no_io) { - return nullptr; // Did not find in block_cache and can't do IO + // Did not find in block_cache and can't do IO + return NewErrorIterator(Status::Incomplete("no blocking io")); } else { Histograms histogram = for_compaction ? READ_BLOCK_COMPACTION_MICROS : READ_BLOCK_GET_MICROS; @@ -292,7 +293,8 @@ Iterator* Table::BlockReader(void* arg, RecordTick(statistics, BLOCK_CACHE_MISS); } } else if (no_io) { - return nullptr; // Could not read from block_cache and can't do IO + // Could not read from block_cache and can't do IO + return NewErrorIterator(Status::Incomplete("no blocking io")); }else { s = ReadBlock(table->rep_->file.get(), options, handle, &block, didIO); } @@ -401,8 +403,7 @@ Status Table::InternalGet(const ReadOptions& options, const Slice& k, void* arg, bool (*saver)(void*, const Slice&, const Slice&, bool), - void (*mark_key_may_exist)(void*), - const bool no_io) { + void (*mark_key_may_exist)(void*)) { Status s; Iterator* iiter = rep_->index_block->NewIterator(rep_->options.comparator); bool done = false; @@ -421,9 +422,10 @@ Status Table::InternalGet(const ReadOptions& options, const Slice& k, } else { bool didIO = false; Iterator* block_iter = BlockReader(this, options, iiter->value(), - &didIO, false, no_io); + &didIO); - if (no_io && !block_iter) { // couldn't get block from block_cache + if (options.read_tier && block_iter->status().IsIncomplete()) { + // couldn't get block from block_cache // Update Saver.state to Found because we are only looking for whether // we can guarantee the key is not there when "no_io" is set (*mark_key_may_exist)(arg); diff --git a/table/table.h b/table/table.h index a7014f911..52d618f38 100644 --- a/table/table.h +++ b/table/table.h @@ -79,8 +79,7 @@ class Table { const EnvOptions& soptions, const Slice&, bool for_compaction); static Iterator* BlockReader(void*, const ReadOptions&, const Slice&, - bool* didIO, bool for_compaction = false, - const bool no_io = false); + bool* didIO, bool for_compaction = false); // Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found // after a call to Seek(key), until handle_result returns false. @@ -90,8 +89,7 @@ class Table { const ReadOptions&, const Slice& key, void* arg, bool (*handle_result)(void* arg, const Slice& k, const Slice& v, bool), - void (*mark_key_may_exist)(void*) = nullptr, - const bool no_io = false); + void (*mark_key_may_exist)(void*) = nullptr); void ReadMeta(const Footer& footer);