diff --git a/db/builder.cc b/db/builder.cc index 3b51bf88e..96fb29eef 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -26,20 +26,18 @@ namespace rocksdb { class TableFactory; -TableBuilder* NewTableBuilder(const Options& options, WritableFile* file, +TableBuilder* NewTableBuilder(const Options& options, + const InternalKeyComparator& internal_comparator, + WritableFile* file, CompressionType compression_type) { - return options.table_factory->NewTableBuilder(options, file, - compression_type); + return options.table_factory->NewTableBuilder(options, internal_comparator, + file, compression_type); } -Status BuildTable(const std::string& dbname, - Env* env, - const Options& options, - const EnvOptions& soptions, - TableCache* table_cache, - Iterator* iter, - FileMetaData* meta, - const Comparator* user_comparator, +Status BuildTable(const std::string& dbname, Env* env, const Options& options, + const EnvOptions& soptions, TableCache* table_cache, + Iterator* iter, FileMetaData* meta, + const InternalKeyComparator& internal_comparator, const SequenceNumber newest_snapshot, const SequenceNumber earliest_seqno_in_memtable, const CompressionType compression) { @@ -64,7 +62,8 @@ Status BuildTable(const std::string& dbname, return s; } - TableBuilder* builder = NewTableBuilder(options, file.get(), compression); + TableBuilder* builder = + NewTableBuilder(options, internal_comparator, file.get(), compression); // the first key is the smallest key Slice key = iter->key(); @@ -72,8 +71,8 @@ Status BuildTable(const std::string& dbname, meta->smallest_seqno = GetInternalKeySeqno(key); meta->largest_seqno = meta->smallest_seqno; - MergeHelper merge(user_comparator, options.merge_operator.get(), - options.info_log.get(), + MergeHelper merge(internal_comparator.user_comparator(), + options.merge_operator.get(), options.info_log.get(), true /* internal key corruption is not ok */); if (purge) { @@ -102,8 +101,8 @@ Status BuildTable(const std::string& dbname, // If the key is the same as the previous key (and it is not the // first key), then we skip it, since it is an older version. // Otherwise we output the key and mark it as the "new" previous key. - if (!is_first_key && !user_comparator->Compare(prev_ikey.user_key, - this_ikey.user_key)) { + if (!is_first_key && !internal_comparator.user_comparator()->Compare( + prev_ikey.user_key, this_ikey.user_key)) { // seqno within the same key are in decreasing order assert(this_ikey.sequence < prev_ikey.sequence); } else { @@ -201,9 +200,8 @@ Status BuildTable(const std::string& dbname, if (s.ok()) { // Verify that the table is usable - Iterator* it = table_cache->NewIterator(ReadOptions(), - soptions, - *meta); + Iterator* it = table_cache->NewIterator(ReadOptions(), soptions, + internal_comparator, *meta); s = it->status(); delete it; } diff --git a/db/builder.h b/db/builder.h index 189bfe6fe..630162968 100644 --- a/db/builder.h +++ b/db/builder.h @@ -24,22 +24,20 @@ class VersionEdit; class TableBuilder; class WritableFile; -extern TableBuilder* NewTableBuilder(const Options& options, WritableFile* file, - CompressionType compression_type); +extern TableBuilder* NewTableBuilder( + const Options& options, const InternalKeyComparator& internal_comparator, + WritableFile* file, CompressionType compression_type); // Build a Table file from the contents of *iter. The generated file // will be named according to meta->number. On success, the rest of // *meta will be filled with metadata about the generated table. // If no data is present in *iter, meta->file_size will be set to // zero, and no Table file will be produced. -extern Status BuildTable(const std::string& dbname, - Env* env, - const Options& options, - const EnvOptions& soptions, - TableCache* table_cache, - Iterator* iter, +extern Status BuildTable(const std::string& dbname, Env* env, + const Options& options, const EnvOptions& soptions, + TableCache* table_cache, Iterator* iter, FileMetaData* meta, - const Comparator* user_comparator, + const InternalKeyComparator& internal_comparator, const SequenceNumber newest_snapshot, const SequenceNumber earliest_seqno_in_memtable, const CompressionType compression); diff --git a/db/db_impl.cc b/db/db_impl.cc index 9a7d0c178..e7351d09e 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -127,7 +127,6 @@ Options SanitizeOptions(const std::string& dbname, const InternalFilterPolicy* ipolicy, const Options& src) { Options result = src; - result.comparator = icmp; result.filter_policy = (src.filter_policy != nullptr) ? ipolicy : nullptr; // result.max_open_files means an "infinite" open files. if (result.max_open_files != -1) { @@ -1107,9 +1106,8 @@ Status DBImpl::WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit) { { mutex_.Unlock(); s = BuildTable(dbname_, env_, options_, storage_options_, - table_cache_.get(), iter, &meta, - user_comparator(), newest_snapshot, - earliest_seqno_in_memtable, + table_cache_.get(), iter, &meta, internal_comparator_, + newest_snapshot, earliest_seqno_in_memtable, GetCompressionFlush(options_)); LogFlush(options_.info_log); mutex_.Lock(); @@ -1173,9 +1171,9 @@ Status DBImpl::WriteLevel0Table(autovector& mems, VersionEdit* edit, (unsigned long)meta.number); s = BuildTable(dbname_, env_, options_, storage_options_, - table_cache_.get(), iter, &meta, - user_comparator(), newest_snapshot, - earliest_seqno_in_memtable, GetCompressionFlush(options_)); + table_cache_.get(), iter, &meta, internal_comparator_, + newest_snapshot, earliest_seqno_in_memtable, + GetCompressionFlush(options_)); LogFlush(options_.info_log); delete iter; Log(options_.info_log, "Level-0 flush table #%lu: %lu bytes %s", @@ -2137,8 +2135,9 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { options_, compact->compaction->output_level(), compact->compaction->enable_compression()); - compact->builder.reset( - NewTableBuilder(options_, compact->outfile.get(), compression_type)); + compact->builder.reset(NewTableBuilder(options_, internal_comparator_, + compact->outfile.get(), + compression_type)); } LogFlush(options_.info_log); return s; @@ -2186,9 +2185,8 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, if (s.ok() && current_entries > 0) { // Verify that the table is usable FileMetaData meta(output_number, current_bytes); - Iterator* iter = table_cache_->NewIterator(ReadOptions(), - storage_options_, - meta); + Iterator* iter = table_cache_->NewIterator(ReadOptions(), storage_options_, + internal_comparator_, meta); s = iter->status(); delete iter; if (s.ok()) { @@ -2522,8 +2520,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact, // If this is the bottommost level (no files in lower levels) // and the earliest snapshot is larger than this seqno // then we can squash the seqno to zero. - if (options_.compaction_style == kCompactionStyleLevel && - bottommost_level && ikey.sequence < earliest_snapshot && + if (bottommost_level && ikey.sequence < earliest_snapshot && ikey.type != kTypeMerge) { assert(ikey.type != kTypeDeletion); // make a copy because updating in place would cause problems diff --git a/db/db_iter.cc b/db/db_iter.cc index 1b5ae5688..b8d9038a1 100644 --- a/db/db_iter.cc +++ b/db/db_iter.cc @@ -235,7 +235,7 @@ void DBIter::FindNextUserEntryInternal(bool skipping) { valid_ = true; MergeValuesNewToOld(); // Go to a different state machine return; - case kTypeLogData: + default: assert(false); break; } diff --git a/db/db_test.cc b/db/db_test.cc index b1fb3f9de..0c4e21c85 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -11,6 +11,7 @@ #include #include +#include "db/dbformat.h" #include "db/db_impl.h" #include "db/filename.h" #include "db/version_set.h" diff --git a/db/dbformat.cc b/db/dbformat.cc index 3d7e61010..43560bc83 100644 --- a/db/dbformat.cc +++ b/db/dbformat.cc @@ -6,9 +6,9 @@ // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +#include "db/dbformat.h" #include -#include "db/dbformat.h" #include "port/port.h" #include "util/coding.h" #include "util/perf_context_imp.h" @@ -72,6 +72,28 @@ int InternalKeyComparator::Compare(const Slice& akey, const Slice& bkey) const { return r; } +int InternalKeyComparator::Compare(const ParsedInternalKey& a, + const ParsedInternalKey& b) const { + // Order by: + // increasing user key (according to user-supplied comparator) + // decreasing sequence number + // decreasing type (though sequence# should be enough to disambiguate) + int r = user_comparator_->Compare(a.user_key, b.user_key); + BumpPerfCount(&perf_context.user_key_comparison_count); + if (r == 0) { + if (a.sequence > b.sequence) { + r = -1; + } else if (a.sequence < b.sequence) { + r = +1; + } else if (a.type > b.type) { + r = -1; + } else if (a.type < b.type) { + r = +1; + } + } + return r; +} + void InternalKeyComparator::FindShortestSeparator( std::string* start, const Slice& limit) const { diff --git a/db/dbformat.h b/db/dbformat.h index 64a2c9f05..e3dbe0ba3 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -25,12 +25,16 @@ class InternalKey; // Value types encoded as the last component of internal keys. // DO NOT CHANGE THESE ENUM VALUES: they are embedded in the on-disk // data structures. -enum ValueType { +// The highest bit of the value type needs to be reserved to SST tables +// for them to do more flexible encoding. +enum ValueType : unsigned char { kTypeDeletion = 0x0, kTypeValue = 0x1, kTypeMerge = 0x2, - kTypeLogData = 0x3 + kTypeLogData = 0x3, + kMaxValue = 0x7F }; + // kValueTypeForSeek defines the ValueType that should be passed when // constructing a ParsedInternalKey object for seeking to a particular // sequence number (since we sort sequence numbers in decreasing order @@ -96,6 +100,7 @@ class InternalKeyComparator : public Comparator { name_("rocksdb.InternalKeyComparator:" + std::string(user_comparator_->Name())) { } + virtual ~InternalKeyComparator() {} virtual const char* Name() const; virtual int Compare(const Slice& a, const Slice& b) const; @@ -107,6 +112,7 @@ class InternalKeyComparator : public Comparator { const Comparator* user_comparator() const { return user_comparator_; } int Compare(const InternalKey& a, const InternalKey& b) const; + int Compare(const ParsedInternalKey& a, const ParsedInternalKey& b) const; }; // Filter policy wrapper that converts from internal keys to user keys @@ -163,6 +169,7 @@ inline bool ParseInternalKey(const Slice& internal_key, unsigned char c = num & 0xff; result->sequence = num >> 8; result->type = static_cast(c); + assert(result->type <= ValueType::kMaxValue); result->user_key = Slice(internal_key.data(), n - 8); return (c <= static_cast(kValueTypeForSeek)); } diff --git a/db/repair.cc b/db/repair.cc index f72bca6b7..5a6cba44d 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -222,10 +222,8 @@ class Repairer { FileMetaData meta; meta.number = next_file_number_++; Iterator* iter = mem->NewIterator(); - status = BuildTable(dbname_, env_, options_, storage_options_, - table_cache_, iter, &meta, - icmp_.user_comparator(), 0, 0, - kNoCompression); + status = BuildTable(dbname_, env_, options_, storage_options_, table_cache_, + iter, &meta, icmp_, 0, 0, kNoCompression); delete iter; delete mem->Unref(); mem = nullptr; @@ -267,7 +265,7 @@ class Repairer { if (status.ok()) { FileMetaData dummy_meta(t->meta.number, t->meta.file_size); Iterator* iter = table_cache_->NewIterator( - ReadOptions(), storage_options_, dummy_meta); + ReadOptions(), storage_options_, icmp_, dummy_meta); bool empty = true; ParsedInternalKey parsed; t->min_sequence = 0; diff --git a/db/simple_table_db_test.cc b/db/simple_table_db_test.cc index 845165ec2..3d1420c0c 100644 --- a/db/simple_table_db_test.cc +++ b/db/simple_table_db_test.cc @@ -87,10 +87,10 @@ public: Iterator* NewIterator(const ReadOptions&) override; - Status Get( - const ReadOptions&, const Slice& key, void* arg, - bool (*handle_result)(void* arg, const Slice& k, const Slice& v, bool), - void (*mark_key_may_exist)(void*) = nullptr) override; + Status Get(const ReadOptions&, const Slice& key, void* arg, + bool (*handle_result)(void* arg, const ParsedInternalKey& k, + const Slice& v, bool), + void (*mark_key_may_exist)(void*) = nullptr) override; uint64_t ApproximateOffsetOf(const Slice& key) override; @@ -245,7 +245,8 @@ Status SimpleTableReader::GetOffset(const Slice& target, uint64_t* offset) { return s; } - int compare_result = rep_->options.comparator->Compare(tmp_slice, target); + InternalKeyComparator ikc(rep_->options.comparator); + int compare_result = ikc.Compare(tmp_slice, target); if (compare_result < 0) { if (left == right) { @@ -280,14 +281,20 @@ Status SimpleTableReader::GetOffset(const Slice& target, uint64_t* offset) { return s; } -Status SimpleTableReader::Get( - const ReadOptions& options, const Slice& k, void* arg, - bool (*saver)(void*, const Slice&, const Slice&, bool), - void (*mark_key_may_exist)(void*)) { +Status SimpleTableReader::Get(const ReadOptions& options, const Slice& k, + void* arg, + bool (*saver)(void*, const ParsedInternalKey&, + const Slice&, bool), + void (*mark_key_may_exist)(void*)) { Status s; SimpleTableIterator* iter = new SimpleTableIterator(this); for (iter->Seek(k); iter->Valid(); iter->Next()) { - if (!(*saver)(arg, iter->key(), iter->value(), true)) { + ParsedInternalKey parsed_key; + if (!ParseInternalKey(iter->key(), &parsed_key)) { + return Status::Corruption(Slice()); + } + + if (!(*saver)(arg, parsed_key, iter->value(), true)) { break; } } @@ -537,15 +544,19 @@ public: return "SimpleTable"; } Status NewTableReader(const Options& options, const EnvOptions& soptions, + const InternalKeyComparator& internal_key, unique_ptr&& file, uint64_t file_size, unique_ptr* table_reader) const; - TableBuilder* NewTableBuilder(const Options& options, WritableFile* file, + TableBuilder* NewTableBuilder(const Options& options, + const InternalKeyComparator& internal_key, + WritableFile* file, CompressionType compression_type) const; }; Status SimpleTableFactory::NewTableReader( const Options& options, const EnvOptions& soptions, + const InternalKeyComparator& internal_key, unique_ptr&& file, uint64_t file_size, unique_ptr* table_reader) const { @@ -554,8 +565,8 @@ Status SimpleTableFactory::NewTableReader( } TableBuilder* SimpleTableFactory::NewTableBuilder( - const Options& options, WritableFile* file, - CompressionType compression_type) const { + const Options& options, const InternalKeyComparator& internal_key, + WritableFile* file, CompressionType compression_type) const { return new SimpleTableBuilder(options, file, compression_type); } diff --git a/db/table_cache.cc b/db/table_cache.cc index 6e48e1a90..591933cef 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -60,6 +60,7 @@ void TableCache::ReleaseHandle(Cache::Handle* handle) { } Status TableCache::FindTable(const EnvOptions& toptions, + const InternalKeyComparator& internal_comparator, uint64_t file_number, uint64_t file_size, Cache::Handle** handle, bool* table_io, const bool no_io) { @@ -84,7 +85,8 @@ Status TableCache::FindTable(const EnvOptions& toptions, } StopWatch sw(env_, options_->statistics.get(), TABLE_OPEN_IO_MICROS); s = options_->table_factory->NewTableReader( - *options_, toptions, std::move(file), file_size, &table_reader); + *options_, toptions, internal_comparator, std::move(file), file_size, + &table_reader); } if (!s.ok()) { @@ -102,6 +104,7 @@ Status TableCache::FindTable(const EnvOptions& toptions, Iterator* TableCache::NewIterator(const ReadOptions& options, const EnvOptions& toptions, + const InternalKeyComparator& icomparator, const FileMetaData& file_meta, TableReader** table_reader_ptr, bool for_compaction) { @@ -111,8 +114,8 @@ Iterator* TableCache::NewIterator(const ReadOptions& options, Cache::Handle* handle = file_meta.table_reader_handle; Status s; if (!handle) { - s = FindTable(toptions, file_meta.number, file_meta.file_size, &handle, - nullptr, options.read_tier == kBlockCacheTier); + s = FindTable(toptions, icomparator, file_meta.number, file_meta.file_size, + &handle, nullptr, options.read_tier == kBlockCacheTier); } if (!s.ok()) { return NewErrorIterator(s); @@ -135,17 +138,17 @@ Iterator* TableCache::NewIterator(const ReadOptions& options, } Status TableCache::Get(const ReadOptions& options, - const FileMetaData& file_meta, - const Slice& k, - void* arg, - bool (*saver)(void*, const Slice&, const Slice&, bool), - bool* table_io, - void (*mark_key_may_exist)(void*)) { + const InternalKeyComparator& internal_comparator, + const FileMetaData& file_meta, const Slice& k, void* arg, + bool (*saver)(void*, const ParsedInternalKey&, + const Slice&, bool), + bool* table_io, void (*mark_key_may_exist)(void*)) { Cache::Handle* handle = file_meta.table_reader_handle; Status s; if (!handle) { - s = FindTable(storage_options_, file_meta.number, file_meta.file_size, - &handle, table_io, options.read_tier == kBlockCacheTier); + s = FindTable(storage_options_, internal_comparator, file_meta.number, + file_meta.file_size, &handle, table_io, + options.read_tier == kBlockCacheTier); } if (s.ok()) { TableReader* t = GetTableReaderFromHandle(handle); @@ -162,13 +165,12 @@ Status TableCache::Get(const ReadOptions& options, } bool TableCache::PrefixMayMatch(const ReadOptions& options, - uint64_t file_number, - uint64_t file_size, - const Slice& internal_prefix, - bool* table_io) { + const InternalKeyComparator& icomparator, + uint64_t file_number, uint64_t file_size, + const Slice& internal_prefix, bool* table_io) { Cache::Handle* handle = nullptr; - Status s = FindTable(storage_options_, file_number, - file_size, &handle, table_io); + Status s = FindTable(storage_options_, icomparator, file_number, file_size, + &handle, table_io); bool may_match = true; if (s.ok()) { TableReader* t = GetTableReaderFromHandle(handle); diff --git a/db/table_cache.h b/db/table_cache.h index 665d3b901..316a31888 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -38,8 +38,8 @@ class TableCache { // the returned iterator. The returned "*tableptr" object is owned by // the cache and should not be deleted, and is valid for as long as the // returned iterator is live. - Iterator* NewIterator(const ReadOptions& options, - const EnvOptions& toptions, + Iterator* NewIterator(const ReadOptions& options, const EnvOptions& toptions, + const InternalKeyComparator& internal_comparator, const FileMetaData& file_meta, TableReader** table_reader_ptr = nullptr, bool for_compaction = false); @@ -48,26 +48,27 @@ class TableCache { // call (*handle_result)(arg, found_key, found_value) repeatedly until // it returns false. Status Get(const ReadOptions& options, - const FileMetaData& file_meta, - const Slice& k, - void* arg, - bool (*handle_result)(void*, const Slice&, const Slice&, bool), - bool* table_io, - void (*mark_key_may_exist)(void*) = nullptr); + const InternalKeyComparator& internal_comparator, + const FileMetaData& file_meta, const Slice& k, void* arg, + bool (*handle_result)(void*, const ParsedInternalKey&, + const Slice&, bool), + bool* table_io, void (*mark_key_may_exist)(void*) = nullptr); // Determine whether the table may contain the specified prefix. If // the table index or blooms are not in memory, this may cause an I/O - bool PrefixMayMatch(const ReadOptions& options, uint64_t file_number, - uint64_t file_size, const Slice& internal_prefix, - bool* table_io); + bool PrefixMayMatch(const ReadOptions& options, + const InternalKeyComparator& internal_comparator, + uint64_t file_number, uint64_t file_size, + const Slice& internal_prefix, bool* table_io); // Evict any entry for the specified file number void Evict(uint64_t file_number); // Find table reader - Status FindTable(const EnvOptions& toptions, uint64_t file_number, - uint64_t file_size, Cache::Handle**, bool* table_io=nullptr, - const bool no_io = false); + Status FindTable(const EnvOptions& toptions, + const InternalKeyComparator& internal_comparator, + uint64_t file_number, uint64_t file_size, Cache::Handle**, + bool* table_io = nullptr, const bool no_io = false); // Get TableReader from a cache handle. TableReader* GetTableReaderFromHandle(Cache::Handle* handle); diff --git a/db/table_properties_collector_test.cc b/db/table_properties_collector_test.cc index 15cbe9213..961a7302b 100644 --- a/db/table_properties_collector_test.cc +++ b/db/table_properties_collector_test.cc @@ -83,13 +83,13 @@ class DumbLogger : public Logger { }; // Utilities test functions -void MakeBuilder( - const Options& options, - std::unique_ptr* writable, - std::unique_ptr* builder) { +void MakeBuilder(const Options& options, + const InternalKeyComparator& internal_comparator, + std::unique_ptr* writable, + std::unique_ptr* builder) { writable->reset(new FakeWritableFile); builder->reset(options.table_factory->NewTableBuilder( - options, writable->get(), options.compression)); + options, internal_comparator, writable->get(), options.compression)); } // Collects keys that starts with "A" in a table. @@ -127,9 +127,8 @@ class RegularKeysStartWithA: public TablePropertiesCollector { extern uint64_t kBlockBasedTableMagicNumber; extern uint64_t kPlainTableMagicNumber; void TestCustomizedTablePropertiesCollector( - uint64_t magic_number, - bool encode_as_internal, - const Options& options) { + uint64_t magic_number, bool encode_as_internal, const Options& options, + const InternalKeyComparator& internal_comparator) { // make sure the entries will be inserted with order. std::map kvs = { {"About ", "val5"}, // starts with 'A' @@ -144,7 +143,7 @@ void TestCustomizedTablePropertiesCollector( // -- Step 1: build table std::unique_ptr builder; std::unique_ptr writable; - MakeBuilder(options, &writable, &builder); + MakeBuilder(options, internal_comparator, &writable, &builder); for (const auto& kv : kvs) { if (encode_as_internal) { @@ -193,11 +192,9 @@ TEST(TablePropertiesTest, CustomizedTablePropertiesCollector) { options.table_properties_collectors.resize(1); options.table_properties_collectors[0].reset(collector); } - TestCustomizedTablePropertiesCollector( - kBlockBasedTableMagicNumber, - encode_as_internal, - options - ); + test::PlainInternalKeyComparator ikc(options.comparator); + TestCustomizedTablePropertiesCollector(kBlockBasedTableMagicNumber, + encode_as_internal, options, ikc); } // test plain table @@ -206,9 +203,9 @@ TEST(TablePropertiesTest, CustomizedTablePropertiesCollector) { std::make_shared() ); options.table_factory = std::make_shared(8, 8, 0); - TestCustomizedTablePropertiesCollector( - kPlainTableMagicNumber, true, options - ); + test::PlainInternalKeyComparator ikc(options.comparator); + TestCustomizedTablePropertiesCollector(kPlainTableMagicNumber, true, options, + ikc); } void TestInternalKeyPropertiesCollector( @@ -228,6 +225,8 @@ void TestInternalKeyPropertiesCollector( std::unique_ptr builder; std::unique_ptr writable; Options options; + test::PlainInternalKeyComparator pikc(options.comparator); + options.table_factory = table_factory; if (sanitized) { options.table_properties_collectors = { @@ -239,12 +238,9 @@ void TestInternalKeyPropertiesCollector( // HACK: Set options.info_log to avoid writing log in // SanitizeOptions(). options.info_log = std::make_shared(); - options = SanitizeOptions( - "db", // just a place holder - nullptr, // with skip internal key comparator - nullptr, // don't care filter policy - options - ); + options = SanitizeOptions("db", // just a place holder + &pikc, nullptr, // don't care filter policy + options); options.comparator = comparator; } else { options.table_properties_collectors = { @@ -252,7 +248,7 @@ void TestInternalKeyPropertiesCollector( }; } - MakeBuilder(options, &writable, &builder); + MakeBuilder(options, pikc, &writable, &builder); for (const auto& k : keys) { builder->Add(k.Encode(), "val"); } diff --git a/db/version_set.cc b/db/version_set.cc index f15a3a49a..20a81801d 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -191,11 +191,10 @@ class Version::LevelFileNumIterator : public Iterator { mutable char value_buf_[16]; }; -static Iterator* GetFileIterator(void* arg, - const ReadOptions& options, +static Iterator* GetFileIterator(void* arg, const ReadOptions& options, const EnvOptions& soptions, - const Slice& file_value, - bool for_compaction) { + const InternalKeyComparator& icomparator, + const Slice& file_value, bool for_compaction) { TableCache* cache = reinterpret_cast(arg); if (file_value.size() != 16) { return NewErrorIterator( @@ -210,11 +209,9 @@ static Iterator* GetFileIterator(void* arg, } FileMetaData meta(DecodeFixed64(file_value.data()), DecodeFixed64(file_value.data() + 8)); - return cache->NewIterator(options.prefix ? options_copy : options, - soptions, - meta, - nullptr /* don't need reference to table*/, - for_compaction); + return cache->NewIterator( + options.prefix ? options_copy : options, soptions, icomparator, meta, + nullptr /* don't need reference to table*/, for_compaction); } } @@ -234,10 +231,9 @@ bool Version::PrefixMayMatch(const ReadOptions& options, may_match = true; } else { may_match = vset_->table_cache_->PrefixMayMatch( - options, - DecodeFixed64(level_iter->value().data()), - DecodeFixed64(level_iter->value().data() + 8), - internal_prefix, nullptr); + options, vset_->icmp_, DecodeFixed64(level_iter->value().data()), + DecodeFixed64(level_iter->value().data() + 8), internal_prefix, + nullptr); } return may_match; } @@ -255,8 +251,8 @@ Iterator* Version::NewConcatenatingIterator(const ReadOptions& options, return NewEmptyIterator(); } } - return NewTwoLevelIterator(level_iter, &GetFileIterator, - vset_->table_cache_, options, soptions); + return NewTwoLevelIterator(level_iter, &GetFileIterator, vset_->table_cache_, + options, soptions, vset_->icmp_); } void Version::AddIterators(const ReadOptions& options, @@ -265,7 +261,7 @@ void Version::AddIterators(const ReadOptions& options, // Merge all level zero files together since they may overlap for (const FileMetaData* file : files_[0]) { iters->push_back(vset_->table_cache_->NewIterator(options, soptions, - *file)); + vset_->icmp_, *file)); } // For levels > 0, we can use a concatenating iterator that sequentially @@ -315,80 +311,73 @@ static void MarkKeyMayExist(void* arg) { } } -static bool SaveValue(void* arg, const Slice& ikey, const Slice& v, bool didIO){ +static bool SaveValue(void* arg, const ParsedInternalKey& parsed_key, + const Slice& v, bool didIO) { Saver* s = reinterpret_cast(arg); MergeContext* merge_contex = s->merge_context; std::string merge_result; // temporary area for merge results later assert(s != nullptr && merge_contex != nullptr); - ParsedInternalKey parsed_key; // TODO: didIO and Merge? s->didIO = didIO; - if (!ParseInternalKey(ikey, &parsed_key)) { - // TODO: what about corrupt during Merge? - s->state = kCorrupt; - } else { - if (s->ucmp->Compare(parsed_key.user_key, s->user_key) == 0) { - // Key matches. Process it - switch (parsed_key.type) { - case kTypeValue: - if (kNotFound == s->state) { - s->state = kFound; - s->value->assign(v.data(), v.size()); - } else if (kMerge == s->state) { - assert(s->merge_operator != nullptr); - s->state = kFound; - if (!s->merge_operator->FullMerge(s->user_key, &v, - merge_contex->GetOperands(), - s->value, s->logger)) { - RecordTick(s->statistics, NUMBER_MERGE_FAILURES); - s->state = kCorrupt; - } - } else { - assert(false); + if (s->ucmp->Compare(parsed_key.user_key, s->user_key) == 0) { + // Key matches. Process it + switch (parsed_key.type) { + case kTypeValue: + if (kNotFound == s->state) { + s->state = kFound; + s->value->assign(v.data(), v.size()); + } else if (kMerge == s->state) { + assert(s->merge_operator != nullptr); + s->state = kFound; + if (!s->merge_operator->FullMerge(s->user_key, &v, + merge_contex->GetOperands(), + s->value, s->logger)) { + RecordTick(s->statistics, NUMBER_MERGE_FAILURES); + s->state = kCorrupt; } - return false; + } else { + assert(false); + } + return false; - case kTypeDeletion: - if (kNotFound == s->state) { - s->state = kDeleted; - } else if (kMerge == s->state) { - s->state = kFound; + case kTypeDeletion: + if (kNotFound == s->state) { + s->state = kDeleted; + } else if (kMerge == s->state) { + s->state = kFound; if (!s->merge_operator->FullMerge(s->user_key, nullptr, merge_contex->GetOperands(), s->value, s->logger)) { - RecordTick(s->statistics, NUMBER_MERGE_FAILURES); - s->state = kCorrupt; - } - } else { - assert(false); - } - return false; - - case kTypeMerge: - assert(s->state == kNotFound || s->state == kMerge); - s->state = kMerge; - merge_contex->PushOperand(v); - while (merge_contex->GetNumOperands() >= 2) { - // Attempt to merge operands together via user associateive merge - if (s->merge_operator->PartialMerge(s->user_key, - merge_contex->GetOperand(0), - merge_contex->GetOperand(1), - &merge_result, - s->logger)) { - merge_contex->PushPartialMergeResult(merge_result); - } else { - // Associative merge returns false ==> stack the operands - break; - } + RecordTick(s->statistics, NUMBER_MERGE_FAILURES); + s->state = kCorrupt; } - return true; - - case kTypeLogData: + } else { assert(false); + } + return false; + + case kTypeMerge: + assert(s->state == kNotFound || s->state == kMerge); + s->state = kMerge; + merge_contex->PushOperand(v); + while (merge_contex->GetNumOperands() >= 2) { + // Attempt to merge operands together via user associateive merge + if (s->merge_operator->PartialMerge( + s->user_key, merge_contex->GetOperand(0), + merge_contex->GetOperand(1), &merge_result, s->logger)) { + merge_contex->PushPartialMergeResult(merge_result); + } else { + // Associative merge returns false ==> stack the operands break; + } } + return true; + + default: + assert(false); + break; } } @@ -521,8 +510,9 @@ void Version::Get(const ReadOptions& options, prev_file = f; #endif bool tableIO = false; - *status = vset_->table_cache_->Get(options, *f, ikey, &saver, SaveValue, - &tableIO, MarkKeyMayExist); + *status = + vset_->table_cache_->Get(options, vset_->icmp_, *f, ikey, &saver, + SaveValue, &tableIO, MarkKeyMayExist); // TODO: examine the behavior for corrupted key if (!status->ok()) { return; @@ -1355,9 +1345,8 @@ class VersionSet::Builder { for (auto& file_meta : *(levels_[level].added_files)) { assert (!file_meta->table_reader_handle); bool table_io; - vset_->table_cache_->FindTable(vset_->storage_options_, - file_meta->number, - file_meta->file_size, + vset_->table_cache_->FindTable(vset_->storage_options_, vset_->icmp_, + file_meta->number, file_meta->file_size, &file_meta->table_reader_handle, &table_io, false); } @@ -2069,8 +2058,9 @@ uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) { // "ikey" falls in the range for this table. Add the // approximate offset of "ikey" within the table. TableReader* table_reader_ptr; - Iterator* iter = table_cache_->NewIterator( - ReadOptions(), storage_options_, *(files[i]), &table_reader_ptr); + Iterator* iter = + table_cache_->NewIterator(ReadOptions(), storage_options_, icmp_, + *(files[i]), &table_reader_ptr); if (table_reader_ptr != nullptr) { result += table_reader_ptr->ApproximateOffsetOf(ikey.Encode()); } @@ -2134,14 +2124,14 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) { if (c->level() + which == 0) { for (const auto& file : *c->inputs(which)) { list[num++] = table_cache_->NewIterator( - options, storage_options_compactions_, *file, nullptr, + options, storage_options_compactions_, icmp_, *file, nullptr, true /* for compaction */); } } else { // Create concatenating iterator for the files from this level list[num++] = NewTwoLevelIterator( new Version::LevelFileNumIterator(icmp_, c->inputs(which)), - &GetFileIterator, table_cache_, options, storage_options_, + &GetFileIterator, table_cache_, options, storage_options_, icmp_, true /* for compaction */); } } diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 931d8f3f5..d3454c343 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -57,7 +57,7 @@ static std::string PrintContents(WriteBatch* b) { state.append(")"); count++; break; - case kTypeLogData: + default: assert(false); break; } diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 219f05630..61ff84c0e 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -34,6 +34,7 @@ class TablePropertiesCollector; class Slice; class SliceTransform; class Statistics; +class InternalKeyComparator; using std::shared_ptr; diff --git a/include/rocksdb/table.h b/include/rocksdb/table.h index a9be3e572..1bdea049f 100644 --- a/include/rocksdb/table.h +++ b/include/rocksdb/table.h @@ -27,8 +27,6 @@ namespace rocksdb { -class TableFactory; - // -- Block-based Table class FlushBlockPolicyFactory; diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index feda28c1a..e5f3bd4d2 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -21,6 +21,7 @@ #include "rocksdb/env.h" #include "rocksdb/filter_policy.h" #include "rocksdb/options.h" +#include "db/dbformat.h" #include "table/block_based_table_reader.h" #include "table/block.h" #include "table/block_builder.h" @@ -52,6 +53,7 @@ extern const uint64_t kBlockBasedTableMagicNumber struct BlockBasedTableBuilder::Rep { Options options; + const InternalKeyComparator& internal_comparator; WritableFile* file; uint64_t offset = 0; Status status; @@ -71,31 +73,30 @@ struct BlockBasedTableBuilder::Rep { std::string compressed_output; std::unique_ptr flush_block_policy; - Rep(const Options& opt, - WritableFile* f, - FlushBlockPolicyFactory* flush_block_policy_factory, + Rep(const Options& opt, const InternalKeyComparator& icomparator, + WritableFile* f, FlushBlockPolicyFactory* flush_block_policy_factory, CompressionType compression_type) : options(opt), + internal_comparator(icomparator), file(f), - data_block(options), + data_block(options, &internal_comparator), // To avoid linear scan, we make the block_restart_interval to be `1` // in index block builder - index_block(1 /* block_restart_interval */, options.comparator), + index_block(1 /* block_restart_interval */, &internal_comparator), compression_type(compression_type), - filter_block(opt.filter_policy == nullptr ? nullptr - : new FilterBlockBuilder(opt)), + filter_block(opt.filter_policy == nullptr + ? nullptr + : new FilterBlockBuilder(opt, &internal_comparator)), flush_block_policy( - flush_block_policy_factory->NewFlushBlockPolicy(data_block)) { - } + flush_block_policy_factory->NewFlushBlockPolicy(data_block)) {} }; BlockBasedTableBuilder::BlockBasedTableBuilder( - const Options& options, - WritableFile* file, - FlushBlockPolicyFactory* flush_block_policy_factory, + const Options& options, const InternalKeyComparator& internal_comparator, + WritableFile* file, FlushBlockPolicyFactory* flush_block_policy_factory, CompressionType compression_type) - : rep_(new Rep(options, - file, flush_block_policy_factory, compression_type)) { + : rep_(new Rep(options, internal_comparator, file, + flush_block_policy_factory, compression_type)) { if (rep_->filter_block != nullptr) { rep_->filter_block->StartBlock(0); } @@ -118,7 +119,7 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) { assert(!r->closed); if (!ok()) return; if (r->props.num_entries > 0) { - assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0); + assert(r->internal_comparator.Compare(key, Slice(r->last_key)) > 0); } auto should_flush = r->flush_block_policy->Update(key, value); @@ -135,7 +136,7 @@ void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) { // entries in the first block and < all entries in subsequent // blocks. if (ok()) { - r->options.comparator->FindShortestSeparator(&r->last_key, key); + r->internal_comparator.FindShortestSeparator(&r->last_key, key); std::string handle_encoding; r->pending_handle.EncodeTo(&handle_encoding); r->index_block.Add(r->last_key, Slice(handle_encoding)); @@ -339,7 +340,7 @@ Status BlockBasedTableBuilder::Finish() { // block, we will finish writing all index entries here and flush them // to storage after metaindex block is written. if (ok() && !empty_data_block) { - r->options.comparator->FindShortSuccessor(&r->last_key); + r->internal_comparator.FindShortSuccessor(&r->last_key); std::string handle_encoding; r->pending_handle.EncodeTo(&handle_encoding); diff --git a/table/block_based_table_builder.h b/table/block_based_table_builder.h index 0752eb399..1c4be1f83 100644 --- a/table/block_based_table_builder.h +++ b/table/block_based_table_builder.h @@ -26,6 +26,7 @@ class BlockBasedTableBuilder : public TableBuilder { // building in *file. Does not close the file. It is up to the // caller to close the file after calling Finish(). BlockBasedTableBuilder(const Options& options, + const InternalKeyComparator& internal_comparator, WritableFile* file, FlushBlockPolicyFactory* flush_block_policy_factory, CompressionType compression_type); diff --git a/table/block_based_table_factory.cc b/table/block_based_table_factory.cc index 3cf064867..6a4a64462 100644 --- a/table/block_based_table_factory.cc +++ b/table/block_based_table_factory.cc @@ -20,15 +20,17 @@ namespace rocksdb { Status BlockBasedTableFactory::NewTableReader( const Options& options, const EnvOptions& soptions, + const InternalKeyComparator& internal_comparator, unique_ptr&& file, uint64_t file_size, unique_ptr* table_reader) const { return BlockBasedTable::Open(options, soptions, table_options_, - std::move(file), file_size, table_reader); + internal_comparator, std::move(file), file_size, + table_reader); } TableBuilder* BlockBasedTableFactory::NewTableBuilder( - const Options& options, WritableFile* file, - CompressionType compression_type) const { + const Options& options, const InternalKeyComparator& internal_comparator, + WritableFile* file, CompressionType compression_type) const { auto flush_block_policy_factory = table_options_.flush_block_policy_factory.get(); @@ -45,11 +47,9 @@ TableBuilder* BlockBasedTableFactory::NewTableBuilder( options.block_size_deviation); } - auto table_builder = new BlockBasedTableBuilder( - options, - file, - flush_block_policy_factory, - compression_type); + auto table_builder = + new BlockBasedTableBuilder(options, internal_comparator, file, + flush_block_policy_factory, compression_type); // Delete flush_block_policy_factory only when it's just created from the // options. diff --git a/table/block_based_table_factory.h b/table/block_based_table_factory.h index bdae45a87..2513b9f83 100644 --- a/table/block_based_table_factory.h +++ b/table/block_based_table_factory.h @@ -35,12 +35,13 @@ class BlockBasedTableFactory : public TableFactory { const char* Name() const override { return "BlockBasedTable"; } Status NewTableReader(const Options& options, const EnvOptions& soptions, + const InternalKeyComparator& internal_comparator, unique_ptr&& file, uint64_t file_size, unique_ptr* table_reader) const override; - TableBuilder* NewTableBuilder(const Options& options, WritableFile* file, - CompressionType compression_type) - const override; + TableBuilder* NewTableBuilder( + const Options& options, const InternalKeyComparator& internal_comparator, + WritableFile* file, CompressionType compression_type) const override; private: BlockBasedTableOptions table_options_; diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index da100fee9..f4dd5b2ec 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -39,12 +39,13 @@ const size_t kMaxCacheKeyPrefixSize = kMaxVarint64Length*3+1; using std::unique_ptr; struct BlockBasedTable::Rep { - Rep(const EnvOptions& storage_options) : - soptions(storage_options) { - } + Rep(const EnvOptions& storage_options, + const InternalKeyComparator& internal_comparator) + : soptions(storage_options), internal_comparator_(internal_comparator) {} Options options; const EnvOptions& soptions; + const InternalKeyComparator& internal_comparator_; Status status; unique_ptr file; char cache_key_prefix[kMaxCacheKeyPrefixSize]; @@ -225,6 +226,7 @@ Cache::Handle* GetFromBlockCache( Status BlockBasedTable::Open(const Options& options, const EnvOptions& soptions, const BlockBasedTableOptions& table_options, + const InternalKeyComparator& internal_comparator, unique_ptr&& file, uint64_t file_size, unique_ptr* table_reader) { @@ -236,7 +238,7 @@ Status BlockBasedTable::Open(const Options& options, const EnvOptions& soptions, // We've successfully read the footer and the index block: we're // ready to serve requests. - Rep* rep = new BlockBasedTable::Rep(soptions); + Rep* rep = new BlockBasedTable::Rep(soptions, internal_comparator); rep->options = options; rep->file = std::move(file); rep->metaindex_handle = footer.metaindex_handle(); @@ -661,7 +663,7 @@ Iterator* BlockBasedTable::BlockReader(void* arg, Iterator* iter; if (block != nullptr) { - iter = block->NewIterator(table->rep_->options.comparator); + iter = block->NewIterator(&(table->rep_->internal_comparator_)); if (cache_handle != nullptr) { iter->RegisterCleanup(&ReleaseBlock, block_cache, cache_handle); } else { @@ -734,7 +736,7 @@ BlockBasedTable::GetFilter(bool no_io) const { // Get the iterator from the index block. Iterator* BlockBasedTable::IndexBlockReader(const ReadOptions& options) const { if (rep_->index_block) { - return rep_->index_block->NewIterator(rep_->options.comparator); + return rep_->index_block->NewIterator(&(rep_->internal_comparator_)); } // get index block from cache @@ -755,7 +757,7 @@ Iterator* BlockBasedTable::IndexBlockReader(const ReadOptions& options) const { Iterator* iter; if (entry.value != nullptr) { - iter = entry.value->NewIterator(rep_->options.comparator); + iter = entry.value->NewIterator(&(rep_->internal_comparator_)); if (entry.cache_handle) { iter->RegisterCleanup( &ReleaseBlock, rep_->options.block_cache.get(), entry.cache_handle @@ -769,9 +771,9 @@ Iterator* BlockBasedTable::IndexBlockReader(const ReadOptions& options) const { return iter; } -Iterator* BlockBasedTable::BlockReader(void* arg, - const ReadOptions& options, +Iterator* BlockBasedTable::BlockReader(void* arg, const ReadOptions& options, const EnvOptions& soptions, + const InternalKeyComparator& icomparator, const Slice& index_value, bool for_compaction) { return BlockReader(arg, options, index_value, nullptr, for_compaction); @@ -862,20 +864,15 @@ Iterator* BlockBasedTable::NewIterator(const ReadOptions& options) { } } - return NewTwoLevelIterator( - IndexBlockReader(options), - &BlockBasedTable::BlockReader, - const_cast(this), - options, - rep_->soptions - ); + return NewTwoLevelIterator(IndexBlockReader(options), + &BlockBasedTable::BlockReader, + const_cast(this), options, + rep_->soptions, rep_->internal_comparator_); } Status BlockBasedTable::Get( - const ReadOptions& readOptions, - const Slice& key, - void* handle_context, - bool (*result_handler)(void* handle_context, const Slice& k, + const ReadOptions& readOptions, const Slice& key, void* handle_context, + bool (*result_handler)(void* handle_context, const ParsedInternalKey& k, const Slice& v, bool didIO), void (*mark_key_may_exist_handler)(void* handle_context)) { Status s; @@ -913,8 +910,13 @@ Status BlockBasedTable::Get( // Call the *saver function on each entry/block until it returns false for (block_iter->Seek(key); block_iter->Valid(); block_iter->Next()) { - if (!(*result_handler)(handle_context, block_iter->key(), - block_iter->value(), didIO)) { + ParsedInternalKey parsed_key; + if (!ParseInternalKey(block_iter->key(), &parsed_key)) { + s = Status::Corruption(Slice()); + } + + if (!(*result_handler)(handle_context, parsed_key, block_iter->value(), + didIO)) { done = true; break; } @@ -931,7 +933,8 @@ Status BlockBasedTable::Get( return s; } -bool SaveDidIO(void* arg, const Slice& key, const Slice& value, bool didIO) { +bool SaveDidIO(void* arg, const ParsedInternalKey& key, const Slice& value, + bool didIO) { *reinterpret_cast(arg) = didIO; return false; } diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index d540f65ad..58e5b0716 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -51,6 +51,7 @@ class BlockBasedTable : public TableReader { // *file must remain live while this Table is in use. static Status Open(const Options& db_options, const EnvOptions& env_options, const BlockBasedTableOptions& table_options, + const InternalKeyComparator& internal_key_comparator, unique_ptr&& file, uint64_t file_size, unique_ptr* table_reader); @@ -63,10 +64,11 @@ class BlockBasedTable : public TableReader { Status Get(const ReadOptions& readOptions, const Slice& key, void* handle_context, - bool (*result_handler)(void* handle_context, const Slice& k, - const Slice& v, bool didIO), - void (*mark_key_may_exist_handler)(void* handle_context) = nullptr) - override; + bool (*result_handler)(void* handle_context, + const ParsedInternalKey& k, const Slice& v, + bool didIO), + void (*mark_key_may_exist_handler)(void* handle_context) = + nullptr) override; // Given a key, return an approximate byte offset in the file where // the data for that key begins (or would begin if the key were @@ -97,8 +99,9 @@ class BlockBasedTable : public TableReader { bool compaction_optimized_; static Iterator* BlockReader(void*, const ReadOptions&, - const EnvOptions& soptions, const Slice&, - bool for_compaction); + const EnvOptions& soptions, + const InternalKeyComparator& icomparator, + const Slice&, bool for_compaction); static Iterator* BlockReader(void*, const ReadOptions&, const Slice&, bool* didIO, bool for_compaction = false); diff --git a/table/block_builder.cc b/table/block_builder.cc index 917601865..f812dbae7 100644 --- a/table/block_builder.cc +++ b/table/block_builder.cc @@ -36,6 +36,7 @@ #include #include #include "rocksdb/comparator.h" +#include "db/dbformat.h" #include "util/coding.h" namespace rocksdb { @@ -51,9 +52,8 @@ BlockBuilder::BlockBuilder(int block_restart_interval, restarts_.push_back(0); // First restart point is at offset 0 } -BlockBuilder::BlockBuilder(const Options& options) - : BlockBuilder(options.block_restart_interval, options.comparator) { -} +BlockBuilder::BlockBuilder(const Options& options, const Comparator* comparator) + : BlockBuilder(options.block_restart_interval, comparator) {} void BlockBuilder::Reset() { buffer_.clear(); diff --git a/table/block_builder.h b/table/block_builder.h index 31faf19b8..ed2f290fd 100644 --- a/table/block_builder.h +++ b/table/block_builder.h @@ -21,7 +21,7 @@ class Comparator; class BlockBuilder { public: BlockBuilder(int block_builder, const Comparator* comparator); - explicit BlockBuilder(const Options& options); + explicit BlockBuilder(const Options& options, const Comparator* comparator); // Reset the contents as if the BlockBuilder was just constructed. void Reset(); diff --git a/table/block_test.cc b/table/block_test.cc index 7f33e3a90..588ce6729 100644 --- a/table/block_test.cc +++ b/table/block_test.cc @@ -32,9 +32,12 @@ class BlockTest {}; TEST(BlockTest, SimpleTest) { Random rnd(301); Options options = Options(); + std::unique_ptr ic; + ic.reset(new test::PlainInternalKeyComparator(options.comparator)); + std::vector keys; std::vector values; - BlockBuilder builder(options); + BlockBuilder builder(options, ic.get()); int num_records = 100000; char buf[10]; char* p = &buf[0]; diff --git a/table/filter_block.cc b/table/filter_block.cc index 82b6c6ee1..356096d0e 100644 --- a/table/filter_block.cc +++ b/table/filter_block.cc @@ -21,11 +21,12 @@ namespace rocksdb { static const size_t kFilterBaseLg = 11; static const size_t kFilterBase = 1 << kFilterBaseLg; -FilterBlockBuilder::FilterBlockBuilder(const Options& opt) - : policy_(opt.filter_policy), - prefix_extractor_(opt.prefix_extractor), - whole_key_filtering_(opt.whole_key_filtering), - comparator_(opt.comparator){} +FilterBlockBuilder::FilterBlockBuilder(const Options& opt, + const Comparator* internal_comparator) + : policy_(opt.filter_policy), + prefix_extractor_(opt.prefix_extractor), + whole_key_filtering_(opt.whole_key_filtering), + comparator_(internal_comparator) {} void FilterBlockBuilder::StartBlock(uint64_t block_offset) { uint64_t filter_index = (block_offset / kFilterBase); diff --git a/table/filter_block.h b/table/filter_block.h index e47f94653..da19d42e9 100644 --- a/table/filter_block.h +++ b/table/filter_block.h @@ -35,7 +35,8 @@ class FilterPolicy; // (StartBlock AddKey*)* Finish class FilterBlockBuilder { public: - explicit FilterBlockBuilder(const Options& opt); + explicit FilterBlockBuilder(const Options& opt, + const Comparator* internal_comparator); void StartBlock(uint64_t block_offset); void AddKey(const Slice& key); diff --git a/table/filter_block_test.cc b/table/filter_block_test.cc index bc1a0d0ab..1703d59d1 100644 --- a/table/filter_block_test.cc +++ b/table/filter_block_test.cc @@ -55,7 +55,7 @@ class FilterBlockTest { }; TEST(FilterBlockTest, EmptyBuilder) { - FilterBlockBuilder builder(options_); + FilterBlockBuilder builder(options_, options_.comparator); Slice block = builder.Finish(); ASSERT_EQ("\\x00\\x00\\x00\\x00\\x0b", EscapeString(block)); FilterBlockReader reader(options_, block); @@ -64,7 +64,7 @@ TEST(FilterBlockTest, EmptyBuilder) { } TEST(FilterBlockTest, SingleChunk) { - FilterBlockBuilder builder(options_); + FilterBlockBuilder builder(options_, options_.comparator); builder.StartBlock(100); builder.AddKey("foo"); builder.AddKey("bar"); @@ -85,7 +85,7 @@ TEST(FilterBlockTest, SingleChunk) { } TEST(FilterBlockTest, MultiChunk) { - FilterBlockBuilder builder(options_); + FilterBlockBuilder builder(options_, options_.comparator); // First filter builder.StartBlock(0); diff --git a/table/plain_table_builder.cc b/table/plain_table_builder.cc index 5c3252360..e33ac39f2 100644 --- a/table/plain_table_builder.cc +++ b/table/plain_table_builder.cc @@ -11,6 +11,8 @@ #include "rocksdb/env.h" #include "rocksdb/filter_policy.h" #include "rocksdb/options.h" +#include "table/plain_table_factory.h" +#include "db/dbformat.h" #include "table/block_builder.h" #include "table/filter_block.h" #include "table/format.h" @@ -67,20 +69,32 @@ PlainTableBuilder::~PlainTableBuilder() { } void PlainTableBuilder::Add(const Slice& key, const Slice& value) { - assert(user_key_len_ == 0 || key.size() == user_key_len_ + 8); + size_t user_key_size = key.size() - 8; + assert(user_key_len_ == 0 || user_key_size == user_key_len_); if (!IsFixedLength()) { // Write key length - int key_size = key.size(); key_size_str_.clear(); - PutVarint32(&key_size_str_, key_size); + PutVarint32(&key_size_str_, user_key_size); file_->Append(key_size_str_); offset_ += key_size_str_.length(); } // Write key - file_->Append(key); - offset_ += key.size(); + ParsedInternalKey parsed_key; + if (!ParseInternalKey(key, &parsed_key)) { + status_ = Status::Corruption(Slice()); + return; + } + if (parsed_key.sequence == 0 && parsed_key.type == kTypeValue) { + file_->Append(Slice(key.data(), user_key_size)); + char tmp_char = PlainTableFactory::kValueTypeSeqId0; + file_->Append(Slice(&tmp_char, 1)); + offset_ += key.size() - 7; + } else { + file_->Append(key); + offset_ += key.size(); + } // Write value length value_size_str_.clear(); @@ -105,9 +119,7 @@ void PlainTableBuilder::Add(const Slice& key, const Slice& value) { ); } -Status PlainTableBuilder::status() const { - return Status::OK(); -} +Status PlainTableBuilder::status() const { return status_; } Status PlainTableBuilder::Finish() { assert(!closed_); diff --git a/table/plain_table_factory.cc b/table/plain_table_factory.cc index 45ae71c64..c7ee8eb2f 100644 --- a/table/plain_table_factory.cc +++ b/table/plain_table_factory.cc @@ -6,6 +6,7 @@ #include #include +#include "db/dbformat.h" #include "table/plain_table_builder.h" #include "table/plain_table_reader.h" #include "port/port.h" @@ -14,16 +15,18 @@ namespace rocksdb { Status PlainTableFactory::NewTableReader(const Options& options, const EnvOptions& soptions, + const InternalKeyComparator& icomp, unique_ptr&& file, uint64_t file_size, unique_ptr* table) const { - return PlainTableReader::Open(options, soptions, std::move(file), file_size, - table, bloom_bits_per_key_, hash_table_ratio_); + return PlainTableReader::Open(options, soptions, icomp, std::move(file), + file_size, table, bloom_bits_per_key_, + hash_table_ratio_); } TableBuilder* PlainTableFactory::NewTableBuilder( - const Options& options, WritableFile* file, - CompressionType compression_type) const { + const Options& options, const InternalKeyComparator& internal_comparator, + WritableFile* file, CompressionType compression_type) const { return new PlainTableBuilder(options, file, user_key_len_); } diff --git a/table/plain_table_factory.h b/table/plain_table_factory.h index 55680a3ec..88745ca1b 100644 --- a/table/plain_table_factory.h +++ b/table/plain_table_factory.h @@ -57,12 +57,16 @@ class PlainTableFactory : public TableFactory { hash_table_ratio_(hash_table_ratio) {} const char* Name() const override { return "PlainTable"; } Status NewTableReader(const Options& options, const EnvOptions& soptions, + const InternalKeyComparator& internal_comparator, unique_ptr&& file, uint64_t file_size, unique_ptr* table) const override; + TableBuilder* NewTableBuilder(const Options& options, + const InternalKeyComparator& icomparator, + WritableFile* file, + CompressionType compression_type) const + override; - TableBuilder* NewTableBuilder(const Options& options, WritableFile* file, - CompressionType compression_type) - const override; + static const char kValueTypeSeqId0 = 0xFF; private: uint32_t user_key_len_; diff --git a/table/plain_table_reader.cc b/table/plain_table_reader.cc index 5d769eea2..b07862bad 100644 --- a/table/plain_table_reader.cc +++ b/table/plain_table_reader.cc @@ -4,8 +4,7 @@ #include "table/plain_table_reader.h" -#include -#include +#include #include "db/dbformat.h" @@ -77,6 +76,7 @@ class PlainTableIterator : public Iterator { Slice key_; Slice value_; Status status_; + std::string tmp_str_; // No copying allowed PlainTableIterator(const PlainTableIterator&) = delete; void operator=(const Iterator&) = delete; @@ -84,10 +84,12 @@ class PlainTableIterator : public Iterator { extern const uint64_t kPlainTableMagicNumber; PlainTableReader::PlainTableReader(const EnvOptions& storage_options, + const InternalKeyComparator& icomparator, uint64_t file_size, int bloom_bits_per_key, double hash_table_ratio, const TableProperties& table_properties) : soptions_(storage_options), + internal_comparator_(icomparator), file_size_(file_size), kHashTableRatio(hash_table_ratio), kBloomBitsPerKey(bloom_bits_per_key), @@ -103,6 +105,7 @@ PlainTableReader::~PlainTableReader() { Status PlainTableReader::Open(const Options& options, const EnvOptions& soptions, + const InternalKeyComparator& internal_comparator, unique_ptr&& file, uint64_t file_size, unique_ptr* table_reader, @@ -122,9 +125,9 @@ Status PlainTableReader::Open(const Options& options, return s; } - std::unique_ptr new_reader( - new PlainTableReader(soptions, file_size, bloom_bits_per_key, - hash_table_ratio, table_properties)); + std::unique_ptr new_reader(new PlainTableReader( + soptions, internal_comparator, file_size, bloom_bits_per_key, + hash_table_ratio, table_properties)); new_reader->file_ = std::move(file); new_reader->options_ = options; @@ -215,10 +218,10 @@ int PlainTableReader::PopulateIndexRecordList(IndexRecordList* record_list) { int num_prefixes = 0; while (pos < data_end_offset_) { uint32_t key_offset = pos; - Slice key_slice; + ParsedInternalKey key; Slice value_slice; - status_ = Next(pos, &key_slice, &value_slice, pos); - Slice key_prefix_slice = GetPrefix(key_slice); + status_ = Next(pos, &key, &value_slice, pos); + Slice key_prefix_slice = GetPrefix(key); if (is_first_record || prev_key_prefix_slice != key_prefix_slice) { ++num_prefixes; @@ -413,7 +416,11 @@ Status PlainTableReader::GetOffset(const Slice& target, const Slice& prefix, index_ptr + 4, &upper_bound); uint32_t high = upper_bound; - Slice mid_key; + ParsedInternalKey mid_key; + ParsedInternalKey parsed_target; + if (!ParseInternalKey(target, &parsed_target)) { + return Status::Corruption(Slice()); + } // The key is between [low, high). Do a binary search between it. while (high - low > 1) { @@ -424,8 +431,8 @@ Status PlainTableReader::GetOffset(const Slice& target, const Slice& prefix, if (!s.ok()) { return s; } - int cmp_result = options_.comparator->Compare(target, mid_key); - if (cmp_result > 0) { + int cmp_result = internal_comparator_.Compare(mid_key, parsed_target); + if (cmp_result < 0) { low = mid; } else { if (cmp_result == 0) { @@ -442,7 +449,7 @@ Status PlainTableReader::GetOffset(const Slice& target, const Slice& prefix, // Both of the key at the position low or low+1 could share the same // prefix as target. We need to rule out one of them to avoid to go // to the wrong prefix. - Slice low_key; + ParsedInternalKey low_key; size_t tmp; uint32_t low_key_offset = base_ptr[low]; Status s = ReadKey(file_data_.data() + low_key_offset, &low_key, tmp); @@ -465,31 +472,53 @@ bool PlainTableReader::MayHavePrefix(uint32_t hash) { return bloom_ == nullptr || bloom_->MayContainHash(hash); } -Status PlainTableReader::ReadKey(const char* row_ptr, Slice* key, +Slice PlainTableReader::GetPrefix(const ParsedInternalKey& target) { + return options_.prefix_extractor->Transform(target.user_key); +} + +Status PlainTableReader::ReadKey(const char* row_ptr, ParsedInternalKey* key, size_t& bytes_read) { const char* key_ptr = nullptr; bytes_read = 0; - size_t internal_key_size = 0; + size_t user_key_size = 0; if (IsFixedLength()) { - internal_key_size = GetFixedInternalKeyLength(); + user_key_size = user_key_len_; key_ptr = row_ptr; } else { - uint32_t key_size = 0; + uint32_t tmp_size = 0; key_ptr = GetVarint32Ptr(row_ptr, file_data_.data() + data_end_offset_, - &key_size); - internal_key_size = (size_t)key_size; + &tmp_size); + if (key_ptr == nullptr) { + return Status::Corruption("Unable to read the next key"); + } + user_key_size = (size_t)tmp_size; bytes_read = key_ptr - row_ptr; } - if (row_ptr + internal_key_size >= file_data_.data() + data_end_offset_) { + if (key_ptr + user_key_size + 1 >= file_data_.data() + data_end_offset_) { return Status::Corruption("Unable to read the next key"); } - *key = Slice(key_ptr, internal_key_size); - bytes_read += internal_key_size; + + if (*(key_ptr + user_key_size) == PlainTableFactory::kValueTypeSeqId0) { + // Special encoding for the row with seqID=0 + key->user_key = Slice(key_ptr, user_key_size); + key->sequence = 0; + key->type = kTypeValue; + bytes_read += user_key_size + 1; + } else { + if (row_ptr + user_key_size + 8 >= file_data_.data() + data_end_offset_) { + return Status::Corruption("Unable to read the next key"); + } + if (!ParseInternalKey(Slice(key_ptr, user_key_size + 8), key)) { + return Status::Corruption(Slice()); + } + bytes_read += user_key_size + 8; + } + return Status::OK(); } -Status PlainTableReader::Next(uint32_t offset, Slice* key, Slice* value, - uint32_t& next_offset) { +Status PlainTableReader::Next(uint32_t offset, ParsedInternalKey* key, + Slice* value, uint32_t& next_offset) { if (offset == data_end_offset_) { next_offset = data_end_offset_; return Status::OK(); @@ -518,10 +547,11 @@ Status PlainTableReader::Next(uint32_t offset, Slice* key, Slice* value, return Status::OK(); } -Status PlainTableReader::Get( - const ReadOptions& ro, const Slice& target, void* arg, - bool (*saver)(void*, const Slice&, const Slice&, bool), - void (*mark_key_may_exist)(void*)) { +Status PlainTableReader::Get(const ReadOptions& ro, const Slice& target, + void* arg, + bool (*saver)(void*, const ParsedInternalKey&, + const Slice&, bool), + void (*mark_key_may_exist)(void*)) { // Check bloom filter first. Slice prefix_slice = GetPrefix(target); uint32_t prefix_hash = GetSliceHash(prefix_slice); @@ -534,7 +564,12 @@ Status PlainTableReader::Get( if (!s.ok()) { return s; } - Slice found_key; + ParsedInternalKey found_key; + ParsedInternalKey parsed_target; + if (!ParseInternalKey(target, &parsed_target)) { + return Status::Corruption(Slice()); + } + Slice found_value; while (offset < data_end_offset_) { Status s = Next(offset, &found_key, &found_value, offset); @@ -549,9 +584,10 @@ Status PlainTableReader::Get( } prefix_match = true; } - if (options_.comparator->Compare(found_key, target) >= 0 - && !(*saver)(arg, found_key, found_value, true)) { - break; + if (internal_comparator_.Compare(found_key, parsed_target) >= 0) { + if (!(*saver)(arg, found_key, found_value, true)) { + break; + } } } return Status::OK(); @@ -612,7 +648,7 @@ void PlainTableIterator::Seek(const Slice& target) { } prefix_match = true; } - if (table_->options_.comparator->Compare(key(), target) >= 0) { + if (table_->internal_comparator_.Compare(key(), target) >= 0) { break; } } @@ -623,8 +659,19 @@ void PlainTableIterator::Seek(const Slice& target) { void PlainTableIterator::Next() { offset_ = next_offset_; - Slice tmp_slice; - status_ = table_->Next(next_offset_, &key_, &value_, next_offset_); + if (offset_ < table_->data_end_offset_) { + Slice tmp_slice; + ParsedInternalKey parsed_key; + status_ = table_->Next(next_offset_, &parsed_key, &value_, next_offset_); + if (status_.ok()) { + // Make a copy in this case. TODO optimize. + tmp_str_.clear(); + AppendInternalKey(&tmp_str_, parsed_key); + key_ = Slice(tmp_str_); + } else { + offset_ = next_offset_ = table_->data_end_offset_; + } + } } void PlainTableIterator::Prev() { @@ -632,10 +679,12 @@ void PlainTableIterator::Prev() { } Slice PlainTableIterator::key() const { + assert(Valid()); return key_; } Slice PlainTableIterator::value() const { + assert(Valid()); return value_; } diff --git a/table/plain_table_reader.h b/table/plain_table_reader.h index d223a13d5..1abe4e35c 100644 --- a/table/plain_table_reader.h +++ b/table/plain_table_reader.h @@ -6,8 +6,10 @@ #include #include #include +#include #include +#include "db/dbformat.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" #include "rocksdb/slice_transform.h" @@ -27,6 +29,7 @@ struct ReadOptions; class TableCache; class TableReader; class DynamicBloom; +class InternalKeyComparator; using std::unique_ptr; using std::unordered_map; @@ -43,6 +46,7 @@ extern const uint32_t kPlainTableVariableLength; class PlainTableReader: public TableReader { public: static Status Open(const Options& options, const EnvOptions& soptions, + const InternalKeyComparator& internal_comparator, unique_ptr&& file, uint64_t file_size, unique_ptr* table, const int bloom_bits_per_key, double hash_table_ratio); @@ -51,10 +55,10 @@ class PlainTableReader: public TableReader { Iterator* NewIterator(const ReadOptions&); - Status Get( - const ReadOptions&, const Slice& key, void* arg, - bool (*handle_result)(void* arg, const Slice& k, const Slice& v, bool), - void (*mark_key_may_exist)(void*) = nullptr); + Status Get(const ReadOptions&, const Slice& key, void* arg, + bool (*result_handler)(void* arg, const ParsedInternalKey& k, + const Slice& v, bool), + void (*mark_key_may_exist)(void*) = nullptr); uint64_t ApproximateOffsetOf(const Slice& key); @@ -62,8 +66,10 @@ class PlainTableReader: public TableReader { const TableProperties& GetTableProperties() { return table_properties_; } - PlainTableReader(const EnvOptions& storage_options, uint64_t file_size, - int bloom_bits_per_key, double hash_table_ratio, + PlainTableReader(const EnvOptions& storage_options, + const InternalKeyComparator& internal_comparator, + uint64_t file_size, int bloom_num_bits, + double hash_table_ratio, const TableProperties& table_properties); ~PlainTableReader(); @@ -77,6 +83,7 @@ class PlainTableReader: public TableReader { Options options_; const EnvOptions& soptions_; + const InternalKeyComparator internal_comparator_; Status status_; unique_ptr file_; @@ -184,11 +191,13 @@ class PlainTableReader: public TableReader { // too. bool MayHavePrefix(uint32_t hash); - Status ReadKey(const char* row_ptr, Slice* key, size_t& bytes_read); + Status ReadKey(const char* row_ptr, ParsedInternalKey* key, + size_t& bytes_read); // Read the key and value at offset to key and value. // tmp_slice is a tmp slice. // return next_offset as the offset for the next key. - Status Next(uint32_t offset, Slice* key, Slice* value, uint32_t& next_offset); + Status Next(uint32_t offset, ParsedInternalKey* key, Slice* value, + uint32_t& next_offset); // Get file offset for key target. // return value prefix_matched is set to true if the offset is confirmed // for a key with the same prefix as target. @@ -202,6 +211,8 @@ class PlainTableReader: public TableReader { Slice(target.data(), target.size() - 8)); } + Slice GetPrefix(const ParsedInternalKey& target); + // No copying allowed explicit PlainTableReader(const TableReader&) = delete; void operator=(const TableReader&) = delete; diff --git a/table/table_factory.h b/table/table_factory.h index d4b222657..f606a916a 100644 --- a/table/table_factory.h +++ b/table/table_factory.h @@ -53,6 +53,7 @@ class TableFactory { // table_reader is the output table reader virtual Status NewTableReader( const Options& options, const EnvOptions& soptions, + const InternalKeyComparator& internal_comparator, unique_ptr&& file, uint64_t file_size, unique_ptr* table_reader) const = 0; @@ -75,8 +76,8 @@ class TableFactory { // keep the file open and close the file after closing the table builder. // compression_type is the compression type to use in this table. virtual TableBuilder* NewTableBuilder( - const Options& options, WritableFile* file, - CompressionType compression_type) const = 0; + const Options& options, const InternalKeyComparator& internal_comparator, + WritableFile* file, CompressionType compression_type) const = 0; }; } // namespace rocksdb diff --git a/table/table_reader.h b/table/table_reader.h index 983c998e7..681ce7233 100644 --- a/table/table_reader.h +++ b/table/table_reader.h @@ -12,6 +12,7 @@ namespace rocksdb { class Iterator; +class ParsedInternalKey; class Slice; struct ReadOptions; struct TableProperties; @@ -62,7 +63,7 @@ class TableReader { // key is the key to search for virtual Status Get( const ReadOptions& readOptions, const Slice& key, void* handle_context, - bool (*result_handler)(void* handle_context, const Slice& k, + bool (*result_handler)(void* arg, const ParsedInternalKey& k, const Slice& v, bool didIO), void (*mark_key_may_exist_handler)(void* handle_context) = nullptr) = 0; }; diff --git a/table/table_reader_bench.cc b/table/table_reader_bench.cc index 88436c1f3..f746592fe 100644 --- a/table/table_reader_bench.cc +++ b/table/table_reader_bench.cc @@ -34,8 +34,8 @@ static std::string MakeKey(int i, int j, bool through_db) { return key.Encode().ToString(); } -static bool DummySaveValue(void* arg, const Slice& ikey, const Slice& v, - bool didIO) { +static bool DummySaveValue(void* arg, const ParsedInternalKey& ikey, + const Slice& v, bool didIO) { return false; } @@ -237,6 +237,8 @@ int main(int argc, char** argv) { rocksdb::EnvOptions env_options; options.create_if_missing = true; options.compression = rocksdb::CompressionType::kNoCompression; + options.internal_comparator = + new rocksdb::InternalKeyComparator(options.comparator); if (FLAGS_plain_table) { options.allow_mmap_reads = true; diff --git a/table/table_test.cc b/table/table_test.cc index 39f341131..076f5eb07 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -183,8 +183,9 @@ class Constructor { // been added so far. Returns the keys in sorted order in "*keys" // and stores the key/value pairs in "*kvmap" void Finish(const Options& options, - std::vector* keys, - KVMap* kvmap) { + const InternalKeyComparator& internal_comparator, + std::vector* keys, KVMap* kvmap) { + last_internal_key_ = &internal_comparator; *kvmap = data_; keys->clear(); for (KVMap::const_iterator it = data_.begin(); @@ -193,12 +194,14 @@ class Constructor { keys->push_back(it->first); } data_.clear(); - Status s = FinishImpl(options, *kvmap); + Status s = FinishImpl(options, internal_comparator, *kvmap); ASSERT_TRUE(s.ok()) << s.ToString(); } // Construct the data structure from the data in "data" - virtual Status FinishImpl(const Options& options, const KVMap& data) = 0; + virtual Status FinishImpl(const Options& options, + const InternalKeyComparator& internal_comparator, + const KVMap& data) = 0; virtual Iterator* NewIterator() const = 0; @@ -206,6 +209,9 @@ class Constructor { virtual DB* db() const { return nullptr; } // Overridden in DBConstructor + protected: + const InternalKeyComparator* last_internal_key_; + private: KVMap data_; }; @@ -219,10 +225,12 @@ class BlockConstructor: public Constructor { ~BlockConstructor() { delete block_; } - virtual Status FinishImpl(const Options& options, const KVMap& data) { + virtual Status FinishImpl(const Options& options, + const InternalKeyComparator& internal_comparator, + const KVMap& data) { delete block_; block_ = nullptr; - BlockBuilder builder(options); + BlockBuilder builder(options, &internal_comparator); for (KVMap::const_iterator it = data.begin(); it != data.end(); @@ -298,12 +306,14 @@ class TableConstructor: public Constructor { : Constructor(cmp), convert_to_internal_key_(convert_to_internal_key) {} ~TableConstructor() { Reset(); } - virtual Status FinishImpl(const Options& options, const KVMap& data) { + virtual Status FinishImpl(const Options& options, + const InternalKeyComparator& internal_comparator, + const KVMap& data) { Reset(); sink_.reset(new StringSink()); unique_ptr builder; - builder.reset(options.table_factory->NewTableBuilder(options, sink_.get(), - options.compression)); + builder.reset(options.table_factory->NewTableBuilder( + options, internal_comparator, sink_.get(), options.compression)); for (KVMap::const_iterator it = data.begin(); it != data.end(); @@ -328,8 +338,8 @@ class TableConstructor: public Constructor { source_.reset(new StringSource(sink_->contents(), uniq_id_, options.allow_mmap_reads)); return options.table_factory->NewTableReader( - options, soptions, std::move(source_), sink_->contents().size(), - &table_reader_); + options, soptions, internal_comparator, std::move(source_), + sink_->contents().size(), &table_reader_); } virtual Iterator* NewIterator() const { @@ -350,8 +360,8 @@ class TableConstructor: public Constructor { new StringSource(sink_->contents(), uniq_id_, options.allow_mmap_reads)); return options.table_factory->NewTableReader( - options, soptions, std::move(source_), sink_->contents().size(), - &table_reader_); + options, soptions, *last_internal_key_, std::move(source_), + sink_->contents().size(), &table_reader_); } virtual TableReader* table_reader() { @@ -393,7 +403,9 @@ class MemTableConstructor: public Constructor { ~MemTableConstructor() { delete memtable_->Unref(); } - virtual Status FinishImpl(const Options& options, const KVMap& data) { + virtual Status FinishImpl(const Options& options, + const InternalKeyComparator& internal_comparator, + const KVMap& data) { delete memtable_->Unref(); Options memtable_options; memtable_options.memtable_factory = table_factory_; @@ -429,7 +441,9 @@ class DBConstructor: public Constructor { ~DBConstructor() { delete db_; } - virtual Status FinishImpl(const Options& options, const KVMap& data) { + virtual Status FinishImpl(const Options& options, + const InternalKeyComparator& internal_comparator, + const KVMap& data) { delete db_; db_ = nullptr; NewDB(); @@ -619,7 +633,10 @@ class Harness { if (args.reverse_compare) { options_.comparator = &reverse_key_comparator; } - internal_comparator_.reset(new InternalKeyComparator(options_.comparator)); + + internal_comparator_.reset( + new test::PlainInternalKeyComparator(options_.comparator)); + support_prev_ = true; only_support_prefix_seek_ = false; BlockBasedTableOptions table_options; @@ -638,7 +655,8 @@ class Harness { options_.allow_mmap_reads = true; options_.table_factory.reset(new PlainTableFactory()); constructor_ = new TableConstructor(options_.comparator, true); - options_.comparator = internal_comparator_.get(); + internal_comparator_.reset( + new InternalKeyComparator(options_.comparator)); break; case PLAIN_TABLE_FULL_STR_PREFIX: support_prev_ = false; @@ -647,7 +665,8 @@ class Harness { options_.allow_mmap_reads = true; options_.table_factory.reset(new PlainTableFactory()); constructor_ = new TableConstructor(options_.comparator, true); - options_.comparator = internal_comparator_.get(); + internal_comparator_.reset( + new InternalKeyComparator(options_.comparator)); break; case BLOCK_TEST: constructor_ = new BlockConstructor(options_.comparator); @@ -672,7 +691,7 @@ class Harness { void Test(Random* rnd) { std::vector keys; KVMap data; - constructor_->Finish(options_, &keys, &data); + constructor_->Finish(options_, *internal_comparator_, &keys, &data); TestForwardScan(keys, data); if (support_prev_) { @@ -844,7 +863,7 @@ class Harness { Constructor* constructor_; bool support_prev_; bool only_support_prefix_seek_; - shared_ptr internal_comparator_; + shared_ptr internal_comparator_; static std::unique_ptr noop_transform; static std::unique_ptr prefix_transform; }; @@ -866,9 +885,24 @@ static bool Between(uint64_t val, uint64_t low, uint64_t high) { } // Tests against all kinds of tables -class GeneralTableTest {}; -class BlockBasedTableTest {}; -class PlainTableTest {}; +class TableTest { + public: + const InternalKeyComparator& GetPlainInternalComparator( + const Comparator* comp) { + if (!plain_internal_comparator) { + plain_internal_comparator.reset( + new test::PlainInternalKeyComparator(comp)); + } + return *plain_internal_comparator; + } + + private: + std::unique_ptr plain_internal_comparator; +}; + +class GeneralTableTest : public TableTest {}; +class BlockBasedTableTest : public TableTest {}; +class PlainTableTest : public TableTest {}; // This test include all the basic checks except those for index size and block // size, which will be conducted in separated unit tests. @@ -891,7 +925,8 @@ TEST(BlockBasedTableTest, BasicBlockBasedTableProperties) { options.compression = kNoCompression; options.block_restart_interval = 1; - c.Finish(options, &keys, &kvmap); + c.Finish(options, GetPlainInternalComparator(options.comparator), &keys, + &kvmap); auto& props = c.table_reader()->GetTableProperties(); ASSERT_EQ(kvmap.size(), props.num_entries); @@ -905,7 +940,7 @@ TEST(BlockBasedTableTest, BasicBlockBasedTableProperties) { ASSERT_EQ("", props.filter_policy_name); // no filter policy is used // Verify data size. - BlockBuilder block_builder(options); + BlockBuilder block_builder(options, options.comparator); for (const auto& item : kvmap) { block_builder.Add(item.first, item.second); } @@ -927,7 +962,8 @@ TEST(BlockBasedTableTest, FilterPolicyNameProperties) { ); options.filter_policy = filter_policy.get(); - c.Finish(options, &keys, &kvmap); + c.Finish(options, GetPlainInternalComparator(options.comparator), &keys, + &kvmap); auto& props = c.table_reader()->GetTableProperties(); ASSERT_EQ("rocksdb.BuiltinBloomFilter", props.filter_policy_name); } @@ -968,7 +1004,8 @@ TEST(BlockBasedTableTest, IndexSizeStat) { options.compression = kNoCompression; options.block_restart_interval = 1; - c.Finish(options, &ks, &kvmap); + c.Finish(options, GetPlainInternalComparator(options.comparator), &ks, + &kvmap); auto index_size = c.table_reader()->GetTableProperties().index_size; ASSERT_GT(index_size, last_index_size); @@ -992,7 +1029,8 @@ TEST(BlockBasedTableTest, NumBlockStat) { std::vector ks; KVMap kvmap; - c.Finish(options, &ks, &kvmap); + c.Finish(options, GetPlainInternalComparator(options.comparator), &ks, + &kvmap); ASSERT_EQ( kvmap.size(), c.table_reader()->GetTableProperties().num_data_blocks @@ -1055,7 +1093,8 @@ TEST(BlockBasedTableTest, BlockCacheTest) { TableConstructor c(BytewiseComparator()); c.Add("key", "value"); - c.Finish(options, &keys, &kvmap); + c.Finish(options, GetPlainInternalComparator(options.comparator), &keys, + &kvmap); // -- PART 1: Open with regular block cache. // Since block_cache is disabled, no cache activities will be involved. @@ -1179,6 +1218,8 @@ TEST(BlockBasedTableTest, BlockCacheLeak) { // unique ID from the file. Options opt; + unique_ptr ikc; + ikc.reset(new test::PlainInternalKeyComparator(opt.comparator)); opt.block_size = 1024; opt.compression = kNoCompression; opt.block_cache = @@ -1195,7 +1236,7 @@ TEST(BlockBasedTableTest, BlockCacheLeak) { c.Add("k07", std::string(100000, 'x')); std::vector keys; KVMap kvmap; - c.Finish(opt, &keys, &kvmap); + c.Finish(opt, *ikc, &keys, &kvmap); unique_ptr iter(c.NewIterator()); iter->SeekToFirst(); @@ -1217,11 +1258,14 @@ extern const uint64_t kPlainTableMagicNumber; TEST(PlainTableTest, BasicPlainTableProperties) { PlainTableFactory factory(8, 8, 0); StringSink sink; + Options options; + InternalKeyComparator ikc(options.comparator); std::unique_ptr builder( - factory.NewTableBuilder(Options(), &sink, kNoCompression)); + factory.NewTableBuilder(options, ikc, &sink, kNoCompression)); for (char c = 'a'; c <= 'z'; ++c) { - std::string key(16, c); + std::string key(8, c); + key.append("\1 "); // PlainTable expects internal key structure std::string value(28, c + 42); builder->Add(key, value); } @@ -1255,9 +1299,10 @@ TEST(GeneralTableTest, ApproximateOffsetOfPlain) { std::vector keys; KVMap kvmap; Options options; + test::PlainInternalKeyComparator internal_comparator(options.comparator); options.block_size = 1024; options.compression = kNoCompression; - c.Finish(options, &keys, &kvmap); + c.Finish(options, internal_comparator, &keys, &kvmap); ASSERT_TRUE(Between(c.ApproximateOffsetOf("abc"), 0, 0)); ASSERT_TRUE(Between(c.ApproximateOffsetOf("k01"), 0, 0)); @@ -1284,9 +1329,10 @@ static void DoCompressionTest(CompressionType comp) { std::vector keys; KVMap kvmap; Options options; + test::PlainInternalKeyComparator ikc(options.comparator); options.block_size = 1024; options.compression = comp; - c.Finish(options, &keys, &kvmap); + c.Finish(options, ikc, &keys, &kvmap); ASSERT_TRUE(Between(c.ApproximateOffsetOf("abc"), 0, 0)); ASSERT_TRUE(Between(c.ApproximateOffsetOf("k01"), 0, 0)); diff --git a/table/two_level_iterator.cc b/table/two_level_iterator.cc index ac2d8d3d9..65a58ad93 100644 --- a/table/two_level_iterator.cc +++ b/table/two_level_iterator.cc @@ -20,18 +20,17 @@ namespace rocksdb { namespace { typedef Iterator* (*BlockFunction)(void*, const ReadOptions&, - const EnvOptions& soptions, const Slice&, - bool for_compaction); + const EnvOptions& soptions, + const InternalKeyComparator& icomparator, + const Slice&, bool for_compaction); class TwoLevelIterator: public Iterator { public: - TwoLevelIterator( - Iterator* index_iter, - BlockFunction block_function, - void* arg, - const ReadOptions& options, - const EnvOptions& soptions, - bool for_compaction); + TwoLevelIterator(Iterator* index_iter, BlockFunction block_function, + void* arg, const ReadOptions& options, + const EnvOptions& soptions, + const InternalKeyComparator& internal_comparator, + bool for_compaction); virtual ~TwoLevelIterator(); @@ -76,6 +75,7 @@ class TwoLevelIterator: public Iterator { void* arg_; const ReadOptions options_; const EnvOptions& soptions_; + const InternalKeyComparator& internal_comparator_; Status status_; IteratorWrapper index_iter_; IteratorWrapper data_iter_; // May be nullptr @@ -86,20 +86,17 @@ class TwoLevelIterator: public Iterator { }; TwoLevelIterator::TwoLevelIterator( - Iterator* index_iter, - BlockFunction block_function, - void* arg, - const ReadOptions& options, - const EnvOptions& soptions, - bool for_compaction) + Iterator* index_iter, BlockFunction block_function, void* arg, + const ReadOptions& options, const EnvOptions& soptions, + const InternalKeyComparator& internal_comparator, bool for_compaction) : block_function_(block_function), arg_(arg), options_(options), soptions_(soptions), + internal_comparator_(internal_comparator), index_iter_(index_iter), data_iter_(nullptr), - for_compaction_(for_compaction) { -} + for_compaction_(for_compaction) {} TwoLevelIterator::~TwoLevelIterator() { } @@ -181,8 +178,9 @@ void TwoLevelIterator::InitDataBlock() { // data_iter_ is already constructed with this iterator, so // no need to change anything } else { - Iterator* iter = (*block_function_)(arg_, options_, soptions_, handle, - for_compaction_); + Iterator* iter = + (*block_function_)(arg_, options_, soptions_, internal_comparator_, + handle, for_compaction_); data_block_handle_.assign(handle.data(), handle.size()); SetDataIterator(iter); } @@ -191,15 +189,14 @@ void TwoLevelIterator::InitDataBlock() { } // namespace -Iterator* NewTwoLevelIterator( - Iterator* index_iter, - BlockFunction block_function, - void* arg, - const ReadOptions& options, - const EnvOptions& soptions, - bool for_compaction) { - return new TwoLevelIterator(index_iter, block_function, arg, - options, soptions, for_compaction); +Iterator* NewTwoLevelIterator(Iterator* index_iter, + BlockFunction block_function, void* arg, + const ReadOptions& options, + const EnvOptions& soptions, + const InternalKeyComparator& internal_comparator, + bool for_compaction) { + return new TwoLevelIterator(index_iter, block_function, arg, options, + soptions, internal_comparator, for_compaction); } } // namespace rocksdb diff --git a/table/two_level_iterator.h b/table/two_level_iterator.h index 85aed3f14..d313dcb18 100644 --- a/table/two_level_iterator.h +++ b/table/two_level_iterator.h @@ -14,6 +14,7 @@ namespace rocksdb { struct ReadOptions; +class InternalKeyComparator; // Return a new two level iterator. A two-level iterator contains an // index iterator whose values point to a sequence of blocks where @@ -27,14 +28,11 @@ struct ReadOptions; extern Iterator* NewTwoLevelIterator( Iterator* index_iter, Iterator* (*block_function)( - void* arg, - const ReadOptions& options, - const EnvOptions& soptions, - const Slice& index_value, - bool for_compaction), - void* arg, - const ReadOptions& options, - const EnvOptions& soptions, + void* arg, const ReadOptions& options, const EnvOptions& soptions, + const InternalKeyComparator& internal_comparator, + const Slice& index_value, bool for_compaction), + void* arg, const ReadOptions& options, const EnvOptions& soptions, + const InternalKeyComparator& internal_comparator, bool for_compaction = false); } // namespace rocksdb diff --git a/tools/sst_dump.cc b/tools/sst_dump.cc index 7eb339659..79b361841 100644 --- a/tools/sst_dump.cc +++ b/tools/sst_dump.cc @@ -71,7 +71,6 @@ SstFileReader::SstFileReader(const std::string& file_path, } Status SstFileReader::NewTableReader(const std::string& file_path) { - table_options_.comparator = &internal_comparator_; Status s = table_options_.env->NewRandomAccessFile(file_path, &file_, soptions_); if (!s.ok()) { @@ -81,7 +80,8 @@ Status SstFileReader::NewTableReader(const std::string& file_path) { table_options_.env->GetFileSize(file_path, &file_size); unique_ptr table_factory; s = table_options_.table_factory->NewTableReader( - table_options_, soptions_, std::move(file_), file_size, &table_reader_); + table_options_, soptions_, internal_comparator_, std::move(file_), + file_size, &table_reader_); return s; } diff --git a/util/testutil.h b/util/testutil.h index c73210fec..4fc8c0f5b 100644 --- a/util/testutil.h +++ b/util/testutil.h @@ -8,6 +8,8 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once +#include +#include "db/dbformat.h" #include "rocksdb/env.h" #include "rocksdb/slice.h" #include "util/random.h" @@ -51,5 +53,28 @@ class ErrorEnv : public EnvWrapper { } }; +// An internal comparator that just forward comparing results from the +// user comparator in it. Can be used to test entities that have no dependency +// on internal key structure but consumes InternalKeyComparator, like +// BlockBasedTable. +class PlainInternalKeyComparator : public InternalKeyComparator { + public: + explicit PlainInternalKeyComparator(const Comparator* c) + : InternalKeyComparator(c) {} + + virtual ~PlainInternalKeyComparator() {} + + virtual int Compare(const Slice& a, const Slice& b) const override { + return user_comparator()->Compare(a, b); + } + virtual void FindShortestSeparator(std::string* start, + const Slice& limit) const override { + user_comparator()->FindShortestSeparator(start, limit); + } + virtual void FindShortSuccessor(std::string* key) const override { + user_comparator()->FindShortSuccessor(key); + } +}; + } // namespace test } // namespace rocksdb