ReadOptions.total_order_seek to allow total order seek for block-based table when hash index is enabled

Summary: as title

Test Plan: table_test

Reviewers: igor, yhchiang, sdong

Reviewed By: sdong

Subscribers: leveldb

Differential Revision: https://reviews.facebook.net/D22239
main
Lei Jin 11 years ago
parent a98badff16
commit 23861857c4
  1. 10
      db/db_impl.cc
  2. 13
      db/memtable.cc
  3. 1
      db/memtable.h
  4. 3
      db/repair.cc
  5. 11
      include/rocksdb/options.h
  6. 12
      table/block.cc
  7. 6
      table/block.h
  8. 24
      table/block_based_table_reader.cc
  9. 2
      table/block_test.cc
  10. 11
      table/cuckoo_table_reader.cc
  11. 4
      table/plain_table_reader.cc
  12. 105
      table/table_test.cc

@ -1406,7 +1406,9 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem,
FileMetaData meta;
meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
pending_outputs_[meta.fd.GetNumber()] = 0; // path 0 for level 0 file.
Iterator* iter = mem->NewIterator(ReadOptions(), true);
ReadOptions ro;
ro.total_order_seek = true;
Iterator* iter = mem->NewIterator(ro);
const SequenceNumber newest_snapshot = snapshots_.GetNewest();
const SequenceNumber earliest_seqno_in_memtable =
mem->GetFirstSequenceNumber();
@ -1473,11 +1475,13 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd,
mutex_.Unlock();
log_buffer->FlushBufferToLog();
std::vector<Iterator*> memtables;
ReadOptions ro;
ro.total_order_seek = true;
for (MemTable* m : mems) {
Log(options_.info_log,
"[%s] Flushing memtable with next log file: %" PRIu64 "\n",
cfd->GetName().c_str(), m->GetNextLogNumber());
memtables.push_back(m->NewIterator(ReadOptions(), true));
memtables.push_back(m->NewIterator(ro));
}
Iterator* iter = NewMergingIterator(&cfd->internal_comparator(),
&memtables[0], memtables.size());
@ -3300,7 +3304,7 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
MergeIteratorBuilder merge_iter_builder(&cfd->internal_comparator(), arena);
// Collect iterator for mutable mem
merge_iter_builder.AddIterator(
super_version->mem->NewIterator(options, false, arena));
super_version->mem->NewIterator(options, arena));
// Collect all needed child iterators for immutable memtables
super_version->imm->AddIterators(options, &merge_iter_builder);
// Collect iterators for files in L0 - Ln

@ -174,13 +174,13 @@ const char* EncodeKey(std::string* scratch, const Slice& target) {
class MemTableIterator: public Iterator {
public:
MemTableIterator(const MemTable& mem, const ReadOptions& options,
bool enforce_total_order, Arena* arena)
MemTableIterator(
const MemTable& mem, const ReadOptions& options, Arena* arena)
: bloom_(nullptr),
prefix_extractor_(mem.prefix_extractor_),
valid_(false),
arena_mode_(arena != nullptr) {
if (prefix_extractor_ != nullptr && !enforce_total_order) {
if (prefix_extractor_ != nullptr && !options.total_order_seek) {
bloom_ = mem.prefix_bloom_.get();
iter_ = mem.table_->GetDynamicPrefixIterator(arena);
} else {
@ -248,14 +248,13 @@ class MemTableIterator: public Iterator {
void operator=(const MemTableIterator&);
};
Iterator* MemTable::NewIterator(const ReadOptions& options,
bool enforce_total_order, Arena* arena) {
Iterator* MemTable::NewIterator(const ReadOptions& options, Arena* arena) {
if (arena == nullptr) {
return new MemTableIterator(*this, options, enforce_total_order, nullptr);
return new MemTableIterator(*this, options, nullptr);
} else {
auto mem = arena->AllocateAligned(sizeof(MemTableIterator));
return new (mem)
MemTableIterator(*this, options, enforce_total_order, arena);
MemTableIterator(*this, options, arena);
}
}

@ -82,7 +82,6 @@ class MemTable {
// Calling ~Iterator of the iterator will destroy all the states but
// those allocated in arena.
Iterator* NewIterator(const ReadOptions& options,
bool enforce_total_order = false,
Arena* arena = nullptr);
// Add an entry into memtable that maps key to value at the

@ -237,7 +237,8 @@ class Repairer {
FileMetaData meta;
meta.fd = FileDescriptor(next_file_number_++, 0, 0);
ReadOptions ro;
Iterator* iter = mem->NewIterator(ro, true /* enforce_total_order */);
ro.total_order_seek = true;
Iterator* iter = mem->NewIterator(ro);
status = BuildTable(dbname_, env_, options_, storage_options_, table_cache_,
iter, &meta, icmp_, 0, 0, kNoCompression);
delete iter;

@ -902,18 +902,25 @@ struct ReadOptions {
// Not supported in ROCKSDB_LITE mode!
bool tailing;
// Enable a total order seek regardless of index format (e.g. hash index)
// used in the table. Some table format (e.g. plain table) may not support
// this option.
bool total_order_seek;
ReadOptions()
: verify_checksums(true),
fill_cache(true),
snapshot(nullptr),
read_tier(kReadAllTier),
tailing(false) {}
tailing(false),
total_order_seek(false) {}
ReadOptions(bool cksum, bool cache)
: verify_checksums(cksum),
fill_cache(cache),
snapshot(nullptr),
read_tier(kReadAllTier),
tailing(false) {}
tailing(false),
total_order_seek(false) {}
};
// Options that control write operations

@ -321,7 +321,8 @@ Block::~Block() {
}
}
Iterator* Block::NewIterator(const Comparator* cmp, BlockIter* iter) {
Iterator* Block::NewIterator(
const Comparator* cmp, BlockIter* iter, bool total_order_seek) {
if (size_ < 2*sizeof(uint32_t)) {
if (iter != nullptr) {
iter->SetStatus(Status::Corruption("bad block contents"));
@ -339,12 +340,17 @@ Iterator* Block::NewIterator(const Comparator* cmp, BlockIter* iter) {
return NewEmptyIterator();
}
} else {
BlockHashIndex* hash_index_ptr =
total_order_seek ? nullptr : hash_index_.get();
BlockPrefixIndex* prefix_index_ptr =
total_order_seek ? nullptr : prefix_index_.get();
if (iter != nullptr) {
iter->Initialize(cmp, data_, restart_offset_, num_restarts,
hash_index_.get(), prefix_index_.get());
hash_index_ptr, prefix_index_ptr);
} else {
iter = new BlockIter(cmp, data_, restart_offset_, num_restarts,
hash_index_.get(), prefix_index_.get());
hash_index_ptr, prefix_index_ptr);
}
}

@ -45,8 +45,12 @@ class Block {
//
// If iter is null, return new Iterator
// If iter is not null, update this one and return it as Iterator*
//
// If total_order_seek is true, hash_index_ and prefix_index_ are ignored.
// This option only applies for index block. For data block, hash_index_
// and prefix_index_ are null, so this option does not matter.
Iterator* NewIterator(const Comparator* comparator,
BlockIter* iter = nullptr);
BlockIter* iter = nullptr, bool total_order_seek = true);
void SetBlockHashIndex(BlockHashIndex* hash_index);
void SetBlockPrefixIndex(BlockPrefixIndex* prefix_index);

@ -137,7 +137,8 @@ class BlockBasedTable::IndexReader {
// Create an iterator for index access.
// An iter is passed in, if it is not null, update this one and return it
// If it is null, create a new Iterator
virtual Iterator* NewIterator(BlockIter* iter = nullptr) = 0;
virtual Iterator* NewIterator(
BlockIter* iter = nullptr, bool total_order_seek = true) = 0;
// The size of the index.
virtual size_t size() const = 0;
@ -174,8 +175,9 @@ class BinarySearchIndexReader : public IndexReader {
return s;
}
virtual Iterator* NewIterator(BlockIter* iter = nullptr) override {
return index_block_->NewIterator(comparator_, iter);
virtual Iterator* NewIterator(
BlockIter* iter = nullptr, bool dont_care = true) override {
return index_block_->NewIterator(comparator_, iter, true);
}
virtual size_t size() const override { return index_block_->size(); }
@ -295,8 +297,9 @@ class HashIndexReader : public IndexReader {
return Status::OK();
}
virtual Iterator* NewIterator(BlockIter* iter = nullptr) override {
return index_block_->NewIterator(comparator_, iter);
virtual Iterator* NewIterator(
BlockIter* iter = nullptr, bool total_order_seek = true) override {
return index_block_->NewIterator(comparator_, iter, total_order_seek);
}
virtual size_t size() const override { return index_block_->size(); }
@ -818,7 +821,8 @@ Iterator* BlockBasedTable::NewIndexIterator(const ReadOptions& read_options,
BlockIter* input_iter) {
// index reader has already been pre-populated.
if (rep_->index_reader) {
return rep_->index_reader->NewIterator(input_iter);
return rep_->index_reader->NewIterator(
input_iter, read_options.total_order_seek);
}
bool no_io = read_options.read_tier == kBlockCacheTier;
@ -866,10 +870,9 @@ Iterator* BlockBasedTable::NewIndexIterator(const ReadOptions& read_options,
}
assert(cache_handle);
Iterator* iter;
iter = index_reader->NewIterator(input_iter);
auto* iter = index_reader->NewIterator(
input_iter, read_options.total_order_seek);
iter->RegisterCleanup(&ReleaseCachedEntry, block_cache, cache_handle);
return iter;
}
@ -988,6 +991,9 @@ class BlockBasedTable::BlockEntryIteratorState : public TwoLevelIteratorState {
}
bool PrefixMayMatch(const Slice& internal_key) override {
if (read_options_.total_order_seek) {
return true;
}
return table_->PrefixMayMatch(internal_key);
}

@ -172,7 +172,7 @@ void CheckBlockContents(BlockContents contents, const int max_key,
}
std::unique_ptr<Iterator> hash_iter(
reader1.NewIterator(BytewiseComparator()));
reader1.NewIterator(BytewiseComparator(), nullptr, false));
std::unique_ptr<Iterator> regular_iter(
reader2.NewIterator(BytewiseComparator()));

@ -271,10 +271,17 @@ Slice CuckooTableIterator::value() const {
return curr_value_;
}
Iterator* CuckooTableReader::NewIterator(const ReadOptions&, Arena* arena) {
extern Iterator* NewErrorIterator(const Status& status, Arena* arena);
Iterator* CuckooTableReader::NewIterator(
const ReadOptions& read_options, Arena* arena) {
if (!status().ok()) {
return NewErrorIterator(
Status::Corruption("CuckooTableReader status is not okay."));
Status::Corruption("CuckooTableReader status is not okay."), arena);
}
if (read_options.total_order_seek) {
return NewErrorIterator(
Status::InvalidArgument("total_order_seek is not supported."), arena);
}
CuckooTableIterator* iter;
if (arena == nullptr) {

@ -187,6 +187,10 @@ void PlainTableReader::SetupForCompaction() {
Iterator* PlainTableReader::NewIterator(const ReadOptions& options,
Arena* arena) {
if (options.total_order_seek && !IsTotalOrderMode()) {
return NewErrorIterator(
Status::InvalidArgument("total_order_seek not supported"), arena);
}
if (arena == nullptr) {
return new PlainTableIterator(this, prefix_extractor_ != nullptr);
} else {

@ -382,7 +382,7 @@ class TableConstructor: public Constructor {
sink_->contents().size(), &table_reader_);
}
virtual TableReader* table_reader() {
virtual TableReader* GetTableReader() {
return table_reader_.get();
}
@ -1042,7 +1042,7 @@ TEST(BlockBasedTableTest, BasicBlockBasedTableProperties) {
c.Finish(options, table_options,
GetPlainInternalComparator(options.comparator), &keys, &kvmap);
auto& props = *c.table_reader()->GetTableProperties();
auto& props = *c.GetTableReader()->GetTableProperties();
ASSERT_EQ(kvmap.size(), props.num_entries);
auto raw_key_size = kvmap.size() * 2ul;
@ -1074,10 +1074,93 @@ TEST(BlockBasedTableTest, FilterPolicyNameProperties) {
c.Finish(options, table_options,
GetPlainInternalComparator(options.comparator), &keys, &kvmap);
auto& props = *c.table_reader()->GetTableProperties();
auto& props = *c.GetTableReader()->GetTableProperties();
ASSERT_EQ("rocksdb.BuiltinBloomFilter", props.filter_policy_name);
}
TEST(BlockBasedTableTest, TotalOrderSeekOnHashIndex) {
BlockBasedTableOptions table_options;
for (int i = 0; i < 4; ++i) {
Options options;
// Make each key/value an individual block
table_options.block_size = 64;
switch (i) {
case 0:
// Binary search index
table_options.index_type = BlockBasedTableOptions::kBinarySearch;
options.table_factory.reset(new BlockBasedTableFactory(table_options));
break;
case 1:
// Hash search index
table_options.index_type = BlockBasedTableOptions::kHashSearch;
options.table_factory.reset(new BlockBasedTableFactory(table_options));
options.prefix_extractor.reset(NewFixedPrefixTransform(4));
break;
case 2:
// Hash search index with hash_index_allow_collision
table_options.index_type = BlockBasedTableOptions::kHashSearch;
table_options.hash_index_allow_collision = true;
options.table_factory.reset(new BlockBasedTableFactory(table_options));
options.prefix_extractor.reset(NewFixedPrefixTransform(4));
break;
case 3:
default:
// Hash search index with filter policy
table_options.index_type = BlockBasedTableOptions::kHashSearch;
table_options.filter_policy.reset(NewBloomFilterPolicy(10));
options.table_factory.reset(new BlockBasedTableFactory(table_options));
options.prefix_extractor.reset(NewFixedPrefixTransform(4));
break;
}
TableConstructor c(BytewiseComparator(), true);
c.Add("aaaa1", std::string('a', 56));
c.Add("bbaa1", std::string('a', 56));
c.Add("cccc1", std::string('a', 56));
c.Add("bbbb1", std::string('a', 56));
c.Add("baaa1", std::string('a', 56));
c.Add("abbb1", std::string('a', 56));
c.Add("cccc2", std::string('a', 56));
std::vector<std::string> keys;
KVMap kvmap;
c.Finish(options, table_options,
GetPlainInternalComparator(options.comparator), &keys, &kvmap);
auto props = c.GetTableReader()->GetTableProperties();
ASSERT_EQ(7u, props->num_data_blocks);
auto* reader = c.GetTableReader();
ReadOptions ro;
ro.total_order_seek = true;
std::unique_ptr<Iterator> iter(reader->NewIterator(ro));
iter->Seek(InternalKey("b", 0, kTypeValue).Encode());
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("baaa1", ExtractUserKey(iter->key()).ToString());
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("bbaa1", ExtractUserKey(iter->key()).ToString());
iter->Seek(InternalKey("bb", 0, kTypeValue).Encode());
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("bbaa1", ExtractUserKey(iter->key()).ToString());
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("bbbb1", ExtractUserKey(iter->key()).ToString());
iter->Seek(InternalKey("bbb", 0, kTypeValue).Encode());
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("bbbb1", ExtractUserKey(iter->key()).ToString());
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("cccc1", ExtractUserKey(iter->key()).ToString());
}
}
static std::string RandomString(Random* rnd, int len) {
std::string r;
test::RandomString(rnd, len, &r);
@ -1125,9 +1208,9 @@ TEST(TableTest, HashIndexTest) {
std::unique_ptr<InternalKeyComparator> comparator(
new InternalKeyComparator(BytewiseComparator()));
c.Finish(options, table_options, *comparator, &keys, &kvmap);
auto reader = c.table_reader();
auto reader = c.GetTableReader();
auto props = c.table_reader()->GetTableProperties();
auto props = reader->GetTableProperties();
ASSERT_EQ(5u, props->num_data_blocks);
std::unique_ptr<Iterator> hash_iter(reader->NewIterator(ReadOptions()));
@ -1234,7 +1317,7 @@ TEST(BlockBasedTableTest, IndexSizeStat) {
c.Finish(options, table_options,
GetPlainInternalComparator(options.comparator), &ks, &kvmap);
auto index_size = c.table_reader()->GetTableProperties()->index_size;
auto index_size = c.GetTableReader()->GetTableProperties()->index_size;
ASSERT_GT(index_size, last_index_size);
last_index_size = index_size;
}
@ -1261,7 +1344,7 @@ TEST(BlockBasedTableTest, NumBlockStat) {
c.Finish(options, table_options,
GetPlainInternalComparator(options.comparator), &ks, &kvmap);
ASSERT_EQ(kvmap.size(),
c.table_reader()->GetTableProperties()->num_data_blocks);
c.GetTableReader()->GetTableProperties()->num_data_blocks);
}
// A simple tool that takes the snapshot of block cache statistics.
@ -1338,7 +1421,7 @@ TEST(BlockBasedTableTest, BlockCacheDisabledTest) {
GetPlainInternalComparator(options.comparator), &keys, &kvmap);
// preloading filter/index blocks is enabled.
auto reader = dynamic_cast<BlockBasedTable*>(c.table_reader());
auto reader = dynamic_cast<BlockBasedTable*>(c.GetTableReader());
ASSERT_TRUE(reader->TEST_filter_block_preloaded());
ASSERT_TRUE(reader->TEST_index_reader_preloaded());
@ -1379,7 +1462,7 @@ TEST(BlockBasedTableTest, FilterBlockInBlockCache) {
c.Finish(options, table_options,
GetPlainInternalComparator(options.comparator), &keys, &kvmap);
// preloading filter/index blocks is prohibited.
auto reader = dynamic_cast<BlockBasedTable*>(c.table_reader());
auto reader = dynamic_cast<BlockBasedTable*>(c.GetTableReader());
ASSERT_TRUE(!reader->TEST_filter_block_preloaded());
ASSERT_TRUE(!reader->TEST_index_reader_preloaded());
@ -1513,7 +1596,7 @@ TEST(BlockBasedTableTest, BlockCacheLeak) {
ASSERT_OK(iter->status());
ASSERT_OK(c.Reopen(opt));
auto table_reader = dynamic_cast<BlockBasedTable*>(c.table_reader());
auto table_reader = dynamic_cast<BlockBasedTable*>(c.GetTableReader());
for (const std::string& key : keys) {
ASSERT_TRUE(table_reader->TEST_KeyInCache(ReadOptions(), key));
}
@ -1522,7 +1605,7 @@ TEST(BlockBasedTableTest, BlockCacheLeak) {
table_options.block_cache = NewLRUCache(16 * 1024 * 1024);
opt.table_factory.reset(NewBlockBasedTableFactory(table_options));
ASSERT_OK(c.Reopen(opt));
table_reader = dynamic_cast<BlockBasedTable*>(c.table_reader());
table_reader = dynamic_cast<BlockBasedTable*>(c.GetTableReader());
for (const std::string& key : keys) {
ASSERT_TRUE(!table_reader->TEST_KeyInCache(ReadOptions(), key));
}

Loading…
Cancel
Save