[Performance Branch] PlainTable to encode rows with seqID 0, value type using 1 internal byte.

Summary: In PlainTable, use one single byte to represent 8 bytes of internal bytes, if seqID = 0 and it is value type (which should be common for bottom most files). It is to save 7 bytes for uncompressed cases.

Test Plan: make all check

Reviewers: haobo, dhruba, kailiu

Reviewed By: haobo

CC: igor, leveldb

Differential Revision: https://reviews.facebook.net/D15489
main
Siying Dong 11 years ago
parent 4f6cb17bdb
commit d169b67680
  1. 36
      db/builder.cc
  2. 16
      db/builder.h
  3. 25
      db/db_impl.cc
  4. 2
      db/db_iter.cc
  5. 1
      db/db_test.cc
  6. 24
      db/dbformat.cc
  7. 11
      db/dbformat.h
  8. 8
      db/repair.cc
  9. 37
      db/simple_table_db_test.cc
  10. 36
      db/table_cache.cc
  11. 29
      db/table_cache.h
  12. 44
      db/table_properties_collector_test.cc
  13. 154
      db/version_set.cc
  14. 2
      db/write_batch_test.cc
  15. 1
      include/rocksdb/options.h
  16. 2
      include/rocksdb/table.h
  17. 35
      table/block_based_table_builder.cc
  18. 1
      table/block_based_table_builder.h
  19. 16
      table/block_based_table_factory.cc
  20. 7
      table/block_based_table_factory.h
  21. 49
      table/block_based_table_reader.cc
  22. 15
      table/block_based_table_reader.h
  23. 6
      table/block_builder.cc
  24. 2
      table/block_builder.h
  25. 5
      table/block_test.cc
  26. 11
      table/filter_block.cc
  27. 3
      table/filter_block.h
  28. 6
      table/filter_block_test.cc
  29. 28
      table/plain_table_builder.cc
  30. 11
      table/plain_table_factory.cc
  31. 10
      table/plain_table_factory.h
  32. 117
      table/plain_table_reader.cc
  33. 27
      table/plain_table_reader.h
  34. 5
      table/table_factory.h
  35. 3
      table/table_reader.h
  36. 6
      table/table_reader_bench.cc
  37. 114
      table/table_test.cc
  38. 53
      table/two_level_iterator.cc
  39. 14
      table/two_level_iterator.h
  40. 4
      tools/sst_dump.cc
  41. 25
      util/testutil.h

@ -26,20 +26,18 @@ namespace rocksdb {
class TableFactory;
TableBuilder* NewTableBuilder(const Options& options, WritableFile* file,
TableBuilder* NewTableBuilder(const Options& options,
const InternalKeyComparator& internal_comparator,
WritableFile* file,
CompressionType compression_type) {
return options.table_factory->NewTableBuilder(options, file,
compression_type);
return options.table_factory->NewTableBuilder(options, internal_comparator,
file, compression_type);
}
Status BuildTable(const std::string& dbname,
Env* env,
const Options& options,
const EnvOptions& soptions,
TableCache* table_cache,
Iterator* iter,
FileMetaData* meta,
const Comparator* user_comparator,
Status BuildTable(const std::string& dbname, Env* env, const Options& options,
const EnvOptions& soptions, TableCache* table_cache,
Iterator* iter, FileMetaData* meta,
const InternalKeyComparator& internal_comparator,
const SequenceNumber newest_snapshot,
const SequenceNumber earliest_seqno_in_memtable,
const CompressionType compression) {
@ -64,7 +62,8 @@ Status BuildTable(const std::string& dbname,
return s;
}
TableBuilder* builder = NewTableBuilder(options, file.get(), compression);
TableBuilder* builder =
NewTableBuilder(options, internal_comparator, file.get(), compression);
// the first key is the smallest key
Slice key = iter->key();
@ -72,8 +71,8 @@ Status BuildTable(const std::string& dbname,
meta->smallest_seqno = GetInternalKeySeqno(key);
meta->largest_seqno = meta->smallest_seqno;
MergeHelper merge(user_comparator, options.merge_operator.get(),
options.info_log.get(),
MergeHelper merge(internal_comparator.user_comparator(),
options.merge_operator.get(), options.info_log.get(),
true /* internal key corruption is not ok */);
if (purge) {
@ -102,8 +101,8 @@ Status BuildTable(const std::string& dbname,
// If the key is the same as the previous key (and it is not the
// first key), then we skip it, since it is an older version.
// Otherwise we output the key and mark it as the "new" previous key.
if (!is_first_key && !user_comparator->Compare(prev_ikey.user_key,
this_ikey.user_key)) {
if (!is_first_key && !internal_comparator.user_comparator()->Compare(
prev_ikey.user_key, this_ikey.user_key)) {
// seqno within the same key are in decreasing order
assert(this_ikey.sequence < prev_ikey.sequence);
} else {
@ -201,9 +200,8 @@ Status BuildTable(const std::string& dbname,
if (s.ok()) {
// Verify that the table is usable
Iterator* it = table_cache->NewIterator(ReadOptions(),
soptions,
*meta);
Iterator* it = table_cache->NewIterator(ReadOptions(), soptions,
internal_comparator, *meta);
s = it->status();
delete it;
}

@ -24,22 +24,20 @@ class VersionEdit;
class TableBuilder;
class WritableFile;
extern TableBuilder* NewTableBuilder(const Options& options, WritableFile* file,
CompressionType compression_type);
extern TableBuilder* NewTableBuilder(
const Options& options, const InternalKeyComparator& internal_comparator,
WritableFile* file, CompressionType compression_type);
// Build a Table file from the contents of *iter. The generated file
// will be named according to meta->number. On success, the rest of
// *meta will be filled with metadata about the generated table.
// If no data is present in *iter, meta->file_size will be set to
// zero, and no Table file will be produced.
extern Status BuildTable(const std::string& dbname,
Env* env,
const Options& options,
const EnvOptions& soptions,
TableCache* table_cache,
Iterator* iter,
extern Status BuildTable(const std::string& dbname, Env* env,
const Options& options, const EnvOptions& soptions,
TableCache* table_cache, Iterator* iter,
FileMetaData* meta,
const Comparator* user_comparator,
const InternalKeyComparator& internal_comparator,
const SequenceNumber newest_snapshot,
const SequenceNumber earliest_seqno_in_memtable,
const CompressionType compression);

@ -127,7 +127,6 @@ Options SanitizeOptions(const std::string& dbname,
const InternalFilterPolicy* ipolicy,
const Options& src) {
Options result = src;
result.comparator = icmp;
result.filter_policy = (src.filter_policy != nullptr) ? ipolicy : nullptr;
// result.max_open_files means an "infinite" open files.
if (result.max_open_files != -1) {
@ -1107,9 +1106,8 @@ Status DBImpl::WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit) {
{
mutex_.Unlock();
s = BuildTable(dbname_, env_, options_, storage_options_,
table_cache_.get(), iter, &meta,
user_comparator(), newest_snapshot,
earliest_seqno_in_memtable,
table_cache_.get(), iter, &meta, internal_comparator_,
newest_snapshot, earliest_seqno_in_memtable,
GetCompressionFlush(options_));
LogFlush(options_.info_log);
mutex_.Lock();
@ -1173,9 +1171,9 @@ Status DBImpl::WriteLevel0Table(autovector<MemTable*>& mems, VersionEdit* edit,
(unsigned long)meta.number);
s = BuildTable(dbname_, env_, options_, storage_options_,
table_cache_.get(), iter, &meta,
user_comparator(), newest_snapshot,
earliest_seqno_in_memtable, GetCompressionFlush(options_));
table_cache_.get(), iter, &meta, internal_comparator_,
newest_snapshot, earliest_seqno_in_memtable,
GetCompressionFlush(options_));
LogFlush(options_.info_log);
delete iter;
Log(options_.info_log, "Level-0 flush table #%lu: %lu bytes %s",
@ -2137,8 +2135,9 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
options_, compact->compaction->output_level(),
compact->compaction->enable_compression());
compact->builder.reset(
NewTableBuilder(options_, compact->outfile.get(), compression_type));
compact->builder.reset(NewTableBuilder(options_, internal_comparator_,
compact->outfile.get(),
compression_type));
}
LogFlush(options_.info_log);
return s;
@ -2186,9 +2185,8 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
if (s.ok() && current_entries > 0) {
// Verify that the table is usable
FileMetaData meta(output_number, current_bytes);
Iterator* iter = table_cache_->NewIterator(ReadOptions(),
storage_options_,
meta);
Iterator* iter = table_cache_->NewIterator(ReadOptions(), storage_options_,
internal_comparator_, meta);
s = iter->status();
delete iter;
if (s.ok()) {
@ -2522,8 +2520,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
// If this is the bottommost level (no files in lower levels)
// and the earliest snapshot is larger than this seqno
// then we can squash the seqno to zero.
if (options_.compaction_style == kCompactionStyleLevel &&
bottommost_level && ikey.sequence < earliest_snapshot &&
if (bottommost_level && ikey.sequence < earliest_snapshot &&
ikey.type != kTypeMerge) {
assert(ikey.type != kTypeDeletion);
// make a copy because updating in place would cause problems

@ -235,7 +235,7 @@ void DBIter::FindNextUserEntryInternal(bool skipping) {
valid_ = true;
MergeValuesNewToOld(); // Go to a different state machine
return;
case kTypeLogData:
default:
assert(false);
break;
}

@ -11,6 +11,7 @@
#include <set>
#include <unistd.h>
#include "db/dbformat.h"
#include "db/db_impl.h"
#include "db/filename.h"
#include "db/version_set.h"

@ -6,9 +6,9 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/dbformat.h"
#include <stdio.h>
#include "db/dbformat.h"
#include "port/port.h"
#include "util/coding.h"
#include "util/perf_context_imp.h"
@ -72,6 +72,28 @@ int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const {
return r;
}
int InternalKeyComparator::Compare(const ParsedInternalKey& a,
const ParsedInternalKey& b) const {
// Order by:
// increasing user key (according to user-supplied comparator)
// decreasing sequence number
// decreasing type (though sequence# should be enough to disambiguate)
int r = user_comparator_->Compare(a.user_key, b.user_key);
BumpPerfCount(&perf_context.user_key_comparison_count);
if (r == 0) {
if (a.sequence > b.sequence) {
r = -1;
} else if (a.sequence < b.sequence) {
r = +1;
} else if (a.type > b.type) {
r = -1;
} else if (a.type < b.type) {
r = +1;
}
}
return r;
}
void InternalKeyComparator::FindShortestSeparator(
std::string* start,
const Slice& limit) const {

@ -25,12 +25,16 @@ class InternalKey;
// Value types encoded as the last component of internal keys.
// DO NOT CHANGE THESE ENUM VALUES: they are embedded in the on-disk
// data structures.
enum ValueType {
// The highest bit of the value type needs to be reserved to SST tables
// for them to do more flexible encoding.
enum ValueType : unsigned char {
kTypeDeletion = 0x0,
kTypeValue = 0x1,
kTypeMerge = 0x2,
kTypeLogData = 0x3
kTypeLogData = 0x3,
kMaxValue = 0x7F
};
// kValueTypeForSeek defines the ValueType that should be passed when
// constructing a ParsedInternalKey object for seeking to a particular
// sequence number (since we sort sequence numbers in decreasing order
@ -96,6 +100,7 @@ class InternalKeyComparator : public Comparator {
name_("rocksdb.InternalKeyComparator:" +
std::string(user_comparator_->Name())) {
}
virtual ~InternalKeyComparator() {}
virtual const char* Name() const;
virtual int Compare(const Slice& a, const Slice& b) const;
@ -107,6 +112,7 @@ class InternalKeyComparator : public Comparator {
const Comparator* user_comparator() const { return user_comparator_; }
int Compare(const InternalKey& a, const InternalKey& b) const;
int Compare(const ParsedInternalKey& a, const ParsedInternalKey& b) const;
};
// Filter policy wrapper that converts from internal keys to user keys
@ -163,6 +169,7 @@ inline bool ParseInternalKey(const Slice& internal_key,
unsigned char c = num & 0xff;
result->sequence = num >> 8;
result->type = static_cast<ValueType>(c);
assert(result->type <= ValueType::kMaxValue);
result->user_key = Slice(internal_key.data(), n - 8);
return (c <= static_cast<unsigned char>(kValueTypeForSeek));
}

@ -222,10 +222,8 @@ class Repairer {
FileMetaData meta;
meta.number = next_file_number_++;
Iterator* iter = mem->NewIterator();
status = BuildTable(dbname_, env_, options_, storage_options_,
table_cache_, iter, &meta,
icmp_.user_comparator(), 0, 0,
kNoCompression);
status = BuildTable(dbname_, env_, options_, storage_options_, table_cache_,
iter, &meta, icmp_, 0, 0, kNoCompression);
delete iter;
delete mem->Unref();
mem = nullptr;
@ -267,7 +265,7 @@ class Repairer {
if (status.ok()) {
FileMetaData dummy_meta(t->meta.number, t->meta.file_size);
Iterator* iter = table_cache_->NewIterator(
ReadOptions(), storage_options_, dummy_meta);
ReadOptions(), storage_options_, icmp_, dummy_meta);
bool empty = true;
ParsedInternalKey parsed;
t->min_sequence = 0;

@ -87,10 +87,10 @@ public:
Iterator* NewIterator(const ReadOptions&) override;
Status Get(
const ReadOptions&, const Slice& key, void* arg,
bool (*handle_result)(void* arg, const Slice& k, const Slice& v, bool),
void (*mark_key_may_exist)(void*) = nullptr) override;
Status Get(const ReadOptions&, const Slice& key, void* arg,
bool (*handle_result)(void* arg, const ParsedInternalKey& k,
const Slice& v, bool),
void (*mark_key_may_exist)(void*) = nullptr) override;
uint64_t ApproximateOffsetOf(const Slice& key) override;
@ -245,7 +245,8 @@ Status SimpleTableReader::GetOffset(const Slice& target, uint64_t* offset) {
return s;
}
int compare_result = rep_->options.comparator->Compare(tmp_slice, target);
InternalKeyComparator ikc(rep_->options.comparator);
int compare_result = ikc.Compare(tmp_slice, target);
if (compare_result < 0) {
if (left == right) {
@ -280,14 +281,20 @@ Status SimpleTableReader::GetOffset(const Slice& target, uint64_t* offset) {
return s;
}
Status SimpleTableReader::Get(
const ReadOptions& options, const Slice& k, void* arg,
bool (*saver)(void*, const Slice&, const Slice&, bool),
void (*mark_key_may_exist)(void*)) {
Status SimpleTableReader::Get(const ReadOptions& options, const Slice& k,
void* arg,
bool (*saver)(void*, const ParsedInternalKey&,
const Slice&, bool),
void (*mark_key_may_exist)(void*)) {
Status s;
SimpleTableIterator* iter = new SimpleTableIterator(this);
for (iter->Seek(k); iter->Valid(); iter->Next()) {
if (!(*saver)(arg, iter->key(), iter->value(), true)) {
ParsedInternalKey parsed_key;
if (!ParseInternalKey(iter->key(), &parsed_key)) {
return Status::Corruption(Slice());
}
if (!(*saver)(arg, parsed_key, iter->value(), true)) {
break;
}
}
@ -537,15 +544,19 @@ public:
return "SimpleTable";
}
Status NewTableReader(const Options& options, const EnvOptions& soptions,
const InternalKeyComparator& internal_key,
unique_ptr<RandomAccessFile>&& file, uint64_t file_size,
unique_ptr<TableReader>* table_reader) const;
TableBuilder* NewTableBuilder(const Options& options, WritableFile* file,
TableBuilder* NewTableBuilder(const Options& options,
const InternalKeyComparator& internal_key,
WritableFile* file,
CompressionType compression_type) const;
};
Status SimpleTableFactory::NewTableReader(
const Options& options, const EnvOptions& soptions,
const InternalKeyComparator& internal_key,
unique_ptr<RandomAccessFile>&& file, uint64_t file_size,
unique_ptr<TableReader>* table_reader) const {
@ -554,8 +565,8 @@ Status SimpleTableFactory::NewTableReader(
}
TableBuilder* SimpleTableFactory::NewTableBuilder(
const Options& options, WritableFile* file,
CompressionType compression_type) const {
const Options& options, const InternalKeyComparator& internal_key,
WritableFile* file, CompressionType compression_type) const {
return new SimpleTableBuilder(options, file, compression_type);
}

@ -60,6 +60,7 @@ void TableCache::ReleaseHandle(Cache::Handle* handle) {
}
Status TableCache::FindTable(const EnvOptions& toptions,
const InternalKeyComparator& internal_comparator,
uint64_t file_number, uint64_t file_size,
Cache::Handle** handle, bool* table_io,
const bool no_io) {
@ -84,7 +85,8 @@ Status TableCache::FindTable(const EnvOptions& toptions,
}
StopWatch sw(env_, options_->statistics.get(), TABLE_OPEN_IO_MICROS);
s = options_->table_factory->NewTableReader(
*options_, toptions, std::move(file), file_size, &table_reader);
*options_, toptions, internal_comparator, std::move(file), file_size,
&table_reader);
}
if (!s.ok()) {
@ -102,6 +104,7 @@ Status TableCache::FindTable(const EnvOptions& toptions,
Iterator* TableCache::NewIterator(const ReadOptions& options,
const EnvOptions& toptions,
const InternalKeyComparator& icomparator,
const FileMetaData& file_meta,
TableReader** table_reader_ptr,
bool for_compaction) {
@ -111,8 +114,8 @@ Iterator* TableCache::NewIterator(const ReadOptions& options,
Cache::Handle* handle = file_meta.table_reader_handle;
Status s;
if (!handle) {
s = FindTable(toptions, file_meta.number, file_meta.file_size, &handle,
nullptr, options.read_tier == kBlockCacheTier);
s = FindTable(toptions, icomparator, file_meta.number, file_meta.file_size,
&handle, nullptr, options.read_tier == kBlockCacheTier);
}
if (!s.ok()) {
return NewErrorIterator(s);
@ -135,17 +138,17 @@ Iterator* TableCache::NewIterator(const ReadOptions& options,
}
Status TableCache::Get(const ReadOptions& options,
const FileMetaData& file_meta,
const Slice& k,
void* arg,
bool (*saver)(void*, const Slice&, const Slice&, bool),
bool* table_io,
void (*mark_key_may_exist)(void*)) {
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta, const Slice& k, void* arg,
bool (*saver)(void*, const ParsedInternalKey&,
const Slice&, bool),
bool* table_io, void (*mark_key_may_exist)(void*)) {
Cache::Handle* handle = file_meta.table_reader_handle;
Status s;
if (!handle) {
s = FindTable(storage_options_, file_meta.number, file_meta.file_size,
&handle, table_io, options.read_tier == kBlockCacheTier);
s = FindTable(storage_options_, internal_comparator, file_meta.number,
file_meta.file_size, &handle, table_io,
options.read_tier == kBlockCacheTier);
}
if (s.ok()) {
TableReader* t = GetTableReaderFromHandle(handle);
@ -162,13 +165,12 @@ Status TableCache::Get(const ReadOptions& options,
}
bool TableCache::PrefixMayMatch(const ReadOptions& options,
uint64_t file_number,
uint64_t file_size,
const Slice& internal_prefix,
bool* table_io) {
const InternalKeyComparator& icomparator,
uint64_t file_number, uint64_t file_size,
const Slice& internal_prefix, bool* table_io) {
Cache::Handle* handle = nullptr;
Status s = FindTable(storage_options_, file_number,
file_size, &handle, table_io);
Status s = FindTable(storage_options_, icomparator, file_number, file_size,
&handle, table_io);
bool may_match = true;
if (s.ok()) {
TableReader* t = GetTableReaderFromHandle(handle);

@ -38,8 +38,8 @@ class TableCache {
// the returned iterator. The returned "*tableptr" object is owned by
// the cache and should not be deleted, and is valid for as long as the
// returned iterator is live.
Iterator* NewIterator(const ReadOptions& options,
const EnvOptions& toptions,
Iterator* NewIterator(const ReadOptions& options, const EnvOptions& toptions,
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta,
TableReader** table_reader_ptr = nullptr,
bool for_compaction = false);
@ -48,26 +48,27 @@ class TableCache {
// call (*handle_result)(arg, found_key, found_value) repeatedly until
// it returns false.
Status Get(const ReadOptions& options,
const FileMetaData& file_meta,
const Slice& k,
void* arg,
bool (*handle_result)(void*, const Slice&, const Slice&, bool),
bool* table_io,
void (*mark_key_may_exist)(void*) = nullptr);
const InternalKeyComparator& internal_comparator,
const FileMetaData& file_meta, const Slice& k, void* arg,
bool (*handle_result)(void*, const ParsedInternalKey&,
const Slice&, bool),
bool* table_io, void (*mark_key_may_exist)(void*) = nullptr);
// Determine whether the table may contain the specified prefix. If
// the table index or blooms are not in memory, this may cause an I/O
bool PrefixMayMatch(const ReadOptions& options, uint64_t file_number,
uint64_t file_size, const Slice& internal_prefix,
bool* table_io);
bool PrefixMayMatch(const ReadOptions& options,
const InternalKeyComparator& internal_comparator,
uint64_t file_number, uint64_t file_size,
const Slice& internal_prefix, bool* table_io);
// Evict any entry for the specified file number
void Evict(uint64_t file_number);
// Find table reader
Status FindTable(const EnvOptions& toptions, uint64_t file_number,
uint64_t file_size, Cache::Handle**, bool* table_io=nullptr,
const bool no_io = false);
Status FindTable(const EnvOptions& toptions,
const InternalKeyComparator& internal_comparator,
uint64_t file_number, uint64_t file_size, Cache::Handle**,
bool* table_io = nullptr, const bool no_io = false);
// Get TableReader from a cache handle.
TableReader* GetTableReaderFromHandle(Cache::Handle* handle);

@ -83,13 +83,13 @@ class DumbLogger : public Logger {
};
// Utilities test functions
void MakeBuilder(
const Options& options,
std::unique_ptr<FakeWritableFile>* writable,
std::unique_ptr<TableBuilder>* builder) {
void MakeBuilder(const Options& options,
const InternalKeyComparator& internal_comparator,
std::unique_ptr<FakeWritableFile>* writable,
std::unique_ptr<TableBuilder>* builder) {
writable->reset(new FakeWritableFile);
builder->reset(options.table_factory->NewTableBuilder(
options, writable->get(), options.compression));
options, internal_comparator, writable->get(), options.compression));
}
// Collects keys that starts with "A" in a table.
@ -127,9 +127,8 @@ class RegularKeysStartWithA: public TablePropertiesCollector {
extern uint64_t kBlockBasedTableMagicNumber;
extern uint64_t kPlainTableMagicNumber;
void TestCustomizedTablePropertiesCollector(
uint64_t magic_number,
bool encode_as_internal,
const Options& options) {
uint64_t magic_number, bool encode_as_internal, const Options& options,
const InternalKeyComparator& internal_comparator) {
// make sure the entries will be inserted with order.
std::map<std::string, std::string> kvs = {
{"About ", "val5"}, // starts with 'A'
@ -144,7 +143,7 @@ void TestCustomizedTablePropertiesCollector(
// -- Step 1: build table
std::unique_ptr<TableBuilder> builder;
std::unique_ptr<FakeWritableFile> writable;
MakeBuilder(options, &writable, &builder);
MakeBuilder(options, internal_comparator, &writable, &builder);
for (const auto& kv : kvs) {
if (encode_as_internal) {
@ -193,11 +192,9 @@ TEST(TablePropertiesTest, CustomizedTablePropertiesCollector) {
options.table_properties_collectors.resize(1);
options.table_properties_collectors[0].reset(collector);
}
TestCustomizedTablePropertiesCollector(
kBlockBasedTableMagicNumber,
encode_as_internal,
options
);
test::PlainInternalKeyComparator ikc(options.comparator);
TestCustomizedTablePropertiesCollector(kBlockBasedTableMagicNumber,
encode_as_internal, options, ikc);
}
// test plain table
@ -206,9 +203,9 @@ TEST(TablePropertiesTest, CustomizedTablePropertiesCollector) {
std::make_shared<RegularKeysStartWithA>()
);
options.table_factory = std::make_shared<PlainTableFactory>(8, 8, 0);
TestCustomizedTablePropertiesCollector(
kPlainTableMagicNumber, true, options
);
test::PlainInternalKeyComparator ikc(options.comparator);
TestCustomizedTablePropertiesCollector(kPlainTableMagicNumber, true, options,
ikc);
}
void TestInternalKeyPropertiesCollector(
@ -228,6 +225,8 @@ void TestInternalKeyPropertiesCollector(
std::unique_ptr<TableBuilder> builder;
std::unique_ptr<FakeWritableFile> writable;
Options options;
test::PlainInternalKeyComparator pikc(options.comparator);
options.table_factory = table_factory;
if (sanitized) {
options.table_properties_collectors = {
@ -239,12 +238,9 @@ void TestInternalKeyPropertiesCollector(
// HACK: Set options.info_log to avoid writing log in
// SanitizeOptions().
options.info_log = std::make_shared<DumbLogger>();
options = SanitizeOptions(
"db", // just a place holder
nullptr, // with skip internal key comparator
nullptr, // don't care filter policy
options
);
options = SanitizeOptions("db", // just a place holder
&pikc, nullptr, // don't care filter policy
options);
options.comparator = comparator;
} else {
options.table_properties_collectors = {
@ -252,7 +248,7 @@ void TestInternalKeyPropertiesCollector(
};
}
MakeBuilder(options, &writable, &builder);
MakeBuilder(options, pikc, &writable, &builder);
for (const auto& k : keys) {
builder->Add(k.Encode(), "val");
}

@ -191,11 +191,10 @@ class Version::LevelFileNumIterator : public Iterator {
mutable char value_buf_[16];
};
static Iterator* GetFileIterator(void* arg,
const ReadOptions& options,
static Iterator* GetFileIterator(void* arg, const ReadOptions& options,
const EnvOptions& soptions,
const Slice& file_value,
bool for_compaction) {
const InternalKeyComparator& icomparator,
const Slice& file_value, bool for_compaction) {
TableCache* cache = reinterpret_cast<TableCache*>(arg);
if (file_value.size() != 16) {
return NewErrorIterator(
@ -210,11 +209,9 @@ static Iterator* GetFileIterator(void* arg,
}
FileMetaData meta(DecodeFixed64(file_value.data()),
DecodeFixed64(file_value.data() + 8));
return cache->NewIterator(options.prefix ? options_copy : options,
soptions,
meta,
nullptr /* don't need reference to table*/,
for_compaction);
return cache->NewIterator(
options.prefix ? options_copy : options, soptions, icomparator, meta,
nullptr /* don't need reference to table*/, for_compaction);
}
}
@ -234,10 +231,9 @@ bool Version::PrefixMayMatch(const ReadOptions& options,
may_match = true;
} else {
may_match = vset_->table_cache_->PrefixMayMatch(
options,
DecodeFixed64(level_iter->value().data()),
DecodeFixed64(level_iter->value().data() + 8),
internal_prefix, nullptr);
options, vset_->icmp_, DecodeFixed64(level_iter->value().data()),
DecodeFixed64(level_iter->value().data() + 8), internal_prefix,
nullptr);
}
return may_match;
}
@ -255,8 +251,8 @@ Iterator* Version::NewConcatenatingIterator(const ReadOptions& options,
return NewEmptyIterator();
}
}
return NewTwoLevelIterator(level_iter, &GetFileIterator,
vset_->table_cache_, options, soptions);
return NewTwoLevelIterator(level_iter, &GetFileIterator, vset_->table_cache_,
options, soptions, vset_->icmp_);
}
void Version::AddIterators(const ReadOptions& options,
@ -265,7 +261,7 @@ void Version::AddIterators(const ReadOptions& options,
// Merge all level zero files together since they may overlap
for (const FileMetaData* file : files_[0]) {
iters->push_back(vset_->table_cache_->NewIterator(options, soptions,
*file));
vset_->icmp_, *file));
}
// For levels > 0, we can use a concatenating iterator that sequentially
@ -315,80 +311,73 @@ static void MarkKeyMayExist(void* arg) {
}
}
static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){
static bool SaveValue(void* arg, const ParsedInternalKey& parsed_key,
const Slice& v, bool didIO) {
Saver* s = reinterpret_cast<Saver*>(arg);
MergeContext* merge_contex = s->merge_context;
std::string merge_result; // temporary area for merge results later
assert(s != nullptr && merge_contex != nullptr);
ParsedInternalKey parsed_key;
// TODO: didIO and Merge?
s->didIO = didIO;
if (!ParseInternalKey(ikey, &parsed_key)) {
// TODO: what about corrupt during Merge?
s->state = kCorrupt;
} else {
if (s->ucmp->Compare(parsed_key.user_key, s->user_key) == 0) {
// Key matches. Process it
switch (parsed_key.type) {
case kTypeValue:
if (kNotFound == s->state) {
s->state = kFound;
s->value->assign(v.data(), v.size());
} else if (kMerge == s->state) {
assert(s->merge_operator != nullptr);
s->state = kFound;
if (!s->merge_operator->FullMerge(s->user_key, &v,
merge_contex->GetOperands(),
s->value, s->logger)) {
RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
s->state = kCorrupt;
}
} else {
assert(false);
if (s->ucmp->Compare(parsed_key.user_key, s->user_key) == 0) {
// Key matches. Process it
switch (parsed_key.type) {
case kTypeValue:
if (kNotFound == s->state) {
s->state = kFound;
s->value->assign(v.data(), v.size());
} else if (kMerge == s->state) {
assert(s->merge_operator != nullptr);
s->state = kFound;
if (!s->merge_operator->FullMerge(s->user_key, &v,
merge_contex->GetOperands(),
s->value, s->logger)) {
RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
s->state = kCorrupt;
}
return false;
} else {
assert(false);
}
return false;
case kTypeDeletion:
if (kNotFound == s->state) {
s->state = kDeleted;
} else if (kMerge == s->state) {
s->state = kFound;
case kTypeDeletion:
if (kNotFound == s->state) {
s->state = kDeleted;
} else if (kMerge == s->state) {
s->state = kFound;
if (!s->merge_operator->FullMerge(s->user_key, nullptr,
merge_contex->GetOperands(),
s->value, s->logger)) {
RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
s->state = kCorrupt;
}
} else {
assert(false);
}
return false;
case kTypeMerge:
assert(s->state == kNotFound || s->state == kMerge);
s->state = kMerge;
merge_contex->PushOperand(v);
while (merge_contex->GetNumOperands() >= 2) {
// Attempt to merge operands together via user associateive merge
if (s->merge_operator->PartialMerge(s->user_key,
merge_contex->GetOperand(0),
merge_contex->GetOperand(1),
&merge_result,
s->logger)) {
merge_contex->PushPartialMergeResult(merge_result);
} else {
// Associative merge returns false ==> stack the operands
break;
}
RecordTick(s->statistics, NUMBER_MERGE_FAILURES);
s->state = kCorrupt;
}
return true;
case kTypeLogData:
} else {
assert(false);
}
return false;
case kTypeMerge:
assert(s->state == kNotFound || s->state == kMerge);
s->state = kMerge;
merge_contex->PushOperand(v);
while (merge_contex->GetNumOperands() >= 2) {
// Attempt to merge operands together via user associateive merge
if (s->merge_operator->PartialMerge(
s->user_key, merge_contex->GetOperand(0),
merge_contex->GetOperand(1), &merge_result, s->logger)) {
merge_contex->PushPartialMergeResult(merge_result);
} else {
// Associative merge returns false ==> stack the operands
break;
}
}
return true;
default:
assert(false);
break;
}
}
@ -521,8 +510,9 @@ void Version::Get(const ReadOptions& options,
prev_file = f;
#endif
bool tableIO = false;
*status = vset_->table_cache_->Get(options, *f, ikey, &saver, SaveValue,
&tableIO, MarkKeyMayExist);
*status =
vset_->table_cache_->Get(options, vset_->icmp_, *f, ikey, &saver,
SaveValue, &tableIO, MarkKeyMayExist);
// TODO: examine the behavior for corrupted key
if (!status->ok()) {
return;
@ -1355,9 +1345,8 @@ class VersionSet::Builder {
for (auto& file_meta : *(levels_[level].added_files)) {
assert (!file_meta->table_reader_handle);
bool table_io;
vset_->table_cache_->FindTable(vset_->storage_options_,
file_meta->number,
file_meta->file_size,
vset_->table_cache_->FindTable(vset_->storage_options_, vset_->icmp_,
file_meta->number, file_meta->file_size,
&file_meta->table_reader_handle,
&table_io, false);
}
@ -2069,8 +2058,9 @@ uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) {
// "ikey" falls in the range for this table. Add the
// approximate offset of "ikey" within the table.
TableReader* table_reader_ptr;
Iterator* iter = table_cache_->NewIterator(
ReadOptions(), storage_options_, *(files[i]), &table_reader_ptr);
Iterator* iter =
table_cache_->NewIterator(ReadOptions(), storage_options_, icmp_,
*(files[i]), &table_reader_ptr);
if (table_reader_ptr != nullptr) {
result += table_reader_ptr->ApproximateOffsetOf(ikey.Encode());
}
@ -2134,14 +2124,14 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) {
if (c->level() + which == 0) {
for (const auto& file : *c->inputs(which)) {
list[num++] = table_cache_->NewIterator(
options, storage_options_compactions_, *file, nullptr,
options, storage_options_compactions_, icmp_, *file, nullptr,
true /* for compaction */);
}
} else {
// Create concatenating iterator for the files from this level
list[num++] = NewTwoLevelIterator(
new Version::LevelFileNumIterator(icmp_, c->inputs(which)),
&GetFileIterator, table_cache_, options, storage_options_,
&GetFileIterator, table_cache_, options, storage_options_, icmp_,
true /* for compaction */);
}
}

@ -57,7 +57,7 @@ static std::string PrintContents(WriteBatch* b) {
state.append(")");
count++;
break;
case kTypeLogData:
default:
assert(false);
break;
}

@ -34,6 +34,7 @@ class TablePropertiesCollector;
class Slice;
class SliceTransform;
class Statistics;
class InternalKeyComparator;
using std::shared_ptr;

@ -27,8 +27,6 @@
namespace rocksdb {
class TableFactory;
// -- Block-based Table
class FlushBlockPolicyFactory;

@ -21,6 +21,7 @@
#include "rocksdb/env.h"
#include "rocksdb/filter_policy.h"
#include "rocksdb/options.h"
#include "db/dbformat.h"
#include "table/block_based_table_reader.h"
#include "table/block.h"
#include "table/block_builder.h"
@ -52,6 +53,7 @@ extern const uint64_t kBlockBasedTableMagicNumber
struct BlockBasedTableBuilder::Rep {
Options options;
const InternalKeyComparator& internal_comparator;
WritableFile* file;
uint64_t offset = 0;
Status status;
@ -71,31 +73,30 @@ struct BlockBasedTableBuilder::Rep {
std::string compressed_output;
std::unique_ptr<FlushBlockPolicy> flush_block_policy;
Rep(const Options& opt,
WritableFile* f,
FlushBlockPolicyFactory* flush_block_policy_factory,
Rep(const Options& opt, const InternalKeyComparator& icomparator,
WritableFile* f, FlushBlockPolicyFactory* flush_block_policy_factory,
CompressionType compression_type)
: options(opt),
internal_comparator(icomparator),
file(f),
data_block(options),
data_block(options, &internal_comparator),
// To avoid linear scan, we make the block_restart_interval to be `1`
// in index block builder
index_block(1 /* block_restart_interval */, options.comparator),
index_block(1 /* block_restart_interval */, &internal_comparator),
compression_type(compression_type),
filter_block(opt.filter_policy == nullptr ? nullptr
: new FilterBlockBuilder(opt)),
filter_block(opt.filter_policy == nullptr
? nullptr
: new FilterBlockBuilder(opt, &internal_comparator)),
flush_block_policy(
flush_block_policy_factory->NewFlushBlockPolicy(data_block)) {
}
flush_block_policy_factory->NewFlushBlockPolicy(data_block)) {}
};
BlockBasedTableBuilder::BlockBasedTableBuilder(
const Options& options,
WritableFile* file,
FlushBlockPolicyFactory* flush_block_policy_factory,
const Options& options, const InternalKeyComparator& internal_comparator,
WritableFile* file, FlushBlockPolicyFactory* flush_block_policy_factory,
CompressionType compression_type)
: rep_(new Rep(options,
file, flush_block_policy_factory, compression_type)) {
: rep_(new Rep(options, internal_comparator, file,
flush_block_policy_factory, compression_type)) {
if (rep_->filter_block != nullptr) {
rep_->filter_block->StartBlock(0);
}
@ -118,7 +119,7 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
assert(!r->closed);
if (!ok()) return;
if (r->props.num_entries > 0) {
assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
assert(r->internal_comparator.Compare(key, Slice(r->last_key)) > 0);
}
auto should_flush = r->flush_block_policy->Update(key, value);
@ -135,7 +136,7 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) {
// entries in the first block and < all entries in subsequent
// blocks.
if (ok()) {
r->options.comparator->FindShortestSeparator(&r->last_key, key);
r->internal_comparator.FindShortestSeparator(&r->last_key, key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
r->index_block.Add(r->last_key, Slice(handle_encoding));
@ -339,7 +340,7 @@ Status BlockBasedTableBuilder::Finish() {
// block, we will finish writing all index entries here and flush them
// to storage after metaindex block is written.
if (ok() && !empty_data_block) {
r->options.comparator->FindShortSuccessor(&r->last_key);
r->internal_comparator.FindShortSuccessor(&r->last_key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);

@ -26,6 +26,7 @@ class BlockBasedTableBuilder : public TableBuilder {
// building in *file. Does not close the file. It is up to the
// caller to close the file after calling Finish().
BlockBasedTableBuilder(const Options& options,
const InternalKeyComparator& internal_comparator,
WritableFile* file,
FlushBlockPolicyFactory* flush_block_policy_factory,
CompressionType compression_type);

@ -20,15 +20,17 @@ namespace rocksdb {
Status BlockBasedTableFactory::NewTableReader(
const Options& options, const EnvOptions& soptions,
const InternalKeyComparator& internal_comparator,
unique_ptr<RandomAccessFile>&& file, uint64_t file_size,
unique_ptr<TableReader>* table_reader) const {
return BlockBasedTable::Open(options, soptions, table_options_,
std::move(file), file_size, table_reader);
internal_comparator, std::move(file), file_size,
table_reader);
}
TableBuilder* BlockBasedTableFactory::NewTableBuilder(
const Options& options, WritableFile* file,
CompressionType compression_type) const {
const Options& options, const InternalKeyComparator& internal_comparator,
WritableFile* file, CompressionType compression_type) const {
auto flush_block_policy_factory =
table_options_.flush_block_policy_factory.get();
@ -45,11 +47,9 @@ TableBuilder* BlockBasedTableFactory::NewTableBuilder(
options.block_size_deviation);
}
auto table_builder = new BlockBasedTableBuilder(
options,
file,
flush_block_policy_factory,
compression_type);
auto table_builder =
new BlockBasedTableBuilder(options, internal_comparator, file,
flush_block_policy_factory, compression_type);
// Delete flush_block_policy_factory only when it's just created from the
// options.

@ -35,12 +35,13 @@ class BlockBasedTableFactory : public TableFactory {
const char* Name() const override { return "BlockBasedTable"; }
Status NewTableReader(const Options& options, const EnvOptions& soptions,
const InternalKeyComparator& internal_comparator,
unique_ptr<RandomAccessFile>&& file, uint64_t file_size,
unique_ptr<TableReader>* table_reader) const override;
TableBuilder* NewTableBuilder(const Options& options, WritableFile* file,
CompressionType compression_type)
const override;
TableBuilder* NewTableBuilder(
const Options& options, const InternalKeyComparator& internal_comparator,
WritableFile* file, CompressionType compression_type) const override;
private:
BlockBasedTableOptions table_options_;

@ -39,12 +39,13 @@ const size_t kMaxCacheKeyPrefixSize = kMaxVarint64Length*3+1;
using std::unique_ptr;
struct BlockBasedTable::Rep {
Rep(const EnvOptions& storage_options) :
soptions(storage_options) {
}
Rep(const EnvOptions& storage_options,
const InternalKeyComparator& internal_comparator)
: soptions(storage_options), internal_comparator_(internal_comparator) {}
Options options;
const EnvOptions& soptions;
const InternalKeyComparator& internal_comparator_;
Status status;
unique_ptr<RandomAccessFile> file;
char cache_key_prefix[kMaxCacheKeyPrefixSize];
@ -225,6 +226,7 @@ Cache::Handle* GetFromBlockCache(
Status BlockBasedTable::Open(const Options& options, const EnvOptions& soptions,
const BlockBasedTableOptions& table_options,
const InternalKeyComparator& internal_comparator,
unique_ptr<RandomAccessFile>&& file,
uint64_t file_size,
unique_ptr<TableReader>* table_reader) {
@ -236,7 +238,7 @@ Status BlockBasedTable::Open(const Options& options, const EnvOptions& soptions,
// We've successfully read the footer and the index block: we're
// ready to serve requests.
Rep* rep = new BlockBasedTable::Rep(soptions);
Rep* rep = new BlockBasedTable::Rep(soptions, internal_comparator);
rep->options = options;
rep->file = std::move(file);
rep->metaindex_handle = footer.metaindex_handle();
@ -661,7 +663,7 @@ Iterator* BlockBasedTable::BlockReader(void* arg,
Iterator* iter;
if (block != nullptr) {
iter = block->NewIterator(table->rep_->options.comparator);
iter = block->NewIterator(&(table->rep_->internal_comparator_));
if (cache_handle != nullptr) {
iter->RegisterCleanup(&ReleaseBlock, block_cache, cache_handle);
} else {
@ -734,7 +736,7 @@ BlockBasedTable::GetFilter(bool no_io) const {
// Get the iterator from the index block.
Iterator* BlockBasedTable::IndexBlockReader(const ReadOptions& options) const {
if (rep_->index_block) {
return rep_->index_block->NewIterator(rep_->options.comparator);
return rep_->index_block->NewIterator(&(rep_->internal_comparator_));
}
// get index block from cache
@ -755,7 +757,7 @@ Iterator* BlockBasedTable::IndexBlockReader(const ReadOptions& options) const {
Iterator* iter;
if (entry.value != nullptr) {
iter = entry.value->NewIterator(rep_->options.comparator);
iter = entry.value->NewIterator(&(rep_->internal_comparator_));
if (entry.cache_handle) {
iter->RegisterCleanup(
&ReleaseBlock, rep_->options.block_cache.get(), entry.cache_handle
@ -769,9 +771,9 @@ Iterator* BlockBasedTable::IndexBlockReader(const ReadOptions& options) const {
return iter;
}
Iterator* BlockBasedTable::BlockReader(void* arg,
const ReadOptions& options,
Iterator* BlockBasedTable::BlockReader(void* arg, const ReadOptions& options,
const EnvOptions& soptions,
const InternalKeyComparator& icomparator,
const Slice& index_value,
bool for_compaction) {
return BlockReader(arg, options, index_value, nullptr, for_compaction);
@ -862,20 +864,15 @@ Iterator* BlockBasedTable::NewIterator(const ReadOptions& options) {
}
}
return NewTwoLevelIterator(
IndexBlockReader(options),
&BlockBasedTable::BlockReader,
const_cast<BlockBasedTable*>(this),
options,
rep_->soptions
);
return NewTwoLevelIterator(IndexBlockReader(options),
&BlockBasedTable::BlockReader,
const_cast<BlockBasedTable*>(this), options,
rep_->soptions, rep_->internal_comparator_);
}
Status BlockBasedTable::Get(
const ReadOptions& readOptions,
const Slice& key,
void* handle_context,
bool (*result_handler)(void* handle_context, const Slice& k,
const ReadOptions& readOptions, const Slice& key, void* handle_context,
bool (*result_handler)(void* handle_context, const ParsedInternalKey& k,
const Slice& v, bool didIO),
void (*mark_key_may_exist_handler)(void* handle_context)) {
Status s;
@ -913,8 +910,13 @@ Status BlockBasedTable::Get(
// Call the *saver function on each entry/block until it returns false
for (block_iter->Seek(key); block_iter->Valid(); block_iter->Next()) {
if (!(*result_handler)(handle_context, block_iter->key(),
block_iter->value(), didIO)) {
ParsedInternalKey parsed_key;
if (!ParseInternalKey(block_iter->key(), &parsed_key)) {
s = Status::Corruption(Slice());
}
if (!(*result_handler)(handle_context, parsed_key, block_iter->value(),
didIO)) {
done = true;
break;
}
@ -931,7 +933,8 @@ Status BlockBasedTable::Get(
return s;
}
bool SaveDidIO(void* arg, const Slice& key, const Slice& value, bool didIO) {
bool SaveDidIO(void* arg, const ParsedInternalKey& key, const Slice& value,
bool didIO) {
*reinterpret_cast<bool*>(arg) = didIO;
return false;
}

@ -51,6 +51,7 @@ class BlockBasedTable : public TableReader {
// *file must remain live while this Table is in use.
static Status Open(const Options& db_options, const EnvOptions& env_options,
const BlockBasedTableOptions& table_options,
const InternalKeyComparator& internal_key_comparator,
unique_ptr<RandomAccessFile>&& file, uint64_t file_size,
unique_ptr<TableReader>* table_reader);
@ -63,10 +64,11 @@ class BlockBasedTable : public TableReader {
Status Get(const ReadOptions& readOptions, const Slice& key,
void* handle_context,
bool (*result_handler)(void* handle_context, const Slice& k,
const Slice& v, bool didIO),
void (*mark_key_may_exist_handler)(void* handle_context) = nullptr)
override;
bool (*result_handler)(void* handle_context,
const ParsedInternalKey& k, const Slice& v,
bool didIO),
void (*mark_key_may_exist_handler)(void* handle_context) =
nullptr) override;
// Given a key, return an approximate byte offset in the file where
// the data for that key begins (or would begin if the key were
@ -97,8 +99,9 @@ class BlockBasedTable : public TableReader {
bool compaction_optimized_;
static Iterator* BlockReader(void*, const ReadOptions&,
const EnvOptions& soptions, const Slice&,
bool for_compaction);
const EnvOptions& soptions,
const InternalKeyComparator& icomparator,
const Slice&, bool for_compaction);
static Iterator* BlockReader(void*, const ReadOptions&, const Slice&,
bool* didIO, bool for_compaction = false);

@ -36,6 +36,7 @@
#include <algorithm>
#include <assert.h>
#include "rocksdb/comparator.h"
#include "db/dbformat.h"
#include "util/coding.h"
namespace rocksdb {
@ -51,9 +52,8 @@ BlockBuilder::BlockBuilder(int block_restart_interval,
restarts_.push_back(0); // First restart point is at offset 0
}
BlockBuilder::BlockBuilder(const Options& options)
: BlockBuilder(options.block_restart_interval, options.comparator) {
}
BlockBuilder::BlockBuilder(const Options& options, const Comparator* comparator)
: BlockBuilder(options.block_restart_interval, comparator) {}
void BlockBuilder::Reset() {
buffer_.clear();

@ -21,7 +21,7 @@ class Comparator;
class BlockBuilder {
public:
BlockBuilder(int block_builder, const Comparator* comparator);
explicit BlockBuilder(const Options& options);
explicit BlockBuilder(const Options& options, const Comparator* comparator);
// Reset the contents as if the BlockBuilder was just constructed.
void Reset();

@ -32,9 +32,12 @@ class BlockTest {};
TEST(BlockTest, SimpleTest) {
Random rnd(301);
Options options = Options();
std::unique_ptr<InternalKeyComparator> ic;
ic.reset(new test::PlainInternalKeyComparator(options.comparator));
std::vector<std::string> keys;
std::vector<std::string> values;
BlockBuilder builder(options);
BlockBuilder builder(options, ic.get());
int num_records = 100000;
char buf[10];
char* p = &buf[0];

@ -21,11 +21,12 @@ namespace rocksdb {
static const size_t kFilterBaseLg = 11;
static const size_t kFilterBase = 1 << kFilterBaseLg;
FilterBlockBuilder::FilterBlockBuilder(const Options& opt)
: policy_(opt.filter_policy),
prefix_extractor_(opt.prefix_extractor),
whole_key_filtering_(opt.whole_key_filtering),
comparator_(opt.comparator){}
FilterBlockBuilder::FilterBlockBuilder(const Options& opt,
const Comparator* internal_comparator)
: policy_(opt.filter_policy),
prefix_extractor_(opt.prefix_extractor),
whole_key_filtering_(opt.whole_key_filtering),
comparator_(internal_comparator) {}
void FilterBlockBuilder::StartBlock(uint64_t block_offset) {
uint64_t filter_index = (block_offset / kFilterBase);

@ -35,7 +35,8 @@ class FilterPolicy;
// (StartBlock AddKey*)* Finish
class FilterBlockBuilder {
public:
explicit FilterBlockBuilder(const Options& opt);
explicit FilterBlockBuilder(const Options& opt,
const Comparator* internal_comparator);
void StartBlock(uint64_t block_offset);
void AddKey(const Slice& key);

@ -55,7 +55,7 @@ class FilterBlockTest {
};
TEST(FilterBlockTest, EmptyBuilder) {
FilterBlockBuilder builder(options_);
FilterBlockBuilder builder(options_, options_.comparator);
Slice block = builder.Finish();
ASSERT_EQ("\\x00\\x00\\x00\\x00\\x0b", EscapeString(block));
FilterBlockReader reader(options_, block);
@ -64,7 +64,7 @@ TEST(FilterBlockTest, EmptyBuilder) {
}
TEST(FilterBlockTest, SingleChunk) {
FilterBlockBuilder builder(options_);
FilterBlockBuilder builder(options_, options_.comparator);
builder.StartBlock(100);
builder.AddKey("foo");
builder.AddKey("bar");
@ -85,7 +85,7 @@ TEST(FilterBlockTest, SingleChunk) {
}
TEST(FilterBlockTest, MultiChunk) {
FilterBlockBuilder builder(options_);
FilterBlockBuilder builder(options_, options_.comparator);
// First filter
builder.StartBlock(0);

@ -11,6 +11,8 @@
#include "rocksdb/env.h"
#include "rocksdb/filter_policy.h"
#include "rocksdb/options.h"
#include "table/plain_table_factory.h"
#include "db/dbformat.h"
#include "table/block_builder.h"
#include "table/filter_block.h"
#include "table/format.h"
@ -67,20 +69,32 @@ PlainTableBuilder::~PlainTableBuilder() {
}
void PlainTableBuilder::Add(const Slice& key, const Slice& value) {
assert(user_key_len_ == 0 || key.size() == user_key_len_ + 8);
size_t user_key_size = key.size() - 8;
assert(user_key_len_ == 0 || user_key_size == user_key_len_);
if (!IsFixedLength()) {
// Write key length
int key_size = key.size();
key_size_str_.clear();
PutVarint32(&key_size_str_, key_size);
PutVarint32(&key_size_str_, user_key_size);
file_->Append(key_size_str_);
offset_ += key_size_str_.length();
}
// Write key
file_->Append(key);
offset_ += key.size();
ParsedInternalKey parsed_key;
if (!ParseInternalKey(key, &parsed_key)) {
status_ = Status::Corruption(Slice());
return;
}
if (parsed_key.sequence == 0 && parsed_key.type == kTypeValue) {
file_->Append(Slice(key.data(), user_key_size));
char tmp_char = PlainTableFactory::kValueTypeSeqId0;
file_->Append(Slice(&tmp_char, 1));
offset_ += key.size() - 7;
} else {
file_->Append(key);
offset_ += key.size();
}
// Write value length
value_size_str_.clear();
@ -105,9 +119,7 @@ void PlainTableBuilder::Add(const Slice& key, const Slice& value) {
);
}
Status PlainTableBuilder::status() const {
return Status::OK();
}
Status PlainTableBuilder::status() const { return status_; }
Status PlainTableBuilder::Finish() {
assert(!closed_);

@ -6,6 +6,7 @@
#include <memory>
#include <stdint.h>
#include "db/dbformat.h"
#include "table/plain_table_builder.h"
#include "table/plain_table_reader.h"
#include "port/port.h"
@ -14,16 +15,18 @@ namespace rocksdb {
Status PlainTableFactory::NewTableReader(const Options& options,
const EnvOptions& soptions,
const InternalKeyComparator& icomp,
unique_ptr<RandomAccessFile>&& file,
uint64_t file_size,
unique_ptr<TableReader>* table) const {
return PlainTableReader::Open(options, soptions, std::move(file), file_size,
table, bloom_bits_per_key_, hash_table_ratio_);
return PlainTableReader::Open(options, soptions, icomp, std::move(file),
file_size, table, bloom_bits_per_key_,
hash_table_ratio_);
}
TableBuilder* PlainTableFactory::NewTableBuilder(
const Options& options, WritableFile* file,
CompressionType compression_type) const {
const Options& options, const InternalKeyComparator& internal_comparator,
WritableFile* file, CompressionType compression_type) const {
return new PlainTableBuilder(options, file, user_key_len_);
}

@ -57,12 +57,16 @@ class PlainTableFactory : public TableFactory {
hash_table_ratio_(hash_table_ratio) {}
const char* Name() const override { return "PlainTable"; }
Status NewTableReader(const Options& options, const EnvOptions& soptions,
const InternalKeyComparator& internal_comparator,
unique_ptr<RandomAccessFile>&& file, uint64_t file_size,
unique_ptr<TableReader>* table) const override;
TableBuilder* NewTableBuilder(const Options& options,
const InternalKeyComparator& icomparator,
WritableFile* file,
CompressionType compression_type) const
override;
TableBuilder* NewTableBuilder(const Options& options, WritableFile* file,
CompressionType compression_type)
const override;
static const char kValueTypeSeqId0 = 0xFF;
private:
uint32_t user_key_len_;

@ -4,8 +4,7 @@
#include "table/plain_table_reader.h"
#include <unordered_map>
#include <map>
#include <string>
#include "db/dbformat.h"
@ -77,6 +76,7 @@ class PlainTableIterator : public Iterator {
Slice key_;
Slice value_;
Status status_;
std::string tmp_str_;
// No copying allowed
PlainTableIterator(const PlainTableIterator&) = delete;
void operator=(const Iterator&) = delete;
@ -84,10 +84,12 @@ class PlainTableIterator : public Iterator {
extern const uint64_t kPlainTableMagicNumber;
PlainTableReader::PlainTableReader(const EnvOptions& storage_options,
const InternalKeyComparator& icomparator,
uint64_t file_size, int bloom_bits_per_key,
double hash_table_ratio,
const TableProperties& table_properties)
: soptions_(storage_options),
internal_comparator_(icomparator),
file_size_(file_size),
kHashTableRatio(hash_table_ratio),
kBloomBitsPerKey(bloom_bits_per_key),
@ -103,6 +105,7 @@ PlainTableReader::~PlainTableReader() {
Status PlainTableReader::Open(const Options& options,
const EnvOptions& soptions,
const InternalKeyComparator& internal_comparator,
unique_ptr<RandomAccessFile>&& file,
uint64_t file_size,
unique_ptr<TableReader>* table_reader,
@ -122,9 +125,9 @@ Status PlainTableReader::Open(const Options& options,
return s;
}
std::unique_ptr<PlainTableReader> new_reader(
new PlainTableReader(soptions, file_size, bloom_bits_per_key,
hash_table_ratio, table_properties));
std::unique_ptr<PlainTableReader> new_reader(new PlainTableReader(
soptions, internal_comparator, file_size, bloom_bits_per_key,
hash_table_ratio, table_properties));
new_reader->file_ = std::move(file);
new_reader->options_ = options;
@ -215,10 +218,10 @@ int PlainTableReader::PopulateIndexRecordList(IndexRecordList* record_list) {
int num_prefixes = 0;
while (pos < data_end_offset_) {
uint32_t key_offset = pos;
Slice key_slice;
ParsedInternalKey key;
Slice value_slice;
status_ = Next(pos, &key_slice, &value_slice, pos);
Slice key_prefix_slice = GetPrefix(key_slice);
status_ = Next(pos, &key, &value_slice, pos);
Slice key_prefix_slice = GetPrefix(key);
if (is_first_record || prev_key_prefix_slice != key_prefix_slice) {
++num_prefixes;
@ -413,7 +416,11 @@ Status PlainTableReader::GetOffset(const Slice& target, const Slice& prefix,
index_ptr + 4,
&upper_bound);
uint32_t high = upper_bound;
Slice mid_key;
ParsedInternalKey mid_key;
ParsedInternalKey parsed_target;
if (!ParseInternalKey(target, &parsed_target)) {
return Status::Corruption(Slice());
}
// The key is between [low, high). Do a binary search between it.
while (high - low > 1) {
@ -424,8 +431,8 @@ Status PlainTableReader::GetOffset(const Slice& target, const Slice& prefix,
if (!s.ok()) {
return s;
}
int cmp_result = options_.comparator->Compare(target, mid_key);
if (cmp_result > 0) {
int cmp_result = internal_comparator_.Compare(mid_key, parsed_target);
if (cmp_result < 0) {
low = mid;
} else {
if (cmp_result == 0) {
@ -442,7 +449,7 @@ Status PlainTableReader::GetOffset(const Slice& target, const Slice& prefix,
// Both of the key at the position low or low+1 could share the same
// prefix as target. We need to rule out one of them to avoid to go
// to the wrong prefix.
Slice low_key;
ParsedInternalKey low_key;
size_t tmp;
uint32_t low_key_offset = base_ptr[low];
Status s = ReadKey(file_data_.data() + low_key_offset, &low_key, tmp);
@ -465,31 +472,53 @@ bool PlainTableReader::MayHavePrefix(uint32_t hash) {
return bloom_ == nullptr || bloom_->MayContainHash(hash);
}
Status PlainTableReader::ReadKey(const char* row_ptr, Slice* key,
Slice PlainTableReader::GetPrefix(const ParsedInternalKey& target) {
return options_.prefix_extractor->Transform(target.user_key);
}
Status PlainTableReader::ReadKey(const char* row_ptr, ParsedInternalKey* key,
size_t& bytes_read) {
const char* key_ptr = nullptr;
bytes_read = 0;
size_t internal_key_size = 0;
size_t user_key_size = 0;
if (IsFixedLength()) {
internal_key_size = GetFixedInternalKeyLength();
user_key_size = user_key_len_;
key_ptr = row_ptr;
} else {
uint32_t key_size = 0;
uint32_t tmp_size = 0;
key_ptr = GetVarint32Ptr(row_ptr, file_data_.data() + data_end_offset_,
&key_size);
internal_key_size = (size_t)key_size;
&tmp_size);
if (key_ptr == nullptr) {
return Status::Corruption("Unable to read the next key");
}
user_key_size = (size_t)tmp_size;
bytes_read = key_ptr - row_ptr;
}
if (row_ptr + internal_key_size >= file_data_.data() + data_end_offset_) {
if (key_ptr + user_key_size + 1 >= file_data_.data() + data_end_offset_) {
return Status::Corruption("Unable to read the next key");
}
*key = Slice(key_ptr, internal_key_size);
bytes_read += internal_key_size;
if (*(key_ptr + user_key_size) == PlainTableFactory::kValueTypeSeqId0) {
// Special encoding for the row with seqID=0
key->user_key = Slice(key_ptr, user_key_size);
key->sequence = 0;
key->type = kTypeValue;
bytes_read += user_key_size + 1;
} else {
if (row_ptr + user_key_size + 8 >= file_data_.data() + data_end_offset_) {
return Status::Corruption("Unable to read the next key");
}
if (!ParseInternalKey(Slice(key_ptr, user_key_size + 8), key)) {
return Status::Corruption(Slice());
}
bytes_read += user_key_size + 8;
}
return Status::OK();
}
Status PlainTableReader::Next(uint32_t offset, Slice* key, Slice* value,
uint32_t& next_offset) {
Status PlainTableReader::Next(uint32_t offset, ParsedInternalKey* key,
Slice* value, uint32_t& next_offset) {
if (offset == data_end_offset_) {
next_offset = data_end_offset_;
return Status::OK();
@ -518,10 +547,11 @@ Status PlainTableReader::Next(uint32_t offset, Slice* key, Slice* value,
return Status::OK();
}
Status PlainTableReader::Get(
const ReadOptions& ro, const Slice& target, void* arg,
bool (*saver)(void*, const Slice&, const Slice&, bool),
void (*mark_key_may_exist)(void*)) {
Status PlainTableReader::Get(const ReadOptions& ro, const Slice& target,
void* arg,
bool (*saver)(void*, const ParsedInternalKey&,
const Slice&, bool),
void (*mark_key_may_exist)(void*)) {
// Check bloom filter first.
Slice prefix_slice = GetPrefix(target);
uint32_t prefix_hash = GetSliceHash(prefix_slice);
@ -534,7 +564,12 @@ Status PlainTableReader::Get(
if (!s.ok()) {
return s;
}
Slice found_key;
ParsedInternalKey found_key;
ParsedInternalKey parsed_target;
if (!ParseInternalKey(target, &parsed_target)) {
return Status::Corruption(Slice());
}
Slice found_value;
while (offset < data_end_offset_) {
Status s = Next(offset, &found_key, &found_value, offset);
@ -549,9 +584,10 @@ Status PlainTableReader::Get(
}
prefix_match = true;
}
if (options_.comparator->Compare(found_key, target) >= 0
&& !(*saver)(arg, found_key, found_value, true)) {
break;
if (internal_comparator_.Compare(found_key, parsed_target) >= 0) {
if (!(*saver)(arg, found_key, found_value, true)) {
break;
}
}
}
return Status::OK();
@ -612,7 +648,7 @@ void PlainTableIterator::Seek(const Slice& target) {
}
prefix_match = true;
}
if (table_->options_.comparator->Compare(key(), target) >= 0) {
if (table_->internal_comparator_.Compare(key(), target) >= 0) {
break;
}
}
@ -623,8 +659,19 @@ void PlainTableIterator::Seek(const Slice& target) {
void PlainTableIterator::Next() {
offset_ = next_offset_;
Slice tmp_slice;
status_ = table_->Next(next_offset_, &key_, &value_, next_offset_);
if (offset_ < table_->data_end_offset_) {
Slice tmp_slice;
ParsedInternalKey parsed_key;
status_ = table_->Next(next_offset_, &parsed_key, &value_, next_offset_);
if (status_.ok()) {
// Make a copy in this case. TODO optimize.
tmp_str_.clear();
AppendInternalKey(&tmp_str_, parsed_key);
key_ = Slice(tmp_str_);
} else {
offset_ = next_offset_ = table_->data_end_offset_;
}
}
}
void PlainTableIterator::Prev() {
@ -632,10 +679,12 @@ void PlainTableIterator::Prev() {
}
Slice PlainTableIterator::key() const {
assert(Valid());
return key_;
}
Slice PlainTableIterator::value() const {
assert(Valid());
return value_;
}

@ -6,8 +6,10 @@
#include <unordered_map>
#include <memory>
#include <vector>
#include <string>
#include <stdint.h>
#include "db/dbformat.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/slice_transform.h"
@ -27,6 +29,7 @@ struct ReadOptions;
class TableCache;
class TableReader;
class DynamicBloom;
class InternalKeyComparator;
using std::unique_ptr;
using std::unordered_map;
@ -43,6 +46,7 @@ extern const uint32_t kPlainTableVariableLength;
class PlainTableReader: public TableReader {
public:
static Status Open(const Options& options, const EnvOptions& soptions,
const InternalKeyComparator& internal_comparator,
unique_ptr<RandomAccessFile>&& file, uint64_t file_size,
unique_ptr<TableReader>* table,
const int bloom_bits_per_key, double hash_table_ratio);
@ -51,10 +55,10 @@ class PlainTableReader: public TableReader {
Iterator* NewIterator(const ReadOptions&);
Status Get(
const ReadOptions&, const Slice& key, void* arg,
bool (*handle_result)(void* arg, const Slice& k, const Slice& v, bool),
void (*mark_key_may_exist)(void*) = nullptr);
Status Get(const ReadOptions&, const Slice& key, void* arg,
bool (*result_handler)(void* arg, const ParsedInternalKey& k,
const Slice& v, bool),
void (*mark_key_may_exist)(void*) = nullptr);
uint64_t ApproximateOffsetOf(const Slice& key);
@ -62,8 +66,10 @@ class PlainTableReader: public TableReader {
const TableProperties& GetTableProperties() { return table_properties_; }
PlainTableReader(const EnvOptions& storage_options, uint64_t file_size,
int bloom_bits_per_key, double hash_table_ratio,
PlainTableReader(const EnvOptions& storage_options,
const InternalKeyComparator& internal_comparator,
uint64_t file_size, int bloom_num_bits,
double hash_table_ratio,
const TableProperties& table_properties);
~PlainTableReader();
@ -77,6 +83,7 @@ class PlainTableReader: public TableReader {
Options options_;
const EnvOptions& soptions_;
const InternalKeyComparator internal_comparator_;
Status status_;
unique_ptr<RandomAccessFile> file_;
@ -184,11 +191,13 @@ class PlainTableReader: public TableReader {
// too.
bool MayHavePrefix(uint32_t hash);
Status ReadKey(const char* row_ptr, Slice* key, size_t& bytes_read);
Status ReadKey(const char* row_ptr, ParsedInternalKey* key,
size_t& bytes_read);
// Read the key and value at offset to key and value.
// tmp_slice is a tmp slice.
// return next_offset as the offset for the next key.
Status Next(uint32_t offset, Slice* key, Slice* value, uint32_t& next_offset);
Status Next(uint32_t offset, ParsedInternalKey* key, Slice* value,
uint32_t& next_offset);
// Get file offset for key target.
// return value prefix_matched is set to true if the offset is confirmed
// for a key with the same prefix as target.
@ -202,6 +211,8 @@ class PlainTableReader: public TableReader {
Slice(target.data(), target.size() - 8));
}
Slice GetPrefix(const ParsedInternalKey& target);
// No copying allowed
explicit PlainTableReader(const TableReader&) = delete;
void operator=(const TableReader&) = delete;

@ -53,6 +53,7 @@ class TableFactory {
// table_reader is the output table reader
virtual Status NewTableReader(
const Options& options, const EnvOptions& soptions,
const InternalKeyComparator& internal_comparator,
unique_ptr<RandomAccessFile>&& file, uint64_t file_size,
unique_ptr<TableReader>* table_reader) const = 0;
@ -75,8 +76,8 @@ class TableFactory {
// keep the file open and close the file after closing the table builder.
// compression_type is the compression type to use in this table.
virtual TableBuilder* NewTableBuilder(
const Options& options, WritableFile* file,
CompressionType compression_type) const = 0;
const Options& options, const InternalKeyComparator& internal_comparator,
WritableFile* file, CompressionType compression_type) const = 0;
};
} // namespace rocksdb

@ -12,6 +12,7 @@
namespace rocksdb {
class Iterator;
class ParsedInternalKey;
class Slice;
struct ReadOptions;
struct TableProperties;
@ -62,7 +63,7 @@ class TableReader {
// key is the key to search for
virtual Status Get(
const ReadOptions& readOptions, const Slice& key, void* handle_context,
bool (*result_handler)(void* handle_context, const Slice& k,
bool (*result_handler)(void* arg, const ParsedInternalKey& k,
const Slice& v, bool didIO),
void (*mark_key_may_exist_handler)(void* handle_context) = nullptr) = 0;
};

@ -34,8 +34,8 @@ static std::string MakeKey(int i, int j, bool through_db) {
return key.Encode().ToString();
}
static bool DummySaveValue(void* arg, const Slice& ikey, const Slice& v,
bool didIO) {
static bool DummySaveValue(void* arg, const ParsedInternalKey& ikey,
const Slice& v, bool didIO) {
return false;
}
@ -237,6 +237,8 @@ int main(int argc, char** argv) {
rocksdb::EnvOptions env_options;
options.create_if_missing = true;
options.compression = rocksdb::CompressionType::kNoCompression;
options.internal_comparator =
new rocksdb::InternalKeyComparator(options.comparator);
if (FLAGS_plain_table) {
options.allow_mmap_reads = true;

@ -183,8 +183,9 @@ class Constructor {
// been added so far. Returns the keys in sorted order in "*keys"
// and stores the key/value pairs in "*kvmap"
void Finish(const Options& options,
std::vector<std::string>* keys,
KVMap* kvmap) {
const InternalKeyComparator& internal_comparator,
std::vector<std::string>* keys, KVMap* kvmap) {
last_internal_key_ = &internal_comparator;
*kvmap = data_;
keys->clear();
for (KVMap::const_iterator it = data_.begin();
@ -193,12 +194,14 @@ class Constructor {
keys->push_back(it->first);
}
data_.clear();
Status s = FinishImpl(options, *kvmap);
Status s = FinishImpl(options, internal_comparator, *kvmap);
ASSERT_TRUE(s.ok()) << s.ToString();
}
// Construct the data structure from the data in "data"
virtual Status FinishImpl(const Options& options, const KVMap& data) = 0;
virtual Status FinishImpl(const Options& options,
const InternalKeyComparator& internal_comparator,
const KVMap& data) = 0;
virtual Iterator* NewIterator() const = 0;
@ -206,6 +209,9 @@ class Constructor {
virtual DB* db() const { return nullptr; } // Overridden in DBConstructor
protected:
const InternalKeyComparator* last_internal_key_;
private:
KVMap data_;
};
@ -219,10 +225,12 @@ class BlockConstructor: public Constructor {
~BlockConstructor() {
delete block_;
}
virtual Status FinishImpl(const Options& options, const KVMap& data) {
virtual Status FinishImpl(const Options& options,
const InternalKeyComparator& internal_comparator,
const KVMap& data) {
delete block_;
block_ = nullptr;
BlockBuilder builder(options);
BlockBuilder builder(options, &internal_comparator);
for (KVMap::const_iterator it = data.begin();
it != data.end();
@ -298,12 +306,14 @@ class TableConstructor: public Constructor {
: Constructor(cmp), convert_to_internal_key_(convert_to_internal_key) {}
~TableConstructor() { Reset(); }
virtual Status FinishImpl(const Options& options, const KVMap& data) {
virtual Status FinishImpl(const Options& options,
const InternalKeyComparator& internal_comparator,
const KVMap& data) {
Reset();
sink_.reset(new StringSink());
unique_ptr<TableBuilder> builder;
builder.reset(options.table_factory->NewTableBuilder(options, sink_.get(),
options.compression));
builder.reset(options.table_factory->NewTableBuilder(
options, internal_comparator, sink_.get(), options.compression));
for (KVMap::const_iterator it = data.begin();
it != data.end();
@ -328,8 +338,8 @@ class TableConstructor: public Constructor {
source_.reset(new StringSource(sink_->contents(), uniq_id_,
options.allow_mmap_reads));
return options.table_factory->NewTableReader(
options, soptions, std::move(source_), sink_->contents().size(),
&table_reader_);
options, soptions, internal_comparator, std::move(source_),
sink_->contents().size(), &table_reader_);
}
virtual Iterator* NewIterator() const {
@ -350,8 +360,8 @@ class TableConstructor: public Constructor {
new StringSource(sink_->contents(), uniq_id_,
options.allow_mmap_reads));
return options.table_factory->NewTableReader(
options, soptions, std::move(source_), sink_->contents().size(),
&table_reader_);
options, soptions, *last_internal_key_, std::move(source_),
sink_->contents().size(), &table_reader_);
}
virtual TableReader* table_reader() {
@ -393,7 +403,9 @@ class MemTableConstructor: public Constructor {
~MemTableConstructor() {
delete memtable_->Unref();
}
virtual Status FinishImpl(const Options& options, const KVMap& data) {
virtual Status FinishImpl(const Options& options,
const InternalKeyComparator& internal_comparator,
const KVMap& data) {
delete memtable_->Unref();
Options memtable_options;
memtable_options.memtable_factory = table_factory_;
@ -429,7 +441,9 @@ class DBConstructor: public Constructor {
~DBConstructor() {
delete db_;
}
virtual Status FinishImpl(const Options& options, const KVMap& data) {
virtual Status FinishImpl(const Options& options,
const InternalKeyComparator& internal_comparator,
const KVMap& data) {
delete db_;
db_ = nullptr;
NewDB();
@ -619,7 +633,10 @@ class Harness {
if (args.reverse_compare) {
options_.comparator = &reverse_key_comparator;
}
internal_comparator_.reset(new InternalKeyComparator(options_.comparator));
internal_comparator_.reset(
new test::PlainInternalKeyComparator(options_.comparator));
support_prev_ = true;
only_support_prefix_seek_ = false;
BlockBasedTableOptions table_options;
@ -638,7 +655,8 @@ class Harness {
options_.allow_mmap_reads = true;
options_.table_factory.reset(new PlainTableFactory());
constructor_ = new TableConstructor(options_.comparator, true);
options_.comparator = internal_comparator_.get();
internal_comparator_.reset(
new InternalKeyComparator(options_.comparator));
break;
case PLAIN_TABLE_FULL_STR_PREFIX:
support_prev_ = false;
@ -647,7 +665,8 @@ class Harness {
options_.allow_mmap_reads = true;
options_.table_factory.reset(new PlainTableFactory());
constructor_ = new TableConstructor(options_.comparator, true);
options_.comparator = internal_comparator_.get();
internal_comparator_.reset(
new InternalKeyComparator(options_.comparator));
break;
case BLOCK_TEST:
constructor_ = new BlockConstructor(options_.comparator);
@ -672,7 +691,7 @@ class Harness {
void Test(Random* rnd) {
std::vector<std::string> keys;
KVMap data;
constructor_->Finish(options_, &keys, &data);
constructor_->Finish(options_, *internal_comparator_, &keys, &data);
TestForwardScan(keys, data);
if (support_prev_) {
@ -844,7 +863,7 @@ class Harness {
Constructor* constructor_;
bool support_prev_;
bool only_support_prefix_seek_;
shared_ptr<Comparator> internal_comparator_;
shared_ptr<InternalKeyComparator> internal_comparator_;
static std::unique_ptr<const SliceTransform> noop_transform;
static std::unique_ptr<const SliceTransform> prefix_transform;
};
@ -866,9 +885,24 @@ static bool Between(uint64_t val, uint64_t low, uint64_t high) {
}
// Tests against all kinds of tables
class GeneralTableTest {};
class BlockBasedTableTest {};
class PlainTableTest {};
class TableTest {
public:
const InternalKeyComparator& GetPlainInternalComparator(
const Comparator* comp) {
if (!plain_internal_comparator) {
plain_internal_comparator.reset(
new test::PlainInternalKeyComparator(comp));
}
return *plain_internal_comparator;
}
private:
std::unique_ptr<InternalKeyComparator> plain_internal_comparator;
};
class GeneralTableTest : public TableTest {};
class BlockBasedTableTest : public TableTest {};
class PlainTableTest : public TableTest {};
// This test include all the basic checks except those for index size and block
// size, which will be conducted in separated unit tests.
@ -891,7 +925,8 @@ TEST(BlockBasedTableTest, BasicBlockBasedTableProperties) {
options.compression = kNoCompression;
options.block_restart_interval = 1;
c.Finish(options, &keys, &kvmap);
c.Finish(options, GetPlainInternalComparator(options.comparator), &keys,
&kvmap);
auto& props = c.table_reader()->GetTableProperties();
ASSERT_EQ(kvmap.size(), props.num_entries);
@ -905,7 +940,7 @@ TEST(BlockBasedTableTest, BasicBlockBasedTableProperties) {
ASSERT_EQ("", props.filter_policy_name); // no filter policy is used
// Verify data size.
BlockBuilder block_builder(options);
BlockBuilder block_builder(options, options.comparator);
for (const auto& item : kvmap) {
block_builder.Add(item.first, item.second);
}
@ -927,7 +962,8 @@ TEST(BlockBasedTableTest, FilterPolicyNameProperties) {
);
options.filter_policy = filter_policy.get();
c.Finish(options, &keys, &kvmap);
c.Finish(options, GetPlainInternalComparator(options.comparator), &keys,
&kvmap);
auto& props = c.table_reader()->GetTableProperties();
ASSERT_EQ("rocksdb.BuiltinBloomFilter", props.filter_policy_name);
}
@ -968,7 +1004,8 @@ TEST(BlockBasedTableTest, IndexSizeStat) {
options.compression = kNoCompression;
options.block_restart_interval = 1;
c.Finish(options, &ks, &kvmap);
c.Finish(options, GetPlainInternalComparator(options.comparator), &ks,
&kvmap);
auto index_size =
c.table_reader()->GetTableProperties().index_size;
ASSERT_GT(index_size, last_index_size);
@ -992,7 +1029,8 @@ TEST(BlockBasedTableTest, NumBlockStat) {
std::vector<std::string> ks;
KVMap kvmap;
c.Finish(options, &ks, &kvmap);
c.Finish(options, GetPlainInternalComparator(options.comparator), &ks,
&kvmap);
ASSERT_EQ(
kvmap.size(),
c.table_reader()->GetTableProperties().num_data_blocks
@ -1055,7 +1093,8 @@ TEST(BlockBasedTableTest, BlockCacheTest) {
TableConstructor c(BytewiseComparator());
c.Add("key", "value");
c.Finish(options, &keys, &kvmap);
c.Finish(options, GetPlainInternalComparator(options.comparator), &keys,
&kvmap);
// -- PART 1: Open with regular block cache.
// Since block_cache is disabled, no cache activities will be involved.
@ -1179,6 +1218,8 @@ TEST(BlockBasedTableTest, BlockCacheLeak) {
// unique ID from the file.
Options opt;
unique_ptr<InternalKeyComparator> ikc;
ikc.reset(new test::PlainInternalKeyComparator(opt.comparator));
opt.block_size = 1024;
opt.compression = kNoCompression;
opt.block_cache =
@ -1195,7 +1236,7 @@ TEST(BlockBasedTableTest, BlockCacheLeak) {
c.Add("k07", std::string(100000, 'x'));
std::vector<std::string> keys;
KVMap kvmap;
c.Finish(opt, &keys, &kvmap);
c.Finish(opt, *ikc, &keys, &kvmap);
unique_ptr<Iterator> iter(c.NewIterator());
iter->SeekToFirst();
@ -1217,11 +1258,14 @@ extern const uint64_t kPlainTableMagicNumber;
TEST(PlainTableTest, BasicPlainTableProperties) {
PlainTableFactory factory(8, 8, 0);
StringSink sink;
Options options;
InternalKeyComparator ikc(options.comparator);
std::unique_ptr<TableBuilder> builder(
factory.NewTableBuilder(Options(), &sink, kNoCompression));
factory.NewTableBuilder(options, ikc, &sink, kNoCompression));
for (char c = 'a'; c <= 'z'; ++c) {
std::string key(16, c);
std::string key(8, c);
key.append("\1 "); // PlainTable expects internal key structure
std::string value(28, c + 42);
builder->Add(key, value);
}
@ -1255,9 +1299,10 @@ TEST(GeneralTableTest, ApproximateOffsetOfPlain) {
std::vector<std::string> keys;
KVMap kvmap;
Options options;
test::PlainInternalKeyComparator internal_comparator(options.comparator);
options.block_size = 1024;
options.compression = kNoCompression;
c.Finish(options, &keys, &kvmap);
c.Finish(options, internal_comparator, &keys, &kvmap);
ASSERT_TRUE(Between(c.ApproximateOffsetOf("abc"), 0, 0));
ASSERT_TRUE(Between(c.ApproximateOffsetOf("k01"), 0, 0));
@ -1284,9 +1329,10 @@ static void DoCompressionTest(CompressionType comp) {
std::vector<std::string> keys;
KVMap kvmap;
Options options;
test::PlainInternalKeyComparator ikc(options.comparator);
options.block_size = 1024;
options.compression = comp;
c.Finish(options, &keys, &kvmap);
c.Finish(options, ikc, &keys, &kvmap);
ASSERT_TRUE(Between(c.ApproximateOffsetOf("abc"), 0, 0));
ASSERT_TRUE(Between(c.ApproximateOffsetOf("k01"), 0, 0));

@ -20,18 +20,17 @@ namespace rocksdb {
namespace {
typedef Iterator* (*BlockFunction)(void*, const ReadOptions&,
const EnvOptions& soptions, const Slice&,
bool for_compaction);
const EnvOptions& soptions,
const InternalKeyComparator& icomparator,
const Slice&, bool for_compaction);
class TwoLevelIterator: public Iterator {
public:
TwoLevelIterator(
Iterator* index_iter,
BlockFunction block_function,
void* arg,
const ReadOptions& options,
const EnvOptions& soptions,
bool for_compaction);
TwoLevelIterator(Iterator* index_iter, BlockFunction block_function,
void* arg, const ReadOptions& options,
const EnvOptions& soptions,
const InternalKeyComparator& internal_comparator,
bool for_compaction);
virtual ~TwoLevelIterator();
@ -76,6 +75,7 @@ class TwoLevelIterator: public Iterator {
void* arg_;
const ReadOptions options_;
const EnvOptions& soptions_;
const InternalKeyComparator& internal_comparator_;
Status status_;
IteratorWrapper index_iter_;
IteratorWrapper data_iter_; // May be nullptr
@ -86,20 +86,17 @@ class TwoLevelIterator: public Iterator {
};
TwoLevelIterator::TwoLevelIterator(
Iterator* index_iter,
BlockFunction block_function,
void* arg,
const ReadOptions& options,
const EnvOptions& soptions,
bool for_compaction)
Iterator* index_iter, BlockFunction block_function, void* arg,
const ReadOptions& options, const EnvOptions& soptions,
const InternalKeyComparator& internal_comparator, bool for_compaction)
: block_function_(block_function),
arg_(arg),
options_(options),
soptions_(soptions),
internal_comparator_(internal_comparator),
index_iter_(index_iter),
data_iter_(nullptr),
for_compaction_(for_compaction) {
}
for_compaction_(for_compaction) {}
TwoLevelIterator::~TwoLevelIterator() {
}
@ -181,8 +178,9 @@ void TwoLevelIterator::InitDataBlock() {
// data_iter_ is already constructed with this iterator, so
// no need to change anything
} else {
Iterator* iter = (*block_function_)(arg_, options_, soptions_, handle,
for_compaction_);
Iterator* iter =
(*block_function_)(arg_, options_, soptions_, internal_comparator_,
handle, for_compaction_);
data_block_handle_.assign(handle.data(), handle.size());
SetDataIterator(iter);
}
@ -191,15 +189,14 @@ void TwoLevelIterator::InitDataBlock() {
} // namespace
Iterator* NewTwoLevelIterator(
Iterator* index_iter,
BlockFunction block_function,
void* arg,
const ReadOptions& options,
const EnvOptions& soptions,
bool for_compaction) {
return new TwoLevelIterator(index_iter, block_function, arg,
options, soptions, for_compaction);
Iterator* NewTwoLevelIterator(Iterator* index_iter,
BlockFunction block_function, void* arg,
const ReadOptions& options,
const EnvOptions& soptions,
const InternalKeyComparator& internal_comparator,
bool for_compaction) {
return new TwoLevelIterator(index_iter, block_function, arg, options,
soptions, internal_comparator, for_compaction);
}
} // namespace rocksdb

@ -14,6 +14,7 @@
namespace rocksdb {
struct ReadOptions;
class InternalKeyComparator;
// Return a new two level iterator. A two-level iterator contains an
// index iterator whose values point to a sequence of blocks where
@ -27,14 +28,11 @@ struct ReadOptions;
extern Iterator* NewTwoLevelIterator(
Iterator* index_iter,
Iterator* (*block_function)(
void* arg,
const ReadOptions& options,
const EnvOptions& soptions,
const Slice& index_value,
bool for_compaction),
void* arg,
const ReadOptions& options,
const EnvOptions& soptions,
void* arg, const ReadOptions& options, const EnvOptions& soptions,
const InternalKeyComparator& internal_comparator,
const Slice& index_value, bool for_compaction),
void* arg, const ReadOptions& options, const EnvOptions& soptions,
const InternalKeyComparator& internal_comparator,
bool for_compaction = false);
} // namespace rocksdb

@ -71,7 +71,6 @@ SstFileReader::SstFileReader(const std::string& file_path,
}
Status SstFileReader::NewTableReader(const std::string& file_path) {
table_options_.comparator = &internal_comparator_;
Status s = table_options_.env->NewRandomAccessFile(file_path, &file_,
soptions_);
if (!s.ok()) {
@ -81,7 +80,8 @@ Status SstFileReader::NewTableReader(const std::string& file_path) {
table_options_.env->GetFileSize(file_path, &file_size);
unique_ptr<TableFactory> table_factory;
s = table_options_.table_factory->NewTableReader(
table_options_, soptions_, std::move(file_), file_size, &table_reader_);
table_options_, soptions_, internal_comparator_, std::move(file_),
file_size, &table_reader_);
return s;
}

@ -8,6 +8,8 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <string>
#include "db/dbformat.h"
#include "rocksdb/env.h"
#include "rocksdb/slice.h"
#include "util/random.h"
@ -51,5 +53,28 @@ class ErrorEnv : public EnvWrapper {
}
};
// An internal comparator that just forward comparing results from the
// user comparator in it. Can be used to test entities that have no dependency
// on internal key structure but consumes InternalKeyComparator, like
// BlockBasedTable.
class PlainInternalKeyComparator : public InternalKeyComparator {
public:
explicit PlainInternalKeyComparator(const Comparator* c)
: InternalKeyComparator(c) {}
virtual ~PlainInternalKeyComparator() {}
virtual int Compare(const Slice& a, const Slice& b) const override {
return user_comparator()->Compare(a, b);
}
virtual void FindShortestSeparator(std::string* start,
const Slice& limit) const override {
user_comparator()->FindShortestSeparator(start, limit);
}
virtual void FindShortSuccessor(std::string* key) const override {
user_comparator()->FindShortSuccessor(key);
}
};
} // namespace test
} // namespace rocksdb

Loading…
Cancel
Save