diff --git a/Makefile b/Makefile index 31720522f..a784046e8 100644 --- a/Makefile +++ b/Makefile @@ -60,6 +60,7 @@ TESTS = \ merge_test \ redis_test \ reduce_levels_test \ + simple_table_db_test \ skiplist_test \ stringappend_test \ table_test \ @@ -236,6 +237,9 @@ crc32c_test: util/crc32c_test.o $(LIBOBJECTS) $(TESTHARNESS) db_test: db/db_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) db/db_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) +simple_table_db_test: db/simple_table_db_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(CXX) db/simple_table_db_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(COVERAGEFLAGS) + perf_context_test: db/perf_context_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) db/perf_context_test.o $(LIBOBJECTS) $(TESTHARNESS) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) diff --git a/db/builder.cc b/db/builder.cc index b9206b4e2..b3bf894ef 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -15,12 +15,23 @@ #include "db/table_cache.h" #include "db/version_edit.h" #include "rocksdb/db.h" +#include "rocksdb/table.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" +#include "rocksdb/options.h" +#include "table/block_based_table_builder.h" #include "util/stop_watch.h" namespace rocksdb { +class TableFactory; + +TableBuilder* GetTableBuilder(const Options& options, WritableFile* file, + CompressionType compression_type) { + return options.table_factory->GetTableBuilder(options, file, + compression_type); +} + Status BuildTable(const std::string& dbname, Env* env, const Options& options, @@ -52,8 +63,9 @@ Status BuildTable(const std::string& dbname, if (!s.ok()) { return s; } - TableBuilder* builder = new TableBuilder(options, file.get(), 0, - enable_compression); + + TableBuilder* builder = GetTableBuilder(options, file.get(), + options.compression); // the first key is the smallest key Slice key = iter->key(); diff --git a/db/builder.h b/db/builder.h index a09033e94..c5810d952 100644 --- a/db/builder.h +++ b/db/builder.h @@ -9,6 +9,7 @@ #include "rocksdb/comparator.h" #include "rocksdb/status.h" #include "rocksdb/types.h" +#include "rocksdb/options.h" namespace rocksdb { @@ -20,6 +21,12 @@ class EnvOptions; class Iterator; class TableCache; class VersionEdit; +class TableBuilder; +class WritableFile; + + +extern TableBuilder* GetTableBuilder(const Options& options, WritableFile* file, + CompressionType compression_type); // Build a Table file from the contents of *iter. The generated file // will be named according to meta->number. On success, the rest of diff --git a/db/corruption_test.cc b/db/corruption_test.cc index 3dd92d173..e7b7b4c8b 100644 --- a/db/corruption_test.cc +++ b/db/corruption_test.cc @@ -15,12 +15,12 @@ #include #include "rocksdb/cache.h" #include "rocksdb/env.h" +#include "rocksdb/table.h" #include "rocksdb/write_batch.h" #include "db/db_impl.h" #include "db/filename.h" #include "db/log_format.h" #include "db/version_set.h" -#include "table/table.h" #include "util/logging.h" #include "util/testharness.h" #include "util/testutil.h" diff --git a/db/db_impl.cc b/db/db_impl.cc index fbbf650d8..75e25b0cd 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -41,10 +41,10 @@ #include "rocksdb/merge_operator.h" #include "rocksdb/statistics.h" #include "rocksdb/status.h" -#include "rocksdb/table_builder.h" +#include "rocksdb/table.h" +#include "port/port.h" #include "table/block.h" #include "table/merger.h" -#include "table/table.h" #include "table/two_level_iterator.h" #include "util/auto_roll_logger.h" #include "util/build_version.h" @@ -211,6 +211,27 @@ Options SanitizeOptions(const std::string& dbname, return result; } +CompressionType GetCompressionType(const Options& options, int level, + const bool enable_compression) { + if (!enable_compression) { + // disable compression + return kNoCompression; + } + // If the use has specified a different compression level for each level, + // then pick the compresison for that level. + if (!options.compression_per_level.empty()) { + const int n = options.compression_per_level.size() - 1; + // It is possible for level_ to be -1; in that case, we use level + // 0's compression. This occurs mostly in backwards compatibility + // situations when the builder doesn't know what level the file + // belongs to. Likewise, if level_ is beyond the end of the + // specified compression levels, use the last value. + return options.compression_per_level[std::max(0, std::min(level, n))]; + } else { + return options.compression; + } +} + DBImpl::DBImpl(const Options& options, const std::string& dbname) : env_(options.env), dbname_(dbname), @@ -1774,9 +1795,12 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { compact->outfile->SetPreallocationBlockSize( 1.1 * versions_->MaxFileSizeForLevel(compact->compaction->output_level())); - compact->builder.reset(new TableBuilder(options_, compact->outfile.get(), - compact->compaction->output_level(), - compact->compaction->enable_compression())); + CompressionType compression_type = GetCompressionType( + options_, compact->compaction->output_level(), + compact->compaction->enable_compression()); + + compact->builder.reset( + GetTableBuilder(options_, compact->outfile.get(), compression_type)); } return s; } @@ -2026,9 +2050,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { compaction_filter_value.clear(); bool to_delete = compaction_filter->Filter(compact->compaction->level(), - ikey.user_key, value, - &compaction_filter_value, - &value_changed); + ikey.user_key, value, + &compaction_filter_value, + &value_changed); if (to_delete) { // make a copy of the original key delete_key.assign(key.data(), key.data() + key.size()); diff --git a/db/db_impl.h b/db/db_impl.h index e769d19f5..556be2c77 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -444,4 +444,13 @@ extern Options SanitizeOptions(const std::string& db, const InternalFilterPolicy* ipolicy, const Options& src); + +// Determine compression type, based on user options, level of the output +// file and whether compression is disabled. +// If enable_compression is false, then compression is always disabled no +// matter what the values of the other two parameters are. +// Otherwise, the compression type is determined based on options and level. +CompressionType GetCompressionType(const Options& options, int level, + const bool enable_compression); + } // namespace rocksdb diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index 58da65abf..27d5c31ed 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -29,11 +29,10 @@ #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/status.h" -#include "rocksdb/table_builder.h" +#include "rocksdb/table.h" #include "port/port.h" #include "table/block.h" #include "table/merger.h" -#include "table/table.h" #include "table/two_level_iterator.h" #include "util/coding.h" #include "util/logging.h" diff --git a/db/db_test.cc b/db/db_test.cc index a99e056e3..27255e56f 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -20,7 +20,7 @@ #include "rocksdb/cache.h" #include "rocksdb/compaction_filter.h" #include "rocksdb/env.h" -#include "table/table.h" +#include "rocksdb/table.h" #include "util/hash.h" #include "util/logging.h" #include "util/mutexlock.h" diff --git a/db/dbformat.h b/db/dbformat.h index a5f1a26a9..64a2c9f05 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -13,7 +13,7 @@ #include "rocksdb/db.h" #include "rocksdb/filter_policy.h" #include "rocksdb/slice.h" -#include "rocksdb/table_builder.h" +#include "rocksdb/table.h" #include "rocksdb/types.h" #include "util/coding.h" #include "util/logging.h" diff --git a/db/simple_table_db_test.cc b/db/simple_table_db_test.cc new file mode 100644 index 000000000..c0fb42c9a --- /dev/null +++ b/db/simple_table_db_test.cc @@ -0,0 +1,793 @@ +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#include +#include + +#include "rocksdb/db.h" +#include "rocksdb/filter_policy.h" +#include "db/db_impl.h" +#include "db/filename.h" +#include "db/version_set.h" +#include "db/write_batch_internal.h" +#include "db/db_statistics.h" +#include "rocksdb/cache.h" +#include "rocksdb/compaction_filter.h" +#include "rocksdb/env.h" +#include "rocksdb/table.h" +#include "util/hash.h" +#include "util/logging.h" +#include "util/mutexlock.h" +#include "util/testharness.h" +#include "util/testutil.h" +#include "utilities/merge_operators.h" + +using std::unique_ptr; + +namespace rocksdb { + +// SimpleTable is a simple table format for UNIT TEST ONLY. It is not built +// as production quality. +// SimpleTable requires the input key size to be fixed 16 bytes, value cannot +// be longer than 150000 bytes and stored data on disk in this format: +// +--------------------------------------------+ <= key1 offset +// | key1 | value_size (4 bytes) | | +// +----------------------------------------+ | +// | value1 | +// | | +// +----------------------------------------+---+ <= key2 offset +// | key2 | value_size (4 bytes) | | +// +----------------------------------------+ | +// | value2 | +// | | +// | ...... | +// +-----------------+--------------------------+ <= index_block_offset +// | key1 | key1 offset (8 bytes) | +// +-----------------+--------------------------+ +// | key2 | key2 offset (8 bytes) | +// +-----------------+--------------------------+ +// | key3 | key3 offset (8 bytes) | +// +-----------------+--------------------------+ +// | ...... | +// +-----------------+------------+-------------+ +// | index_block_offset (8 bytes) | +// +------------------------------+ + +// SimpleTable is a simple table format for UNIT TEST ONLY. It is not built +// as production quality. +class SimpleTableReader: public TableReader { +public: + // Attempt to open the table that is stored in bytes [0..file_size) + // of "file", and read the metadata entries necessary to allow + // retrieving data from the table. + // + // If successful, returns ok and sets "*table" to the newly opened + // table. The client should delete "*table" when no longer needed. + // If there was an error while initializing the table, sets "*table" + // to nullptr and returns a non-ok status. Does not take ownership of + // "*source", but the client must ensure that "source" remains live + // for the duration of the returned table's lifetime. + // + // *file must remain live while this Table is in use. + static Status Open(const Options& options, const EnvOptions& soptions, + unique_ptr && file, uint64_t file_size, + unique_ptr* table_reader); + + bool PrefixMayMatch(const Slice& internal_prefix) override; + + Iterator* NewIterator(const ReadOptions&) override; + + Status Get( + const ReadOptions&, const Slice& key, void* arg, + bool (*handle_result)(void* arg, const Slice& k, const Slice& v, bool), + void (*mark_key_may_exist)(void*) = nullptr) override; + + uint64_t ApproximateOffsetOf(const Slice& key) override; + + bool TEST_KeyInCache(const ReadOptions& options, const Slice& key) override; + + void SetupForCompaction() override; + + TableStats& GetTableStats() override; + + ~SimpleTableReader(); + +private: + struct Rep; + Rep* rep_; + + explicit SimpleTableReader(Rep* rep) { + rep_ = rep; + } + friend class TableCache; + friend class SimpleTableIterator; + + Status GetOffset(const Slice& target, uint64_t* offset); + + // No copying allowed + explicit SimpleTableReader(const TableReader&) = delete; + void operator=(const TableReader&) = delete; +}; + +// Iterator to iterate SimpleTable +class SimpleTableIterator: public Iterator { +public: + explicit SimpleTableIterator(SimpleTableReader* table); + ~SimpleTableIterator(); + + bool Valid() const; + + void SeekToFirst(); + + void SeekToLast(); + + void Seek(const Slice& target); + + void Next(); + + void Prev(); + + Slice key() const; + + Slice value() const; + + Status status() const; + +private: + SimpleTableReader* table_; + uint64_t offset_; + uint64_t next_offset_; + Slice key_; + Slice value_; + char tmp_str_[4]; + char* key_str_; + char* value_str_; + int value_str_len_; + Status status_; + // No copying allowed + SimpleTableIterator(const SimpleTableIterator&) = delete; + void operator=(const Iterator&) = delete; +}; + +struct SimpleTableReader::Rep { + ~Rep() { + } + Rep(const EnvOptions& storage_options, uint64_t index_start_offset, + int num_entries) : + soptions(storage_options), index_start_offset(index_start_offset), + num_entries(num_entries) { + } + + Options options; + const EnvOptions& soptions; + Status status; + unique_ptr file; + uint64_t index_start_offset; + int num_entries; + TableStats table_stats; + + const static int user_key_size = 16; + const static int offset_length = 8; + const static int key_footer_len = 8; + + static int GetInternalKeyLength() { + return user_key_size + key_footer_len; + } +}; + +SimpleTableReader::~SimpleTableReader() { + delete rep_; +} + +Status SimpleTableReader::Open(const Options& options, + const EnvOptions& soptions, + unique_ptr && file, + uint64_t size, + unique_ptr* table_reader) { + char footer_space[Rep::offset_length]; + Slice footer_input; + Status s = file->Read(size - Rep::offset_length, Rep::offset_length, + &footer_input, footer_space); + if (s.ok()) { + uint64_t index_start_offset = DecodeFixed64(footer_space); + + int num_entries = (size - Rep::offset_length - index_start_offset) + / (Rep::GetInternalKeyLength() + Rep::offset_length); + SimpleTableReader::Rep* rep = new SimpleTableReader::Rep(soptions, + index_start_offset, + num_entries); + + rep->file = std::move(file); + rep->options = options; + table_reader->reset(new SimpleTableReader(rep)); + } + return s; +} + +void SimpleTableReader::SetupForCompaction() { +} + +TableStats& SimpleTableReader::GetTableStats() { + return rep_->table_stats; +} + +bool SimpleTableReader::PrefixMayMatch(const Slice& internal_prefix) { + return true; +} + +Iterator* SimpleTableReader::NewIterator(const ReadOptions& options) { + return new SimpleTableIterator(this); +} + +Status SimpleTableReader::GetOffset(const Slice& target, uint64_t* offset) { + uint32_t left = 0; + uint32_t right = rep_->num_entries - 1; + char key_chars[Rep::GetInternalKeyLength()]; + Slice tmp_slice; + + uint32_t target_offset = 0; + while (left <= right) { + uint32_t mid = (left + right + 1) / 2; + + uint64_t offset_to_read = rep_->index_start_offset + + (Rep::GetInternalKeyLength() + Rep::offset_length) * mid; + Status s = rep_->file->Read(offset_to_read, Rep::GetInternalKeyLength(), + &tmp_slice, key_chars); + if (!s.ok()) { + return s; + } + + int compare_result = rep_->options.comparator->Compare(tmp_slice, target); + + if (compare_result < 0) { + if (left == right) { + target_offset = right + 1; + break; + } + left = mid; + } else { + if (left == right) { + target_offset = left; + break; + } + right = mid - 1; + } + } + + if (target_offset >= (uint32_t) rep_->num_entries) { + *offset = rep_->index_start_offset; + return Status::OK(); + } + + char value_offset_chars[Rep::offset_length]; + + int64_t offset_for_value_offset = rep_->index_start_offset + + (Rep::GetInternalKeyLength() + Rep::offset_length) * target_offset + + Rep::GetInternalKeyLength(); + Status s = rep_->file->Read(offset_for_value_offset, Rep::offset_length, + &tmp_slice, value_offset_chars); + if (s.ok()) { + *offset = DecodeFixed64(value_offset_chars); + } + return s; +} + +Status SimpleTableReader::Get( + const ReadOptions& options, const Slice& k, void* arg, + bool (*saver)(void*, const Slice&, const Slice&, bool), + void (*mark_key_may_exist)(void*)) { + Status s; + SimpleTableIterator* iter = new SimpleTableIterator(this); + for (iter->Seek(k); iter->Valid(); iter->Next()) { + if (!(*saver)(arg, iter->key(), iter->value(), true)) { + break; + } + } + s = iter->status(); + delete iter; + return s; +} + +bool SimpleTableReader::TEST_KeyInCache(const ReadOptions& options, + const Slice& key) { + return false; +} + +uint64_t SimpleTableReader::ApproximateOffsetOf(const Slice& key) { + return 0; +} + +SimpleTableIterator::SimpleTableIterator(SimpleTableReader* table) : + table_(table) { + key_str_ = new char[SimpleTableReader::Rep::GetInternalKeyLength()]; + value_str_len_ = -1; + SeekToFirst(); +} + +SimpleTableIterator::~SimpleTableIterator() { + delete[] key_str_; + if (value_str_len_ >= 0) { + delete[] value_str_; + } +} + +bool SimpleTableIterator::Valid() const { + return offset_ < table_->rep_->index_start_offset && offset_ >= 0; +} + +void SimpleTableIterator::SeekToFirst() { + next_offset_ = 0; + Next(); +} + +void SimpleTableIterator::SeekToLast() { + assert(false); +} + +void SimpleTableIterator::Seek(const Slice& target) { + Status s = table_->GetOffset(target, &next_offset_); + if (!s.ok()) { + status_ = s; + } + Next(); +} + +void SimpleTableIterator::Next() { + offset_ = next_offset_; + if (offset_ >= table_->rep_->index_start_offset) { + return; + } + Slice result; + int internal_key_size = SimpleTableReader::Rep::GetInternalKeyLength(); + + Status s = table_->rep_->file->Read(next_offset_, internal_key_size, &result, + key_str_); + next_offset_ += internal_key_size; + key_ = result; + + Slice value_size_slice; + s = table_->rep_->file->Read(next_offset_, 4, &value_size_slice, tmp_str_); + next_offset_ += 4; + uint32_t value_size = DecodeFixed32(tmp_str_); + + Slice value_slice; + if ((int) value_size > value_str_len_) { + if (value_str_len_ >= 0) { + delete[] value_str_; + } + value_str_ = new char[value_size]; + value_str_len_ = value_size; + } + s = table_->rep_->file->Read(next_offset_, value_size, &value_slice, + value_str_); + next_offset_ += value_size; + value_ = value_slice; +} + +void SimpleTableIterator::Prev() { + assert(false); +} + +Slice SimpleTableIterator::key() const { + Log(table_->rep_->options.info_log, "key!!!!"); + return key_; +} + +Slice SimpleTableIterator::value() const { + return value_; +} + +Status SimpleTableIterator::status() const { + return status_; +} + +class SimpleTableBuilder: public TableBuilder { +public: + // Create a builder that will store the contents of the table it is + // building in *file. Does not close the file. It is up to the + // caller to close the file after calling Finish(). The output file + // will be part of level specified by 'level'. A value of -1 means + // that the caller does not know which level the output file will reside. + SimpleTableBuilder(const Options& options, WritableFile* file, + CompressionType compression_type); + + // REQUIRES: Either Finish() or Abandon() has been called. + ~SimpleTableBuilder(); + + // Add key,value to the table being constructed. + // REQUIRES: key is after any previously added key according to comparator. + // REQUIRES: Finish(), Abandon() have not been called + void Add(const Slice& key, const Slice& value) override; + + // Return non-ok iff some error has been detected. + Status status() const override; + + // Finish building the table. Stops using the file passed to the + // constructor after this function returns. + // REQUIRES: Finish(), Abandon() have not been called + Status Finish() override; + + // Indicate that the contents of this builder should be abandoned. Stops + // using the file passed to the constructor after this function returns. + // If the caller is not going to call Finish(), it must call Abandon() + // before destroying this builder. + // REQUIRES: Finish(), Abandon() have not been called + void Abandon() override; + + // Number of calls to Add() so far. + uint64_t NumEntries() const override; + + // Size of the file generated so far. If invoked after a successful + // Finish() call, returns the size of the final generated file. + uint64_t FileSize() const override; + +private: + struct Rep; + Rep* rep_; + + // No copying allowed + SimpleTableBuilder(const SimpleTableBuilder&) = delete; + void operator=(const SimpleTableBuilder&) = delete; +}; + +struct SimpleTableBuilder::Rep { + Options options; + WritableFile* file; + uint64_t offset = 0; + Status status; + + uint64_t num_entries = 0; + + bool closed = false; // Either Finish() or Abandon() has been called. + + const static int user_key_size = 16; + const static int offset_length = 8; + const static int key_footer_len = 8; + + static int GetInternalKeyLength() { + return user_key_size + key_footer_len; + } + + std::string index; + + Rep(const Options& opt, WritableFile* f) : + options(opt), file(f) { + } + ~Rep() { + } +}; + +SimpleTableBuilder::SimpleTableBuilder(const Options& options, + WritableFile* file, + CompressionType compression_type) : + rep_(new SimpleTableBuilder::Rep(options, file)) { +} + +SimpleTableBuilder::~SimpleTableBuilder() { + delete (rep_); +} + +void SimpleTableBuilder::Add(const Slice& key, const Slice& value) { + assert((int ) key.size() == Rep::GetInternalKeyLength()); + + // Update index + rep_->index.append(key.data(), key.size()); + PutFixed64(&(rep_->index), rep_->offset); + + // Write key-value pair + rep_->file->Append(key); + rep_->offset += Rep::GetInternalKeyLength(); + + std::string size; + int value_size = value.size(); + PutFixed32(&size, value_size); + Slice sizeSlice(size); + rep_->file->Append(sizeSlice); + rep_->file->Append(value); + rep_->offset += value_size + 4; + + rep_->num_entries++; +} + +Status SimpleTableBuilder::status() const { + return Status::OK(); +} + +Status SimpleTableBuilder::Finish() { + Rep* r = rep_; + assert(!r->closed); + r->closed = true; + + uint64_t index_offset = rep_->offset; + Slice index_slice(rep_->index); + rep_->file->Append(index_slice); + rep_->offset += index_slice.size(); + + std::string index_offset_str; + PutFixed64(&index_offset_str, index_offset); + Slice foot_slice(index_offset_str); + rep_->file->Append(foot_slice); + rep_->offset += foot_slice.size(); + + return Status::OK(); +} + +void SimpleTableBuilder::Abandon() { + rep_->closed = true; +} + +uint64_t SimpleTableBuilder::NumEntries() const { + return rep_->num_entries; +} + +uint64_t SimpleTableBuilder::FileSize() const { + return rep_->offset; +} + +class SimpleTableFactory: public TableFactory { +public: + ~SimpleTableFactory() { + } + SimpleTableFactory() { + } + const char* Name() const override { + return "SimpleTable"; + } + Status GetTableReader(const Options& options, const EnvOptions& soptions, + unique_ptr && file, + uint64_t file_size, + unique_ptr* table_reader) const; + + TableBuilder* GetTableBuilder(const Options& options, WritableFile* file, + CompressionType compression_type) const; +}; + +Status SimpleTableFactory::GetTableReader( + const Options& options, const EnvOptions& soptions, + unique_ptr && file, uint64_t file_size, + unique_ptr* table_reader) const { + + return SimpleTableReader::Open(options, soptions, std::move(file), file_size, + table_reader); +} + +TableBuilder* SimpleTableFactory::GetTableBuilder( + const Options& options, WritableFile* file, + CompressionType compression_type) const { + return new SimpleTableBuilder(options, file, compression_type); +} + +class SimpleTableDBTest { +protected: +public: + std::string dbname_; + Env* env_; + DB* db_; + + Options last_options_; + + SimpleTableDBTest() : + env_(Env::Default()) { + dbname_ = test::TmpDir() + "/simple_table_db_test"; + ASSERT_OK(DestroyDB(dbname_, Options())); + db_ = nullptr; + Reopen(); + } + + ~SimpleTableDBTest() { + delete db_; + ASSERT_OK(DestroyDB(dbname_, Options())); + } + + // Return the current option configuration. + Options CurrentOptions() { + Options options; + options.table_factory.reset(new SimpleTableFactory()); + return options; + } + + DBImpl* dbfull() { + return reinterpret_cast(db_); + } + + void Reopen(Options* options = nullptr) { + ASSERT_OK(TryReopen(options)); + } + + void Close() { + delete db_; + db_ = nullptr; + } + + void DestroyAndReopen(Options* options = nullptr) { + //Destroy using last options + Destroy(&last_options_); + ASSERT_OK(TryReopen(options)); + } + + void Destroy(Options* options) { + delete db_; + db_ = nullptr; + ASSERT_OK(DestroyDB(dbname_, *options)); + } + + Status PureReopen(Options* options, DB** db) { + return DB::Open(*options, dbname_, db); + } + + Status TryReopen(Options* options = nullptr) { + delete db_; + db_ = nullptr; + Options opts; + if (options != nullptr) { + opts = *options; + } else { + opts = CurrentOptions(); + opts.create_if_missing = true; + } + last_options_ = opts; + + return DB::Open(opts, dbname_, &db_); + } + + Status Put(const Slice& k, const Slice& v) { + return db_->Put(WriteOptions(), k, v); + } + + Status Delete(const std::string& k) { + return db_->Delete(WriteOptions(), k); + } + + std::string Get(const std::string& k, const Snapshot* snapshot = nullptr) { + ReadOptions options; + options.snapshot = snapshot; + std::string result; + Status s = db_->Get(options, k, &result); + if (s.IsNotFound()) { + result = "NOT_FOUND"; + } else if (!s.ok()) { + result = s.ToString(); + } + return result; + } + + + int NumTableFilesAtLevel(int level) { + std::string property; + ASSERT_TRUE( + db_->GetProperty("rocksdb.num-files-at-level" + NumberToString(level), + &property)); + return atoi(property.c_str()); + } + + // Return spread of files per level + std::string FilesPerLevel() { + std::string result; + int last_non_zero_offset = 0; + for (int level = 0; level < db_->NumberLevels(); level++) { + int f = NumTableFilesAtLevel(level); + char buf[100]; + snprintf(buf, sizeof(buf), "%s%d", (level ? "," : ""), f); + result += buf; + if (f > 0) { + last_non_zero_offset = result.size(); + } + } + result.resize(last_non_zero_offset); + return result; + } + + std::string IterStatus(Iterator* iter) { + std::string result; + if (iter->Valid()) { + result = iter->key().ToString() + "->" + iter->value().ToString(); + } else { + result = "(invalid)"; + } + return result; + } +}; + +TEST(SimpleTableDBTest, Empty) { + ASSERT_TRUE(db_ != nullptr); + ASSERT_EQ("NOT_FOUND", Get("0000000000000foo")); +} + +TEST(SimpleTableDBTest, ReadWrite) { + ASSERT_OK(Put("0000000000000foo", "v1")); + ASSERT_EQ("v1", Get("0000000000000foo")); + ASSERT_OK(Put("0000000000000bar", "v2")); + ASSERT_OK(Put("0000000000000foo", "v3")); + ASSERT_EQ("v3", Get("0000000000000foo")); + ASSERT_EQ("v2", Get("0000000000000bar")); +} + +TEST(SimpleTableDBTest, Flush) { + ASSERT_OK(Put("0000000000000foo", "v1")); + ASSERT_OK(Put("0000000000000bar", "v2")); + ASSERT_OK(Put("0000000000000foo", "v3")); + dbfull()->TEST_FlushMemTable(); + ASSERT_EQ("v3", Get("0000000000000foo")); + ASSERT_EQ("v2", Get("0000000000000bar")); +} + +TEST(SimpleTableDBTest, Flush2) { + ASSERT_OK(Put("0000000000000bar", "b")); + ASSERT_OK(Put("0000000000000foo", "v1")); + dbfull()->TEST_FlushMemTable(); + + ASSERT_OK(Put("0000000000000foo", "v2")); + dbfull()->TEST_FlushMemTable(); + ASSERT_EQ("v2", Get("0000000000000foo")); + + ASSERT_OK(Put("0000000000000eee", "v3")); + dbfull()->TEST_FlushMemTable(); + ASSERT_EQ("v3", Get("0000000000000eee")); + + ASSERT_OK(Delete("0000000000000bar")); + dbfull()->TEST_FlushMemTable(); + ASSERT_EQ("NOT_FOUND", Get("0000000000000bar")); + + ASSERT_OK(Put("0000000000000eee", "v5")); + dbfull()->TEST_FlushMemTable(); + ASSERT_EQ("v5", Get("0000000000000eee")); +} + +static std::string Key(int i) { + char buf[100]; + snprintf(buf, sizeof(buf), "key_______%06d", i); + return std::string(buf); +} + +static std::string RandomString(Random* rnd, int len) { + std::string r; + test::RandomString(rnd, len, &r); + return r; +} + +TEST(SimpleTableDBTest, CompactionTrigger) { + Options options = CurrentOptions(); + options.write_buffer_size = 100 << 10; //100KB + options.num_levels = 3; + options.max_mem_compaction_level = 0; + options.level0_file_num_compaction_trigger = 3; + Reopen(&options); + + Random rnd(301); + + for (int num = 0; num < options.level0_file_num_compaction_trigger - 1; + num++) { + std::vector values; + // Write 120KB (12 values, each 10K) + for (int i = 0; i < 12; i++) { + values.push_back(RandomString(&rnd, 10000)); + ASSERT_OK(Put(Key(i), values[i])); + } + dbfull()->TEST_WaitForFlushMemTable(); + ASSERT_EQ(NumTableFilesAtLevel(0), num + 1); + } + + //generate one more file in level-0, and should trigger level-0 compaction + std::vector values; + for (int i = 0; i < 12; i++) { + values.push_back(RandomString(&rnd, 10000)); + ASSERT_OK(Put(Key(i), values[i])); + } + dbfull()->TEST_WaitForCompact(); + + ASSERT_EQ(NumTableFilesAtLevel(0), 0); + ASSERT_EQ(NumTableFilesAtLevel(1), 1); +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + return rocksdb::test::RunAllTests(); +} diff --git a/db/table_cache.cc b/db/table_cache.cc index a51ee7fac..a1f466b5a 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -12,15 +12,15 @@ #include "db/filename.h" #include "rocksdb/statistics.h" -#include "table/table.h" +#include "rocksdb/table.h" #include "util/coding.h" #include "util/stop_watch.h" namespace rocksdb { static void DeleteEntry(const Slice& key, void* value) { - Table* table = reinterpret_cast(value); - delete table; + TableReader* table_reader = reinterpret_cast(value); + delete table_reader; } static void UnrefEntry(void* arg1, void* arg2) { @@ -63,7 +63,7 @@ Status TableCache::FindTable(const EnvOptions& toptions, } std::string fname = TableFileName(dbname_, file_number); unique_ptr file; - unique_ptr table; + unique_ptr table_reader; s = env_->NewRandomAccessFile(fname, &file, toptions); RecordTick(options_->statistics, NO_FILE_OPENS); if (s.ok()) { @@ -71,17 +71,19 @@ Status TableCache::FindTable(const EnvOptions& toptions, file->Hint(RandomAccessFile::RANDOM); } StopWatch sw(env_, options_->statistics, TABLE_OPEN_IO_MICROS); - s = Table::Open(*options_, toptions, std::move(file), file_size, &table); + s = options_->table_factory->GetTableReader(*options_, toptions, + std::move(file), file_size, + &table_reader); } if (!s.ok()) { - assert(table == nullptr); + assert(table_reader == nullptr); RecordTick(options_->statistics, NO_FILE_ERRORS); // We do not cache error results so that if the error is transient, // or somebody repairs the file, we recover automatically. } else { assert(file.get() == nullptr); - *handle = cache_->Insert(key, table.release(), 1, &DeleteEntry); + *handle = cache_->Insert(key, table_reader.release(), 1, &DeleteEntry); } } return s; @@ -91,10 +93,10 @@ Iterator* TableCache::NewIterator(const ReadOptions& options, const EnvOptions& toptions, uint64_t file_number, uint64_t file_size, - Table** tableptr, + TableReader** table_reader_ptr, bool for_compaction) { - if (tableptr != nullptr) { - *tableptr = nullptr; + if (table_reader_ptr != nullptr) { + *table_reader_ptr = nullptr; } Cache::Handle* handle = nullptr; @@ -104,16 +106,16 @@ Iterator* TableCache::NewIterator(const ReadOptions& options, return NewErrorIterator(s); } - Table* table = - reinterpret_cast(cache_->Value(handle)); - Iterator* result = table->NewIterator(options); + TableReader* table_reader = + reinterpret_cast(cache_->Value(handle)); + Iterator* result = table_reader->NewIterator(options); result->RegisterCleanup(&UnrefEntry, cache_.get(), handle); - if (tableptr != nullptr) { - *tableptr = table; + if (table_reader_ptr != nullptr) { + *table_reader_ptr = table_reader; } if (for_compaction) { - table->SetupForCompaction(); + table_reader->SetupForCompaction(); } return result; @@ -132,9 +134,9 @@ Status TableCache::Get(const ReadOptions& options, &handle, table_io, options.read_tier == kBlockCacheTier); if (s.ok()) { - Table* t = - reinterpret_cast(cache_->Value(handle)); - s = t->InternalGet(options, k, arg, saver, mark_key_may_exist); + TableReader* t = + reinterpret_cast(cache_->Value(handle)); + s = t->Get(options, k, arg, saver, mark_key_may_exist); cache_->Release(handle); } else if (options.read_tier && s.IsIncomplete()) { // Couldnt find Table in cache but treat as kFound if no_io set @@ -154,8 +156,8 @@ bool TableCache::PrefixMayMatch(const ReadOptions& options, file_size, &handle, table_io); bool may_match = true; if (s.ok()) { - Table* t = - reinterpret_cast(cache_->Value(handle)); + TableReader* t = + reinterpret_cast(cache_->Value(handle)); may_match = t->PrefixMayMatch(internal_prefix); cache_->Release(handle); } diff --git a/db/table_cache.h b/db/table_cache.h index 0928d3b32..4b225af9b 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -16,7 +16,7 @@ #include "rocksdb/env.h" #include "rocksdb/cache.h" #include "port/port.h" -#include "table/table.h" +#include "rocksdb/table.h" namespace rocksdb { @@ -39,7 +39,7 @@ class TableCache { const EnvOptions& toptions, uint64_t file_number, uint64_t file_size, - Table** tableptr = nullptr, + TableReader** table_reader_ptr = nullptr, bool for_compaction = false); // If a seek to internal key "k" in specified file finds an entry, diff --git a/db/table_stats_collector_test.cc b/db/table_stats_collector_test.cc index 586b65d40..52f4b4bb8 100644 --- a/db/table_stats_collector_test.cc +++ b/db/table_stats_collector_test.cc @@ -10,9 +10,8 @@ #include "db/dbformat.h" #include "db/db_impl.h" #include "db/table_stats_collector.h" -#include "rocksdb/table_builder.h" #include "rocksdb/table_stats.h" -#include "table/table.h" +#include "rocksdb/table.h" #include "util/coding.h" #include "util/testharness.h" #include "util/testutil.h" @@ -21,7 +20,7 @@ namespace rocksdb { class TableStatsTest { private: - unique_ptr
table_; + unique_ptr table_reader_; }; // TODO(kailiu) the following classes should be moved to some more general @@ -89,21 +88,21 @@ void MakeBuilder( std::unique_ptr* builder) { writable->reset(new FakeWritableFile); builder->reset( - new TableBuilder(options, writable->get()) - ); + options.table_factory->GetTableBuilder(options, writable->get(), + options.compression)); } void OpenTable( const Options& options, const std::string& contents, - std::unique_ptr
* table) { + std::unique_ptr* table_reader) { std::unique_ptr file(new FakeRandomeAccessFile(contents)); - auto s = Table::Open( + auto s = options.table_factory->GetTableReader( options, EnvOptions(), std::move(file), contents.size(), - table + table_reader ); ASSERT_OK(s); } @@ -176,9 +175,9 @@ TEST(TableStatsTest, CustomizedTableStatsCollector) { ASSERT_OK(builder->Finish()); // -- Step 2: Open table - std::unique_ptr
table; - OpenTable(options, writable->contents(), &table); - const auto& stats = table->GetTableStats().user_collected_stats; + std::unique_ptr table_reader; + OpenTable(options, writable->contents(), &table_reader); + const auto& stats = table_reader->GetTableStats().user_collected_stats; ASSERT_EQ("Rocksdb", stats.at("TableStatsTest")); @@ -234,9 +233,9 @@ TEST(TableStatsTest, InternalKeyStatsCollector) { ASSERT_OK(builder->Finish()); - std::unique_ptr
table; - OpenTable(options, writable->contents(), &table); - const auto& stats = table->GetTableStats().user_collected_stats; + std::unique_ptr table_reader; + OpenTable(options, writable->contents(), &table_reader); + const auto& stats = table_reader->GetTableStats().user_collected_stats; uint64_t deleted = 0; Slice key(stats.at(InternalKeyTableStatsNames::kDeletedKeys)); diff --git a/db/version_set.cc b/db/version_set.cc index 5c93550cf..04e5c6753 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -19,7 +19,7 @@ #include "db/table_cache.h" #include "rocksdb/env.h" #include "rocksdb/merge_operator.h" -#include "rocksdb/table_builder.h" +#include "rocksdb/table.h" #include "table/merger.h" #include "table/two_level_iterator.h" #include "util/coding.h" @@ -294,11 +294,11 @@ struct Saver { }; } -// Called from TableCache::Get and InternalGet when file/block in which key may -// exist are not there in TableCache/BlockCache respectively. In this case we -// can't guarantee that key does not exist and are not permitted to do IO to be -// certain.Set the status=kFound and value_found=false to let the caller know -// that key may exist but is not there in memory +// Called from TableCache::Get and Table::Get when file/block in which +// key may exist are not there in TableCache/BlockCache respectively. In this +// case we can't guarantee that key does not exist and are not permitted to do +// IO to be certain.Set the status=kFound and value_found=false to let the +// caller know that key may exist but is not there in memory static void MarkKeyMayExist(void* arg) { Saver* s = reinterpret_cast(arg); s->state = kFound; @@ -1920,12 +1920,12 @@ uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) { } else { // "ikey" falls in the range for this table. Add the // approximate offset of "ikey" within the table. - Table* tableptr; + TableReader* table_reader_ptr; Iterator* iter = table_cache_->NewIterator( ReadOptions(), storage_options_, files[i]->number, - files[i]->file_size, &tableptr); - if (tableptr != nullptr) { - result += tableptr->ApproximateOffsetOf(ikey.Encode()); + files[i]->file_size, &table_reader_ptr); + if (table_reader_ptr != nullptr) { + result += table_reader_ptr->ApproximateOffsetOf(ikey.Encode()); } delete iter; } diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 9f7c87f31..907d4c7ca 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -17,6 +17,8 @@ #include "rocksdb/statistics.h" #include "rocksdb/table_stats.h" #include "rocksdb/universal_compaction.h" +#include "rocksdb/memtablerep.h" +#include "rocksdb/slice_transform.h" namespace rocksdb { @@ -29,6 +31,7 @@ class MergeOperator; class Snapshot; class CompactionFilter; class CompactionFilterFactory; +class TableFactory; using std::shared_ptr; @@ -579,6 +582,11 @@ struct Options { // MemTableRep. std::shared_ptr memtable_factory; + // This is a factory that provides TableFactory objects. + // Default: a factory that provides a default implementation of + // Table and TableBuilder. + std::shared_ptr table_factory; + // This is a factory that provides compaction filter objects which allow // an application to modify/delete a key-value during background compaction. // Default: a factory that doesn't provide any object diff --git a/include/rocksdb/table.h b/include/rocksdb/table.h new file mode 100644 index 000000000..f1b3632e8 --- /dev/null +++ b/include/rocksdb/table.h @@ -0,0 +1,180 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once +#include +#include +#include "rocksdb/env.h" +#include "rocksdb/iterator.h" +#include "rocksdb/table_stats.h" +#include "rocksdb/options.h" + +namespace rocksdb { + +struct Options; +class RandomAccessFile; +struct ReadOptions; +class TableCache; +class WritableFile; + +using std::unique_ptr; + +// TableBuilder provides the interface used to build a Table +// (an immutable and sorted map from keys to values). +// +// Multiple threads can invoke const methods on a TableBuilder without +// external synchronization, but if any of the threads may call a +// non-const method, all threads accessing the same TableBuilder must use +// external synchronization. +class TableBuilder { + public: + // REQUIRES: Either Finish() or Abandon() has been called. + virtual ~TableBuilder() {} + + // Add key,value to the table being constructed. + // REQUIRES: key is after any previously added key according to comparator. + // REQUIRES: Finish(), Abandon() have not been called + virtual void Add(const Slice& key, const Slice& value) = 0; + + // Return non-ok iff some error has been detected. + virtual Status status() const = 0; + + // Finish building the table. + // REQUIRES: Finish(), Abandon() have not been called + virtual Status Finish() = 0; + + // Indicate that the contents of this builder should be abandoned. + // If the caller is not going to call Finish(), it must call Abandon() + // before destroying this builder. + // REQUIRES: Finish(), Abandon() have not been called + virtual void Abandon() = 0; + + // Number of calls to Add() so far. + virtual uint64_t NumEntries() const = 0; + + // Size of the file generated so far. If invoked after a successful + // Finish() call, returns the size of the final generated file. + virtual uint64_t FileSize() const = 0; +}; + +// A Table is a sorted map from strings to strings. Tables are +// immutable and persistent. A Table may be safely accessed from +// multiple threads without external synchronization. +class TableReader { + public: + virtual ~TableReader() {} + + // Determine whether there is a chance that the current table file + // contains the key a key starting with iternal_prefix. The specific + // table implementation can use bloom filter and/or other heuristic + // to filter out this table as a whole. + virtual bool PrefixMayMatch(const Slice& internal_prefix) = 0; + + // Returns a new iterator over the table contents. + // The result of NewIterator() is initially invalid (caller must + // call one of the Seek methods on the iterator before using it). + virtual Iterator* NewIterator(const ReadOptions&) = 0; + + // Given a key, return an approximate byte offset in the file where + // the data for that key begins (or would begin if the key were + // present in the file). The returned value is in terms of file + // bytes, and so includes effects like compression of the underlying data. + // E.g., the approximate offset of the last key in the table will + // be close to the file length. + virtual uint64_t ApproximateOffsetOf(const Slice& key) = 0; + + // Returns true if the block for the specified key is in cache. + // REQUIRES: key is in this table. + virtual bool TEST_KeyInCache(const ReadOptions& options, + const Slice& key) = 0; + + // Set up the table for Compaction. Might change some parameters with + // posix_fadvise + virtual void SetupForCompaction() = 0; + + virtual TableStats& GetTableStats() = 0; + + // Calls (*result_handler)(handle_context, ...) repeatedly, starting with + // the entry found after a call to Seek(key), until result_handler returns + // false, where k is the actual internal key for a row found and v as the + // value of the key. didIO is true if I/O is involved in the operation. May + // not make such a call if filter policy says that key is not present. + // + // mark_key_may_exist_handler needs to be called when it is configured to be + // memory only and the key is not found in the block cache, with + // the parameter to be handle_context. + // + // readOptions is the options for the read + // key is the key to search for + virtual Status Get( + const ReadOptions& readOptions, + const Slice& key, + void* handle_context, + bool (*result_handler)(void* handle_context, const Slice& k, + const Slice& v, bool didIO), + void (*mark_key_may_exist_handler)(void* handle_context) = nullptr) = 0; +}; + +// A base class for table factories +class TableFactory { + public: + virtual ~TableFactory() {} + + // The type of the table. + // + // The client of this package should switch to a new name whenever + // the table format implementation changes. + // + // Names starting with "rocksdb." are reserved and should not be used + // by any clients of this package. + virtual const char* Name() const = 0; + + // Returns a Table object table that can fetch data from file specified + // in parameter file. It's the caller's responsibility to make sure + // file is in the correct format. + // + // GetTableReader() is called in two places: + // (1) TableCache::FindTable() calls the function when table cache miss + // and cache the table object returned. + // (1) SstFileReader (for SST Dump) opens the table and dump the table + // contents using the interator of the table. + // options and soptions are options. options is the general options. + // Multiple configured can be accessed from there, including and not + // limited to block cache and key comparators. + // file is a file handler to handle the file for the table + // file_size is the physical file size of the file + // table_reader is the output table reader + virtual Status GetTableReader( + const Options& options, const EnvOptions& soptions, + unique_ptr && file, uint64_t file_size, + unique_ptr* table_reader) const = 0; + + // Return a table builder to write to a file for this table type. + // + // It is called in several places: + // (1) When flushing memtable to a level-0 output file, it creates a table + // builder (In DBImpl::WriteLevel0Table(), by calling BuildTable()) + // (2) During compaction, it gets the builder for writing compaction output + // files in DBImpl::OpenCompactionOutputFile(). + // (3) When recovering from transaction logs, it creates a table builder to + // write to a level-0 output file (In DBImpl::WriteLevel0TableForRecovery, + // by calling BuildTable()) + // (4) When running Repairer, it creates a table builder to convert logs to + // SST files (In Repairer::ConvertLogToTable() by calling BuildTable()) + // + // options is the general options. Multiple configured can be acceseed from + // there, including and not limited to compression options. + // file is a handle of a writable file. It is the caller's responsibility to + // keep the file open and close the file after closing the table builder. + // compression_type is the compression type to use in this table. + virtual TableBuilder* GetTableBuilder( + const Options& options, WritableFile* file, + CompressionType compression_type) const = 0; +}; +} // namespace rocksdb diff --git a/table/table_builder.cc b/table/block_based_table_builder.cc similarity index 79% rename from table/table_builder.cc rename to table/block_based_table_builder.cc index a9dd21ff6..fde6c81e8 100644 --- a/table/table_builder.cc +++ b/table/block_based_table_builder.cc @@ -7,19 +7,20 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. -#include "rocksdb/table_builder.h" +#include "table/block_based_table_builder.h" #include #include #include "rocksdb/comparator.h" +#include "rocksdb/table.h" #include "rocksdb/env.h" #include "rocksdb/filter_policy.h" #include "rocksdb/options.h" +#include "table/block_based_table_reader.h" #include "table/block_builder.h" #include "table/filter_block.h" #include "table/format.h" -#include "table/table.h" #include "util/coding.h" #include "util/crc32c.h" #include "util/stop_watch.h" @@ -71,7 +72,7 @@ void LogStatsCollectionError( } // anonymous namespace -struct TableBuilder::Rep { +struct BlockBasedTableBuilder::Rep { Options options; Options index_block_options; WritableFile* file; @@ -80,8 +81,7 @@ struct TableBuilder::Rep { BlockBuilder data_block; BlockBuilder index_block; std::string last_key; - // Whether enable compression in this table. - bool enable_compression; + CompressionType compression_type; uint64_t num_entries = 0; uint64_t num_data_blocks = 0; @@ -106,50 +106,35 @@ struct TableBuilder::Rep { std::string compressed_output; - Rep(const Options& opt, WritableFile* f, bool enable_compression) + Rep(const Options& opt, WritableFile* f, CompressionType compression_type) : options(opt), index_block_options(opt), file(f), data_block(&options), index_block(1, index_block_options.comparator), - enable_compression(enable_compression), + compression_type(compression_type), filter_block(opt.filter_policy == nullptr ? nullptr : new FilterBlockBuilder(opt)), pending_index_entry(false) { } }; -TableBuilder::TableBuilder(const Options& options, WritableFile* file, - int level, const bool enable_compression) - : rep_(new Rep(options, file, enable_compression)), level_(level) { +BlockBasedTableBuilder::BlockBasedTableBuilder(const Options& options, + WritableFile* file, + CompressionType compression_type) + : rep_(new Rep(options, file, compression_type)) { if (rep_->filter_block != nullptr) { rep_->filter_block->StartBlock(0); } } -TableBuilder::~TableBuilder() { +BlockBasedTableBuilder::~BlockBasedTableBuilder() { assert(rep_->closed); // Catch errors where caller forgot to call Finish() delete rep_->filter_block; delete rep_; } -Status TableBuilder::ChangeOptions(const Options& options) { - // Note: if more fields are added to Options, update - // this function to catch changes that should not be allowed to - // change in the middle of building a Table. - if (options.comparator != rep_->options.comparator) { - return Status::InvalidArgument("changing comparator while building table"); - } - - // Note that any live BlockBuilders point to rep_->options and therefore - // will automatically pick up the updated options. - rep_->options = options; - rep_->index_block_options = options; - rep_->index_block_options.block_restart_interval = 1; - return Status::OK(); -} - -void TableBuilder::Add(const Slice& key, const Slice& value) { +void BlockBasedTableBuilder::Add(const Slice& key, const Slice& value) { Rep* r = rep_; assert(!r->closed); if (!ok()) return; @@ -204,7 +189,7 @@ void TableBuilder::Add(const Slice& key, const Slice& value) { } } -void TableBuilder::Flush() { +void BlockBasedTableBuilder::Flush() { Rep* r = rep_; assert(!r->closed); if (!ok()) return; @@ -222,7 +207,8 @@ void TableBuilder::Flush() { ++r->num_data_blocks; } -void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) { +void BlockBasedTableBuilder::WriteBlock(BlockBuilder* block, + BlockHandle* handle) { // File format contains a sequence of blocks where each block has: // block_data: uint8[n] // type: uint8 @@ -233,26 +219,7 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) { Slice block_contents; std::string* compressed = &r->compressed_output; - CompressionType type; - if (!r->enable_compression) { - // disable compression - type = kNoCompression; - } else { - // If the use has specified a different compression level for each level, - // then pick the compresison for that level. - if (!r->options.compression_per_level.empty()) { - const int n = r->options.compression_per_level.size(); - // It is possible for level_ to be -1; in that case, we use level - // 0's compression. This occurs mostly in backwards compatibility - // situations when the builder doesn't know what level the file - // belongs to. Likewise, if level_ is beyond the end of the - // specified compression levels, use the last value. - type = r->options.compression_per_level[std::max(0, - std::min(level_, n))]; - } else { - type = r->options.compression; - } - } + CompressionType type = r->compression_type; switch (type) { case kNoCompression: block_contents = raw; @@ -302,9 +269,9 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) { block->Reset(); } -void TableBuilder::WriteRawBlock(const Slice& block_contents, - CompressionType type, - BlockHandle* handle) { +void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents, + CompressionType type, + BlockHandle* handle) { Rep* r = rep_; StopWatch sw(r->options.env, r->options.statistics, WRITE_RAW_BLOCK_MICROS); handle->set_offset(r->offset); @@ -323,11 +290,11 @@ void TableBuilder::WriteRawBlock(const Slice& block_contents, } } -Status TableBuilder::status() const { +Status BlockBasedTableBuilder::status() const { return rep_->status; } -Status TableBuilder::Finish() { +Status BlockBasedTableBuilder::Finish() { Rep* r = rep_; Flush(); assert(!r->closed); @@ -370,7 +337,7 @@ Status TableBuilder::Finish() { if (r->filter_block != nullptr) { // Add mapping from ".Name" to location // of filter data. - std::string key = Table::kFilterBlockPrefix; + std::string key = BlockBasedTable::kFilterBlockPrefix; key.append(r->options.filter_policy->Name()); std::string handle_encoding; filter_block_handle.EncodeTo(&handle_encoding); @@ -389,19 +356,21 @@ Status TableBuilder::Finish() { BytewiseSortedMap stats; // Add basic stats - AddStats(stats, TableStatsNames::kRawKeySize, r->raw_key_size); - AddStats(stats, TableStatsNames::kRawValueSize, r->raw_value_size); - AddStats(stats, TableStatsNames::kDataSize, r->data_size); + AddStats(stats, BlockBasedTableStatsNames::kRawKeySize, r->raw_key_size); + AddStats(stats, BlockBasedTableStatsNames::kRawValueSize, + r->raw_value_size); + AddStats(stats, BlockBasedTableStatsNames::kDataSize, r->data_size); AddStats( stats, - TableStatsNames::kIndexSize, + BlockBasedTableStatsNames::kIndexSize, r->index_block.CurrentSizeEstimate() + kBlockTrailerSize ); - AddStats(stats, TableStatsNames::kNumEntries, r->num_entries); - AddStats(stats, TableStatsNames::kNumDataBlocks, r->num_data_blocks); + AddStats(stats, BlockBasedTableStatsNames::kNumEntries, r->num_entries); + AddStats(stats, BlockBasedTableStatsNames::kNumDataBlocks, + r->num_data_blocks); if (r->filter_block != nullptr) { stats.insert(std::make_pair( - TableStatsNames::kFilterPolicy, + BlockBasedTableStatsNames::kFilterPolicy, r->options.filter_policy->Name() )); } @@ -435,7 +404,7 @@ Status TableBuilder::Finish() { std::string handle_encoding; stats_block_handle.EncodeTo(&handle_encoding); meta_block_handles.insert( - std::make_pair(Table::kStatsBlock, handle_encoding) + std::make_pair(BlockBasedTable::kStatsBlock, handle_encoding) ); } // end of stats block writing @@ -466,17 +435,17 @@ Status TableBuilder::Finish() { return r->status; } -void TableBuilder::Abandon() { +void BlockBasedTableBuilder::Abandon() { Rep* r = rep_; assert(!r->closed); r->closed = true; } -uint64_t TableBuilder::NumEntries() const { +uint64_t BlockBasedTableBuilder::NumEntries() const { return rep_->num_entries; } -uint64_t TableBuilder::FileSize() const { +uint64_t BlockBasedTableBuilder::FileSize() const { return rep_->offset; } diff --git a/include/rocksdb/table_builder.h b/table/block_based_table_builder.h similarity index 53% rename from include/rocksdb/table_builder.h rename to table/block_based_table_builder.h index da0e07570..b7c82b68f 100644 --- a/include/rocksdb/table_builder.h +++ b/table/block_based_table_builder.h @@ -1,21 +1,17 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. -// -// TableBuilder provides the interface used to build a Table -// (an immutable and sorted map from keys to values). -// -// Multiple threads can invoke const methods on a TableBuilder without -// external synchronization, but if any of the threads may call a -// non-const method, all threads accessing the same TableBuilder must use -// external synchronization. - -#ifndef STORAGE_ROCKSDB_INCLUDE_TABLE_BUILDER_H_ -#define STORAGE_ROCKSDB_INCLUDE_TABLE_BUILDER_H_ +#pragma once #include #include "rocksdb/options.h" #include "rocksdb/status.h" +#include "rocksdb/table.h" namespace rocksdb { @@ -23,63 +19,44 @@ class BlockBuilder; class BlockHandle; class WritableFile; -class TableBuilder { + +class BlockBasedTableBuilder : public TableBuilder { public: // Create a builder that will store the contents of the table it is // building in *file. Does not close the file. It is up to the - // caller to close the file after calling Finish(). The output file - // will be part of level specified by 'level'. A value of -1 means - // that the caller does not know which level the output file will reside. - // - // If enable_compression=true, this table will follow the compression - // setting given in parameter options. If enable_compression=false, the - // table will not be compressed. - TableBuilder(const Options& options, WritableFile* file, int level=-1, - const bool enable_compression=true); + // caller to close the file after calling Finish(). + BlockBasedTableBuilder(const Options& options, WritableFile* file, + CompressionType compression_type); // REQUIRES: Either Finish() or Abandon() has been called. - ~TableBuilder(); - - // Change the options used by this builder. Note: only some of the - // option fields can be changed after construction. If a field is - // not allowed to change dynamically and its value in the structure - // passed to the constructor is different from its value in the - // structure passed to this method, this method will return an error - // without changing any fields. - Status ChangeOptions(const Options& options); + ~BlockBasedTableBuilder(); // Add key,value to the table being constructed. // REQUIRES: key is after any previously added key according to comparator. // REQUIRES: Finish(), Abandon() have not been called - void Add(const Slice& key, const Slice& value); - - // Advanced operation: flush any buffered key/value pairs to file. - // Can be used to ensure that two adjacent entries never live in - // the same data block. Most clients should not need to use this method. - // REQUIRES: Finish(), Abandon() have not been called - void Flush(); + void Add(const Slice& key, const Slice& value) override; // Return non-ok iff some error has been detected. - Status status() const; + Status status() const override; // Finish building the table. Stops using the file passed to the // constructor after this function returns. // REQUIRES: Finish(), Abandon() have not been called - Status Finish(); + Status Finish() override; // Indicate that the contents of this builder should be abandoned. Stops // using the file passed to the constructor after this function returns. // If the caller is not going to call Finish(), it must call Abandon() // before destroying this builder. // REQUIRES: Finish(), Abandon() have not been called - void Abandon(); + void Abandon() override; // Number of calls to Add() so far. - uint64_t NumEntries() const; + uint64_t NumEntries() const override; // Size of the file generated so far. If invoked after a successful // Finish() call, returns the size of the final generated file. - uint64_t FileSize() const; + uint64_t FileSize() const override; private: bool ok() const { return status().ok(); } @@ -88,13 +65,17 @@ class TableBuilder { struct Rep; Rep* rep_; - int level_; + + // Advanced operation: flush any buffered key/value pairs to file. + // Can be used to ensure that two adjacent entries never live in + // the same data block. Most clients should not need to use this method. + // REQUIRES: Finish(), Abandon() have not been called + void Flush(); // No copying allowed - TableBuilder(const TableBuilder&); - void operator=(const TableBuilder&); + BlockBasedTableBuilder(const BlockBasedTableBuilder&) = delete; + void operator=(const BlockBasedTableBuilder&) = delete; }; } // namespace rocksdb -#endif // STORAGE_ROCKSDB_INCLUDE_TABLE_BUILDER_H_ diff --git a/table/block_based_table_factory.cc b/table/block_based_table_factory.cc new file mode 100644 index 000000000..0944c7f56 --- /dev/null +++ b/table/block_based_table_factory.cc @@ -0,0 +1,34 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + + +#include "table/block_based_table_factory.h" + +#include +#include +#include "table/block_based_table_builder.h" +#include "table/block_based_table_reader.h" +#include "port/port.h" + +namespace rocksdb { + +Status BlockBasedTableFactory::GetTableReader( + const Options& options, const EnvOptions& soptions, + unique_ptr && file, uint64_t file_size, + unique_ptr* table_reader) const { + return BlockBasedTable::Open(options, soptions, std::move(file), file_size, + table_reader); +} + +TableBuilder* BlockBasedTableFactory::GetTableBuilder( + const Options& options, WritableFile* file, + CompressionType compression_type) const { + return new BlockBasedTableBuilder(options, file, compression_type); +} +} // namespace rocksdb diff --git a/table/block_based_table_factory.h b/table/block_based_table_factory.h new file mode 100644 index 000000000..677979f4e --- /dev/null +++ b/table/block_based_table_factory.h @@ -0,0 +1,49 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once +#include +#include + +#include "rocksdb/table.h" + +namespace rocksdb { + +struct Options; +struct EnvOptions; + +using std::unique_ptr; +class Status; +class RandomAccessFile; +class WritableFile; +class Table; +class TableBuilder; +class BlockBasedTable; +class BlockBasedTableBuilder; + +class BlockBasedTableFactory: public TableFactory { +public: + ~BlockBasedTableFactory() { + } + BlockBasedTableFactory() { + } + const char* Name() const override { + return "BlockBasedTable"; + } + Status GetTableReader(const Options& options, const EnvOptions& soptions, + unique_ptr && file, + uint64_t file_size, + unique_ptr* table_reader) const override; + + TableBuilder* GetTableBuilder(const Options& options, WritableFile* file, + CompressionType compression_type) const + override; +}; + +} // namespace rocksdb diff --git a/table/table.cc b/table/block_based_table_reader.cc similarity index 79% rename from table/table.cc rename to table/block_based_table_reader.cc index 6243ab144..bec087033 100644 --- a/table/table.cc +++ b/table/block_based_table_reader.cc @@ -7,7 +7,7 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. -#include "table/table.h" +#include "table/block_based_table_reader.h" #include "db/dbformat.h" @@ -17,11 +17,11 @@ #include "rocksdb/filter_policy.h" #include "rocksdb/options.h" #include "rocksdb/statistics.h" +#include "rocksdb/table.h" #include "table/block.h" #include "table/filter_block.h" #include "table/format.h" -#include "table/table.h" #include "table/two_level_iterator.h" #include "util/coding.h" @@ -35,7 +35,7 @@ namespace rocksdb { // varints. const size_t kMaxCacheKeyPrefixSize = kMaxVarint64Length*3+1; -struct Table::Rep { +struct BlockBasedTable::Rep { ~Rep() { delete filter; delete [] filter_data; @@ -59,8 +59,12 @@ struct Table::Rep { TableStats table_stats; }; +BlockBasedTable::~BlockBasedTable() { + delete rep_; +} + // Helper function to setup the cache key's prefix for the Table. -void Table::SetupCacheKeyPrefix(Rep* rep) { +void BlockBasedTable::SetupCacheKeyPrefix(Rep* rep) { assert(kMaxCacheKeyPrefixSize >= 10); rep->cache_key_prefix_size = 0; if (rep->options.block_cache) { @@ -105,12 +109,12 @@ Status ReadBlock(RandomAccessFile* file, } // end of anonymous namespace -Status Table::Open(const Options& options, - const EnvOptions& soptions, - unique_ptr&& file, - uint64_t size, - unique_ptr
* table) { - table->reset(); +Status BlockBasedTable::Open(const Options& options, + const EnvOptions& soptions, + unique_ptr && file, + uint64_t size, + unique_ptr* table_reader) { + table_reader->reset(); if (size < Footer::kEncodedLength) { return Status::InvalidArgument("file is too short to be an sstable"); } @@ -139,7 +143,7 @@ Status Table::Open(const Options& options, if (s.ok()) { // We've successfully read the footer and the index block: we're // ready to serve requests. - Rep* rep = new Table::Rep(soptions); + BlockBasedTable::Rep* rep = new BlockBasedTable::Rep(soptions); rep->options = options; rep->file = std::move(file); rep->metaindex_handle = footer.metaindex_handle(); @@ -147,8 +151,8 @@ Status Table::Open(const Options& options, SetupCacheKeyPrefix(rep); rep->filter_data = nullptr; rep->filter = nullptr; - table->reset(new Table(rep)); - (*table)->ReadMeta(footer); + table_reader->reset(new BlockBasedTable(rep)); + ((BlockBasedTable*) (table_reader->get()))->ReadMeta(footer); } else { if (index_block) delete index_block; } @@ -156,7 +160,7 @@ Status Table::Open(const Options& options, return s; } -void Table::SetupForCompaction() { +void BlockBasedTable::SetupForCompaction() { switch (rep_->options.access_hint_on_compaction_start) { case Options::NONE: break; @@ -175,11 +179,11 @@ void Table::SetupForCompaction() { compaction_optimized_ = true; } -const TableStats& Table::GetTableStats() const { +TableStats& BlockBasedTable::GetTableStats() { return rep_->table_stats; } -void Table::ReadMeta(const Footer& footer) { +void BlockBasedTable::ReadMeta(const Footer& footer) { // TODO(sanjay): Skip this if footer.metaindex_handle() size indicates // it is an empty block. // TODO: we never really verify check sum for meta index block @@ -222,7 +226,7 @@ void Table::ReadMeta(const Footer& footer) { delete meta; } -void Table::ReadFilter(const Slice& filter_handle_value) { +void BlockBasedTable::ReadFilter(const Slice& filter_handle_value) { Slice v = filter_handle_value; BlockHandle filter_handle; if (!filter_handle.DecodeFrom(&v).ok()) { @@ -230,7 +234,7 @@ void Table::ReadFilter(const Slice& filter_handle_value) { } // TODO: We might want to unify with ReadBlock() if we start - // requiring checksum verification in Table::Open. + // requiring checksum verification in BlockBasedTable::Open. ReadOptions opt; BlockContents block; if (!ReadBlockContents(rep_->file.get(), opt, filter_handle, &block, @@ -243,7 +247,7 @@ void Table::ReadFilter(const Slice& filter_handle_value) { rep_->filter = new FilterBlockReader(rep_->options, block.data); } -Status Table::ReadStats(const Slice& handle_value, Rep* rep) { +Status BlockBasedTable::ReadStats(const Slice& handle_value, Rep* rep) { Slice v = handle_value; BlockHandle handle; if (!handle.DecodeFrom(&v).ok()) { @@ -271,12 +275,12 @@ Status Table::ReadStats(const Slice& handle_value, Rep* rep) { auto& table_stats = rep->table_stats; // All pre-defined stats of type uint64_t std::unordered_map predefined_uint64_stats = { - { TableStatsNames::kDataSize, &table_stats.data_size }, - { TableStatsNames::kIndexSize, &table_stats.index_size }, - { TableStatsNames::kRawKeySize, &table_stats.raw_key_size }, - { TableStatsNames::kRawValueSize, &table_stats.raw_value_size }, - { TableStatsNames::kNumDataBlocks, &table_stats.num_data_blocks }, - { TableStatsNames::kNumEntries, &table_stats.num_entries }, + { BlockBasedTableStatsNames::kDataSize, &table_stats.data_size }, + { BlockBasedTableStatsNames::kIndexSize, &table_stats.index_size }, + { BlockBasedTableStatsNames::kRawKeySize, &table_stats.raw_key_size }, + { BlockBasedTableStatsNames::kRawValueSize, &table_stats.raw_value_size }, + { BlockBasedTableStatsNames::kNumDataBlocks, &table_stats.num_data_blocks}, + { BlockBasedTableStatsNames::kNumEntries, &table_stats.num_entries }, }; std::string last_key; @@ -309,7 +313,7 @@ Status Table::ReadStats(const Slice& handle_value, Rep* rep) { continue; } *(pos->second) = val; - } else if (key == TableStatsNames::kFilterPolicy) { + } else if (key == BlockBasedTableStatsNames::kFilterPolicy) { table_stats.filter_policy_name = raw_val.ToString(); } else { // handle user-collected @@ -322,10 +326,6 @@ Status Table::ReadStats(const Slice& handle_value, Rep* rep) { return s; } -Table::~Table() { - delete rep_; -} - static void DeleteBlock(void* arg, void* ignored) { delete reinterpret_cast(arg); } @@ -343,13 +343,13 @@ static void ReleaseBlock(void* arg, void* h) { // Convert an index iterator value (i.e., an encoded BlockHandle) // into an iterator over the contents of the corresponding block. -Iterator* Table::BlockReader(void* arg, - const ReadOptions& options, - const Slice& index_value, - bool* didIO, - bool for_compaction) { +Iterator* BlockBasedTable::BlockReader(void* arg, + const ReadOptions& options, + const Slice& index_value, + bool* didIO, + bool for_compaction) { const bool no_io = (options.read_tier == kBlockCacheTier); - Table* table = reinterpret_cast(arg); + BlockBasedTable* table = reinterpret_cast(arg); Cache* block_cache = table->rep_->options.block_cache.get(); std::shared_ptr statistics = table->rep_->options.statistics; Block* block = nullptr; @@ -427,11 +427,11 @@ Iterator* Table::BlockReader(void* arg, return iter; } -Iterator* Table::BlockReader(void* arg, - const ReadOptions& options, - const EnvOptions& soptions, - const Slice& index_value, - bool for_compaction) { +Iterator* BlockBasedTable::BlockReader(void* arg, + const ReadOptions& options, + const EnvOptions& soptions, + const Slice& index_value, + bool for_compaction) { return BlockReader(arg, options, index_value, nullptr, for_compaction); } @@ -448,7 +448,7 @@ Iterator* Table::BlockReader(void* arg, // in memory. When blooms may need to be paged in, we should refactor so that // this is only ever called lazily. In particular, this shouldn't be called // while the DB lock is held like it is now. -bool Table::PrefixMayMatch(const Slice& internal_prefix) const { +bool BlockBasedTable::PrefixMayMatch(const Slice& internal_prefix) { FilterBlockReader* filter = rep_->filter; bool may_match = true; Status s; @@ -464,7 +464,7 @@ bool Table::PrefixMayMatch(const Slice& internal_prefix) const { // we're past end of file may_match = false; } else if (ExtractUserKey(iiter->key()).starts_with( - ExtractUserKey(internal_prefix))) { + ExtractUserKey(internal_prefix))) { // we need to check for this subtle case because our only // guarantee is that "the key is a string >= last key in that data // block" according to the doc/table_format.txt spec. @@ -497,7 +497,7 @@ bool Table::PrefixMayMatch(const Slice& internal_prefix) const { return may_match; } -Iterator* Table::NewIterator(const ReadOptions& options) const { +Iterator* BlockBasedTable::NewIterator(const ReadOptions& options) { if (options.prefix) { InternalKey internal_prefix(*options.prefix, 0, kTypeValue); if (!PrefixMayMatch(internal_prefix.Encode())) { @@ -509,24 +509,27 @@ Iterator* Table::NewIterator(const ReadOptions& options) const { return NewTwoLevelIterator( rep_->index_block->NewIterator(rep_->options.comparator), - &Table::BlockReader, const_cast(this), options, rep_->soptions); + &BlockBasedTable::BlockReader, const_cast(this), + options, rep_->soptions); } -Status Table::InternalGet(const ReadOptions& options, const Slice& k, - void* arg, - bool (*saver)(void*, const Slice&, const Slice&, - bool), - void (*mark_key_may_exist)(void*)) { +Status BlockBasedTable::Get( + const ReadOptions& readOptions, + const Slice& key, + void* handle_context, + bool (*result_handler)(void* handle_context, const Slice& k, + const Slice& v, bool didIO), + void (*mark_key_may_exist_handler)(void* handle_context)) { Status s; Iterator* iiter = rep_->index_block->NewIterator(rep_->options.comparator); bool done = false; - for (iiter->Seek(k); iiter->Valid() && !done; iiter->Next()) { + for (iiter->Seek(key); iiter->Valid() && !done; iiter->Next()) { Slice handle_value = iiter->value(); FilterBlockReader* filter = rep_->filter; BlockHandle handle; if (filter != nullptr && handle.DecodeFrom(&handle_value).ok() && - !filter->KeyMayMatch(handle.offset(), k)) { + !filter->KeyMayMatch(handle.offset(), key)) { // Not found // TODO: think about interaction with Merge. If a user key cannot // cross one data block, we should be fine. @@ -535,19 +538,20 @@ Status Table::InternalGet(const ReadOptions& options, const Slice& k, } else { bool didIO = false; std::unique_ptr block_iter( - BlockReader(this, options, iiter->value(), &didIO)); + BlockReader(this, readOptions, iiter->value(), &didIO)); - if (options.read_tier && block_iter->status().IsIncomplete()) { + if (readOptions.read_tier && block_iter->status().IsIncomplete()) { // couldn't get block from block_cache // Update Saver.state to Found because we are only looking for whether // we can guarantee the key is not there when "no_io" is set - (*mark_key_may_exist)(arg); + (*mark_key_may_exist_handler)(handle_context); break; } // Call the *saver function on each entry/block until it returns false - for (block_iter->Seek(k); block_iter->Valid(); block_iter->Next()) { - if (!(*saver)(arg, block_iter->key(), block_iter->value(), didIO)) { + for (block_iter->Seek(key); block_iter->Valid(); block_iter->Next()) { + if (!(*result_handler)(handle_context, block_iter->key(), + block_iter->value(), didIO)) { done = true; break; } @@ -566,16 +570,17 @@ bool SaveDidIO(void* arg, const Slice& key, const Slice& value, bool didIO) { *reinterpret_cast(arg) = didIO; return false; } -bool Table::TEST_KeyInCache(const ReadOptions& options, const Slice& key) { - // We use InternalGet() as it has logic that checks whether we read the +bool BlockBasedTable::TEST_KeyInCache(const ReadOptions& options, + const Slice& key) { + // We use Get() as it has logic that checks whether we read the // block from the disk or not. bool didIO = false; - Status s = InternalGet(options, key, &didIO, SaveDidIO); + Status s = Get(options, key, &didIO, SaveDidIO); assert(s.ok()); return !didIO; } -uint64_t Table::ApproximateOffsetOf(const Slice& key) const { +uint64_t BlockBasedTable::ApproximateOffsetOf(const Slice& key) { Iterator* index_iter = rep_->index_block->NewIterator(rep_->options.comparator); index_iter->Seek(key); @@ -602,15 +607,20 @@ uint64_t Table::ApproximateOffsetOf(const Slice& key) const { return result; } -const std::string Table::kFilterBlockPrefix = "filter."; -const std::string Table::kStatsBlock = "rocksdb.stats"; - -const std::string TableStatsNames::kDataSize = "rocksdb.data.size"; -const std::string TableStatsNames::kIndexSize = "rocksdb.index.size"; -const std::string TableStatsNames::kRawKeySize = "rocksdb.raw.key.size"; -const std::string TableStatsNames::kRawValueSize = "rocksdb.raw.value.size"; -const std::string TableStatsNames::kNumDataBlocks = "rocksdb.num.data.blocks"; -const std::string TableStatsNames::kNumEntries = "rocksdb.num.entries"; -const std::string TableStatsNames::kFilterPolicy = "rocksdb.filter.policy"; +const std::string BlockBasedTable::kFilterBlockPrefix = "filter."; +const std::string BlockBasedTable::kStatsBlock = "rocksdb.stats"; + +const std::string BlockBasedTableStatsNames::kDataSize = "rocksdb.data.size"; +const std::string BlockBasedTableStatsNames::kIndexSize = "rocksdb.index.size"; +const std::string BlockBasedTableStatsNames::kRawKeySize = + "rocksdb.raw.key.size"; +const std::string BlockBasedTableStatsNames::kRawValueSize = + "rocksdb.raw.value.size"; +const std::string BlockBasedTableStatsNames::kNumDataBlocks = + "rocksdb.num.data.blocks"; +const std::string BlockBasedTableStatsNames::kNumEntries = + "rocksdb.num.entries"; +const std::string BlockBasedTableStatsNames::kFilterPolicy = + "rocksdb.filter.policy"; } // namespace rocksdb diff --git a/table/table.h b/table/block_based_table_reader.h similarity index 74% rename from table/table.h rename to table/block_based_table_reader.h index 13dc3f32b..848c55655 100644 --- a/table/table.h +++ b/table/block_based_table_reader.h @@ -13,6 +13,7 @@ #include "rocksdb/env.h" #include "rocksdb/iterator.h" #include "rocksdb/table_stats.h" +#include "rocksdb/table.h" namespace rocksdb { @@ -23,13 +24,14 @@ struct Options; class RandomAccessFile; struct ReadOptions; class TableCache; +class TableReader; using std::unique_ptr; // A Table is a sorted map from strings to strings. Tables are // immutable and persistent. A Table may be safely accessed from // multiple threads without external synchronization. -class Table { +class BlockBasedTable : public TableReader { public: static const std::string kFilterBlockPrefix; static const std::string kStatsBlock; @@ -38,28 +40,33 @@ class Table { // of "file", and read the metadata entries necessary to allow // retrieving data from the table. // - // If successful, returns ok and sets "*table" to the newly opened - // table. The client should delete "*table" when no longer needed. - // If there was an error while initializing the table, sets "*table" - // to nullptr and returns a non-ok status. Does not take ownership of - // "*source", but the client must ensure that "source" remains live - // for the duration of the returned table's lifetime. + // If successful, returns ok and sets "*table_reader" to the newly opened + // table. The client should delete "*table_reader" when no longer needed. + // If there was an error while initializing the table, sets "*table_reader" + // to nullptr and returns a non-ok status. // // *file must remain live while this Table is in use. static Status Open(const Options& options, const EnvOptions& soptions, unique_ptr&& file, uint64_t file_size, - unique_ptr
* table); + unique_ptr* table_reader); - ~Table(); - - bool PrefixMayMatch(const Slice& internal_prefix) const; + bool PrefixMayMatch(const Slice& internal_prefix) override; // Returns a new iterator over the table contents. // The result of NewIterator() is initially invalid (caller must // call one of the Seek methods on the iterator before using it). - Iterator* NewIterator(const ReadOptions&) const; + Iterator* NewIterator(const ReadOptions&) override; + + Status Get( + const ReadOptions& readOptions, + const Slice& key, + void* handle_context, + bool (*result_handler)(void* handle_context, const Slice& k, + const Slice& v, bool didIO), + void (*mark_key_may_exist_handler)(void* handle_context) = nullptr) + override; // Given a key, return an approximate byte offset in the file where // the data for that key begins (or would begin if the key were @@ -67,24 +74,25 @@ class Table { // bytes, and so includes effects like compression of the underlying data. // E.g., the approximate offset of the last key in the table will // be close to the file length. - uint64_t ApproximateOffsetOf(const Slice& key) const; + uint64_t ApproximateOffsetOf(const Slice& key) override; // Returns true if the block for the specified key is in cache. // REQUIRES: key is in this table. - bool TEST_KeyInCache(const ReadOptions& options, const Slice& key); + bool TEST_KeyInCache(const ReadOptions& options, const Slice& key) override; // Set up the table for Compaction. Might change some parameters with // posix_fadvise - void SetupForCompaction(); + void SetupForCompaction() override; + + TableStats& GetTableStats() override; - const TableStats& GetTableStats() const; + ~BlockBasedTable(); private: struct Rep; Rep* rep_; bool compaction_optimized_; - explicit Table(Rep* rep) : compaction_optimized_(false) { rep_ = rep; } static Iterator* BlockReader(void*, const ReadOptions&, const EnvOptions& soptions, const Slice&, bool for_compaction); @@ -95,12 +103,6 @@ class Table { // after a call to Seek(key), until handle_result returns false. // May not make such a call if filter policy says that key is not present. friend class TableCache; - Status InternalGet( - const ReadOptions&, const Slice& key, - void* arg, - bool (*handle_result)(void* arg, const Slice& k, const Slice& v, bool), - void (*mark_key_may_exist)(void*) = nullptr); - void ReadMeta(const Footer& footer); void ReadFilter(const Slice& filter_handle_value); @@ -108,12 +110,17 @@ class Table { static void SetupCacheKeyPrefix(Rep* rep); + explicit BlockBasedTable(Rep* rep) : + compaction_optimized_(false) { + rep_ = rep; + } + // No copying allowed - Table(const Table&); - void operator=(const Table&); + explicit BlockBasedTable(const TableReader&) = delete; + void operator=(const TableReader&) = delete; }; -struct TableStatsNames { +struct BlockBasedTableStatsNames { static const std::string kDataSize; static const std::string kIndexSize; static const std::string kRawKeySize; diff --git a/table/block_builder.cc b/table/block_builder.cc index 592d7136c..0382dfc24 100644 --- a/table/block_builder.cc +++ b/table/block_builder.cc @@ -36,7 +36,6 @@ #include #include #include "rocksdb/comparator.h" -#include "rocksdb/table_builder.h" #include "util/coding.h" namespace rocksdb { diff --git a/table/block_builder.h b/table/block_builder.h index 046b8affb..c5ea6690d 100644 --- a/table/block_builder.h +++ b/table/block_builder.h @@ -20,7 +20,7 @@ class Comparator; class BlockBuilder { public: - BlockBuilder(int block_builder, const Comparator* comparator); + BlockBuilder(int block_restart_interval, const Comparator* comparator); explicit BlockBuilder(const Options* options); // Reset the contents as if the BlockBuilder was just constructed. diff --git a/table/block_test.cc b/table/block_test.cc index eb141160c..f1b16faff 100644 --- a/table/block_test.cc +++ b/table/block_test.cc @@ -10,10 +10,9 @@ #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" -#include "rocksdb/table_builder.h" +#include "rocksdb/table.h" #include "table/block.h" #include "table/block_builder.h" -#include "table/table.h" #include "table/format.h" #include "util/random.h" #include "util/testharness.h" diff --git a/table/format.h b/table/format.h index 91e74638f..82f86884e 100644 --- a/table/format.h +++ b/table/format.h @@ -12,7 +12,7 @@ #include #include "rocksdb/slice.h" #include "rocksdb/status.h" -#include "rocksdb/table_builder.h" +#include "rocksdb/table.h" namespace rocksdb { diff --git a/table/table_test.cc b/table/table_test.cc index b8e25877a..ad4129901 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -17,12 +17,13 @@ #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" -#include "rocksdb/table_builder.h" +#include "rocksdb/table.h" #include "rocksdb/memtablerep.h" #include "table/block.h" #include "table/block_builder.h" #include "table/format.h" -#include "table/table.h" +#include "table/block_based_table_reader.h" +#include "table/block_based_table_builder.h" #include "util/random.h" #include "util/testharness.h" #include "util/testutil.h" @@ -237,19 +238,19 @@ class BlockConstructor: public Constructor { BlockConstructor(); }; -class TableConstructor: public Constructor { +class BlockBasedTableConstructor: public Constructor { public: - explicit TableConstructor( + explicit BlockBasedTableConstructor( const Comparator* cmp) : Constructor(cmp) { } - ~TableConstructor() { + ~BlockBasedTableConstructor() { Reset(); } virtual Status FinishImpl(const Options& options, const KVMap& data) { Reset(); sink_.reset(new StringSink()); - TableBuilder builder(options, sink_.get()); + BlockBasedTableBuilder builder(options, sink_.get(), options.compression); for (KVMap::const_iterator it = data.begin(); it != data.end(); @@ -265,32 +266,37 @@ class TableConstructor: public Constructor { // Open the table uniq_id_ = cur_uniq_id_++; source_.reset(new StringSource(sink_->contents(), uniq_id_)); - return Table::Open(options, soptions, std::move(source_), - sink_->contents().size(), &table_); + unique_ptr table_factory; + return options.table_factory->GetTableReader(options, soptions, + std::move(source_), + sink_->contents().size(), + &table_reader_); } virtual Iterator* NewIterator() const { - return table_->NewIterator(ReadOptions()); + return table_reader_->NewIterator(ReadOptions()); } uint64_t ApproximateOffsetOf(const Slice& key) const { - return table_->ApproximateOffsetOf(key); + return table_reader_->ApproximateOffsetOf(key); } virtual Status Reopen(const Options& options) { source_.reset(new StringSource(sink_->contents(), uniq_id_)); - return Table::Open(options, soptions, std::move(source_), - sink_->contents().size(), &table_); + return options.table_factory->GetTableReader(options, soptions, + std::move(source_), + sink_->contents().size(), + &table_reader_); } - virtual Table* table() { - return table_.get(); + virtual TableReader* table_reader() { + return table_reader_.get(); } private: void Reset() { uniq_id_ = 0; - table_.reset(); + table_reader_.reset(); sink_.reset(); source_.reset(); } @@ -298,14 +304,14 @@ class TableConstructor: public Constructor { uint64_t uniq_id_; unique_ptr sink_; unique_ptr source_; - unique_ptr
table_; + unique_ptr table_reader_; - TableConstructor(); + BlockBasedTableConstructor(); static uint64_t cur_uniq_id_; const EnvOptions soptions; }; -uint64_t TableConstructor::cur_uniq_id_ = 1; +uint64_t BlockBasedTableConstructor::cur_uniq_id_ = 1; // A helper class that converts internal format keys into user keys class KeyConvertingIterator: public Iterator { @@ -533,7 +539,7 @@ class Harness { } switch (args.type) { case TABLE_TEST: - constructor_ = new TableConstructor(options_.comparator); + constructor_ = new BlockBasedTableConstructor(options_.comparator); break; case BLOCK_TEST: constructor_ = new BlockConstructor(options_.comparator); @@ -858,7 +864,7 @@ class TableTest { }; // This test include all the basic checks except those for index size and block // size, which will be conducted in separated unit tests. TEST(TableTest, BasicTableStats) { - TableConstructor c(BytewiseComparator()); + BlockBasedTableConstructor c(BytewiseComparator()); c.Add("a1", "val1"); c.Add("b2", "val2"); @@ -878,7 +884,7 @@ TEST(TableTest, BasicTableStats) { c.Finish(options, &keys, &kvmap); - auto& stats = c.table()->GetTableStats(); + auto& stats = c.table_reader()->GetTableStats(); ASSERT_EQ(kvmap.size(), stats.num_entries); auto raw_key_size = kvmap.size() * 2ul; @@ -902,7 +908,7 @@ TEST(TableTest, BasicTableStats) { } TEST(TableTest, FilterPolicyNameStats) { - TableConstructor c(BytewiseComparator()); + BlockBasedTableConstructor c(BytewiseComparator()); c.Add("a1", "val1"); std::vector keys; KVMap kvmap; @@ -913,7 +919,7 @@ TEST(TableTest, FilterPolicyNameStats) { options.filter_policy = filter_policy.get(); c.Finish(options, &keys, &kvmap); - auto& stats = c.table()->GetTableStats(); + auto& stats = c.table_reader()->GetTableStats(); ASSERT_EQ("rocksdb.BuiltinBloomFilter", stats.filter_policy_name); } @@ -942,7 +948,7 @@ TEST(TableTest, IndexSizeStat) { // Each time we load one more key to the table. the table index block // size is expected to be larger than last time's. for (size_t i = 1; i < keys.size(); ++i) { - TableConstructor c(BytewiseComparator()); + BlockBasedTableConstructor c(BytewiseComparator()); for (size_t j = 0; j < i; ++j) { c.Add(keys[j], "val"); } @@ -955,7 +961,7 @@ TEST(TableTest, IndexSizeStat) { c.Finish(options, &ks, &kvmap); auto index_size = - c.table()->GetTableStats().index_size; + c.table_reader()->GetTableStats().index_size; ASSERT_GT(index_size, last_index_size); last_index_size = index_size; } @@ -963,7 +969,7 @@ TEST(TableTest, IndexSizeStat) { TEST(TableTest, NumBlockStat) { Random rnd(test::RandomSeed()); - TableConstructor c(BytewiseComparator()); + BlockBasedTableConstructor c(BytewiseComparator()); Options options; options.compression = kNoCompression; options.block_restart_interval = 1; @@ -980,12 +986,12 @@ TEST(TableTest, NumBlockStat) { c.Finish(options, &ks, &kvmap); ASSERT_EQ( kvmap.size(), - c.table()->GetTableStats().num_data_blocks + c.table_reader()->GetTableStats().num_data_blocks ); } TEST(TableTest, ApproximateOffsetOfPlain) { - TableConstructor c(BytewiseComparator()); + BlockBasedTableConstructor c(BytewiseComparator()); c.Add("k01", "hello"); c.Add("k02", "hello2"); c.Add("k03", std::string(10000, 'x')); @@ -1016,7 +1022,7 @@ TEST(TableTest, ApproximateOffsetOfPlain) { static void Do_Compression_Test(CompressionType comp) { Random rnd(301); - TableConstructor c(BytewiseComparator()); + BlockBasedTableConstructor c(BytewiseComparator()); std::string tmp; c.Add("k01", "hello"); c.Add("k02", test::CompressibleString(&rnd, 0.25, 10000, &tmp)); @@ -1072,7 +1078,7 @@ TEST(TableTest, BlockCacheLeak) { opt.block_cache = NewLRUCache(16*1024*1024); // big enough so we don't ever // lose cached values. - TableConstructor c(BytewiseComparator()); + BlockBasedTableConstructor c(BytewiseComparator()); c.Add("k01", "hello"); c.Add("k02", "hello2"); c.Add("k03", std::string(10000, 'x')); @@ -1095,7 +1101,7 @@ TEST(TableTest, BlockCacheLeak) { ASSERT_OK(c.Reopen(opt)); for (const std::string& key: keys) { - ASSERT_TRUE(c.table()->TEST_KeyInCache(ReadOptions(), key)); + ASSERT_TRUE(c.table_reader()->TEST_KeyInCache(ReadOptions(), key)); } } diff --git a/table/two_level_iterator.cc b/table/two_level_iterator.cc index a5c1e592e..965aa65a1 100644 --- a/table/two_level_iterator.cc +++ b/table/two_level_iterator.cc @@ -9,9 +9,10 @@ #include "table/two_level_iterator.h" +#include "rocksdb/options.h" +#include "rocksdb/table.h" #include "table/block.h" #include "table/format.h" -#include "table/table.h" #include "table/iterator_wrapper.h" namespace rocksdb { diff --git a/tools/blob_store_bench.cc b/tools/blob_store_bench.cc index 0d0751ead..70ece2c5f 100644 --- a/tools/blob_store_bench.cc +++ b/tools/blob_store_bench.cc @@ -8,6 +8,12 @@ #define KB 1024LL #define MB 1024*1024LL +// BlobStore does costly asserts to make sure it's running correctly, which +// significantly impacts benchmark runtime. +// NDEBUG will compile out those asserts. +#ifndef NDEBUG +#define NDEBUG +#endif using namespace rocksdb; using namespace std; diff --git a/tools/sst_dump.cc b/tools/sst_dump.cc index c2a69af7b..c89b8e372 100644 --- a/tools/sst_dump.cc +++ b/tools/sst_dump.cc @@ -3,7 +3,6 @@ // LICENSE file in the root directory of this source tree. An additional grant // of patent rights can be found in the PATENTS file in the same directory. // -#include "table/table.h" #include #include @@ -15,7 +14,7 @@ #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/iterator.h" -#include "rocksdb/table_builder.h" +#include "rocksdb/table.h" #include "table/block.h" #include "table/block_builder.h" #include "table/format.h" @@ -64,7 +63,7 @@ Status SstFileReader::ReadSequential(bool print_kv, bool has_to, const std::string& to_key) { - unique_ptr
table; + unique_ptr table_reader; InternalKeyComparator internal_comparator_(BytewiseComparator()); Options table_options; table_options.comparator = &internal_comparator_; @@ -76,12 +75,16 @@ Status SstFileReader::ReadSequential(bool print_kv, } uint64_t file_size; table_options.env->GetFileSize(file_name_, &file_size); - s = Table::Open(table_options, soptions_, std::move(file), file_size, &table); + unique_ptr table_factory; + s = table_options.table_factory->GetTableReader(table_options, soptions_, + std::move(file), file_size, + &table_reader); if(!s.ok()) { return s; } - Iterator* iter = table->NewIterator(ReadOptions(verify_checksum_, false)); + Iterator* iter = table_reader->NewIterator(ReadOptions(verify_checksum_, + false)); uint64_t i = 0; if (has_from) { InternalKey ikey(from_key, kMaxSequenceNumber, kValueTypeForSeek); diff --git a/util/options.cc b/util/options.cc index df1a82e48..f828b026a 100644 --- a/util/options.cc +++ b/util/options.cc @@ -17,6 +17,7 @@ #include "rocksdb/env.h" #include "rocksdb/filter_policy.h" #include "rocksdb/merge_operator.h" +#include "table/block_based_table_factory.h" namespace rocksdb { @@ -91,6 +92,8 @@ Options::Options() filter_deletes(false), max_sequential_skip_in_iterations(8), memtable_factory(std::shared_ptr(new SkipListFactory)), + table_factory( + std::shared_ptr(new BlockBasedTableFactory())), compaction_filter_factory( std::shared_ptr( new DefaultCompactionFilterFactory())), @@ -116,6 +119,7 @@ Options::Dump(Logger* log) const compaction_filter_factory->Name()); Log(log," Options.memtable_factory: %s", memtable_factory->Name()); + Log(log," Options.table_factory: %s", table_factory->Name()); Log(log," Options.error_if_exists: %d", error_if_exists); Log(log," Options.create_if_missing: %d", create_if_missing); Log(log," Options.paranoid_checks: %d", paranoid_checks);