use GetContext to replace callback function pointer

Summary:
Intead of passing callback function pointer and its arg on Table::Get()
interface, passing GetContext. This makes the interface cleaner and
possible better perf. Also adding a fast pass for SaveValue()

Test Plan: make all check

Reviewers: igor, yhchiang, sdong

Reviewed By: sdong

Subscribers: leveldb

Differential Revision: https://reviews.facebook.net/D24057
main
Lei Jin 10 years ago
parent 983d2de2de
commit 2faf49d5f1
  1. 12
      Makefile
  2. 815
      db/simple_table_db_test.cc
  3. 11
      db/table_cache.cc
  4. 7
      db/table_cache.h
  5. 114
      db/version_set.cc
  6. 22
      db/version_set.h
  7. 12
      table/block_based_table_reader.cc
  8. 7
      table/block_based_table_reader.h
  9. 14
      table/cuckoo_table_reader.cc
  10. 8
      table/cuckoo_table_reader.h
  11. 108
      table/cuckoo_table_reader_test.cc
  12. 101
      table/get_context.cc
  13. 47
      table/get_context.h
  14. 10
      table/plain_table_reader.cc
  15. 7
      table/plain_table_reader.h
  16. 21
      table/table_reader.h
  17. 16
      table/table_reader_bench.cc
  18. 6
      table/table_test.cc
  19. 50
      utilities/compacted_db/compacted_db_impl.cc
  20. 3
      utilities/compacted_db/compacted_db_impl.h

@ -122,7 +122,6 @@ TESTS = \
reduce_levels_test \
plain_table_db_test \
prefix_test \
simple_table_db_test \
skiplist_test \
stringappend_test \
ttl_test \
@ -371,9 +370,6 @@ log_write_bench: util/log_write_bench.o $(LIBOBJECTS) $(TESTHARNESS)
plain_table_db_test: db/plain_table_db_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/plain_table_db_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
simple_table_db_test: db/simple_table_db_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) db/simple_table_db_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS)
table_reader_bench: table/table_reader_bench.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) table/table_reader_bench.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) -pg
@ -523,11 +519,11 @@ libz.a:
curl -O http://zlib.net/zlib-1.2.8.tar.gz
tar xvzf zlib-1.2.8.tar.gz
cd zlib-1.2.8 && CFLAGS='-fPIC' ./configure --static && make
cp zlib-1.2.8/libz.a .
cp zlib-1.2.8/libz.a .
libbz2.a:
-rm -rf bzip2-1.0.6
curl -O http://www.bzip.org/1.0.6/bzip2-1.0.6.tar.gz
curl -O http://www.bzip.org/1.0.6/bzip2-1.0.6.tar.gz
tar xvzf bzip2-1.0.6.tar.gz
cd bzip2-1.0.6 && make CFLAGS='-fPIC -Wall -Winline -O2 -g -D_FILE_OFFSET_BITS=64'
cp bzip2-1.0.6/libbz2.a .
@ -539,7 +535,7 @@ libsnappy.a:
cd snappy-1.1.1 && ./configure --with-pic --enable-static
cd snappy-1.1.1 && make
cp snappy-1.1.1/.libs/libsnappy.a .
rocksdbjavastatic: libz.a libbz2.a libsnappy.a
OPT="-fPIC -DNDEBUG -O2" $(MAKE) $(LIBRARY) -j
@ -547,7 +543,7 @@ rocksdbjavastatic: libz.a libbz2.a libsnappy.a
rm -f ./java/$(ROCKSDBJNILIB)
$(CXX) $(CXXFLAGS) -I./java/. $(JAVA_INCLUDE) -shared -fPIC -o ./java/$(ROCKSDBJNILIB) $(JNI_NATIVE_SOURCES) $(LIBOBJECTS) $(COVERAGEFLAGS) libz.a libbz2.a libsnappy.a
cd java;jar -cf $(ROCKSDB_JAR) org/rocksdb/*.class org/rocksdb/util/*.class HISTORY*.md $(ROCKSDBJNILIB)
rocksdbjava:
OPT="-fPIC -DNDEBUG -O2" $(MAKE) $(LIBRARY) -j32

@ -1,815 +0,0 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <algorithm>
#include <set>
#include "rocksdb/db.h"
#include "rocksdb/filter_policy.h"
#include "db/db_impl.h"
#include "db/filename.h"
#include "db/version_set.h"
#include "db/write_batch_internal.h"
#include "rocksdb/statistics.h"
#include "rocksdb/cache.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/env.h"
#include "rocksdb/table.h"
#include "rocksdb/table_properties.h"
#include "table/table_builder.h"
#include "util/hash.h"
#include "util/logging.h"
#include "util/mutexlock.h"
#include "util/testharness.h"
#include "util/testutil.h"
#include "utilities/merge_operators.h"
using std::unique_ptr;
// IS THIS FILE STILL NEEDED?
namespace rocksdb {
// SimpleTable is a simple table format for UNIT TEST ONLY. It is not built
// as production quality.
// SimpleTable requires the input key size to be fixed 16 bytes, value cannot
// be longer than 150000 bytes and stored data on disk in this format:
// +--------------------------------------------+ <= key1 offset
// | key1 | value_size (4 bytes) | |
// +----------------------------------------+ |
// | value1 |
// | |
// +----------------------------------------+---+ <= key2 offset
// | key2 | value_size (4 bytes) | |
// +----------------------------------------+ |
// | value2 |
// | |
// | ...... |
// +-----------------+--------------------------+ <= index_block_offset
// | key1 | key1 offset (8 bytes) |
// +-----------------+--------------------------+
// | key2 | key2 offset (8 bytes) |
// +-----------------+--------------------------+
// | key3 | key3 offset (8 bytes) |
// +-----------------+--------------------------+
// | ...... |
// +-----------------+------------+-------------+
// | index_block_offset (8 bytes) |
// +------------------------------+
// SimpleTable is a simple table format for UNIT TEST ONLY. It is not built
// as production quality.
class SimpleTableReader: public TableReader {
public:
// Attempt to open the table that is stored in bytes [0..file_size)
// of "file", and read the metadata entries necessary to allow
// retrieving data from the table.
//
// If successful, returns ok and sets "*table" to the newly opened
// table. The client should delete "*table" when no longer needed.
// If there was an error while initializing the table, sets "*table"
// to nullptr and returns a non-ok status. Does not take ownership of
// "*source", but the client must ensure that "source" remains live
// for the duration of the returned table's lifetime.
//
// *file must remain live while this Table is in use.
static Status Open(const ImmutableCFOptions& options,
const EnvOptions& env_options,
unique_ptr<RandomAccessFile> && file, uint64_t file_size,
unique_ptr<TableReader>* table_reader);
Iterator* NewIterator(const ReadOptions&, Arena* arena) override;
Status Get(const ReadOptions&, const Slice& key, void* arg,
bool (*handle_result)(void* arg, const ParsedInternalKey& k,
const Slice& v),
void (*mark_key_may_exist)(void*) = nullptr) override;
uint64_t ApproximateOffsetOf(const Slice& key) override;
virtual size_t ApproximateMemoryUsage() const override { return 0; }
void SetupForCompaction() override;
std::shared_ptr<const TableProperties> GetTableProperties() const override;
~SimpleTableReader();
private:
struct Rep;
Rep* rep_;
explicit SimpleTableReader(Rep* rep) {
rep_ = rep;
}
friend class TableCache;
friend class SimpleTableIterator;
Status GetOffset(const Slice& target, uint64_t* offset);
// No copying allowed
explicit SimpleTableReader(const TableReader&) = delete;
void operator=(const TableReader&) = delete;
};
// Iterator to iterate SimpleTable
class SimpleTableIterator: public Iterator {
public:
explicit SimpleTableIterator(SimpleTableReader* table);
~SimpleTableIterator();
bool Valid() const;
void SeekToFirst();
void SeekToLast();
void Seek(const Slice& target);
void Next();
void Prev();
Slice key() const;
Slice value() const;
Status status() const;
private:
SimpleTableReader* table_;
uint64_t offset_;
uint64_t next_offset_;
Slice key_;
Slice value_;
char tmp_str_[4];
char* key_str_;
char* value_str_;
int value_str_len_;
Status status_;
// No copying allowed
SimpleTableIterator(const SimpleTableIterator&) = delete;
void operator=(const Iterator&) = delete;
};
struct SimpleTableReader::Rep {
~Rep() {
}
Rep(const ImmutableCFOptions& ioptions, const EnvOptions& env_options,
uint64_t index_start_offset, int num_entries) :
ioptions(ioptions), env_options(env_options),
index_start_offset(index_start_offset), num_entries(num_entries) {
}
const ImmutableCFOptions& ioptions;
const EnvOptions& env_options;
Status status;
unique_ptr<RandomAccessFile> file;
uint64_t index_start_offset;
int num_entries;
std::shared_ptr<TableProperties> table_properties;
const static int user_key_size = 16;
const static int offset_length = 8;
const static int key_footer_len = 8;
static int GetInternalKeyLength() {
return user_key_size + key_footer_len;
}
};
SimpleTableReader::~SimpleTableReader() {
delete rep_;
}
Status SimpleTableReader::Open(const ImmutableCFOptions& ioptions,
const EnvOptions& env_options,
unique_ptr<RandomAccessFile> && file,
uint64_t size,
unique_ptr<TableReader>* table_reader) {
char footer_space[Rep::offset_length];
Slice footer_input;
Status s = file->Read(size - Rep::offset_length, Rep::offset_length,
&footer_input, footer_space);
if (s.ok()) {
uint64_t index_start_offset = DecodeFixed64(footer_space);
int num_entries = (size - Rep::offset_length - index_start_offset)
/ (Rep::GetInternalKeyLength() + Rep::offset_length);
SimpleTableReader::Rep* rep = new SimpleTableReader::Rep(
ioptions, env_options, index_start_offset, num_entries);
rep->file = std::move(file);
table_reader->reset(new SimpleTableReader(rep));
}
return s;
}
void SimpleTableReader::SetupForCompaction() {
}
std::shared_ptr<const TableProperties> SimpleTableReader::GetTableProperties()
const {
return rep_->table_properties;
}
Iterator* SimpleTableReader::NewIterator(const ReadOptions& options,
Arena* arena) {
if (arena == nullptr) {
return new SimpleTableIterator(this);
} else {
auto mem = arena->AllocateAligned(sizeof(SimpleTableIterator));
return new (mem) SimpleTableIterator(this);
}
}
Status SimpleTableReader::GetOffset(const Slice& target, uint64_t* offset) {
uint32_t left = 0;
uint32_t right = rep_->num_entries - 1;
char key_chars[Rep::GetInternalKeyLength()];
Slice tmp_slice;
uint32_t target_offset = 0;
while (left <= right) {
uint32_t mid = (left + right + 1) / 2;
uint64_t offset_to_read = rep_->index_start_offset
+ (Rep::GetInternalKeyLength() + Rep::offset_length) * mid;
Status s = rep_->file->Read(offset_to_read, Rep::GetInternalKeyLength(),
&tmp_slice, key_chars);
if (!s.ok()) {
return s;
}
InternalKeyComparator ikc(rep_->ioptions.comparator);
int compare_result = ikc.Compare(tmp_slice, target);
if (compare_result < 0) {
if (left == right) {
target_offset = right + 1;
break;
}
left = mid;
} else {
if (left == right) {
target_offset = left;
break;
}
right = mid - 1;
}
}
if (target_offset >= (uint32_t) rep_->num_entries) {
*offset = rep_->index_start_offset;
return Status::OK();
}
char value_offset_chars[Rep::offset_length];
int64_t offset_for_value_offset = rep_->index_start_offset
+ (Rep::GetInternalKeyLength() + Rep::offset_length) * target_offset
+ Rep::GetInternalKeyLength();
Status s = rep_->file->Read(offset_for_value_offset, Rep::offset_length,
&tmp_slice, value_offset_chars);
if (s.ok()) {
*offset = DecodeFixed64(value_offset_chars);
}
return s;
}
Status SimpleTableReader::Get(const ReadOptions& options, const Slice& k,
void* arg,
bool (*saver)(void*, const ParsedInternalKey&,
const Slice&),
void (*mark_key_may_exist)(void*)) {
Status s;
SimpleTableIterator* iter = new SimpleTableIterator(this);
for (iter->Seek(k); iter->Valid(); iter->Next()) {
ParsedInternalKey parsed_key;
if (!ParseInternalKey(iter->key(), &parsed_key)) {
return Status::Corruption(Slice());
}
if (!(*saver)(arg, parsed_key, iter->value())) {
break;
}
}
s = iter->status();
delete iter;
return s;
}
uint64_t SimpleTableReader::ApproximateOffsetOf(const Slice& key) {
return 0;
}
SimpleTableIterator::SimpleTableIterator(SimpleTableReader* table) :
table_(table) {
key_str_ = new char[SimpleTableReader::Rep::GetInternalKeyLength()];
value_str_len_ = -1;
SeekToFirst();
}
SimpleTableIterator::~SimpleTableIterator() {
delete[] key_str_;
if (value_str_len_ >= 0) {
delete[] value_str_;
}
}
bool SimpleTableIterator::Valid() const {
return offset_ < table_->rep_->index_start_offset;
}
void SimpleTableIterator::SeekToFirst() {
next_offset_ = 0;
Next();
}
void SimpleTableIterator::SeekToLast() {
assert(false);
}
void SimpleTableIterator::Seek(const Slice& target) {
Status s = table_->GetOffset(target, &next_offset_);
if (!s.ok()) {
status_ = s;
}
Next();
}
void SimpleTableIterator::Next() {
offset_ = next_offset_;
if (offset_ >= table_->rep_->index_start_offset) {
return;
}
Slice result;
int internal_key_size = SimpleTableReader::Rep::GetInternalKeyLength();
Status s = table_->rep_->file->Read(next_offset_, internal_key_size, &result,
key_str_);
next_offset_ += internal_key_size;
key_ = result;
Slice value_size_slice;
s = table_->rep_->file->Read(next_offset_, 4, &value_size_slice, tmp_str_);
next_offset_ += 4;
uint32_t value_size = DecodeFixed32(tmp_str_);
Slice value_slice;
if ((int) value_size > value_str_len_) {
if (value_str_len_ >= 0) {
delete[] value_str_;
}
value_str_ = new char[value_size];
value_str_len_ = value_size;
}
s = table_->rep_->file->Read(next_offset_, value_size, &value_slice,
value_str_);
next_offset_ += value_size;
value_ = value_slice;
}
void SimpleTableIterator::Prev() {
assert(false);
}
Slice SimpleTableIterator::key() const {
Log(table_->rep_->ioptions.info_log, "key!!!!");
return key_;
}
Slice SimpleTableIterator::value() const {
return value_;
}
Status SimpleTableIterator::status() const {
return status_;
}
class SimpleTableBuilder: public TableBuilder {
public:
// Create a builder that will store the contents of the table it is
// building in *file. Does not close the file. It is up to the
// caller to close the file after calling Finish(). The output file
// will be part of level specified by 'level'. A value of -1 means
// that the caller does not know which level the output file will reside.
SimpleTableBuilder(const ImmutableCFOptions& ioptions, WritableFile* file,
CompressionType compression_type);
// REQUIRES: Either Finish() or Abandon() has been called.
~SimpleTableBuilder();
// Add key,value to the table being constructed.
// REQUIRES: key is after any previously added key according to comparator.
// REQUIRES: Finish(), Abandon() have not been called
void Add(const Slice& key, const Slice& value) override;
// Return non-ok iff some error has been detected.
Status status() const override;
// Finish building the table. Stops using the file passed to the
// constructor after this function returns.
// REQUIRES: Finish(), Abandon() have not been called
Status Finish() override;
// Indicate that the contents of this builder should be abandoned. Stops
// using the file passed to the constructor after this function returns.
// If the caller is not going to call Finish(), it must call Abandon()
// before destroying this builder.
// REQUIRES: Finish(), Abandon() have not been called
void Abandon() override;
// Number of calls to Add() so far.
uint64_t NumEntries() const override;
// Size of the file generated so far. If invoked after a successful
// Finish() call, returns the size of the final generated file.
uint64_t FileSize() const override;
private:
struct Rep;
Rep* rep_;
// No copying allowed
SimpleTableBuilder(const SimpleTableBuilder&) = delete;
void operator=(const SimpleTableBuilder&) = delete;
};
struct SimpleTableBuilder::Rep {
const ImmutableCFOptions& ioptions;
WritableFile* file;
uint64_t offset = 0;
Status status;
uint64_t num_entries = 0;
bool closed = false; // Either Finish() or Abandon() has been called.
const static int user_key_size = 16;
const static int offset_length = 8;
const static int key_footer_len = 8;
static int GetInternalKeyLength() {
return user_key_size + key_footer_len;
}
std::string index;
Rep(const ImmutableCFOptions& iopt, WritableFile* f) :
ioptions(iopt), file(f) {
}
~Rep() {
}
};
SimpleTableBuilder::SimpleTableBuilder(const ImmutableCFOptions& ioptions,
WritableFile* file,
CompressionType compression_type) :
rep_(new SimpleTableBuilder::Rep(ioptions, file)) {
}
SimpleTableBuilder::~SimpleTableBuilder() {
delete (rep_);
}
void SimpleTableBuilder::Add(const Slice& key, const Slice& value) {
assert((int ) key.size() == Rep::GetInternalKeyLength());
// Update index
rep_->index.append(key.data(), key.size());
PutFixed64(&(rep_->index), rep_->offset);
// Write key-value pair
rep_->file->Append(key);
rep_->offset += Rep::GetInternalKeyLength();
std::string size;
int value_size = value.size();
PutFixed32(&size, value_size);
Slice sizeSlice(size);
rep_->file->Append(sizeSlice);
rep_->file->Append(value);
rep_->offset += value_size + 4;
rep_->num_entries++;
}
Status SimpleTableBuilder::status() const {
return Status::OK();
}
Status SimpleTableBuilder::Finish() {
Rep* r = rep_;
assert(!r->closed);
r->closed = true;
uint64_t index_offset = rep_->offset;
Slice index_slice(rep_->index);
rep_->file->Append(index_slice);
rep_->offset += index_slice.size();
std::string index_offset_str;
PutFixed64(&index_offset_str, index_offset);
Slice foot_slice(index_offset_str);
rep_->file->Append(foot_slice);
rep_->offset += foot_slice.size();
return Status::OK();
}
void SimpleTableBuilder::Abandon() {
rep_->closed = true;
}
uint64_t SimpleTableBuilder::NumEntries() const {
return rep_->num_entries;
}
uint64_t SimpleTableBuilder::FileSize() const {
return rep_->offset;
}
class SimpleTableFactory: public TableFactory {
public:
~SimpleTableFactory() {
}
SimpleTableFactory() {
}
const char* Name() const override {
return "SimpleTable";
}
Status NewTableReader(const ImmutableCFOptions& ioptions,
const EnvOptions& env_options,
const InternalKeyComparator& internal_key,
unique_ptr<RandomAccessFile>&& file, uint64_t file_size,
unique_ptr<TableReader>* table_reader) const;
TableBuilder* NewTableBuilder(
const ImmutableCFOptions& ioptions,
const InternalKeyComparator& internal_key,
WritableFile* file,
const CompressionType compression_type,
const CompressionOptions& compression_opts) const;
virtual Status SanitizeDBOptions(const DBOptions* db_opts) const override {
return Status::OK();
}
virtual std::string GetPrintableTableOptions() const override {
return std::string();
}
};
Status SimpleTableFactory::NewTableReader(
const ImmutableCFOptions& ioptions,
const EnvOptions& env_options,
const InternalKeyComparator& internal_key,
unique_ptr<RandomAccessFile>&& file, uint64_t file_size,
unique_ptr<TableReader>* table_reader) const {
return SimpleTableReader::Open(ioptions, env_options, std::move(file),
file_size, table_reader);
}
TableBuilder* SimpleTableFactory::NewTableBuilder(
const ImmutableCFOptions& ioptions,
const InternalKeyComparator& internal_key,
WritableFile* file, const CompressionType compression_type,
const CompressionOptions& compression_opts) const {
return new SimpleTableBuilder(ioptions, file, compression_type);
}
class SimpleTableDBTest {
protected:
public:
std::string dbname_;
Env* env_;
DB* db_;
Options last_options_;
SimpleTableDBTest() :
env_(Env::Default()) {
dbname_ = test::TmpDir() + "/simple_table_db_test";
ASSERT_OK(DestroyDB(dbname_, Options()));
db_ = nullptr;
Reopen();
}
~SimpleTableDBTest() {
delete db_;
ASSERT_OK(DestroyDB(dbname_, Options()));
}
// Return the current option configuration.
Options CurrentOptions() {
Options options;
options.table_factory.reset(new SimpleTableFactory());
return options;
}
DBImpl* dbfull() {
return reinterpret_cast<DBImpl*>(db_);
}
void Reopen(Options* options = nullptr) {
ASSERT_OK(TryReopen(options));
}
void Close() {
delete db_;
db_ = nullptr;
}
void DestroyAndReopen(Options* options = nullptr) {
//Destroy using last options
Destroy(&last_options_);
ASSERT_OK(TryReopen(options));
}
void Destroy(Options* options) {
delete db_;
db_ = nullptr;
ASSERT_OK(DestroyDB(dbname_, *options));
}
Status PureReopen(Options* options, DB** db) {
return DB::Open(*options, dbname_, db);
}
Status TryReopen(Options* options = nullptr) {
delete db_;
db_ = nullptr;
Options opts;
if (options != nullptr) {
opts = *options;
} else {
opts = CurrentOptions();
opts.create_if_missing = true;
}
last_options_ = opts;
return DB::Open(opts, dbname_, &db_);
}
Status Put(const Slice& k, const Slice& v) {
return db_->Put(WriteOptions(), k, v);
}
Status Delete(const std::string& k) {
return db_->Delete(WriteOptions(), k);
}
std::string Get(const std::string& k, const Snapshot* snapshot = nullptr) {
ReadOptions options;
options.snapshot = snapshot;
std::string result;
Status s = db_->Get(options, k, &result);
if (s.IsNotFound()) {
result = "NOT_FOUND";
} else if (!s.ok()) {
result = s.ToString();
}
return result;
}
int NumTableFilesAtLevel(int level) {
std::string property;
ASSERT_TRUE(
db_->GetProperty("rocksdb.num-files-at-level" + NumberToString(level),
&property));
return atoi(property.c_str());
}
// Return spread of files per level
std::string FilesPerLevel() {
std::string result;
int last_non_zero_offset = 0;
for (int level = 0; level < db_->NumberLevels(); level++) {
int f = NumTableFilesAtLevel(level);
char buf[100];
snprintf(buf, sizeof(buf), "%s%d", (level ? "," : ""), f);
result += buf;
if (f > 0) {
last_non_zero_offset = result.size();
}
}
result.resize(last_non_zero_offset);
return result;
}
std::string IterStatus(Iterator* iter) {
std::string result;
if (iter->Valid()) {
result = iter->key().ToString() + "->" + iter->value().ToString();
} else {
result = "(invalid)";
}
return result;
}
};
TEST(SimpleTableDBTest, Empty) {
ASSERT_TRUE(db_ != nullptr);
ASSERT_EQ("NOT_FOUND", Get("0000000000000foo"));
}
TEST(SimpleTableDBTest, ReadWrite) {
ASSERT_OK(Put("0000000000000foo", "v1"));
ASSERT_EQ("v1", Get("0000000000000foo"));
ASSERT_OK(Put("0000000000000bar", "v2"));
ASSERT_OK(Put("0000000000000foo", "v3"));
ASSERT_EQ("v3", Get("0000000000000foo"));
ASSERT_EQ("v2", Get("0000000000000bar"));
}
TEST(SimpleTableDBTest, Flush) {
ASSERT_OK(Put("0000000000000foo", "v1"));
ASSERT_OK(Put("0000000000000bar", "v2"));
ASSERT_OK(Put("0000000000000foo", "v3"));
dbfull()->TEST_FlushMemTable();
ASSERT_EQ("v3", Get("0000000000000foo"));
ASSERT_EQ("v2", Get("0000000000000bar"));
}
TEST(SimpleTableDBTest, Flush2) {
ASSERT_OK(Put("0000000000000bar", "b"));
ASSERT_OK(Put("0000000000000foo", "v1"));
dbfull()->TEST_FlushMemTable();
ASSERT_OK(Put("0000000000000foo", "v2"));
dbfull()->TEST_FlushMemTable();
ASSERT_EQ("v2", Get("0000000000000foo"));
ASSERT_OK(Put("0000000000000eee", "v3"));
dbfull()->TEST_FlushMemTable();
ASSERT_EQ("v3", Get("0000000000000eee"));
ASSERT_OK(Delete("0000000000000bar"));
dbfull()->TEST_FlushMemTable();
ASSERT_EQ("NOT_FOUND", Get("0000000000000bar"));
ASSERT_OK(Put("0000000000000eee", "v5"));
dbfull()->TEST_FlushMemTable();
ASSERT_EQ("v5", Get("0000000000000eee"));
}
static std::string Key(int i) {
char buf[100];
snprintf(buf, sizeof(buf), "key_______%06d", i);
return std::string(buf);
}
static std::string RandomString(Random* rnd, int len) {
std::string r;
test::RandomString(rnd, len, &r);
return r;
}
TEST(SimpleTableDBTest, CompactionTrigger) {
Options options = CurrentOptions();
options.write_buffer_size = 100 << 10; //100KB
options.num_levels = 3;
options.max_mem_compaction_level = 0;
options.level0_file_num_compaction_trigger = 3;
Reopen(&options);
Random rnd(301);
for (int num = 0; num < options.level0_file_num_compaction_trigger - 1;
num++) {
std::vector<std::string> values;
// Write 120KB (12 values, each 10K)
for (int i = 0; i < 12; i++) {
values.push_back(RandomString(&rnd, 10000));
ASSERT_OK(Put(Key(i), values[i]));
}
dbfull()->TEST_WaitForFlushMemTable();
ASSERT_EQ(NumTableFilesAtLevel(0), num + 1);
}
//generate one more file in level-0, and should trigger level-0 compaction
std::vector<std::string> values;
for (int i = 0; i < 12; i++) {
values.push_back(RandomString(&rnd, 10000));
ASSERT_OK(Put(Key(i), values[i]));
}
dbfull()->TEST_WaitForCompact();
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
ASSERT_EQ(NumTableFilesAtLevel(1), 1);
}
} // namespace rocksdb
int main(int argc, char** argv) {
return rocksdb::test::RunAllTests();
}

@ -15,6 +15,7 @@
#include "rocksdb/statistics.h"
#include "table/iterator_wrapper.h"
#include "table/table_reader.h"
#include "table/get_context.h"
#include "util/coding.h"
#include "util/stop_watch.h"
@ -132,10 +133,8 @@ Iterator* TableCache::NewIterator(const ReadOptions& options,
Status TableCache::Get(const ReadOptions& options,
const InternalKeyComparator& internal_comparator,
const FileDescriptor& fd, const Slice& k, void* arg,
bool (*saver)(void*, const ParsedInternalKey&,
const Slice&),
void (*mark_key_may_exist)(void*)) {
const FileDescriptor& fd, const Slice& k,
GetContext* get_context) {
TableReader* t = fd.table_reader;
Status s;
Cache::Handle* handle = nullptr;
@ -147,13 +146,13 @@ Status TableCache::Get(const ReadOptions& options,
}
}
if (s.ok()) {
s = t->Get(options, k, arg, saver, mark_key_may_exist);
s = t->Get(options, k, get_context);
if (handle != nullptr) {
ReleaseHandle(handle);
}
} else if (options.read_tier && s.IsIncomplete()) {
// Couldnt find Table in cache but treat as kFound if no_io set
(*mark_key_may_exist)(arg);
get_context->MarkKeyMayExist();
return Status::OK();
}
return s;

@ -27,6 +27,7 @@ namespace rocksdb {
class Env;
class Arena;
struct FileDescriptor;
class GetContext;
class TableCache {
public:
@ -52,10 +53,8 @@ class TableCache {
// it returns false.
Status Get(const ReadOptions& options,
const InternalKeyComparator& internal_comparator,
const FileDescriptor& file_fd, const Slice& k, void* arg,
bool (*handle_result)(void*, const ParsedInternalKey&,
const Slice&),
void (*mark_key_may_exist)(void*) = nullptr);
const FileDescriptor& file_fd, const Slice& k,
GetContext* get_context);
// Evict any entry for the specified file number
static void Evict(Cache* cache, uint64_t file_number);

@ -37,6 +37,7 @@
#include "table/format.h"
#include "table/plain_table_factory.h"
#include "table/meta_blocks.h"
#include "table/get_context.h"
#include "util/coding.h"
#include "util/logging.h"
#include "util/stop_watch.h"
@ -627,81 +628,6 @@ void Version::AddIterators(const ReadOptions& read_options,
}
// Called from TableCache::Get and Table::Get when file/block in which
// key may exist are not there in TableCache/BlockCache respectively. In this
// case we can't guarantee that key does not exist and are not permitted to do
// IO to be certain.Set the status=kFound and value_found=false to let the
// caller know that key may exist but is not there in memory
void MarkKeyMayExist(void* arg) {
Version::Saver* s = reinterpret_cast<Version::Saver*>(arg);
s->state = Version::kFound;
if (s->value_found != nullptr) {
*(s->value_found) = false;
}
}
bool SaveValue(void* arg, const ParsedInternalKey& parsed_key,
const Slice& v) {
Version::Saver* s = reinterpret_cast<Version::Saver*>(arg);
MergeContext* merge_contex = s->merge_context;
std::string merge_result; // temporary area for merge results later
assert(s != nullptr && merge_contex != nullptr);
// TODO: Merge?
if (s->ucmp->Compare(parsed_key.user_key, s->user_key) == 0) {
// Key matches. Process it
switch (parsed_key.type) {
case kTypeValue:
if (Version::kNotFound == s->state) {
s->state = Version::kFound;
s->value->assign(v.data(), v.size());
} else if (Version::kMerge == s->state) {
assert(s->merge_operator != nullptr);
s->state = Version::kFound;
if (!s->merge_operator->FullMerge(s->user_key, &v,
merge_contex->GetOperands(),
s->value, s->logger)) {
RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
s->state = Version::kCorrupt;
}
} else {
assert(false);
}
return false;
case kTypeDeletion:
if (Version::kNotFound == s->state) {
s->state = Version::kDeleted;
} else if (Version::kMerge == s->state) {
s->state = Version::kFound;
if (!s->merge_operator->FullMerge(s->user_key, nullptr,
merge_contex->GetOperands(),
s->value, s->logger)) {
RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
s->state = Version::kCorrupt;
}
} else {
assert(false);
}
return false;
case kTypeMerge:
assert(s->state == Version::kNotFound || s->state == Version::kMerge);
s->state = Version::kMerge;
merge_contex->PushOperand(v);
return true;
default:
assert(false);
break;
}
}
// s->state could be Corrupt, merge or notfound
return false;
}
Version::Version(ColumnFamilyData* cfd, VersionSet* vset,
uint64_t version_number)
@ -756,46 +682,42 @@ void Version::Get(const ReadOptions& options,
Slice user_key = k.user_key();
assert(status->ok() || status->IsMergeInProgress());
Saver saver;
saver.state = status->ok()? kNotFound : kMerge;
saver.ucmp = user_comparator_;
saver.user_key = user_key;
saver.value_found = value_found;
saver.value = value;
saver.merge_operator = merge_operator_;
saver.merge_context = merge_context;
saver.logger = info_log_;
saver.statistics = db_statistics_;
GetContext get_context(user_comparator_, merge_operator_, info_log_,
db_statistics_, status->ok() ? GetContext::kNotFound : GetContext::kMerge,
user_key, value, value_found, merge_context);
FilePicker fp(files_, user_key, ikey, &file_levels_, num_non_empty_levels_,
&file_indexer_, user_comparator_, internal_comparator_);
FdWithKeyRange* f = fp.GetNextFile();
while (f != nullptr) {
*status = table_cache_->Get(options, *internal_comparator_, f->fd, ikey,
&saver, SaveValue, MarkKeyMayExist);
&get_context);
// TODO: examine the behavior for corrupted key
if (!status->ok()) {
return;
}
switch (saver.state) {
case kNotFound:
break; // Keep searching in other files
case kFound:
switch (get_context.State()) {
case GetContext::kNotFound:
// Keep searching in other files
break;
case GetContext::kFound:
return;
case kDeleted:
*status = Status::NotFound(); // Use empty error message for speed
case GetContext::kDeleted:
// Use empty error message for speed
*status = Status::NotFound();
return;
case kCorrupt:
case GetContext::kCorrupt:
*status = Status::Corruption("corrupted key for ", user_key);
return;
case kMerge:
case GetContext::kMerge:
break;
}
f = fp.GetNextFile();
}
if (kMerge == saver.state) {
if (GetContext::kMerge == get_context.State()) {
if (!merge_operator_) {
*status = Status::InvalidArgument(
"merge_operator is not properly initialized.");
@ -804,7 +726,7 @@ void Version::Get(const ReadOptions& options,
// merge_operands are in saver and we hit the beginning of the key history
// do a final merge of nullptr and operands;
if (merge_operator_->FullMerge(user_key, nullptr,
saver.merge_context->GetOperands(), value,
merge_context->GetOperands(), value,
info_log_)) {
*status = Status::OK();
} else {

@ -241,28 +241,6 @@ class Version {
FileMetaData* file;
};
enum SaverState {
kNotFound,
kFound,
kDeleted,
kCorrupt,
kMerge // saver contains the current merge result (the operands)
};
// Callback from TableCache::Get()
struct Saver {
SaverState state;
const Comparator* ucmp;
Slice user_key;
bool* value_found; // Is value set correctly? Used by KeyMayExist
std::string* value;
const MergeOperator* merge_operator;
// the merge operations encountered;
MergeContext* merge_context;
Logger* logger;
Statistics* statistics;
};
private:
friend class Compaction;
friend class VersionSet;

@ -33,6 +33,7 @@
#include "table/format.h"
#include "table/meta_blocks.h"
#include "table/two_level_iterator.h"
#include "table/get_context.h"
#include "util/coding.h"
#include "util/perf_context_imp.h"
@ -1100,10 +1101,8 @@ Iterator* BlockBasedTable::NewIterator(const ReadOptions& read_options,
}
Status BlockBasedTable::Get(
const ReadOptions& read_options, const Slice& key, void* handle_context,
bool (*result_handler)(void* handle_context, const ParsedInternalKey& k,
const Slice& v),
void (*mark_key_may_exist_handler)(void* handle_context)) {
const ReadOptions& read_options, const Slice& key,
GetContext* get_context) {
Status s;
auto filter_entry = GetFilter(read_options.read_tier == kBlockCacheTier);
FilterBlockReader* filter = filter_entry.value;
@ -1141,7 +1140,7 @@ Status BlockBasedTable::Get(
// couldn't get block from block_cache
// Update Saver.state to Found because we are only looking for whether
// we can guarantee the key is not there when "no_io" is set
(*mark_key_may_exist_handler)(handle_context);
get_context->MarkKeyMayExist();
break;
}
if (!biter.status().ok()) {
@ -1156,8 +1155,7 @@ Status BlockBasedTable::Get(
s = Status::Corruption(Slice());
}
if (!(*result_handler)(handle_context, parsed_key,
biter.value())) {
if (!get_context->SaveValue(parsed_key, biter.value())) {
done = true;
break;
}

@ -40,6 +40,7 @@ class WritableFile;
struct BlockBasedTableOptions;
struct EnvOptions;
struct ReadOptions;
class GetContext;
using std::unique_ptr;
@ -76,11 +77,7 @@ class BlockBasedTable : public TableReader {
Iterator* NewIterator(const ReadOptions&, Arena* arena = nullptr) override;
Status Get(const ReadOptions& readOptions, const Slice& key,
void* handle_context,
bool (*result_handler)(void* handle_context,
const ParsedInternalKey& k, const Slice& v),
void (*mark_key_may_exist_handler)(void* handle_context) =
nullptr) override;
GetContext* get_context) override;
// Given a key, return an approximate byte offset in the file where
// the data for that key begins (or would begin if the key were

@ -19,6 +19,7 @@
#include "rocksdb/table.h"
#include "table/meta_blocks.h"
#include "table/cuckoo_table_factory.h"
#include "table/get_context.h"
#include "util/arena.h"
#include "util/coding.h"
@ -126,11 +127,8 @@ CuckooTableReader::CuckooTableReader(
status_ = file_->Read(0, file_size, &file_data_, nullptr);
}
Status CuckooTableReader::Get(
const ReadOptions& readOptions, const Slice& key, void* handle_context,
bool (*result_handler)(void* arg, const ParsedInternalKey& k,
const Slice& v),
void (*mark_key_may_exist_handler)(void* handle_context)) {
Status CuckooTableReader::Get(const ReadOptions& readOptions, const Slice& key,
GetContext* get_context) {
assert(key.size() == key_length_ + (is_last_level_ ? 8 : 0));
Slice user_key = ExtractUserKey(key);
for (uint32_t hash_cnt = 0; hash_cnt < num_hash_func_; ++hash_cnt) {
@ -149,14 +147,12 @@ Status CuckooTableReader::Get(
if (ucomp_->Compare(user_key, Slice(bucket, user_key.size())) == 0) {
Slice value(bucket + key_length_, value_length_);
if (is_last_level_) {
ParsedInternalKey found_ikey(
Slice(bucket, key_length_), 0, kTypeValue);
result_handler(handle_context, found_ikey, value);
get_context->SaveValue(value);
} else {
Slice full_key(bucket, key_length_);
ParsedInternalKey found_ikey;
ParseInternalKey(full_key, &found_ikey);
result_handler(handle_context, found_ikey, value);
get_context->SaveValue(found_ikey, value);
}
// We don't support merge operations. So, we return here.
return Status::OK();

@ -40,12 +40,8 @@ class CuckooTableReader: public TableReader {
Status status() const { return status_; }
Status Get(
const ReadOptions& read_options, const Slice& key, void* handle_context,
bool (*result_handler)(void* arg, const ParsedInternalKey& k,
const Slice& v),
void (*mark_key_may_exist_handler)(void* handle_context) = nullptr)
override;
Status Get(const ReadOptions& read_options, const Slice& key,
GetContext* get_context) override;
Iterator* NewIterator(const ReadOptions&, Arena* arena = nullptr) override;
void Prepare(const Slice& target) override;

@ -25,6 +25,7 @@ int main() {
#include "table/cuckoo_table_builder.h"
#include "table/cuckoo_table_reader.h"
#include "table/cuckoo_table_factory.h"
#include "table/get_context.h"
#include "util/arena.h"
#include "util/random.h"
#include "util/testharness.h"
@ -61,25 +62,6 @@ uint64_t GetSliceHash(const Slice& s, uint32_t index,
return hash_map[s.ToString()][index];
}
// Methods, variables for checking key and values read.
struct ValuesToAssert {
ValuesToAssert(const std::string& key, const Slice& value)
: expected_user_key(key),
expected_value(value),
call_count(0) {}
std::string expected_user_key;
Slice expected_value;
int call_count;
};
bool AssertValues(void* assert_obj,
const ParsedInternalKey& k, const Slice& v) {
ValuesToAssert *ptr = reinterpret_cast<ValuesToAssert*>(assert_obj);
ASSERT_EQ(ptr->expected_value.ToString(), v.ToString());
ASSERT_EQ(ptr->expected_user_key, k.user_key.ToString());
++ptr->call_count;
return false;
}
} // namespace
class CuckooReaderTest {
@ -134,11 +116,14 @@ class CuckooReaderTest {
ucomp,
GetSliceHash);
ASSERT_OK(reader.status());
// Assume no merge/deletion
for (uint32_t i = 0; i < num_items; ++i) {
ValuesToAssert v(user_keys[i], values[i]);
ASSERT_OK(reader.Get(
ReadOptions(), Slice(keys[i]), &v, AssertValues, nullptr));
ASSERT_EQ(1, v.call_count);
std::string value;
GetContext get_context(ucomp, nullptr, nullptr, nullptr,
GetContext::kNotFound, Slice(user_keys[i]), &value,
nullptr, nullptr);
ASSERT_OK(reader.Get(ReadOptions(), Slice(keys[i]), &get_context));
ASSERT_EQ(values[i], value);
}
}
void UpdateKeys(bool with_zero_seqno) {
@ -329,6 +314,7 @@ TEST(CuckooReaderTest, WhenKeyNotFound) {
// Make all hash values collide.
AddHashLookups(user_keys[i], 0, kNumHashFunc);
}
auto* ucmp = BytewiseComparator();
CreateCuckooFileAndCheckReader();
std::unique_ptr<RandomAccessFile> read_file;
ASSERT_OK(env->NewRandomAccessFile(fname, &read_file, env_options));
@ -337,7 +323,7 @@ TEST(CuckooReaderTest, WhenKeyNotFound) {
ioptions,
std::move(read_file),
file_size,
BytewiseComparator(),
ucmp,
GetSliceHash);
ASSERT_OK(reader.status());
// Search for a key with colliding hash values.
@ -346,10 +332,11 @@ TEST(CuckooReaderTest, WhenKeyNotFound) {
AddHashLookups(not_found_user_key, 0, kNumHashFunc);
ParsedInternalKey ikey(not_found_user_key, 1000, kTypeValue);
AppendInternalKey(&not_found_key, ikey);
ValuesToAssert v("", "");
ASSERT_OK(reader.Get(
ReadOptions(), Slice(not_found_key), &v, AssertValues, nullptr));
ASSERT_EQ(0, v.call_count);
std::string value;
GetContext get_context(ucmp, nullptr, nullptr, nullptr, GetContext::kNotFound,
Slice(not_found_key), &value, nullptr, nullptr);
ASSERT_OK(reader.Get(ReadOptions(), Slice(not_found_key), &get_context));
ASSERT_TRUE(value.empty());
ASSERT_OK(reader.status());
// Search for a key with an independent hash value.
std::string not_found_user_key2 = "key" + NumToStr(num_items + 1);
@ -357,9 +344,11 @@ TEST(CuckooReaderTest, WhenKeyNotFound) {
ParsedInternalKey ikey2(not_found_user_key2, 1000, kTypeValue);
std::string not_found_key2;
AppendInternalKey(&not_found_key2, ikey2);
ASSERT_OK(reader.Get(
ReadOptions(), Slice(not_found_key2), &v, AssertValues, nullptr));
ASSERT_EQ(0, v.call_count);
GetContext get_context2(ucmp, nullptr, nullptr, nullptr,
GetContext::kNotFound, Slice(not_found_key2), &value,
nullptr, nullptr);
ASSERT_OK(reader.Get(ReadOptions(), Slice(not_found_key2), &get_context2));
ASSERT_TRUE(value.empty());
ASSERT_OK(reader.status());
// Test read when key is unused key.
@ -369,34 +358,16 @@ TEST(CuckooReaderTest, WhenKeyNotFound) {
// Add hash values that map to empty buckets.
AddHashLookups(ExtractUserKey(unused_key).ToString(),
kNumHashFunc, kNumHashFunc);
ASSERT_OK(reader.Get(
ReadOptions(), Slice(unused_key), &v, AssertValues, nullptr));
ASSERT_EQ(0, v.call_count);
GetContext get_context3(ucmp, nullptr, nullptr, nullptr,
GetContext::kNotFound, Slice(unused_key), &value,
nullptr, nullptr);
ASSERT_OK(reader.Get(ReadOptions(), Slice(unused_key), &get_context3));
ASSERT_TRUE(value.empty());
ASSERT_OK(reader.status());
}
// Performance tests
namespace {
int64_t found_count = 0;
std::string value;
bool DoNothing(void* arg, const ParsedInternalKey& k, const Slice& v) {
// Deliberately empty.
if (*reinterpret_cast<const int32_t*>(k.user_key.data()) ==
*reinterpret_cast<const int32_t*>(v.data())) {
++found_count;
value.assign(v.data(), v.size());
}
return false;
}
bool CheckValue(void* cnt_ptr, const ParsedInternalKey& k, const Slice& v) {
++*reinterpret_cast<int*>(cnt_ptr);
std::string expected_value;
AppendInternalKey(&expected_value, k);
ASSERT_EQ(0, v.compare(Slice(&expected_value[0], v.size())));
return false;
}
void GetKeys(uint64_t num, std::vector<std::string>* keys) {
keys->clear();
IterKey k;
@ -457,13 +428,15 @@ void WriteFile(const std::vector<std::string>& keys,
test::Uint64Comparator(), nullptr);
ASSERT_OK(reader.status());
ReadOptions r_options;
std::string value;
// Assume only the fast path is triggered
GetContext get_context(nullptr, nullptr, nullptr, nullptr,
GetContext::kNotFound, Slice(), &value,
nullptr, nullptr);
for (uint64_t i = 0; i < num; ++i) {
int cnt = 0;
ASSERT_OK(reader.Get(r_options, Slice(keys[i]), &cnt, CheckValue, nullptr));
if (cnt != 1) {
fprintf(stderr, "%" PRIu64 " not found.\n", i);
ASSERT_EQ(1, cnt);
}
value.clear();
ASSERT_OK(reader.Get(r_options, Slice(keys[i]), &get_context));
ASSERT_TRUE(Slice(keys[i]) == Slice(&keys[i][0], 4));
}
}
@ -501,7 +474,11 @@ void ReadKeys(uint64_t num, uint32_t batch_size) {
}
std::random_shuffle(keys.begin(), keys.end());
found_count = 0;
std::string value;
// Assume only the fast path is triggered
GetContext get_context(nullptr, nullptr, nullptr, nullptr,
GetContext::kNotFound, Slice(), &value,
nullptr, nullptr);
uint64_t start_time = env->NowMicros();
if (batch_size > 0) {
for (uint64_t i = 0; i < num; i += batch_size) {
@ -510,20 +487,19 @@ void ReadKeys(uint64_t num, uint32_t batch_size) {
}
for (uint64_t j = i; j < i+batch_size && j < num; ++j) {
reader.Get(r_options, Slice(reinterpret_cast<char*>(&keys[j]), 16),
nullptr, DoNothing, nullptr);
&get_context);
}
}
} else {
for (uint64_t i = 0; i < num; i++) {
reader.Get(r_options, Slice(reinterpret_cast<char*>(&keys[i]), 16),
nullptr, DoNothing, nullptr);
&get_context);
}
}
float time_per_op = (env->NowMicros() - start_time) * 1.0 / num;
fprintf(stderr,
"Time taken per op is %.3fus (%.1f Mqps) with batch size of %u, "
"# of found keys %" PRId64 "\n",
time_per_op, 1.0 / time_per_op, batch_size, found_count);
"Time taken per op is %.3fus (%.1f Mqps) with batch size of %u\n",
time_per_op, 1.0 / time_per_op, batch_size);
}
} // namespace.

@ -0,0 +1,101 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "table/get_context.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/statistics.h"
#include "util/statistics.h"
namespace rocksdb {
GetContext::GetContext(const Comparator* ucmp,
const MergeOperator* merge_operator,
Logger* logger, Statistics* statistics,
GetState init_state, const Slice& user_key, std::string* ret_value,
bool* value_found, MergeContext* merge_context)
: ucmp_(ucmp),
merge_operator_(merge_operator),
logger_(logger),
statistics_(statistics),
state_(init_state),
user_key_(user_key),
value_(ret_value),
value_found_(value_found),
merge_context_(merge_context) {
}
// Called from TableCache::Get and Table::Get when file/block in which
// key may exist are not there in TableCache/BlockCache respectively. In this
// case we can't guarantee that key does not exist and are not permitted to do
// IO to be certain.Set the status=kFound and value_found=false to let the
// caller know that key may exist but is not there in memory
void GetContext::MarkKeyMayExist() {
state_ = kFound;
if (value_found_ != nullptr) {
*value_found_ = false;
}
}
void GetContext::SaveValue(const Slice& value) {
state_ = kFound;
value_->assign(value.data(), value.size());
}
bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
const Slice& value) {
assert((state_ != kMerge && parsed_key.type != kTypeMerge) ||
merge_context_ != nullptr);
if (ucmp_->Compare(parsed_key.user_key, user_key_) == 0) {
// Key matches. Process it
switch (parsed_key.type) {
case kTypeValue:
assert(state_ == kNotFound || state_ == kMerge);
if (kNotFound == state_) {
state_ = kFound;
value_->assign(value.data(), value.size());
} else if (kMerge == state_) {
assert(merge_operator_ != nullptr);
state_ = kFound;
if (!merge_operator_->FullMerge(user_key_, &value,
merge_context_->GetOperands(),
value_, logger_)) {
RecordTick(statistics_, NUMBER_MERGE_FAILURES);
state_ = kCorrupt;
}
}
return false;
case kTypeDeletion:
assert(state_ == kNotFound || state_ == kMerge);
if (kNotFound == state_) {
state_ = kDeleted;
} else if (kMerge == state_) {
state_ = kFound;
if (!merge_operator_->FullMerge(user_key_, nullptr,
merge_context_->GetOperands(),
value_, logger_)) {
RecordTick(statistics_, NUMBER_MERGE_FAILURES);
state_ = kCorrupt;
}
}
return false;
case kTypeMerge:
assert(state_ == kNotFound || state_ == kMerge);
state_ = kMerge;
merge_context_->PushOperand(value);
return true;
default:
assert(false);
break;
}
}
// state_ could be Corrupt, merge or notfound
return false;
}
} // namespace rocksdb

@ -0,0 +1,47 @@
// Copyright (c) 2014, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include <string>
#include "db/merge_context.h"
namespace rocksdb {
class MergeContext;
class GetContext {
public:
enum GetState {
kNotFound,
kFound,
kDeleted,
kCorrupt,
kMerge // saver contains the current merge result (the operands)
};
GetContext(const Comparator* ucmp, const MergeOperator* merge_operator,
Logger* logger, Statistics* statistics,
GetState init_state, const Slice& user_key, std::string* ret_value,
bool* value_found, MergeContext* merge_context);
void MarkKeyMayExist();
void SaveValue(const Slice& value);
bool SaveValue(const ParsedInternalKey& parsed_key, const Slice& value);
GetState State() const { return state_; }
private:
const Comparator* ucmp_;
const MergeOperator* merge_operator_;
// the merge operations encountered;
Logger* logger_;
Statistics* statistics_;
GetState state_;
Slice user_key_;
std::string* value_;
bool* value_found_; // Is value set correctly? Used by KeyMayExist
MergeContext* merge_context_;
};
} // namespace rocksdb

@ -26,6 +26,7 @@
#include "table/two_level_iterator.h"
#include "table/plain_table_factory.h"
#include "table/plain_table_key_coding.h"
#include "table/get_context.h"
#include "util/arena.h"
#include "util/coding.h"
@ -525,10 +526,7 @@ void PlainTableReader::Prepare(const Slice& target) {
}
Status PlainTableReader::Get(const ReadOptions& ro, const Slice& target,
void* arg,
bool (*saver)(void*, const ParsedInternalKey&,
const Slice&),
void (*mark_key_may_exist)(void*)) {
GetContext* get_context) {
// Check bloom filter first.
Slice prefix_slice;
uint32_t prefix_hash;
@ -580,8 +578,10 @@ Status PlainTableReader::Get(const ReadOptions& ro, const Slice& target,
}
prefix_match = true;
}
// TODO(ljin): since we know the key comparison result here,
// can we enable the fast path?
if (internal_comparator_.Compare(found_key, parsed_target) >= 0) {
if (!(*saver)(arg, found_key, found_value)) {
if (!get_context->SaveValue(found_key, found_value)) {
break;
}
}

@ -36,6 +36,7 @@ class TableCache;
class TableReader;
class InternalKeyComparator;
class PlainTableKeyDecoder;
class GetContext;
using std::unique_ptr;
using std::unordered_map;
@ -65,10 +66,8 @@ class PlainTableReader: public TableReader {
void Prepare(const Slice& target);
Status Get(const ReadOptions&, const Slice& key, void* arg,
bool (*result_handler)(void* arg, const ParsedInternalKey& k,
const Slice& v),
void (*mark_key_may_exist)(void*) = nullptr);
Status Get(const ReadOptions&, const Slice& key,
GetContext* get_context) override;
uint64_t ApproximateOffsetOf(const Slice& key);

@ -18,6 +18,7 @@ class Slice;
class Arena;
struct ReadOptions;
struct TableProperties;
class GetContext;
// A Table is a sorted map from strings to strings. Tables are
// immutable and persistent. A Table may be safely accessed from
@ -55,23 +56,17 @@ class TableReader {
// Report an approximation of how much memory has been used.
virtual size_t ApproximateMemoryUsage() const = 0;
// Calls (*result_handler)(handle_context, ...) repeatedly, starting with
// the entry found after a call to Seek(key), until result_handler returns
// false, where k is the actual internal key for a row found and v as the
// value of the key. May not make such a call if filter policy says that key
// is not present.
// Calls get_context->SaveValue() repeatedly, starting with
// the entry found after a call to Seek(key), until it returns false.
// May not make such a call if filter policy says that key is not present.
//
// mark_key_may_exist_handler needs to be called when it is configured to be
// memory only and the key is not found in the block cache, with
// the parameter to be handle_context.
// get_context->MarkKeyMayExist needs to be called when it is configured to be
// memory only and the key is not found in the block cache.
//
// readOptions is the options for the read
// key is the key to search for
virtual Status Get(
const ReadOptions& readOptions, const Slice& key, void* handle_context,
bool (*result_handler)(void* arg, const ParsedInternalKey& k,
const Slice& v),
void (*mark_key_may_exist_handler)(void* handle_context) = nullptr) = 0;
virtual Status Get(const ReadOptions& readOptions, const Slice& key,
GetContext* get_context) = 0;
};
} // namespace rocksdb

@ -22,6 +22,7 @@ int main() {
#include "table/block_based_table_factory.h"
#include "table/plain_table_factory.h"
#include "table/table_builder.h"
#include "table/get_context.h"
#include "util/histogram.h"
#include "util/testharness.h"
#include "util/testutil.h"
@ -48,11 +49,6 @@ static std::string MakeKey(int i, int j, bool through_db) {
return key.Encode().ToString();
}
static bool DummySaveValue(void* arg, const ParsedInternalKey& ikey,
const Slice& v) {
return false;
}
uint64_t Now(Env* env, bool measured_by_nanosecond) {
return measured_by_nanosecond ? env->NowNanos() : env->NowMicros();
}
@ -131,7 +127,6 @@ void TableReaderBenchmark(Options& opts, EnvOptions& env_options,
std::string result;
HistogramImpl hist;
void* arg = nullptr;
for (int it = 0; it < num_iter; it++) {
for (int i = 0; i < num_keys1; i++) {
for (int j = 0; j < num_keys2; j++) {
@ -147,8 +142,13 @@ void TableReaderBenchmark(Options& opts, EnvOptions& env_options,
std::string key = MakeKey(r1, r2, through_db);
uint64_t start_time = Now(env, measured_by_nanosecond);
if (!through_db) {
s = table_reader->Get(read_options, key, arg, DummySaveValue,
nullptr);
std::string value;
MergeContext merge_context;
GetContext get_context(ioptions.comparator, ioptions.merge_operator,
ioptions.info_log, ioptions.statistics,
GetContext::kNotFound, Slice(key), &value,
nullptr, &merge_context);
s = table_reader->Get(read_options, key, &get_context);
} else {
s = db->Get(read_options, key, &result);
}

@ -37,6 +37,7 @@
#include "table/format.h"
#include "table/meta_blocks.h"
#include "table/plain_table_factory.h"
#include "table/get_context.h"
#include "util/random.h"
#include "util/statistics.h"
@ -1485,8 +1486,11 @@ TEST(BlockBasedTableTest, BlockCacheDisabledTest) {
}
{
GetContext get_context(options.comparator, nullptr, nullptr, nullptr,
GetContext::kNotFound, Slice(), nullptr,
nullptr, nullptr);
// a hack that just to trigger BlockBasedTable::GetFilter.
reader->Get(ReadOptions(), "non-exist-key", nullptr, nullptr, nullptr);
reader->Get(ReadOptions(), "non-exist-key", &get_context);
BlockCachePropertiesSnapshot props(options.statistics.get());
props.AssertIndexBlockStat(0, 0);
props.AssertFilterBlockStat(0, 0);

@ -7,13 +7,13 @@
#include "utilities/compacted_db/compacted_db_impl.h"
#include "db/db_impl.h"
#include "db/version_set.h"
#include "db/merge_context.h"
#include "table/get_context.h"
namespace rocksdb {
extern void MarkKeyMayExist(void* arg);
extern bool SaveValue(void* arg, const ParsedInternalKey& parsed_key,
const Slice& v);
const Slice& v, bool hit_and_return);
CompactedDBImpl::CompactedDBImpl(
const DBOptions& options, const std::string& dbname)
@ -44,25 +44,12 @@ size_t CompactedDBImpl::FindFile(const Slice& key) {
Status CompactedDBImpl::Get(const ReadOptions& options,
ColumnFamilyHandle*, const Slice& key, std::string* value) {
const FdWithKeyRange& f = files_.files[FindFile(key)];
bool value_found;
MergeContext merge_context;
Version::Saver saver;
saver.state = Version::kNotFound;
saver.ucmp = user_comparator_;
saver.user_key = key;
saver.value_found = &value_found;
saver.value = value;
saver.merge_operator = nullptr;
saver.merge_context = &merge_context;
saver.logger = info_log_;
saver.statistics = statistics_;
GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
GetContext::kNotFound, key, value, nullptr, nullptr);
LookupKey lkey(key, kMaxSequenceNumber);
f.fd.table_reader->Get(options, lkey.internal_key(),
reinterpret_cast<void*>(&saver), SaveValue,
MarkKeyMayExist);
if (saver.state == Version::kFound) {
files_.files[FindFile(key)].fd.table_reader->Get(
options, lkey.internal_key(), &get_context);
if (get_context.State() == GetContext::kFound) {
return Status::OK();
}
return Status::NotFound();
@ -84,26 +71,15 @@ std::vector<Status> CompactedDBImpl::MultiGet(const ReadOptions& options,
}
std::vector<Status> statuses(keys.size(), Status::NotFound());
values->resize(keys.size());
bool value_found;
MergeContext merge_context;
Version::Saver saver;
saver.ucmp = user_comparator_;
saver.value_found = &value_found;
saver.merge_operator = nullptr;
saver.merge_context = &merge_context;
saver.logger = info_log_;
saver.statistics = statistics_;
int idx = 0;
for (auto* r : reader_list) {
if (r != nullptr) {
saver.state = Version::kNotFound;
saver.user_key = keys[idx];
saver.value = &(*values)[idx];
GetContext get_context(user_comparator_, nullptr, nullptr, nullptr,
GetContext::kNotFound, keys[idx], &(*values)[idx],
nullptr, nullptr);
LookupKey lkey(keys[idx], kMaxSequenceNumber);
r->Get(options, lkey.internal_key(),
reinterpret_cast<void*>(&saver), SaveValue,
MarkKeyMayExist);
if (saver.state == Version::kFound) {
r->Get(options, lkey.internal_key(), &get_context);
if (get_context.State() == GetContext::kFound) {
statuses[idx] = Status::OK();
}
}
@ -128,8 +104,6 @@ Status CompactedDBImpl::Init(const Options& options) {
}
version_ = cfd_->GetSuperVersion()->current;
user_comparator_ = cfd_->user_comparator();
statistics_ = cfd_->ioptions()->statistics;
info_log_ = cfd_->ioptions()->info_log;
// L0 should not have files
if (version_->file_levels_[0].num_files > 1) {
return Status::NotSupported("L0 contain more than 1 file");

@ -88,9 +88,6 @@ class CompactedDBImpl : public DBImpl {
const Comparator* user_comparator_;
FileLevel files_;
Statistics* statistics_;
Logger* info_log_;
// No copying allowed
CompactedDBImpl(const CompactedDBImpl&);
void operator=(const CompactedDBImpl&);

Loading…
Cancel
Save