diff --git a/db/db_impl_add_file.cc b/db/db_impl_add_file.cc index e2e2ae188..4277e93c0 100644 --- a/db/db_impl_add_file.cc +++ b/db/db_impl_add_file.cc @@ -11,12 +11,10 @@ #include +#include "db/builder.h" #include "rocksdb/db.h" #include "rocksdb/env.h" #include "rocksdb/sst_file_writer.h" - -#include "db/builder.h" -#include "table/sst_file_writer_collectors.h" #include "table/table_builder.h" #include "util/file_reader_writer.h" #include "util/file_util.h" @@ -69,12 +67,7 @@ Status DBImpl::ReadExternalSstFileInfo(ColumnFamilyHandle* column_family, file_info->version = DecodeFixed32(external_sst_file_version_iter->second.c_str()); - if (file_info->version == 2) { - // version 2 imply that we have global sequence number - - // TODO(tec): Implement version 2 ingestion - file_info->sequence_number = 0; - } else if (file_info->version == 1) { + if (file_info->version == 1) { // version 1 imply that all sequence numbers in table equal 0 file_info->sequence_number = 0; } else { @@ -172,20 +165,12 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family, if (file_info_list[i].num_entries == 0) { return Status::InvalidArgument("File contain no entries"); } - - if (file_info_list[i].version == 2) { - // version 2 imply that file have only Put Operations - // with global Sequence Number - - // TODO(tec): Implement changing file global sequence number - } else if (file_info_list[i].version == 1) { - // version 1 imply that file have only Put Operations - // with Sequence Number = 0 - } else { - // Unknown version ! + if (file_info_list[i].version != 1) { return Status::InvalidArgument( "Generated table version is not supported"); } + // version 1 imply that file have only Put Operations with Sequence Number = + // 0 meta_list[i].smallest = InternalKey(file_info_list[i].smallest_key, @@ -279,7 +264,7 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family, for (size_t i = 0; i < num_files; i++) { StopWatch sw(env_, nullptr, 0, µ_list[i], false); InternalKey range_start(file_info_list[i].smallest_key, - kMaxSequenceNumber, kValueTypeForSeek); + kMaxSequenceNumber, kTypeValue); iter->Seek(range_start.Encode()); status = iter->status(); diff --git a/db/dbformat.h b/db/dbformat.h index 550e93606..29d17b0ec 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -71,8 +71,6 @@ inline bool IsExtendedValueType(ValueType t) { static const SequenceNumber kMaxSequenceNumber = ((0x1ull << 56) - 1); -static const SequenceNumber kDisableGlobalSequenceNumber = port::kMaxUint64; - struct ParsedInternalKey { Slice user_key; SequenceNumber sequence; @@ -359,15 +357,6 @@ class IterKey { return Slice(key_, key_n); } - // Copy the key into IterKey own buf_ - void OwnKey() { - assert(IsKeyPinned() == true); - - EnlargeBufferIfNeeded(key_size_); - memcpy(buf_, key_, key_size_); - key_ = buf_; - } - // Update the sequence number in the internal key. Guarantees not to // invalidate slices to the key (and the user key). void UpdateInternalKey(uint64_t seq, ValueType t) { diff --git a/db/external_sst_file_test.cc b/db/external_sst_file_test.cc index fc71dc059..b79f1d957 100644 --- a/db/external_sst_file_test.cc +++ b/db/external_sst_file_test.cc @@ -1091,7 +1091,7 @@ TEST_F(ExternalSSTFileTest, CompactDuringAddFileRandom) { std::vector threads; while (range_id < 5000) { - int range_start = range_id * 10; + int range_start = (range_id * 20); int range_end = range_start + 10; file_keys.clear(); @@ -1114,18 +1114,6 @@ TEST_F(ExternalSSTFileTest, CompactDuringAddFileRandom) { range_id++; } - - for (int rid = 0; rid < 5000; rid++) { - int range_start = rid * 10; - int range_end = range_start + 10; - - ASSERT_EQ(Get(Key(range_start)), Key(range_start)) << rid; - ASSERT_EQ(Get(Key(range_end)), Key(range_end)) << rid; - for (int k = range_start + 1; k < range_end; k++) { - std::string v = Key(k) + ToString(rid); - ASSERT_EQ(Get(Key(k)), v) << rid; - } - } } TEST_F(ExternalSSTFileTest, PickedLevelDynamic) { diff --git a/include/rocksdb/sst_file_writer.h b/include/rocksdb/sst_file_writer.h index ace372545..0f8bf4a30 100644 --- a/include/rocksdb/sst_file_writer.h +++ b/include/rocksdb/sst_file_writer.h @@ -13,6 +13,12 @@ namespace rocksdb { class Comparator; +// Table Properties that are specific to tables created by SstFileWriter. +struct ExternalSstFilePropertyNames { + // value of this property is a fixed int32 number. + static const std::string kVersion; +}; + // ExternalSstFileInfo include information about sst files created // using SstFileWriter struct ExternalSstFileInfo { @@ -62,6 +68,8 @@ class SstFileWriter { Status Finish(ExternalSstFileInfo* file_info = nullptr); private: + class SstFileWriterPropertiesCollectorFactory; + class SstFileWriterPropertiesCollector; struct Rep; Rep* rep_; }; diff --git a/include/rocksdb/table_properties.h b/include/rocksdb/table_properties.h index 7e8e9ad95..c3f691a96 100644 --- a/include/rocksdb/table_properties.h +++ b/include/rocksdb/table_properties.h @@ -184,9 +184,6 @@ struct TableProperties { UserCollectedProperties user_collected_properties; UserCollectedProperties readable_properties; - // The offset of the value of each property in the file. - std::map properties_offsets; - // convert this object to a human readable form // @prop_delim: delimiter for each property. std::string ToString(const std::string& prop_delim = "; ", diff --git a/table/block.cc b/table/block.cc index 6ef46f4ce..dacd53d50 100644 --- a/table/block.cc +++ b/table/block.cc @@ -241,27 +241,6 @@ bool BlockIter::ParseNextKey() { key_.TrimAppend(shared, p, non_shared); key_pinned_ = false; } - - if (global_seqno_ != kDisableGlobalSequenceNumber) { - // If we are reading a file with a global sequence number we should - // expect that all encoded sequence numbers are zeros and all value - // types are kTypeValue - assert(GetInternalKeySeqno(key_.GetKey()) == 0); - assert(ExtractValueType(key_.GetKey()) == ValueType::kTypeValue); - - if (key_pinned_) { - // TODO(tec): Investigate updating the seqno in the loaded block - // directly instead of doing a copy and update. - - // We cannot use the key address in the block directly because - // we have a global_seqno_ that will overwrite the encoded one. - key_.OwnKey(); - key_pinned_ = false; - } - - key_.UpdateInternalKey(global_seqno_, ValueType::kTypeValue); - } - value_ = Slice(p + non_shared, value_length); while (restart_index_ + 1 < num_restarts_ && GetRestartPoint(restart_index_ + 1) < current_) { @@ -393,12 +372,11 @@ uint32_t Block::NumRestarts() const { return DecodeFixed32(data_ + size_ - sizeof(uint32_t)); } -Block::Block(BlockContents&& contents, SequenceNumber _global_seqno, - size_t read_amp_bytes_per_bit, Statistics* statistics) +Block::Block(BlockContents&& contents, size_t read_amp_bytes_per_bit, + Statistics* statistics) : contents_(std::move(contents)), data_(contents_.data.data()), - size_(contents_.data.size()), - global_seqno_(_global_seqno) { + size_(contents_.data.size()) { if (size_ < sizeof(uint32_t)) { size_ = 0; // Error marker } else { @@ -440,11 +418,10 @@ InternalIterator* Block::NewIterator(const Comparator* cmp, BlockIter* iter, if (iter != nullptr) { iter->Initialize(cmp, data_, restart_offset_, num_restarts, - prefix_index_ptr, global_seqno_, read_amp_bitmap_.get()); + prefix_index_ptr, read_amp_bitmap_.get()); } else { iter = new BlockIter(cmp, data_, restart_offset_, num_restarts, - prefix_index_ptr, global_seqno_, - read_amp_bitmap_.get()); + prefix_index_ptr, read_amp_bitmap_.get()); } if (read_amp_bitmap_) { diff --git a/table/block.h b/table/block.h index 4332bf235..b70ee8daf 100644 --- a/table/block.h +++ b/table/block.h @@ -147,8 +147,7 @@ class BlockReadAmpBitmap { class Block { public: // Initialize the block with the specified contents. - explicit Block(BlockContents&& contents, SequenceNumber _global_seqno, - size_t read_amp_bytes_per_bit = 0, + explicit Block(BlockContents&& contents, size_t read_amp_bytes_per_bit = 0, Statistics* statistics = nullptr); ~Block() = default; @@ -191,8 +190,6 @@ class Block { // Report an approximation of how much memory has been used. size_t ApproximateMemoryUsage() const; - SequenceNumber global_seqno() const { return global_seqno_; } - private: BlockContents contents_; const char* data_; // contents_.data.data() @@ -200,9 +197,6 @@ class Block { uint32_t restart_offset_; // Offset in data_ of restart array std::unique_ptr prefix_index_; std::unique_ptr read_amp_bitmap_; - // All keys in the block will have seqno = global_seqno_, regardless of - // the encoded value (kDisableGlobalSequenceNumber means disabled) - const SequenceNumber global_seqno_; // No copying allowed Block(const Block&); @@ -221,21 +215,20 @@ class BlockIter : public InternalIterator { status_(Status::OK()), prefix_index_(nullptr), key_pinned_(false), - global_seqno_(kDisableGlobalSequenceNumber), read_amp_bitmap_(nullptr), last_bitmap_offset_(0) {} BlockIter(const Comparator* comparator, const char* data, uint32_t restarts, uint32_t num_restarts, BlockPrefixIndex* prefix_index, - SequenceNumber global_seqno, BlockReadAmpBitmap* read_amp_bitmap) + BlockReadAmpBitmap* read_amp_bitmap) : BlockIter() { Initialize(comparator, data, restarts, num_restarts, prefix_index, - global_seqno, read_amp_bitmap); + read_amp_bitmap); } void Initialize(const Comparator* comparator, const char* data, uint32_t restarts, uint32_t num_restarts, - BlockPrefixIndex* prefix_index, SequenceNumber global_seqno, + BlockPrefixIndex* prefix_index, BlockReadAmpBitmap* read_amp_bitmap) { assert(data_ == nullptr); // Ensure it is called only once assert(num_restarts > 0); // Ensure the param is valid @@ -247,7 +240,6 @@ class BlockIter : public InternalIterator { current_ = restarts_; restart_index_ = num_restarts_; prefix_index_ = prefix_index; - global_seqno_ = global_seqno; read_amp_bitmap_ = read_amp_bitmap; last_bitmap_offset_ = current_ + 1; } @@ -304,10 +296,6 @@ class BlockIter : public InternalIterator { size_t TEST_CurrentEntrySize() { return NextEntryOffset() - current_; } - uint32_t ValueOffset() const { - return static_cast(value_.data() - data_); - } - private: const Comparator* comparator_; const char* data_; // underlying block contents @@ -322,7 +310,6 @@ class BlockIter : public InternalIterator { Status status_; BlockPrefixIndex* prefix_index_; bool key_pinned_; - SequenceNumber global_seqno_; // read-amp bitmap BlockReadAmpBitmap* read_amp_bitmap_; diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index 344d8cac1..724d69f17 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -806,7 +806,7 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents, BlockContents results(std::move(ubuf), size, true, type); - Block* block = new Block(std::move(results), kDisableGlobalSequenceNumber); + Block* block = new Block(std::move(results)); // make cache key by appending the file offset to the cache prefix id char* end = EncodeVarint64( diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index 4eff1bbc2..ef31c7012 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -38,7 +38,6 @@ #include "table/internal_iterator.h" #include "table/meta_blocks.h" #include "table/persistent_cache_helper.h" -#include "table/sst_file_writer_collectors.h" #include "table/two_level_iterator.h" #include "util/coding.h" @@ -70,14 +69,13 @@ Status ReadBlockFromFile(RandomAccessFileReader* file, const Footer& footer, const ImmutableCFOptions& ioptions, bool do_uncompress, const Slice& compression_dict, const PersistentCacheOptions& cache_options, - SequenceNumber global_seqno, size_t read_amp_bytes_per_bit) { BlockContents contents; Status s = ReadBlockContents(file, footer, options, handle, &contents, ioptions, do_uncompress, compression_dict, cache_options); if (s.ok()) { - result->reset(new Block(std::move(contents), global_seqno, - read_amp_bytes_per_bit, ioptions.statistics)); + result->reset(new Block(std::move(contents), read_amp_bytes_per_bit, + ioptions.statistics)); } return s; @@ -190,10 +188,10 @@ class BinarySearchIndexReader : public IndexReader { const Comparator* comparator, IndexReader** index_reader, const PersistentCacheOptions& cache_options) { std::unique_ptr index_block; - auto s = ReadBlockFromFile( - file, footer, ReadOptions(), index_handle, &index_block, ioptions, - true /* decompress */, Slice() /*compression dict*/, cache_options, - kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */); + auto s = ReadBlockFromFile(file, footer, ReadOptions(), index_handle, + &index_block, ioptions, true /* decompress */, + Slice() /*compression dict*/, cache_options, + 0 /* read_amp_bytes_per_bit */); if (s.ok()) { *index_reader = new BinarySearchIndexReader( @@ -242,10 +240,10 @@ class HashIndexReader : public IndexReader { bool hash_index_allow_collision, const PersistentCacheOptions& cache_options) { std::unique_ptr index_block; - auto s = ReadBlockFromFile( - file, footer, ReadOptions(), index_handle, &index_block, ioptions, - true /* decompress */, Slice() /*compression dict*/, cache_options, - kDisableGlobalSequenceNumber, 0 /* read_amp_bytes_per_bit */); + auto s = ReadBlockFromFile(file, footer, ReadOptions(), index_handle, + &index_block, ioptions, true /* decompress */, + Slice() /*compression dict*/, cache_options, + 0 /* read_amp_bytes_per_bit */); if (!s.ok()) { return s; @@ -371,8 +369,7 @@ struct BlockBasedTable::Rep { filter_type(FilterType::kNoFilter), whole_key_filtering(_table_opt.whole_key_filtering), prefix_filtering(true), - range_del_block(nullptr), - global_seqno(kDisableGlobalSequenceNumber) {} + range_del_block(nullptr) {} const ImmutableCFOptions& ioptions; const EnvOptions& env_options; @@ -431,13 +428,6 @@ struct BlockBasedTable::Rep { CachableEntry filter_entry; CachableEntry index_entry; unique_ptr range_del_block; - - // If global_seqno is used, all Keys in this file will have the same - // seqno with value `global_seqno`. - // - // A value of kDisableGlobalSequenceNumber means that this feature is disabled - // and every key have it's own seqno. - SequenceNumber global_seqno; }; BlockBasedTable::~BlockBasedTable() { @@ -516,50 +506,6 @@ bool IsFeatureSupported(const TableProperties& table_properties, } return true; } - -SequenceNumber GetGlobalSequenceNumber(const TableProperties& table_properties, - Logger* info_log) { - auto& props = table_properties.user_collected_properties; - - auto version_pos = props.find(ExternalSstFilePropertyNames::kVersion); - auto seqno_pos = props.find(ExternalSstFilePropertyNames::kGlobalSeqno); - - if (version_pos == props.end()) { - if (seqno_pos != props.end()) { - // This is not an external sst file, global_seqno is not supported. - assert(false); - Log(InfoLogLevel::ERROR_LEVEL, info_log, - "A non-external sst file have global seqno property with value %s", - seqno_pos->second.c_str()); - } - return kDisableGlobalSequenceNumber; - } - - uint32_t version = DecodeFixed32(version_pos->second.c_str()); - if (version < 2) { - if (seqno_pos != props.end() || version != 1) { - // This is a v1 external sst file, global_seqno is not supported. - assert(false); - Log(InfoLogLevel::ERROR_LEVEL, info_log, - "An external sst file with version %u have global seqno property " - "with value %s", - version, seqno_pos->second.c_str()); - } - return kDisableGlobalSequenceNumber; - } - - SequenceNumber global_seqno = DecodeFixed64(seqno_pos->second.c_str()); - - if (global_seqno > kMaxSequenceNumber) { - assert(false); - Log(InfoLogLevel::ERROR_LEVEL, info_log, - "An external sst file with version %u have global seqno property " - "with value %llu, which is greater than kMaxSequenceNumber", - version, global_seqno); - } - - return global_seqno; -} } // namespace Slice BlockBasedTable::GetCacheKey(const char* cache_key_prefix, @@ -723,8 +669,8 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, "Encountered error while reading data from range del block %s", s.ToString().c_str()); } else { - rep->range_del_block.reset(new Block( - std::move(range_del_block_contents), kDisableGlobalSequenceNumber)); + rep->range_del_block.reset( + new Block(std::move(range_del_block_contents))); } } } @@ -738,9 +684,6 @@ Status BlockBasedTable::Open(const ImmutableCFOptions& ioptions, rep->prefix_filtering &= IsFeatureSupported( *(rep->table_properties), BlockBasedTablePropertyNames::kPrefixFiltering, rep->ioptions.info_log); - - rep->global_seqno = GetGlobalSequenceNumber(*(rep->table_properties), - rep->ioptions.info_log); } // pre-fetching of blocks is turned on @@ -854,8 +797,7 @@ Status BlockBasedTable::ReadMetaBlock(Rep* rep, rep->file.get(), rep->footer, ReadOptions(), rep->footer.metaindex_handle(), &meta, rep->ioptions, true /* decompress */, Slice() /*compression dict*/, - rep->persistent_cache_options, kDisableGlobalSequenceNumber, - 0 /* read_amp_bytes_per_bit */); + rep->persistent_cache_options, 0 /* read_amp_bytes_per_bit */); if (!s.ok()) { Log(InfoLogLevel::ERROR_LEVEL, rep->ioptions.info_log, @@ -925,10 +867,8 @@ Status BlockBasedTable::GetDataBlockFromCache( // Insert uncompressed block into block cache if (s.ok()) { - block->value = - new Block(std::move(contents), compressed_block->global_seqno(), - read_amp_bytes_per_bit, - statistics); // uncompressed block + block->value = new Block(std::move(contents), read_amp_bytes_per_bit, + statistics); // uncompressed block assert(block->value->compression_type() == kNoCompression); if (block_cache != nullptr && block->value->cachable() && read_options.fill_cache) { @@ -973,9 +913,8 @@ Status BlockBasedTable::PutDataBlockToCache( } if (raw_block->compression_type() != kNoCompression) { - block->value = new Block(std::move(contents), raw_block->global_seqno(), - read_amp_bytes_per_bit, - statistics); // uncompressed block + block->value = new Block(std::move(contents), read_amp_bytes_per_bit, + statistics); // compressed block } else { block->value = raw_block; raw_block = nullptr; @@ -1283,11 +1222,11 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator( std::unique_ptr raw_block; { StopWatch sw(rep->ioptions.env, statistics, READ_BLOCK_GET_MICROS); - s = ReadBlockFromFile( - rep->file.get(), rep->footer, ro, handle, &raw_block, rep->ioptions, - block_cache_compressed == nullptr, compression_dict, - rep->persistent_cache_options, rep->global_seqno, - rep->table_options.read_amp_bytes_per_bit); + s = ReadBlockFromFile(rep->file.get(), rep->footer, ro, handle, + &raw_block, rep->ioptions, + block_cache_compressed == nullptr, + compression_dict, rep->persistent_cache_options, + rep->table_options.read_amp_bytes_per_bit); } if (s.ok()) { @@ -1311,10 +1250,10 @@ InternalIterator* BlockBasedTable::NewDataBlockIterator( } } std::unique_ptr block_value; - s = ReadBlockFromFile( - rep->file.get(), rep->footer, ro, handle, &block_value, rep->ioptions, - true /* compress */, compression_dict, rep->persistent_cache_options, - rep->global_seqno, rep->table_options.read_amp_bytes_per_bit); + s = ReadBlockFromFile(rep->file.get(), rep->footer, ro, handle, + &block_value, rep->ioptions, true /* compress */, + compression_dict, rep->persistent_cache_options, + rep->table_options.read_amp_bytes_per_bit); if (s.ok()) { block.value = block_value.release(); } diff --git a/table/block_test.cc b/table/block_test.cc index 7d54b7665..b720b8dad 100644 --- a/table/block_test.cc +++ b/table/block_test.cc @@ -95,7 +95,7 @@ TEST_F(BlockTest, SimpleTest) { BlockContents contents; contents.data = rawblock; contents.cachable = false; - Block reader(std::move(contents), kDisableGlobalSequenceNumber); + Block reader(std::move(contents)); // read contents of block sequentially int count = 0; @@ -156,8 +156,8 @@ void CheckBlockContents(BlockContents contents, const int max_key, // create block reader BlockContents contents_ref(contents.data, contents.cachable, contents.compression_type); - Block reader1(std::move(contents), kDisableGlobalSequenceNumber); - Block reader2(std::move(contents_ref), kDisableGlobalSequenceNumber); + Block reader1(std::move(contents)); + Block reader2(std::move(contents_ref)); std::unique_ptr prefix_extractor( NewFixedPrefixTransform(prefix_size)); @@ -358,8 +358,7 @@ TEST_F(BlockTest, BlockWithReadAmpBitmap) { BlockContents contents; contents.data = rawblock; contents.cachable = true; - Block reader(std::move(contents), kDisableGlobalSequenceNumber, - kBytesPerBit, stats.get()); + Block reader(std::move(contents), kBytesPerBit, stats.get()); // read contents of block sequentially size_t read_bytes = 0; @@ -392,8 +391,7 @@ TEST_F(BlockTest, BlockWithReadAmpBitmap) { BlockContents contents; contents.data = rawblock; contents.cachable = true; - Block reader(std::move(contents), kDisableGlobalSequenceNumber, - kBytesPerBit, stats.get()); + Block reader(std::move(contents), kBytesPerBit, stats.get()); size_t read_bytes = 0; BlockIter *iter = static_cast( @@ -428,8 +426,7 @@ TEST_F(BlockTest, BlockWithReadAmpBitmap) { BlockContents contents; contents.data = rawblock; contents.cachable = true; - Block reader(std::move(contents), kDisableGlobalSequenceNumber, - kBytesPerBit, stats.get()); + Block reader(std::move(contents), kBytesPerBit, stats.get()); size_t read_bytes = 0; BlockIter *iter = static_cast( diff --git a/table/meta_blocks.cc b/table/meta_blocks.cc index d851f73da..22c9ff5db 100644 --- a/table/meta_blocks.cc +++ b/table/meta_blocks.cc @@ -175,10 +175,9 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file, return s; } - Block properties_block(std::move(block_contents), - kDisableGlobalSequenceNumber); - BlockIter iter; - properties_block.NewIterator(BytewiseComparator(), &iter); + Block properties_block(std::move(block_contents)); + std::unique_ptr iter( + properties_block.NewIterator(BytewiseComparator())); auto new_table_properties = new TableProperties(); // All pre-defined properties of type uint64_t @@ -201,24 +200,21 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file, }; std::string last_key; - for (iter.SeekToFirst(); iter.Valid(); iter.Next()) { - s = iter.status(); + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + s = iter->status(); if (!s.ok()) { break; } - auto key = iter.key().ToString(); + auto key = iter->key().ToString(); // properties block is strictly sorted with no duplicate key. assert(last_key.empty() || BytewiseComparator()->Compare(key, last_key) > 0); last_key = key; - auto raw_val = iter.value(); + auto raw_val = iter->value(); auto pos = predefined_uint64_properties.find(key); - new_table_properties->properties_offsets.insert( - {key, handle.offset() + iter.ValueOffset()}); - if (pos != predefined_uint64_properties.end()) { // handle predefined rocksdb properties uint64_t val; @@ -281,8 +277,7 @@ Status ReadTableProperties(RandomAccessFileReader* file, uint64_t file_size, if (!s.ok()) { return s; } - Block metaindex_block(std::move(metaindex_contents), - kDisableGlobalSequenceNumber); + Block metaindex_block(std::move(metaindex_contents)); std::unique_ptr meta_iter( metaindex_block.NewIterator(BytewiseComparator())); @@ -336,8 +331,7 @@ Status FindMetaBlock(RandomAccessFileReader* file, uint64_t file_size, if (!s.ok()) { return s; } - Block metaindex_block(std::move(metaindex_contents), - kDisableGlobalSequenceNumber); + Block metaindex_block(std::move(metaindex_contents)); std::unique_ptr meta_iter; meta_iter.reset(metaindex_block.NewIterator(BytewiseComparator())); @@ -370,8 +364,7 @@ Status ReadMetaBlock(RandomAccessFileReader* file, uint64_t file_size, } // Finding metablock - Block metaindex_block(std::move(metaindex_contents), - kDisableGlobalSequenceNumber); + Block metaindex_block(std::move(metaindex_contents)); std::unique_ptr meta_iter; meta_iter.reset(metaindex_block.NewIterator(BytewiseComparator())); diff --git a/table/sst_file_writer.cc b/table/sst_file_writer.cc index 413d9b8b4..b6450431c 100644 --- a/table/sst_file_writer.cc +++ b/table/sst_file_writer.cc @@ -9,15 +9,66 @@ #include "db/dbformat.h" #include "rocksdb/table.h" #include "table/block_based_table_builder.h" -#include "table/sst_file_writer_collectors.h" #include "util/file_reader_writer.h" +#include "util/string_util.h" namespace rocksdb { const std::string ExternalSstFilePropertyNames::kVersion = "rocksdb.external_sst_file.version"; -const std::string ExternalSstFilePropertyNames::kGlobalSeqno = - "rocksdb.external_sst_file.global_seqno"; + +// PropertiesCollector used to add properties specific to tables +// generated by SstFileWriter +class SstFileWriter::SstFileWriterPropertiesCollector + : public IntTblPropCollector { + public: + explicit SstFileWriterPropertiesCollector(int32_t version) + : version_(version) {} + + virtual Status InternalAdd(const Slice& key, const Slice& value, + uint64_t file_size) override { + // Intentionally left blank. Have no interest in collecting stats for + // individual key/value pairs. + return Status::OK(); + } + + virtual Status Finish(UserCollectedProperties* properties) override { + std::string version_val; + PutFixed32(&version_val, static_cast(version_)); + properties->insert({ExternalSstFilePropertyNames::kVersion, version_val}); + return Status::OK(); + } + + virtual const char* Name() const override { + return "SstFileWriterPropertiesCollector"; + } + + virtual UserCollectedProperties GetReadableProperties() const override { + return {{ExternalSstFilePropertyNames::kVersion, ToString(version_)}}; + } + + private: + int32_t version_; +}; + +class SstFileWriter::SstFileWriterPropertiesCollectorFactory + : public IntTblPropCollectorFactory { + public: + explicit SstFileWriterPropertiesCollectorFactory(int32_t version) + : version_(version) {} + + virtual IntTblPropCollector* CreateIntTblPropCollector( + uint32_t column_family_id) override { + return new SstFileWriterPropertiesCollector(version_); + } + + virtual const char* Name() const override { + return "SstFileWriterPropertiesCollector"; + } + + private: + int32_t version_; +}; struct SstFileWriter::Rep { Rep(const EnvOptions& _env_options, const Options& options, @@ -69,8 +120,7 @@ Status SstFileWriter::Open(const std::string& file_path) { // SstFileWriter properties collector to add SstFileWriter version. int_tbl_prop_collector_factories.emplace_back( - new SstFileWriterPropertiesCollectorFactory(2 /* version */, - 0 /* global_seqno*/)); + new SstFileWriterPropertiesCollectorFactory(1 /* version */)); // User collector factories auto user_collector_factories = @@ -88,9 +138,6 @@ Status SstFileWriter::Open(const std::string& file_path) { r->column_family_name, unknown_level); r->file_writer.reset( new WritableFileWriter(std::move(sst_file), r->env_options)); - - // TODO(tec) : If table_factory is using compressed block cache, we will - // be adding the external sst file blocks into it, which is wasteful. r->builder.reset(r->ioptions.table_factory->NewTableBuilder( table_builder_options, TablePropertiesCollectorFactory::Context::kUnknownColumnFamily, @@ -100,7 +147,7 @@ Status SstFileWriter::Open(const std::string& file_path) { r->file_info.file_size = 0; r->file_info.num_entries = 0; r->file_info.sequence_number = 0; - r->file_info.version = 2; + r->file_info.version = 1; return s; } @@ -125,7 +172,6 @@ Status SstFileWriter::Add(const Slice& user_key, const Slice& value) { r->file_info.largest_key.assign(user_key.data(), user_key.size()); r->file_info.file_size = r->builder->FileSize(); - // TODO(tec) : For external SST files we could omit the seqno and type. r->ikey.Set(user_key, 0 /* Sequence Number */, ValueType::kTypeValue /* Put */); r->builder->Add(r->ikey.Encode(), value); diff --git a/table/sst_file_writer_collectors.h b/table/sst_file_writer_collectors.h deleted file mode 100644 index 7375f5e64..000000000 --- a/table/sst_file_writer_collectors.h +++ /dev/null @@ -1,84 +0,0 @@ -// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. -// This source code is licensed under the BSD-style license found in the -// LICENSE file in the root directory of this source tree. An additional grant -// of patent rights can be found in the PATENTS file in the same directory. - -#pragma once -#include -#include "rocksdb/types.h" -#include "util/string_util.h" - -namespace rocksdb { - -// Table Properties that are specific to tables created by SstFileWriter. -struct ExternalSstFilePropertyNames { - // value of this property is a fixed uint32 number. - static const std::string kVersion; - // value of this property is a fixed uint64 number. - static const std::string kGlobalSeqno; -}; - -// PropertiesCollector used to add properties specific to tables -// generated by SstFileWriter -class SstFileWriterPropertiesCollector : public IntTblPropCollector { - public: - explicit SstFileWriterPropertiesCollector(int32_t version, - SequenceNumber global_seqno) - : version_(version), global_seqno_(global_seqno) {} - - virtual Status InternalAdd(const Slice& key, const Slice& value, - uint64_t file_size) override { - // Intentionally left blank. Have no interest in collecting stats for - // individual key/value pairs. - return Status::OK(); - } - - virtual Status Finish(UserCollectedProperties* properties) override { - // File version - std::string version_val; - PutFixed32(&version_val, static_cast(version_)); - properties->insert({ExternalSstFilePropertyNames::kVersion, version_val}); - - // Global Sequence number - std::string seqno_val; - PutFixed64(&seqno_val, static_cast(global_seqno_)); - properties->insert({ExternalSstFilePropertyNames::kGlobalSeqno, seqno_val}); - - return Status::OK(); - } - - virtual const char* Name() const override { - return "SstFileWriterPropertiesCollector"; - } - - virtual UserCollectedProperties GetReadableProperties() const override { - return {{ExternalSstFilePropertyNames::kVersion, ToString(version_)}}; - } - - private: - int32_t version_; - SequenceNumber global_seqno_; -}; - -class SstFileWriterPropertiesCollectorFactory - : public IntTblPropCollectorFactory { - public: - explicit SstFileWriterPropertiesCollectorFactory(int32_t version, - SequenceNumber global_seqno) - : version_(version), global_seqno_(global_seqno) {} - - virtual IntTblPropCollector* CreateIntTblPropCollector( - uint32_t column_family_id) override { - return new SstFileWriterPropertiesCollector(version_, global_seqno_); - } - - virtual const char* Name() const override { - return "SstFileWriterPropertiesCollector"; - } - - private: - int32_t version_; - SequenceNumber global_seqno_; -}; - -} // namespace rocksdb diff --git a/table/table_test.cc b/table/table_test.cc index 5a1906f42..96c568970 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -41,7 +41,6 @@ #include "table/meta_blocks.h" #include "table/plain_table_factory.h" #include "table/scoped_arena_iterator.h" -#include "table/sst_file_writer_collectors.h" #include "util/compression.h" #include "util/random.h" #include "util/statistics.h" @@ -223,7 +222,7 @@ class BlockConstructor: public Constructor { BlockContents contents; contents.data = data_; contents.cachable = false; - block_ = new Block(std::move(contents), kDisableGlobalSequenceNumber); + block_ = new Block(std::move(contents)); return Status::OK(); } virtual InternalIterator* NewIterator() const override { @@ -2746,183 +2745,6 @@ TEST_F(PrefixTest, PrefixAndWholeKeyTest) { // rocksdb still works. } -TEST_F(BlockBasedTableTest, TableWithGlobalSeqno) { - BlockBasedTableOptions bbto; - test::StringSink* sink = new test::StringSink(); - unique_ptr file_writer(test::GetWritableFileWriter(sink)); - Options options; - options.table_factory.reset(NewBlockBasedTableFactory(bbto)); - const ImmutableCFOptions ioptions(options); - InternalKeyComparator ikc(options.comparator); - std::vector> - int_tbl_prop_collector_factories; - int_tbl_prop_collector_factories.emplace_back( - new SstFileWriterPropertiesCollectorFactory(2 /* version */, - 0 /* global_seqno*/)); - std::string column_family_name; - std::unique_ptr builder(options.table_factory->NewTableBuilder( - TableBuilderOptions(ioptions, ikc, &int_tbl_prop_collector_factories, - kNoCompression, CompressionOptions(), - nullptr /* compression_dict */, - false /* skip_filters */, column_family_name, -1), - TablePropertiesCollectorFactory::Context::kUnknownColumnFamily, - file_writer.get())); - - for (char c = 'a'; c <= 'z'; ++c) { - std::string key(8, c); - std::string value = key; - InternalKey ik(key, 0, kTypeValue); - - builder->Add(ik.Encode(), value); - } - ASSERT_OK(builder->Finish()); - file_writer->Flush(); - - test::RandomRWStringSink ss_rw(sink); - uint32_t version; - uint64_t global_seqno; - uint64_t global_seqno_offset; - - // Helper function to get version, global_seqno, global_seqno_offset - std::function GetVersionAndGlobalSeqno = [&]() { - unique_ptr file_reader( - test::GetRandomAccessFileReader( - new test::StringSource(ss_rw.contents(), 73342, true))); - - TableProperties* props = nullptr; - ASSERT_OK(ReadTableProperties(file_reader.get(), ss_rw.contents().size(), - kBlockBasedTableMagicNumber, ioptions, - &props)); - - UserCollectedProperties user_props = props->user_collected_properties; - version = DecodeFixed32( - user_props[ExternalSstFilePropertyNames::kVersion].c_str()); - global_seqno = DecodeFixed64( - user_props[ExternalSstFilePropertyNames::kGlobalSeqno].c_str()); - global_seqno_offset = - props->properties_offsets[ExternalSstFilePropertyNames::kGlobalSeqno]; - - delete props; - }; - - // Helper function to update the value of the global seqno in the file - std::function SetGlobalSeqno = [&](uint64_t val) { - std::string new_global_seqno; - PutFixed64(&new_global_seqno, val); - - ASSERT_OK(ss_rw.Write(global_seqno_offset, new_global_seqno)); - }; - - // Helper function to get the contents of the table InternalIterator - unique_ptr table_reader; - std::function GetTableInternalIter = [&]() { - unique_ptr file_reader( - test::GetRandomAccessFileReader( - new test::StringSource(ss_rw.contents(), 73342, true))); - - options.table_factory->NewTableReader( - TableReaderOptions(ioptions, EnvOptions(), ikc), std::move(file_reader), - ss_rw.contents().size(), &table_reader); - - return table_reader->NewIterator(ReadOptions()); - }; - - GetVersionAndGlobalSeqno(); - ASSERT_EQ(2, version); - ASSERT_EQ(0, global_seqno); - - InternalIterator* iter = GetTableInternalIter(); - char current_c = 'a'; - for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { - ParsedInternalKey pik; - ASSERT_TRUE(ParseInternalKey(iter->key(), &pik)); - - ASSERT_EQ(pik.type, ValueType::kTypeValue); - ASSERT_EQ(pik.sequence, 0); - ASSERT_EQ(pik.user_key, iter->value()); - ASSERT_EQ(pik.user_key.ToString(), std::string(8, current_c)); - current_c++; - } - ASSERT_EQ(current_c, 'z' + 1); - delete iter; - - // Update global sequence number to 10 - SetGlobalSeqno(10); - GetVersionAndGlobalSeqno(); - ASSERT_EQ(2, version); - ASSERT_EQ(10, global_seqno); - - iter = GetTableInternalIter(); - current_c = 'a'; - for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { - ParsedInternalKey pik; - ASSERT_TRUE(ParseInternalKey(iter->key(), &pik)); - - ASSERT_EQ(pik.type, ValueType::kTypeValue); - ASSERT_EQ(pik.sequence, 10); - ASSERT_EQ(pik.user_key, iter->value()); - ASSERT_EQ(pik.user_key.ToString(), std::string(8, current_c)); - current_c++; - } - ASSERT_EQ(current_c, 'z' + 1); - - // Verify Seek - for (char c = 'a'; c <= 'z'; c++) { - std::string k = std::string(8, c); - InternalKey ik(k, 10, kValueTypeForSeek); - iter->Seek(ik.Encode()); - ASSERT_TRUE(iter->Valid()); - - ParsedInternalKey pik; - ASSERT_TRUE(ParseInternalKey(iter->key(), &pik)); - - ASSERT_EQ(pik.type, ValueType::kTypeValue); - ASSERT_EQ(pik.sequence, 10); - ASSERT_EQ(pik.user_key.ToString(), k); - ASSERT_EQ(iter->value().ToString(), k); - } - delete iter; - - // Update global sequence number to 3 - SetGlobalSeqno(3); - GetVersionAndGlobalSeqno(); - ASSERT_EQ(2, version); - ASSERT_EQ(3, global_seqno); - - iter = GetTableInternalIter(); - current_c = 'a'; - for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { - ParsedInternalKey pik; - ASSERT_TRUE(ParseInternalKey(iter->key(), &pik)); - - ASSERT_EQ(pik.type, ValueType::kTypeValue); - ASSERT_EQ(pik.sequence, 3); - ASSERT_EQ(pik.user_key, iter->value()); - ASSERT_EQ(pik.user_key.ToString(), std::string(8, current_c)); - current_c++; - } - ASSERT_EQ(current_c, 'z' + 1); - - // Verify Seek - for (char c = 'a'; c <= 'z'; c++) { - std::string k = std::string(8, c); - // seqno=4 is less than 3 so we still should get our key - InternalKey ik(k, 4, kValueTypeForSeek); - iter->Seek(ik.Encode()); - ASSERT_TRUE(iter->Valid()); - - ParsedInternalKey pik; - ASSERT_TRUE(ParseInternalKey(iter->key(), &pik)); - - ASSERT_EQ(pik.type, ValueType::kTypeValue); - ASSERT_EQ(pik.sequence, 3); - ASSERT_EQ(pik.user_key.ToString(), k); - ASSERT_EQ(iter->value().ToString(), k); - } - - delete iter; -} - } // namespace rocksdb int main(int argc, char** argv) { diff --git a/util/delete_scheduler_test.cc b/util/delete_scheduler_test.cc index 845c7ef1d..32f71aacf 100644 --- a/util/delete_scheduler_test.cc +++ b/util/delete_scheduler_test.cc @@ -18,7 +18,6 @@ #include "util/string_util.h" #include "util/sync_point.h" #include "util/testharness.h" -#include "util/testutil.h" namespace rocksdb { @@ -35,11 +34,26 @@ class DeleteSchedulerTest : public testing::Test { rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->LoadDependency({}); rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); - test::DestroyDir(env_, dummy_files_dir_); + DestroyDir(dummy_files_dir_); + } + + void DestroyDir(const std::string& dir) { + if (env_->FileExists(dir).IsNotFound()) { + return; + } + std::vector files_in_dir; + EXPECT_OK(env_->GetChildren(dir, &files_in_dir)); + for (auto& file_in_dir : files_in_dir) { + if (file_in_dir == "." || file_in_dir == "..") { + continue; + } + EXPECT_OK(env_->DeleteFile(dir + "/" + file_in_dir)); + } + EXPECT_OK(env_->DeleteDir(dir)); } void DestroyAndCreateDir(const std::string& dir) { - ASSERT_OK(test::DestroyDir(env_, dir)); + DestroyDir(dir); EXPECT_OK(env_->CreateDir(dir)); } @@ -409,7 +423,7 @@ TEST_F(DeleteSchedulerTest, MoveToTrashError) { // We will delete the trash directory, that mean that DeleteScheduler wont // be able to move files to trash and will delete files them immediately. - ASSERT_OK(test::DestroyDir(env_, trash_dir_)); + DestroyDir(trash_dir_); for (int i = 0; i < 10; i++) { std::string file_name = "data_" + ToString(i) + ".data"; ASSERT_OK(delete_scheduler_->DeleteFile(NewDummyFile(file_name))); diff --git a/util/testutil.h b/util/testutil.h index dd3a07451..fb1a26c62 100644 --- a/util/testutil.h +++ b/util/testutil.h @@ -240,43 +240,6 @@ class StringSink: public WritableFile { size_t last_flush_; }; -// A wrapper around a StringSink to give it a RandomRWFile interface -class RandomRWStringSink : public RandomRWFile { - public: - explicit RandomRWStringSink(StringSink* ss) : ss_(ss) {} - - Status Write(uint64_t offset, const Slice& data) { - if (offset + data.size() > ss_->contents_.size()) { - ss_->contents_.resize(offset + data.size(), '\0'); - } - - char* pos = const_cast(ss_->contents_.data() + offset); - memcpy(pos, data.data(), data.size()); - return Status::OK(); - } - - Status Read(uint64_t offset, size_t n, Slice* result, char* scratch) const { - *result = Slice(nullptr, 0); - if (offset < ss_->contents_.size()) { - size_t str_res_sz = - std::min(static_cast(ss_->contents_.size() - offset), n); - *result = Slice(ss_->contents_.data() + offset, str_res_sz); - } - return Status::OK(); - } - - Status Flush() { return Status::OK(); } - - Status Sync() { return Status::OK(); } - - Status Close() { return Status::OK(); } - - const std::string& contents() const { return ss_->contents(); } - - private: - StringSink* ss_; -}; - // Like StringSink, this writes into a string. Unlink StringSink, it // has some initial content and overwrites it, just like a recycled // log file. diff --git a/utilities/blob_db/blob_db.cc b/utilities/blob_db/blob_db.cc index d1e116ae1..c843f9510 100644 --- a/utilities/blob_db/blob_db.cc +++ b/utilities/blob_db/blob_db.cc @@ -190,7 +190,7 @@ Status BlobDB::Get(const ReadOptions& options, const Slice& key, if (!s.ok()) { return s; } - Block block(std::move(contents), kDisableGlobalSequenceNumber); + Block block(std::move(contents)); BlockIter bit; InternalIterator* it = block.NewIterator(nullptr, &bit); it->SeekToFirst();