Introduced a new flag non_blocking_io in ReadOptions.

Summary:
If ReadOptions.non_blocking_io is set to true, then KeyMayExists
and Iterators will return data that is cached in RAM.
If the Iterator needs to do IO from storage to serve the data,
then the Iterator.status() will return Status::IsRetry().

Test Plan:
Enhanced unit test DBTest.KeyMayExist to detect if there were are IOs
issues from storage. Added DBTest.NonBlockingIteration to verify
nonblocking Iterations.

Reviewers: emayanke, haobo

Reviewed By: haobo

CC: leveldb

Maniphest Tasks: T63

Differential Revision: https://reviews.facebook.net/D12531
main
Dhruba Borthakur 12 years ago
parent 43eef52001
commit fc0c399d2e
  1. 1
      .gitignore
  2. 7
      db/db_impl.cc
  3. 1
      db/db_impl.h
  4. 94
      db/db_test.cc
  5. 15
      db/table_cache.cc
  6. 3
      db/table_cache.h
  7. 6
      db/version_set.cc
  8. 2
      db/version_set.h
  9. 2
      include/rocksdb/iterator.h
  10. 24
      include/rocksdb/options.h
  11. 7
      include/rocksdb/status.h
  12. 18
      table/table.cc
  13. 6
      table/table.h

1
.gitignore vendored

@ -21,3 +21,4 @@ util/build_version.cc
build_tools/VALGRIND_LOGS/
coverage/COVERAGE_REPORT
util/build_version.cc.tmp
.gdbhistory

@ -2203,7 +2203,6 @@ Status DBImpl::Get(const ReadOptions& options,
Status DBImpl::GetImpl(const ReadOptions& options,
const Slice& key,
std::string* value,
const bool no_io,
bool* value_found) {
Status s;
@ -2242,7 +2241,7 @@ Status DBImpl::GetImpl(const ReadOptions& options,
// Done
} else {
current->Get(options, lkey, value, &s, &merge_operands, &stats,
options_, no_io, value_found);
options_, value_found);
have_stat_update = true;
}
mutex_.Lock();
@ -2348,7 +2347,9 @@ bool DBImpl::KeyMayExist(const ReadOptions& options,
if (value_found != nullptr) {
*value_found = true; // falsify later if key-may-exist but can't fetch value
}
return GetImpl(options, key, value, true, value_found).ok();
ReadOptions roptions = options;
roptions.read_tier = kBlockCacheTier; // read from block cache only
return GetImpl(roptions, key, value, value_found).ok();
}
Iterator* DBImpl::NewIterator(const ReadOptions& options) {

@ -424,7 +424,6 @@ class DBImpl : public DB {
Status GetImpl(const ReadOptions& options,
const Slice& key,
std::string* value,
const bool no_io = false,
bool* value_found = nullptr);
};

@ -11,6 +11,7 @@
#include "db/filename.h"
#include "db/version_set.h"
#include "db/write_batch_internal.h"
#include "db/db_statistics.h"
#include "rocksdb/cache.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/env.h"
@ -829,6 +830,7 @@ TEST(DBTest, KeyMayExist) {
std::string value;
Options options = CurrentOptions();
options.filter_policy = NewBloomFilterPolicy(20);
options.statistics = leveldb::CreateDBStatistics();
Reopen(&options);
ASSERT_TRUE(!db_->KeyMayExist(ropts, "a", &value));
@ -841,24 +843,114 @@ TEST(DBTest, KeyMayExist) {
dbfull()->Flush(FlushOptions());
value.clear();
value_found = false;
long numopen = options.statistics.get()->getTickerCount(NO_FILE_OPENS);
long cache_miss =
options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS);
ASSERT_TRUE(db_->KeyMayExist(ropts, "a", &value, &value_found));
ASSERT_TRUE(!value_found);
// assert that no new files were opened and no new blocks were
// read into block cache.
ASSERT_EQ(numopen, options.statistics.get()->getTickerCount(NO_FILE_OPENS));
ASSERT_EQ(cache_miss,
options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS));
ASSERT_OK(db_->Delete(WriteOptions(), "a"));
numopen = options.statistics.get()->getTickerCount(NO_FILE_OPENS);
cache_miss = options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS);
ASSERT_TRUE(!db_->KeyMayExist(ropts, "a", &value));
ASSERT_EQ(numopen, options.statistics.get()->getTickerCount(NO_FILE_OPENS));
ASSERT_EQ(cache_miss,
options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS));
dbfull()->Flush(FlushOptions());
dbfull()->CompactRange(nullptr, nullptr);
numopen = options.statistics.get()->getTickerCount(NO_FILE_OPENS);
cache_miss = options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS);
ASSERT_TRUE(!db_->KeyMayExist(ropts, "a", &value));
ASSERT_EQ(numopen, options.statistics.get()->getTickerCount(NO_FILE_OPENS));
ASSERT_EQ(cache_miss,
options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS));
ASSERT_OK(db_->Delete(WriteOptions(), "c"));
numopen = options.statistics.get()->getTickerCount(NO_FILE_OPENS);
cache_miss = options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS);
ASSERT_TRUE(!db_->KeyMayExist(ropts, "c", &value));
ASSERT_EQ(numopen, options.statistics.get()->getTickerCount(NO_FILE_OPENS));
ASSERT_EQ(cache_miss,
options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS));
delete options.filter_policy;
} while (ChangeOptions());
}
TEST(DBTest, NonBlockingIteration) {
do {
ReadOptions non_blocking_opts, regular_opts;
Options options = CurrentOptions();
options.statistics = leveldb::CreateDBStatistics();
non_blocking_opts.read_tier = kBlockCacheTier;
Reopen(&options);
// write one kv to the database.
ASSERT_OK(db_->Put(WriteOptions(), "a", "b"));
// scan using non-blocking iterator. We should find it because
// it is in memtable.
Iterator* iter = db_->NewIterator(non_blocking_opts);
int count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_TRUE(iter->status().ok());
count++;
}
ASSERT_EQ(count, 1);
delete iter;
// flush memtable to storage. Now, the key should not be in the
// memtable neither in the block cache.
dbfull()->Flush(FlushOptions());
// verify that a non-blocking iterator does not find any
// kvs. Neither does it do any IOs to storage.
long numopen = options.statistics.get()->getTickerCount(NO_FILE_OPENS);
long cache_miss =
options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS);
iter = db_->NewIterator(non_blocking_opts);
count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
count++;
}
ASSERT_EQ(count, 0);
ASSERT_TRUE(iter->status().IsIncomplete());
ASSERT_EQ(numopen, options.statistics.get()->getTickerCount(NO_FILE_OPENS));
ASSERT_EQ(cache_miss,
options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS));
delete iter;
// read in the specified block via a regular get
ASSERT_EQ(Get("a"), "b");
// verify that we can find it via a non-blocking scan
numopen = options.statistics.get()->getTickerCount(NO_FILE_OPENS);
cache_miss = options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS);
iter = db_->NewIterator(non_blocking_opts);
count = 0;
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_TRUE(iter->status().ok());
count++;
}
ASSERT_EQ(count, 1);
ASSERT_EQ(numopen, options.statistics.get()->getTickerCount(NO_FILE_OPENS));
ASSERT_EQ(cache_miss,
options.statistics.get()->getTickerCount(BLOCK_CACHE_MISS));
delete iter;
} while (ChangeOptions());
}
// A delete is skipped for key if KeyMayExist(key) returns False
// Tests Writebatch consistency and proper delete behaviour
TEST(DBTest, FilterDeletes) {

@ -48,7 +48,7 @@ Status TableCache::FindTable(const EnvOptions& toptions,
*handle = cache_->Lookup(key);
if (*handle == nullptr) {
if (no_io) { // Dont do IO and return a not-found status
return Status::NotFound("Table not found in table_cache, no_io is set");
return Status::Incomplete("Table not found in table_cache, no_io is set");
}
if (table_io != nullptr) {
*table_io = true; // we had to do IO from storage
@ -90,7 +90,8 @@ Iterator* TableCache::NewIterator(const ReadOptions& options,
}
Cache::Handle* handle = nullptr;
Status s = FindTable(toptions, file_number, file_size, &handle);
Status s = FindTable(toptions, file_number, file_size, &handle,
nullptr, options.read_tier == kBlockCacheTier);
if (!s.ok()) {
return NewErrorIterator(s);
}
@ -117,17 +118,17 @@ Status TableCache::Get(const ReadOptions& options,
void* arg,
bool (*saver)(void*, const Slice&, const Slice&, bool),
bool* table_io,
void (*mark_key_may_exist)(void*),
const bool no_io) {
void (*mark_key_may_exist)(void*)) {
Cache::Handle* handle = nullptr;
Status s = FindTable(storage_options_, file_number, file_size,
&handle, table_io, no_io);
&handle, table_io,
options.read_tier == kBlockCacheTier);
if (s.ok()) {
Table* t =
reinterpret_cast<Table*>(cache_->Value(handle));
s = t->InternalGet(options, k, arg, saver, mark_key_may_exist, no_io);
s = t->InternalGet(options, k, arg, saver, mark_key_may_exist);
cache_->Release(handle);
} else if (no_io && s.IsNotFound()) {
} else if (options.read_tier && s.IsIncomplete()) {
// Couldnt find Table in cache but treat as kFound if no_io set
(*mark_key_may_exist)(arg);
return Status::OK();

@ -49,8 +49,7 @@ class TableCache {
void* arg,
bool (*handle_result)(void*, const Slice&, const Slice&, bool),
bool* table_io,
void (*mark_key_may_exist)(void*) = nullptr,
const bool no_io = false);
void (*mark_key_may_exist)(void*) = nullptr);
// Determine whether the table may contain the specified prefix. If
// the table index of blooms are not in memory, this may cause an I/O

@ -415,7 +415,6 @@ void Version::Get(const ReadOptions& options,
std::deque<std::string>* operands,
GetStats* stats,
const Options& db_options,
const bool no_io,
bool* value_found) {
Slice ikey = k.internal_key();
Slice user_key = k.user_key();
@ -425,9 +424,6 @@ void Version::Get(const ReadOptions& options,
auto logger = db_options.info_log;
assert(status->ok() || status->IsMergeInProgress());
if (no_io) {
assert(status->ok());
}
Saver saver;
saver.state = status->ok()? kNotFound : kMerge;
saver.ucmp = ucmp;
@ -516,7 +512,7 @@ void Version::Get(const ReadOptions& options,
bool tableIO = false;
*status = vset_->table_cache_->Get(options, f->number, f->file_size,
ikey, &saver, SaveValue, &tableIO,
MarkKeyMayExist, no_io);
MarkKeyMayExist);
// TODO: examine the behavior for corrupted key
if (!status->ok()) {
return;

@ -76,7 +76,7 @@ class Version {
};
void Get(const ReadOptions&, const LookupKey& key, std::string* val,
Status* status, std::deque<std::string>* operands, GetStats* stats,
const Options& db_option, const bool no_io = false,
const Options& db_option,
bool* value_found = nullptr);
// Adds "stats" into the current state. Returns true if a new

@ -65,6 +65,8 @@ class Iterator {
virtual Slice value() const = 0;
// If an error has occurred, return it. Else return an ok status.
// If non-blocking IO is requested and this operation cannot be
// satisfied without doing some IO, then this returns Status::Incomplete().
virtual Status status() const = 0;
// Clients are allowed to register function/arg1/arg2 triples that

@ -543,6 +543,18 @@ struct Options {
std::shared_ptr<CompactionFilterFactory> compaction_filter_factory;
};
//
// An application can issue a read request (via Get/Iterators) and specify
// if that read should process data that ALREADY resides on a specified cache
// level. For example, if an application specifies kBlockCacheTier then the
// Get call will process data that is already processed in the memtable or
// the block cache. It will not page in data from the OS cache or data that
// resides in storage.
enum ReadTier {
kReadAllTier = 0x0, // data in memtable, block cache, OS cache or storage
kBlockCacheTier = 0x1 // data in memtable or block cache
};
// Options that control read operations
struct ReadOptions {
// If true, all data read from underlying storage will be
@ -575,15 +587,23 @@ struct ReadOptions {
// Default: nullptr
const Slice* prefix;
// Specify if this read request should process data that ALREADY
// resides on a particular cache. If the required data is not
// found at the specified cache, then Status::WouldBlock is returned.
// Default: kReadAllTier
ReadTier read_tier;
ReadOptions()
: verify_checksums(false),
fill_cache(true),
snapshot(nullptr),
prefix(nullptr) {
prefix(nullptr),
read_tier(kReadAllTier) {
}
ReadOptions(bool cksum, bool cache) :
verify_checksums(cksum), fill_cache(cache),
snapshot(nullptr), prefix(nullptr) {
snapshot(nullptr), prefix(nullptr),
read_tier(kReadAllTier) {
}
};

@ -50,6 +50,9 @@ class Status {
static Status MergeInProgress(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kMergeInProgress, msg, msg2);
}
static Status Incomplete(const Slice& msg, const Slice& msg2 = Slice()) {
return Status(kIncomplete, msg, msg2);
}
// Returns true iff the status indicates success.
bool ok() const { return (state_ == nullptr); }
@ -72,6 +75,9 @@ class Status {
// Returns true iff the status indicates an MergeInProgress.
bool IsMergeInProgress() const { return code() == kMergeInProgress; }
// Returns true iff the status indicates Incomplete
bool IsIncomplete() const { return code() == kIncomplete; }
// Return a string representation of this status suitable for printing.
// Returns the string "OK" for success.
std::string ToString() const;
@ -92,6 +98,7 @@ class Status {
kInvalidArgument = 4,
kIOError = 5,
kMergeInProgress = 6,
kIncomplete = 7
};
Code code() const {

@ -237,8 +237,8 @@ Iterator* Table::BlockReader(void* arg,
const ReadOptions& options,
const Slice& index_value,
bool* didIO,
bool for_compaction,
const bool no_io) {
bool for_compaction) {
const bool no_io = (options.read_tier == kBlockCacheTier);
Table* table = reinterpret_cast<Table*>(arg);
Cache* block_cache = table->rep_->options.block_cache.get();
std::shared_ptr<Statistics> statistics = table->rep_->options.statistics;
@ -268,7 +268,8 @@ Iterator* Table::BlockReader(void* arg,
RecordTick(statistics, BLOCK_CACHE_HIT);
} else if (no_io) {
return nullptr; // Did not find in block_cache and can't do IO
// Did not find in block_cache and can't do IO
return NewErrorIterator(Status::Incomplete("no blocking io"));
} else {
Histograms histogram = for_compaction ?
READ_BLOCK_COMPACTION_MICROS : READ_BLOCK_GET_MICROS;
@ -292,7 +293,8 @@ Iterator* Table::BlockReader(void* arg,
RecordTick(statistics, BLOCK_CACHE_MISS);
}
} else if (no_io) {
return nullptr; // Could not read from block_cache and can't do IO
// Could not read from block_cache and can't do IO
return NewErrorIterator(Status::Incomplete("no blocking io"));
}else {
s = ReadBlock(table->rep_->file.get(), options, handle, &block, didIO);
}
@ -401,8 +403,7 @@ Status Table::InternalGet(const ReadOptions& options, const Slice& k,
void* arg,
bool (*saver)(void*, const Slice&, const Slice&,
bool),
void (*mark_key_may_exist)(void*),
const bool no_io) {
void (*mark_key_may_exist)(void*)) {
Status s;
Iterator* iiter = rep_->index_block->NewIterator(rep_->options.comparator);
bool done = false;
@ -421,9 +422,10 @@ Status Table::InternalGet(const ReadOptions& options, const Slice& k,
} else {
bool didIO = false;
Iterator* block_iter = BlockReader(this, options, iiter->value(),
&didIO, false, no_io);
&didIO);
if (no_io && !block_iter) { // couldn't get block from block_cache
if (options.read_tier && block_iter->status().IsIncomplete()) {
// couldn't get block from block_cache
// Update Saver.state to Found because we are only looking for whether
// we can guarantee the key is not there when "no_io" is set
(*mark_key_may_exist)(arg);

@ -79,8 +79,7 @@ class Table {
const EnvOptions& soptions, const Slice&,
bool for_compaction);
static Iterator* BlockReader(void*, const ReadOptions&, const Slice&,
bool* didIO, bool for_compaction = false,
const bool no_io = false);
bool* didIO, bool for_compaction = false);
// Calls (*handle_result)(arg, ...) repeatedly, starting with the entry found
// after a call to Seek(key), until handle_result returns false.
@ -90,8 +89,7 @@ class Table {
const ReadOptions&, const Slice& key,
void* arg,
bool (*handle_result)(void* arg, const Slice& k, const Slice& v, bool),
void (*mark_key_may_exist)(void*) = nullptr,
const bool no_io = false);
void (*mark_key_may_exist)(void*) = nullptr);
void ReadMeta(const Footer& footer);

Loading…
Cancel
Save