diff --git a/.gitignore b/.gitignore index 0a297b402..cfe2af126 100644 --- a/.gitignore +++ b/.gitignore @@ -32,6 +32,7 @@ build/ ldb manifest_dump sst_dump +column_aware_encoding_exp util/build_version.cc build_tools/VALGRIND_LOGS/ coverage/COVERAGE_REPORT diff --git a/CMakeLists.txt b/CMakeLists.txt index 985694c49..7630aa878 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -282,6 +282,9 @@ set(SOURCES utilities/ttl/db_ttl_impl.cc utilities/write_batch_with_index/write_batch_with_index.cc utilities/write_batch_with_index/write_batch_with_index_internal.cc + utilities/col_buf_encoder.cc + utilities/col_buf_decoder.cc + utilities/column_aware_encoding_util.cc ) # For test util library that is build only in DEBUG mode @@ -327,6 +330,7 @@ set(APPS tools/dump/rocksdb_undump.cc util/cache_bench.cc utilities/persistent_cache/hash_table_bench.cc + utilities/column_aware_encoding_exp.cc ) set(C_TESTS db/c_test.c) @@ -442,6 +446,7 @@ set(TESTS utilities/transactions/transaction_test.cc utilities/ttl/ttl_test.cc utilities/write_batch_with_index/write_batch_with_index_test.cc + utilities/column_aware_encoding_test.cc ) set(EXES ${APPS}) diff --git a/Makefile b/Makefile index 24e1cb087..a1ffa87de 100644 --- a/Makefile +++ b/Makefile @@ -268,6 +268,8 @@ VALGRIND_OPTS = --error-exitcode=$(VALGRIND_ERROR) --leak-check=full BENCHTOOLOBJECTS = $(BENCH_LIB_SOURCES:.cc=.o) $(LIBOBJECTS) $(TESTUTIL) +EXPOBJECTS = $(EXP_LIB_SOURCES:.cc=.o) $(LIBOBJECTS) $(TESTUTIL) + TESTS = \ db_test \ db_test2 \ @@ -367,6 +369,7 @@ TESTS = \ compaction_job_test \ thread_list_test \ sst_dump_test \ + column_aware_encoding_test \ compact_files_test \ perf_context_test \ optimistic_transaction_test \ @@ -416,7 +419,7 @@ TEST_LIBS = \ librocksdb_env_basic_test.a # TODO: add back forward_iterator_bench, after making it build in all environemnts. -BENCHMARKS = db_bench table_reader_bench cache_bench memtablerep_bench +BENCHMARKS = db_bench table_reader_bench cache_bench memtablerep_bench column_aware_encoding_exp # if user didn't config LIBNAME, set the default ifeq ($(LIBNAME),) @@ -1176,6 +1179,9 @@ event_logger_test: util/event_logger_test.o $(LIBOBJECTS) $(TESTHARNESS) sst_dump_test: tools/sst_dump_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +column_aware_encoding_test: utilities/column_aware_encoding_test.o $(TESTHARNESS) $(EXPOBJECTS) + $(AM_LINK) + optimistic_transaction_test: utilities/transactions/optimistic_transaction_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) @@ -1206,6 +1212,9 @@ transaction_test: utilities/transactions/transaction_test.o $(LIBOBJECTS) $(TEST sst_dump: tools/sst_dump.o $(LIBOBJECTS) $(AM_LINK) +column_aware_encoding_exp: utilities/column_aware_encoding_exp.o $(EXPOBJECTS) + $(AM_LINK) + repair_test: db/repair_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) @@ -1440,7 +1449,7 @@ endif # Source files dependencies detection # --------------------------------------------------------------------------- -all_sources = $(LIB_SOURCES) $(MAIN_SOURCES) $(MOCK_LIB_SOURCES) $(TOOL_LIB_SOURCES) $(BENCH_LIB_SOURCES) $(TEST_LIB_SOURCES) +all_sources = $(LIB_SOURCES) $(MAIN_SOURCES) $(MOCK_LIB_SOURCES) $(TOOL_LIB_SOURCES) $(BENCH_LIB_SOURCES) $(TEST_LIB_SOURCES) $(EXP_LIB_SOURCES) DEPFILES = $(all_sources:.cc=.d) # Add proper dependency support so changing a .h file forces a .cc file to diff --git a/src.mk b/src.mk index 87dd06a83..9e799c512 100644 --- a/src.mk +++ b/src.mk @@ -192,7 +192,12 @@ MOCK_LIB_SOURCES = \ util/fault_injection_test_env.cc BENCH_LIB_SOURCES = \ - tools/db_bench_tool.cc + tools/db_bench_tool.cc \ + +EXP_LIB_SOURCES = \ + utilities/col_buf_encoder.cc \ + utilities/col_buf_decoder.cc \ + utilities/column_aware_encoding_util.cc TEST_LIB_SOURCES = \ util/testharness.cc \ @@ -296,6 +301,7 @@ MAIN_SOURCES = \ utilities/transactions/transaction_test.cc \ utilities/ttl/ttl_test.cc \ utilities/write_batch_with_index/write_batch_with_index_test.cc \ + utilities/column_aware_encoding_test.cc \ util/iostats_context_test.cc \ util/log_write_bench.cc \ util/mock_env_test.cc \ @@ -304,7 +310,8 @@ MAIN_SOURCES = \ util/rate_limiter_test.cc \ util/slice_transform_test.cc \ util/thread_list_test.cc \ - util/thread_local_test.cc + util/thread_local_test.cc \ + utilities/column_aware_encoding_exp.cc JNI_NATIVE_SOURCES = \ java/rocksjni/backupenginejni.cc \ diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index 21720bdb7..c4fddb737 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -312,6 +312,8 @@ bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) { return compressed_size < raw_size - (raw_size / 8u); } +} // namespace + // format_version is the block format as defined in include/rocksdb/table.h Slice CompressBlock(const Slice& raw, const CompressionOptions& compression_options, @@ -391,8 +393,6 @@ Slice CompressBlock(const Slice& raw, return raw; } -} // namespace - // kBlockBasedTableMagicNumber was picked by running // echo rocksdb.table.block_based | sha1sum // and taking the leading 64 bits. diff --git a/table/block_based_table_builder.h b/table/block_based_table_builder.h index 8172c238e..d1d488653 100644 --- a/table/block_based_table_builder.h +++ b/table/block_based_table_builder.h @@ -117,4 +117,10 @@ class BlockBasedTableBuilder : public TableBuilder { void operator=(const BlockBasedTableBuilder&) = delete; }; +Slice CompressBlock(const Slice& raw, + const CompressionOptions& compression_options, + CompressionType* type, uint32_t format_version, + const Slice& compression_dict, + std::string* compressed_output); + } // namespace rocksdb diff --git a/table/block_based_table_reader.cc b/table/block_based_table_reader.cc index bcb803217..5a1e8ebd3 100644 --- a/table/block_based_table_reader.cc +++ b/table/block_based_table_reader.cc @@ -10,6 +10,7 @@ #include #include +#include #include "db/dbformat.h" #include "db/pinned_iterators_manager.h" @@ -1667,6 +1668,56 @@ bool BlockBasedTable::TEST_index_reader_preloaded() const { return rep_->index_reader != nullptr; } +Status BlockBasedTable::GetKVPairsFromDataBlocks( + std::vector* kv_pair_blocks) { + std::unique_ptr blockhandles_iter( + NewIndexIterator(ReadOptions())); + + Status s = blockhandles_iter->status(); + if (!s.ok()) { + // Cannot read Index Block + return s; + } + + for (blockhandles_iter->SeekToFirst(); blockhandles_iter->Valid(); + blockhandles_iter->Next()) { + s = blockhandles_iter->status(); + + if (!s.ok()) { + break; + } + + std::unique_ptr datablock_iter; + datablock_iter.reset( + NewDataBlockIterator(rep_, ReadOptions(), blockhandles_iter->value())); + s = datablock_iter->status(); + + if (!s.ok()) { + // Error reading the block - Skipped + continue; + } + + KVPairBlock kv_pair_block; + for (datablock_iter->SeekToFirst(); datablock_iter->Valid(); + datablock_iter->Next()) { + s = datablock_iter->status(); + if (!s.ok()) { + // Error reading the block - Skipped + break; + } + const Slice& key = datablock_iter->key(); + const Slice& value = datablock_iter->value(); + std::string key_copy = std::string(key.data(), key.size()); + std::string value_copy = std::string(value.data(), value.size()); + + kv_pair_block.push_back( + std::make_pair(std::move(key_copy), std::move(value_copy))); + } + kv_pair_blocks->push_back(std::move(kv_pair_block)); + } + return Status::OK(); +} + Status BlockBasedTable::DumpTable(WritableFile* out_file) { // Output Footer out_file->Append( diff --git a/table/block_based_table_reader.h b/table/block_based_table_reader.h index 9c8b1b114..f8c6601bf 100644 --- a/table/block_based_table_reader.h +++ b/table/block_based_table_reader.h @@ -11,8 +11,9 @@ #include #include -#include #include +#include +#include #include "rocksdb/options.h" #include "rocksdb/persistent_cache.h" @@ -48,6 +49,8 @@ class InternalIterator; using std::unique_ptr; +typedef std::vector> KVPairBlock; + // A Table is a sorted map from strings to strings. Tables are // immutable and persistent. A Table may be safely accessed from // multiple threads without external synchronization. @@ -138,6 +141,10 @@ class BlockBasedTable : public TableReader { size_t cache_key_prefix_size, const BlockHandle& handle, char* cache_key); + // Retrieve all key value pairs from data blocks in the table. + // The key retrieved are internal keys. + Status GetKVPairsFromDataBlocks(std::vector* kv_pair_blocks); + private: template struct CachableEntry; diff --git a/utilities/col_buf_decoder.cc b/utilities/col_buf_decoder.cc new file mode 100644 index 000000000..c2f0bd861 --- /dev/null +++ b/utilities/col_buf_decoder.cc @@ -0,0 +1,240 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "utilities/col_buf_decoder.h" +#include +#include +#include "port/port.h" + +namespace rocksdb { + +ColBufDecoder::~ColBufDecoder() {} + +namespace { + +inline uint64_t EncodeFixed64WithEndian(uint64_t val, bool big_endian) { + if (big_endian && port::kLittleEndian) { + val = le64toh(val); + val = htobe64(val); + } else if (!big_endian && !port::kLittleEndian) { + val = be64toh(val); + val = htole64(val); + } + return val; +} + +} // namespace +ColBufDecoder* ColBufDecoder::NewColBufDecoder( + const ColDeclaration& col_declaration) { + if (col_declaration.col_type == "FixedLength") { + return new FixedLengthColBufDecoder( + col_declaration.size, col_declaration.col_compression_type, + col_declaration.nullable, col_declaration.big_endian); + } else if (col_declaration.col_type == "VariableLength") { + return new VariableLengthColBufDecoder(); + } else if (col_declaration.col_type == "VariableChunk") { + return new VariableChunkColBufDecoder(col_declaration.col_compression_type); + } else if (col_declaration.col_type == "LongFixedLength") { + return new LongFixedLengthColBufDecoder(col_declaration.size, + col_declaration.nullable); + } + // Unrecognized column type + return nullptr; +} + +namespace { + +void ReadVarint64(const char** src_ptr, uint64_t* val_ptr) { + const char* q = GetVarint64Ptr(*src_ptr, *src_ptr + 10, val_ptr); + assert(q != nullptr); + *src_ptr = q; +} +} // namespace + +size_t FixedLengthColBufDecoder::Init(const char* src) { + remain_runs_ = 0; + last_val_ = 0; + // Dictionary initialization + dict_vec_.clear(); + const char* orig_src = src; + if (col_compression_type_ == kColDict || + col_compression_type_ == kColRleDict) { + const char* q; + size_t dict_size; + // Bypass limit + q = GetVarint64Ptr(src, src + 10, &dict_size); + assert(q != nullptr); + src = q; + + uint64_t dict_key; + for (size_t i = 0; i < dict_size; ++i) { + // Bypass limit + ReadVarint64(&src, &dict_key); + + dict_key = EncodeFixed64WithEndian(dict_key, big_endian_); + dict_vec_.push_back(dict_key); + } + } + return src - orig_src; +} + +size_t FixedLengthColBufDecoder::Decode(const char* src, char** dest) { + uint64_t read_val = 0; + const char* orig_src = src; + const char* src_limit = src + 20; + if (nullable_) { + bool not_null; + not_null = *src; + src += 1; + if (!not_null) { + return 1; + } + } + if (IsRunLength(col_compression_type_)) { + if (remain_runs_ == 0) { + const char* q; + run_val_ = 0; + if (col_compression_type_ == kColRle) { + memcpy(&run_val_, src, size_); + src += size_; + } else { + q = GetVarint64Ptr(src, src_limit, &run_val_); + assert(q != nullptr); + src = q; + } + + q = GetVarint64Ptr(src, src_limit, &remain_runs_); + assert(q != nullptr); + src = q; + + if (col_compression_type_ != kColRleDeltaVarint && + col_compression_type_ != kColRleDict) { + run_val_ = EncodeFixed64WithEndian(run_val_, big_endian_); + } + } + read_val = run_val_; + } else { + if (col_compression_type_ == kColNoCompression) { + memcpy(&read_val, src, size_); + src += size_; + } else { + // Assume a column does not exceed 8 bytes here + const char* q = GetVarint64Ptr(src, src_limit, &read_val); + assert(q != nullptr); + src = q; + } + if (col_compression_type_ != kColDeltaVarint && + col_compression_type_ != kColDict) { + read_val = EncodeFixed64WithEndian(read_val, big_endian_); + } + } + + uint64_t write_val = read_val; + if (col_compression_type_ == kColDeltaVarint || + col_compression_type_ == kColRleDeltaVarint) { + // does not support 64 bit + + uint64_t mask = (write_val & 1) ? (~0UL) : 0; + int64_t delta = (write_val >> 1) ^ mask; + write_val = last_val_ + delta; + + uint64_t tmp = write_val; + write_val = EncodeFixed64WithEndian(write_val, big_endian_); + last_val_ = tmp; + } else if (col_compression_type_ == kColRleDict || + col_compression_type_ == kColDict) { + size_t dict_val = read_val; + assert(dict_val < dict_vec_.size()); + write_val = dict_vec_[dict_val]; + } + + // dest->append(reinterpret_cast(&write_val), size_); + memcpy(*dest, reinterpret_cast(&write_val), size_); + *dest += size_; + if (IsRunLength(col_compression_type_)) { + --remain_runs_; + } + return src - orig_src; +} + +size_t LongFixedLengthColBufDecoder::Decode(const char* src, char** dest) { + if (nullable_) { + bool not_null; + not_null = *src; + src += 1; + if (!not_null) { + return 1; + } + } + memcpy(*dest, src, size_); + *dest += size_; + return size_ + 1; +} + +size_t VariableLengthColBufDecoder::Decode(const char* src, char** dest) { + uint8_t len; + len = *src; + memcpy(dest, reinterpret_cast(&len), 1); + *dest += 1; + src += 1; + memcpy(*dest, src, len); + *dest += len; + return len + 1; +} + +size_t VariableChunkColBufDecoder::Init(const char* src) { + // Dictionary initialization + dict_vec_.clear(); + const char* orig_src = src; + if (col_compression_type_ == kColDict) { + const char* q; + size_t dict_size; + // Bypass limit + q = GetVarint64Ptr(src, src + 10, &dict_size); + assert(q != nullptr); + src = q; + + uint64_t dict_key; + for (size_t i = 0; i < dict_size; ++i) { + // Bypass limit + ReadVarint64(&src, &dict_key); + dict_vec_.push_back(dict_key); + } + } + return src - orig_src; +} + +size_t VariableChunkColBufDecoder::Decode(const char* src, char** dest) { + const char* orig_src = src; + uint64_t size = 0; + ReadVarint64(&src, &size); + int64_t full_chunks = size / 8; + uint64_t chunk_buf; + size_t chunk_size = 8; + for (int64_t i = 0; i < full_chunks + 1; ++i) { + chunk_buf = 0; + if (i == full_chunks) { + chunk_size = size % 8; + } + if (col_compression_type_ == kColDict) { + size_t dict_val; + ReadVarint64(&src, &dict_val); + assert(dict_val < dict_vec_.size()); + chunk_buf = dict_vec_[dict_val]; + } else { + memcpy(&chunk_buf, src, chunk_size); + src += chunk_size; + } + memcpy(*dest, reinterpret_cast(&chunk_buf), 8); + *dest += 8; + uint8_t mask = 0xFF - 8 + chunk_size; + memcpy(*dest, reinterpret_cast(&mask), 1); + *dest += 1; + } + + return src - orig_src; +} + +} // namespace rocksdb diff --git a/utilities/col_buf_decoder.h b/utilities/col_buf_decoder.h new file mode 100644 index 000000000..1351ecbd0 --- /dev/null +++ b/utilities/col_buf_decoder.h @@ -0,0 +1,117 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include "util/coding.h" +#include "utilities/col_buf_encoder.h" + +namespace rocksdb { + +struct ColDeclaration; + +// ColBufDecoder is a class to decode column buffers. It can be populated from a +// ColDeclaration. Before starting decoding, a Init() method should be called. +// Each time it takes a column value into Decode() method. +class ColBufDecoder { + public: + virtual ~ColBufDecoder() = 0; + virtual size_t Init(const char* src) { return 0; } + virtual size_t Decode(const char* src, char** dest) = 0; + static ColBufDecoder* NewColBufDecoder(const ColDeclaration& col_declaration); + + protected: + std::string buffer_; + static inline bool IsRunLength(ColCompressionType type) { + return type == kColRle || type == kColRleVarint || + type == kColRleDeltaVarint || type == kColRleDict; + } +}; + +class FixedLengthColBufDecoder : public ColBufDecoder { + public: + explicit FixedLengthColBufDecoder( + size_t size, ColCompressionType col_compression_type = kColNoCompression, + bool nullable = false, bool big_endian = false) + : size_(size), + col_compression_type_(col_compression_type), + nullable_(nullable), + big_endian_(big_endian) {} + + size_t Init(const char* src) override; + size_t Decode(const char* src, char** dest) override; + ~FixedLengthColBufDecoder() {} + + private: + size_t size_; + ColCompressionType col_compression_type_; + bool nullable_; + bool big_endian_; + + // for decoding + std::vector dict_vec_; + uint64_t remain_runs_; + uint64_t run_val_; + uint64_t last_val_; +}; + +class LongFixedLengthColBufDecoder : public ColBufDecoder { + public: + LongFixedLengthColBufDecoder(size_t size, bool nullable) + : size_(size), nullable_(nullable) {} + + size_t Decode(const char* src, char** dest) override; + ~LongFixedLengthColBufDecoder() {} + + private: + size_t size_; + bool nullable_; +}; + +class VariableLengthColBufDecoder : public ColBufDecoder { + public: + size_t Decode(const char* src, char** dest) override; + ~VariableLengthColBufDecoder() {} +}; + +class VariableChunkColBufDecoder : public VariableLengthColBufDecoder { + public: + size_t Init(const char* src) override; + size_t Decode(const char* src, char** dest) override; + explicit VariableChunkColBufDecoder(ColCompressionType col_compression_type) + : col_compression_type_(col_compression_type) {} + VariableChunkColBufDecoder() : col_compression_type_(kColNoCompression) {} + + private: + ColCompressionType col_compression_type_; + std::unordered_map dictionary_; + std::vector dict_vec_; +}; + +struct KVPairColBufDecoders { + std::vector> key_col_bufs; + std::vector> value_col_bufs; + std::unique_ptr value_checksum_buf; + + explicit KVPairColBufDecoders(const KVPairColDeclarations& kvp_cd) { + for (auto kcd : *kvp_cd.key_col_declarations) { + key_col_bufs.emplace_back( + std::move(ColBufDecoder::NewColBufDecoder(kcd))); + } + for (auto vcd : *kvp_cd.value_col_declarations) { + value_col_bufs.emplace_back( + std::move(ColBufDecoder::NewColBufDecoder(vcd))); + } + value_checksum_buf.reset( + ColBufDecoder::NewColBufDecoder(*kvp_cd.value_checksum_declaration)); + } +}; +} // namespace rocksdb diff --git a/utilities/col_buf_encoder.cc b/utilities/col_buf_encoder.cc new file mode 100644 index 000000000..64d7a43bf --- /dev/null +++ b/utilities/col_buf_encoder.cc @@ -0,0 +1,213 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "utilities/col_buf_encoder.h" +#include +#include +#include "port/port.h" + +namespace rocksdb { + +ColBufEncoder::~ColBufEncoder() {} + +namespace { + +inline uint64_t DecodeFixed64WithEndian(uint64_t val, bool big_endian) { + if (big_endian && port::kLittleEndian) { + val = be64toh(val); + val = htole64(val); + } else if (!big_endian && !port::kLittleEndian) { + val = le64toh(val); + val = htobe64(val); + } + return val; +} + +} // namespace + +const std::string &ColBufEncoder::GetData() { return buffer_; } + +ColBufEncoder *ColBufEncoder::NewColBufEncoder( + const ColDeclaration &col_declaration) { + if (col_declaration.col_type == "FixedLength") { + return new FixedLengthColBufEncoder( + col_declaration.size, col_declaration.col_compression_type, + col_declaration.nullable, col_declaration.big_endian); + } else if (col_declaration.col_type == "VariableLength") { + return new VariableLengthColBufEncoder(); + } else if (col_declaration.col_type == "VariableChunk") { + return new VariableChunkColBufEncoder(col_declaration.col_compression_type); + } else if (col_declaration.col_type == "LongFixedLength") { + return new LongFixedLengthColBufEncoder(col_declaration.size, + col_declaration.nullable); + } + // Unrecognized column type + return nullptr; +} + +size_t FixedLengthColBufEncoder::Append(const char *buf) { + if (nullable_) { + if (buf == nullptr) { + buffer_.append(1, 0); + return 0; + } else { + buffer_.append(1, 1); + } + } + uint64_t read_val = 0; + memcpy(&read_val, buf, size_); + if (big_endian_) { + read_val = DecodeFixed64WithEndian(read_val, big_endian_); + } + + // Determine write value + uint64_t write_val = read_val; + if (col_compression_type_ == kColDeltaVarint || + col_compression_type_ == kColRleDeltaVarint) { + int64_t delta = read_val - last_val_; + // Encode signed delta value + delta = (delta << 1) ^ (delta >> 63); + write_val = delta; + last_val_ = read_val; + } else if (col_compression_type_ == kColDict || + col_compression_type_ == kColRleDict) { + auto iter = dictionary_.find(read_val); + uint64_t dict_val; + if (iter == dictionary_.end()) { + // Add new entry to dictionary + dict_val = dictionary_.size(); + dictionary_.insert(std::make_pair(read_val, dict_val)); + dict_vec_.push_back(read_val); + } else { + dict_val = iter->second; + } + write_val = dict_val; + } + + // Write into buffer + if (IsRunLength(col_compression_type_)) { + if (run_length_ == -1) { + // First element + run_val_ = write_val; + run_length_ = 1; + } else if (write_val != run_val_) { + // End of run + // Write run value + if (col_compression_type_ == kColRle) { + buffer_.append(reinterpret_cast(&run_val_), size_); + } else { + PutVarint64(&buffer_, run_val_); + } + // Write run length + PutVarint64(&buffer_, run_length_); + run_val_ = write_val; + run_length_ = 1; + } else { + run_length_++; + } + } else { // non run-length encodings + if (col_compression_type_ == kColNoCompression) { + buffer_.append(reinterpret_cast(&write_val), size_); + } else { + PutVarint64(&buffer_, write_val); + } + } + return size_; +} + +void FixedLengthColBufEncoder::Finish() { + if (col_compression_type_ == kColDict || + col_compression_type_ == kColRleDict) { + std::string header; + PutVarint64(&header, dict_vec_.size()); + // Put dictionary in the header + for (auto item : dict_vec_) { + PutVarint64(&header, item); + } + buffer_ = header + buffer_; + } + if (IsRunLength(col_compression_type_)) { + // Finish last run value + if (col_compression_type_ == kColRle) { + buffer_.append(reinterpret_cast(&run_val_), size_); + } else { + PutVarint64(&buffer_, run_val_); + } + PutVarint64(&buffer_, run_length_); + } +} + +size_t LongFixedLengthColBufEncoder::Append(const char *buf) { + if (nullable_) { + if (buf == nullptr) { + buffer_.append(1, 0); + return 0; + } else { + buffer_.append(1, 1); + } + } + buffer_.append(buf, size_); + return size_; +} + +void LongFixedLengthColBufEncoder::Finish() {} + +size_t VariableLengthColBufEncoder::Append(const char *buf) { + uint8_t length = 0; + length = *buf; + buffer_.append(buf, 1); + buf += 1; + buffer_.append(buf, length); + return length + 1; +} + +void VariableLengthColBufEncoder::Finish() {} + +size_t VariableChunkColBufEncoder::Append(const char *buf) { + const char *orig_buf = buf; + uint8_t mark = 0xFF; + size_t length = 0; + std::string tmp_buffer; + while (mark == 0xFF) { + uint64_t val; + memcpy(&val, buf, 8); + buf += 8; + mark = *buf; + buf += 1; + int8_t chunk_size = 8 - (0xFF - mark); + if (col_compression_type_ == kColDict) { + auto iter = dictionary_.find(val); + size_t dict_val; + if (iter == dictionary_.end()) { + dict_val = dictionary_.size(); + dictionary_.insert(std::make_pair(val, dict_val)); + dict_vec_.push_back(val); + } else { + dict_val = iter->second; + } + PutVarint64(&tmp_buffer, dict_val); + } else { + tmp_buffer.append(reinterpret_cast(&val), chunk_size); + } + length += chunk_size; + } + + PutVarint64(&buffer_, length); + buffer_.append(tmp_buffer); + return buf - orig_buf; +} + +void VariableChunkColBufEncoder::Finish() { + if (col_compression_type_ == kColDict) { + std::string header; + PutVarint64(&header, dict_vec_.size()); + for (auto item : dict_vec_) { + PutVarint64(&header, item); + } + buffer_ = header + buffer_; + } +} + +} // namespace rocksdb diff --git a/utilities/col_buf_encoder.h b/utilities/col_buf_encoder.h new file mode 100644 index 000000000..441943153 --- /dev/null +++ b/utilities/col_buf_encoder.h @@ -0,0 +1,220 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include "util/coding.h" + +namespace rocksdb { + +enum ColCompressionType { + kColNoCompression, + kColRle, + kColVarint, + kColRleVarint, + kColDeltaVarint, + kColRleDeltaVarint, + kColDict, + kColRleDict +}; + +struct ColDeclaration; + +// ColBufEncoder is a class to encode column buffers. It can be populated from a +// ColDeclaration. Each time it takes a column value into Append() method to +// encode the column and store it into an internal buffer. After all rows for +// this column are consumed, a Finish() should be called to add header and +// remaining data. +class ColBufEncoder { + public: + // Read a column, encode data and append into internal buffer. + virtual size_t Append(const char *buf) = 0; + virtual ~ColBufEncoder() = 0; + // Get the internal column buffer. Should only be called after Finish(). + const std::string &GetData(); + // Finish encoding. Add header and remaining data. + virtual void Finish() = 0; + // Populate a ColBufEncoder from ColDeclaration. + static ColBufEncoder *NewColBufEncoder(const ColDeclaration &col_declaration); + + protected: + std::string buffer_; + static inline bool IsRunLength(ColCompressionType type) { + return type == kColRle || type == kColRleVarint || + type == kColRleDeltaVarint || type == kColRleDict; + } +}; + +// Encoder for fixed length column buffer. In fixed length column buffer, the +// size of the column should not exceed 8 bytes. +// The following encodings are supported: +// Varint: Variable length integer. See util/coding.h for more details +// Rle (Run length encoding): encode a sequence of contiguous value as +// [run_value][run_length]. Can be combined with Varint +// Delta: Encode value to its delta with its adjacent entry. Use varint to +// possibly reduce stored bytes. Can be combined with Rle. +// Dictionary: Use a dictionary to record all possible values in the block and +// encode them with an ID started from 0. IDs are encoded as varint. A column +// with dictionary encoding will have a header to store all actual values, +// ordered by their dictionary value, and the data will be replaced by +// dictionary value. Can be combined with Rle. +class FixedLengthColBufEncoder : public ColBufEncoder { + public: + explicit FixedLengthColBufEncoder( + size_t size, ColCompressionType col_compression_type = kColNoCompression, + bool nullable = false, bool big_endian = false) + : size_(size), + col_compression_type_(col_compression_type), + nullable_(nullable), + big_endian_(big_endian), + last_val_(0), + run_length_(-1), + run_val_(0) {} + + size_t Append(const char *buf) override; + void Finish() override; + ~FixedLengthColBufEncoder() {} + + private: + size_t size_; + ColCompressionType col_compression_type_; + // If set as true, the input value can be null (represented as nullptr). When + // nullable is true, use one more byte before actual value to indicate if the + // current value is null. + bool nullable_; + // If set as true, input value will be treated as big endian encoded. + bool big_endian_; + + // for encoding + uint64_t last_val_; + int16_t run_length_; + uint64_t run_val_; + // Map to store dictionary for dictionary encoding + std::unordered_map dictionary_; + // Vector of dictionary keys. + std::vector dict_vec_; +}; + +// Long fixed length column buffer is a variant of fixed length buffer to hold +// fixed length buffer with more than 8 bytes. We do not support any special +// encoding schemes in LongFixedLengthColBufEncoder. +class LongFixedLengthColBufEncoder : public ColBufEncoder { + public: + LongFixedLengthColBufEncoder(size_t size, bool nullable) + : size_(size), nullable_(nullable) {} + size_t Append(const char *buf) override; + void Finish() override; + + ~LongFixedLengthColBufEncoder() {} + + private: + size_t size_; + bool nullable_; +}; + +// Variable length column buffer holds a format of variable length column. In +// this format, a column is composed of one byte length k, followed by data with +// k bytes long data. +class VariableLengthColBufEncoder : public ColBufEncoder { + public: + size_t Append(const char *buf) override; + void Finish() override; + + ~VariableLengthColBufEncoder() {} +}; + +// Variable chunk column buffer holds another format of variable length column. +// In this format, a column contains multiple chunks of data, each of which is +// composed of 8 bytes long data, and one byte as a mask to indicate whether we +// have more data to come. If no more data coming, the mask is set as 0xFF. If +// the chunk is the last chunk and has only k valid bytes, the mask is set as +// 0xFF - (8 - k). +class VariableChunkColBufEncoder : public VariableLengthColBufEncoder { + public: + size_t Append(const char *buf) override; + void Finish() override; + explicit VariableChunkColBufEncoder(ColCompressionType col_compression_type) + : col_compression_type_(col_compression_type) {} + VariableChunkColBufEncoder() : col_compression_type_(kColNoCompression) {} + + private: + ColCompressionType col_compression_type_; + // Map to store dictionary for dictionary encoding + std::unordered_map dictionary_; + // Vector of dictionary keys. + std::vector dict_vec_; +}; + +// ColDeclaration declares a column's type, algorithm of column-aware encoding, +// and other column data like endian and nullability. +struct ColDeclaration { + explicit ColDeclaration( + std::string _col_type, + ColCompressionType _col_compression_type = kColNoCompression, + size_t _size = 0, bool _nullable = false, bool _big_endian = false) + : col_type(_col_type), + col_compression_type(_col_compression_type), + size(_size), + nullable(_nullable), + big_endian(_big_endian) {} + std::string col_type; + ColCompressionType col_compression_type; + size_t size; + bool nullable; + bool big_endian; +}; + +// KVPairColDeclarations is a class to hold column declaration of columns in +// key and value. +struct KVPairColDeclarations { + std::vector *key_col_declarations; + std::vector *value_col_declarations; + ColDeclaration *value_checksum_declaration; + KVPairColDeclarations(std::vector *_key_col_declarations, + std::vector *_value_col_declarations, + ColDeclaration *_value_checksum_declaration) + : key_col_declarations(_key_col_declarations), + value_col_declarations(_value_col_declarations), + value_checksum_declaration(_value_checksum_declaration) {} +}; + +// Similar to KVPairDeclarations, KVPairColBufEncoders is used to hold column +// buffer encoders of all columns in key and value. +struct KVPairColBufEncoders { + std::vector> key_col_bufs; + std::vector> value_col_bufs; + std::unique_ptr value_checksum_buf; + + explicit KVPairColBufEncoders(const KVPairColDeclarations &kvp_cd) { + for (auto kcd : *kvp_cd.key_col_declarations) { + key_col_bufs.emplace_back( + std::move(ColBufEncoder::NewColBufEncoder(kcd))); + } + for (auto vcd : *kvp_cd.value_col_declarations) { + value_col_bufs.emplace_back( + std::move(ColBufEncoder::NewColBufEncoder(vcd))); + } + value_checksum_buf.reset( + ColBufEncoder::NewColBufEncoder(*kvp_cd.value_checksum_declaration)); + } + + // Helper function to call Finish() + void Finish() { + for (auto &col_buf : key_col_bufs) { + col_buf->Finish(); + } + for (auto &col_buf : value_col_bufs) { + col_buf->Finish(); + } + value_checksum_buf->Finish(); + } +}; +} // namespace rocksdb diff --git a/utilities/column_aware_encoding_exp.cc b/utilities/column_aware_encoding_exp.cc new file mode 100644 index 000000000..c1cc1a906 --- /dev/null +++ b/utilities/column_aware_encoding_exp.cc @@ -0,0 +1,171 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +#include + +#ifndef ROCKSDB_LITE +#ifdef GFLAGS + +#include +#include +#include "rocksdb/env.h" +#include "rocksdb/options.h" +#include "table/block_based_table_builder.h" +#include "table/block_based_table_reader.h" +#include "table/format.h" +#include "tools/sst_dump_tool_imp.h" +#include "util/compression.h" +#include "util/stop_watch.h" +#include "utilities/col_buf_encoder.h" +#include "utilities/column_aware_encoding_util.h" + +using GFLAGS::ParseCommandLineFlags; +DEFINE_string(encoded_file, "", "file to store encoded data blocks"); +DEFINE_string(decoded_file, "", + "file to store decoded data blocks after encoding"); +DEFINE_string(format, "col", "Output Format. Can be 'row' or 'col'"); +// TODO(jhli): option `col` should be removed and replaced by general +// column specifications. +DEFINE_string(index_type, "col", "Index type. Can be 'primary' or 'secondary'"); +DEFINE_string(dump_file, "", + "Dump data blocks separated by columns in human-readable format"); +DEFINE_bool(decode, false, "Deocde blocks after they are encoded"); +DEFINE_bool(stat, false, + "Print column distribution statistics. Cannot decode in this mode"); +DEFINE_string(compression_type, "kNoCompression", + "The compression algorithm used to compress data blocks"); + +namespace rocksdb { + +class ColumnAwareEncodingExp { + public: + static void Run(const std::string& sst_file) { + bool decode = FLAGS_decode; + if (FLAGS_decoded_file.size() > 0) { + decode = true; + } + if (FLAGS_stat) { + decode = false; + } + + ColumnAwareEncodingReader reader(sst_file); + std::vector* key_col_declarations; + std::vector* value_col_declarations; + ColDeclaration* value_checksum_declaration; + if (FLAGS_index_type == "primary") { + ColumnAwareEncodingReader::GetColDeclarationsPrimary( + &key_col_declarations, &value_col_declarations, + &value_checksum_declaration); + } else { + ColumnAwareEncodingReader::GetColDeclarationsSecondary( + &key_col_declarations, &value_col_declarations, + &value_checksum_declaration); + } + KVPairColDeclarations kvp_cd(key_col_declarations, value_col_declarations, + value_checksum_declaration); + + if (!FLAGS_dump_file.empty()) { + std::vector kv_pair_blocks; + reader.GetKVPairsFromDataBlocks(&kv_pair_blocks); + reader.DumpDataColumns(FLAGS_dump_file, kvp_cd, kv_pair_blocks); + return; + } + std::unordered_map compressions = { + {"kNoCompression", CompressionType::kNoCompression}, + {"kZlibCompression", CompressionType::kZlibCompression}, + {"kZSTDNotFinalCompression", + CompressionType::kZSTDNotFinalCompression}}; + + // Find Compression + CompressionType compression_type = compressions[FLAGS_compression_type]; + EnvOptions env_options; + if (CompressionTypeSupported(compression_type)) { + fprintf(stdout, "[%s]\n", FLAGS_compression_type.c_str()); + unique_ptr encoded_out_file; + + std::unique_ptr env(NewMemEnv(Env::Default())); + if (!FLAGS_encoded_file.empty()) { + env->NewWritableFile(FLAGS_encoded_file, &encoded_out_file, + env_options); + } + + std::vector kv_pair_blocks; + reader.GetKVPairsFromDataBlocks(&kv_pair_blocks); + + std::vector encoded_blocks; + StopWatchNano sw(env.get(), true); + if (FLAGS_format == "col") { + reader.EncodeBlocks(kvp_cd, encoded_out_file.get(), compression_type, + kv_pair_blocks, &encoded_blocks, FLAGS_stat); + } else { // row format + reader.EncodeBlocksToRowFormat(encoded_out_file.get(), compression_type, + kv_pair_blocks, &encoded_blocks); + } + if (encoded_out_file != nullptr) { + uint64_t size = 0; + env->GetFileSize(FLAGS_encoded_file, &size); + fprintf(stdout, "File size: %lu\n", size); + } + uint64_t encode_time = sw.ElapsedNanosSafe(false /* reset */); + fprintf(stdout, "Encode time:%lu\n", encode_time); + if (decode) { + unique_ptr decoded_out_file; + if (!FLAGS_decoded_file.empty()) { + env->NewWritableFile(FLAGS_decoded_file, &decoded_out_file, + env_options); + } + sw.Start(); + if (FLAGS_format == "col") { + reader.DecodeBlocks(kvp_cd, decoded_out_file.get(), &encoded_blocks); + } else { + reader.DecodeBlocksFromRowFormat(decoded_out_file.get(), + &encoded_blocks); + } + uint64_t decode_time = sw.ElapsedNanosSafe(true /* reset */); + fprintf(stdout, "Decode time:%lu\n", decode_time); + } + } else { + fprintf(stdout, "Unsupported compression type: %s.\n", + FLAGS_compression_type.c_str()); + } + delete key_col_declarations; + delete value_col_declarations; + delete value_checksum_declaration; + } +}; + +} // namespace rocksdb + +int main(int argc, char** argv) { + int arg_idx = ParseCommandLineFlags(&argc, &argv, true); + if (arg_idx >= argc) { + fprintf(stdout, "SST filename required.\n"); + exit(1); + } + std::string sst_file(argv[arg_idx]); + if (FLAGS_format != "row" && FLAGS_format != "col") { + fprintf(stderr, "Format must be 'row' or 'col'\n"); + exit(1); + } + if (FLAGS_index_type != "primary" && FLAGS_index_type != "secondary") { + fprintf(stderr, "Format must be 'primary' or 'secondary'\n"); + exit(1); + } + rocksdb::ColumnAwareEncodingExp::Run(sst_file); + return 0; +} + +#else +int main() { + fprintf(stderr, "Please install gflags to run rocksdb tools\n"); + return 1; +} +#endif // GFLAGS +#else +int main(int argc, char** argv) { + fprintf(stderr, "Not supported in lite mode.\n"); + return 1; +} +#endif // ROCKSDB_LITE diff --git a/utilities/column_aware_encoding_test.cc b/utilities/column_aware_encoding_test.cc new file mode 100644 index 000000000..acfc65013 --- /dev/null +++ b/utilities/column_aware_encoding_test.cc @@ -0,0 +1,193 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +#ifndef ROCKSDB_LITE + +#include +#include "util/testharness.h" +#include "util/testutil.h" +#include "utilities/col_buf_decoder.h" +#include "utilities/col_buf_encoder.h" + +namespace rocksdb { + +class ColumnAwareEncodingTest : public testing::Test { + public: + ColumnAwareEncodingTest() {} + + ~ColumnAwareEncodingTest() {} +}; + +TEST_F(ColumnAwareEncodingTest, NoCompressionEncodeDecode) { + std::unique_ptr col_buf_encoder( + new FixedLengthColBufEncoder(8, kColNoCompression, false, true)); + std::string str_buf; + uint64_t val = 0x0102030405060708; + for (int i = 0; i < 4; ++i) { + str_buf.append(reinterpret_cast(&val), 8); + } + const char* str_buf_ptr = str_buf.c_str(); + for (int i = 0; i < 4; ++i) { + col_buf_encoder->Append(str_buf_ptr); + } + col_buf_encoder->Finish(); + const std::string& encoded_data = col_buf_encoder->GetData(); + ASSERT_EQ(encoded_data.size(), 32); + + const char* encoded_data_ptr = encoded_data.c_str(); + uint64_t encoded_val; + for (int i = 0; i < 4; ++i) { + memcpy(&encoded_val, encoded_data_ptr, 8); + ASSERT_EQ(encoded_val, 0x0807060504030201); + encoded_data_ptr += 8; + } + + std::unique_ptr col_buf_decoder( + new FixedLengthColBufDecoder(8, kColNoCompression, false, true)); + encoded_data_ptr = encoded_data.c_str(); + encoded_data_ptr += col_buf_decoder->Init(encoded_data_ptr); + char* decoded_data = new char[100]; + char* decoded_data_base = decoded_data; + for (int i = 0; i < 4; ++i) { + encoded_data_ptr += + col_buf_decoder->Decode(encoded_data_ptr, &decoded_data); + } + + ASSERT_EQ(4 * 8, decoded_data - decoded_data_base); + decoded_data = decoded_data_base; + for (int i = 0; i < 4; ++i) { + uint64_t decoded_val; + decoded_val = 0; + memcpy(&decoded_val, decoded_data, 8); + ASSERT_EQ(decoded_val, val); + decoded_data += 8; + } + delete[] decoded_data_base; +} + +TEST_F(ColumnAwareEncodingTest, RleEncodeDecode) { + std::unique_ptr col_buf_encoder( + new FixedLengthColBufEncoder(8, kColRle, false, true)); + std::string str_buf; + uint64_t val = 0x0102030405060708; + for (int i = 0; i < 4; ++i) { + str_buf.append(reinterpret_cast(&val), 8); + } + const char* str_buf_ptr = str_buf.c_str(); + for (int i = 0; i < 4; ++i) { + str_buf_ptr += col_buf_encoder->Append(str_buf_ptr); + } + col_buf_encoder->Finish(); + const std::string& encoded_data = col_buf_encoder->GetData(); + ASSERT_EQ(encoded_data.size(), 9); + + const char* encoded_data_ptr = encoded_data.c_str(); + uint64_t encoded_val; + memcpy(&encoded_val, encoded_data_ptr, 8); + ASSERT_EQ(encoded_val, 0x0807060504030201); + + std::unique_ptr col_buf_decoder( + new FixedLengthColBufDecoder(8, kColRle, false, true)); + char* decoded_data = new char[100]; + char* decoded_data_base = decoded_data; + encoded_data_ptr += col_buf_decoder->Init(encoded_data_ptr); + for (int i = 0; i < 4; ++i) { + encoded_data_ptr += + col_buf_decoder->Decode(encoded_data_ptr, &decoded_data); + } + ASSERT_EQ(4 * 8, decoded_data - decoded_data_base); + decoded_data = decoded_data_base; + for (int i = 0; i < 4; ++i) { + uint64_t decoded_val; + decoded_val = 0; + memcpy(&decoded_val, decoded_data, 8); + ASSERT_EQ(decoded_val, val); + decoded_data += 8; + } + delete[] decoded_data_base; +} + +TEST_F(ColumnAwareEncodingTest, DeltaEncodeDecode) { + std::unique_ptr col_buf_encoder( + new FixedLengthColBufEncoder(8, kColDeltaVarint, false, true)); + std::string str_buf; + uint64_t val = 0x0102030405060708; + uint64_t val2 = 0x0202030405060708; + const char* str_buf_ptr; + for (int i = 0; i < 2; ++i) { + str_buf = std::string(reinterpret_cast(&val), 8); + str_buf_ptr = str_buf.c_str(); + col_buf_encoder->Append(str_buf_ptr); + + str_buf = std::string(reinterpret_cast(&val2), 8); + str_buf_ptr = str_buf.c_str(); + col_buf_encoder->Append(str_buf_ptr); + } + col_buf_encoder->Finish(); + const std::string& encoded_data = col_buf_encoder->GetData(); + ASSERT_EQ(encoded_data.size(), 9 + 3); + + std::unique_ptr col_buf_decoder( + new FixedLengthColBufDecoder(8, kColDeltaVarint, false, true)); + char* decoded_data = new char[100]; + char* decoded_data_base = decoded_data; + const char* encoded_data_ptr = encoded_data.c_str(); + encoded_data_ptr += col_buf_decoder->Init(encoded_data_ptr); + for (int i = 0; i < 4; ++i) { + encoded_data_ptr += + col_buf_decoder->Decode(encoded_data_ptr, &decoded_data); + } + ASSERT_EQ(4 * 8, decoded_data - decoded_data_base); + decoded_data = decoded_data_base; + for (int i = 0; i < 2; ++i) { + uint64_t decoded_val; + memcpy(&decoded_val, decoded_data, 8); + ASSERT_EQ(decoded_val, val); + decoded_data += 8; + memcpy(&decoded_val, decoded_data, 8); + ASSERT_EQ(decoded_val, val2); + decoded_data += 8; + } + delete[] decoded_data_base; +} + +TEST_F(ColumnAwareEncodingTest, ChunkBufEncodeDecode) { + std::unique_ptr col_buf_encoder( + new VariableChunkColBufEncoder(kColDict)); + std::string buf("12345678\377\1\0\0\0\0\0\0\0\376", 18); + col_buf_encoder->Append(buf.c_str()); + col_buf_encoder->Finish(); + const std::string& encoded_data = col_buf_encoder->GetData(); + const char* str_ptr = encoded_data.c_str(); + + std::unique_ptr col_buf_decoder( + new VariableChunkColBufDecoder(kColDict)); + str_ptr += col_buf_decoder->Init(str_ptr); + char* decoded_data = new char[100]; + char* decoded_data_base = decoded_data; + col_buf_decoder->Decode(str_ptr, &decoded_data); + for (size_t i = 0; i < buf.size(); ++i) { + ASSERT_EQ(buf[i], decoded_data_base[i]); + } + delete[] decoded_data_base; +} + +} // namespace rocksdb + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +#else + +#include + +int main() { + fprintf(stderr, + "SKIPPED as column aware encoding experiment is not enabled in " + "ROCKSDB_LITE\n"); +} +#endif // ROCKSDB_LITE diff --git a/utilities/column_aware_encoding_util.cc b/utilities/column_aware_encoding_util.cc new file mode 100644 index 000000000..43f9dfc25 --- /dev/null +++ b/utilities/column_aware_encoding_util.cc @@ -0,0 +1,490 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +#ifndef ROCKSDB_LITE + +#include "utilities/column_aware_encoding_util.h" + +#ifndef __STDC_FORMAT_MACROS +#define __STDC_FORMAT_MACROS +#endif + +#include +#include +#include +#include +#include +#include +#include "include/rocksdb/comparator.h" +#include "include/rocksdb/slice.h" +#include "rocksdb/env.h" +#include "rocksdb/status.h" +#include "table/block_based_table_builder.h" +#include "table/block_based_table_factory.h" +#include "table/format.h" +#include "table/table_reader.h" +#include "util/coding.h" +#include "utilities/col_buf_decoder.h" +#include "utilities/col_buf_encoder.h" + +namespace rocksdb { + +ColumnAwareEncodingReader::ColumnAwareEncodingReader( + const std::string& file_path) + : file_name_(file_path), + ioptions_(options_), + internal_comparator_(BytewiseComparator()) { + InitTableReader(file_name_); +} + +void ColumnAwareEncodingReader::InitTableReader(const std::string& file_path) { + std::unique_ptr file; + uint64_t file_size; + options_.env->NewRandomAccessFile(file_path, &file, soptions_); + options_.env->GetFileSize(file_path, &file_size); + + file_.reset(new RandomAccessFileReader(std::move(file))); + + options_.comparator = &internal_comparator_; + options_.table_factory = std::make_shared(); + shared_ptr block_table_factory = + std::dynamic_pointer_cast(options_.table_factory); + + std::unique_ptr table_reader; + block_table_factory->NewTableReader( + TableReaderOptions(ioptions_, soptions_, internal_comparator_, + /*skip_filters=*/false), + std::move(file_), file_size, &table_reader, /*enable_prefetch=*/false); + + table_reader_.reset(dynamic_cast(table_reader.release())); +} + +void ColumnAwareEncodingReader::GetKVPairsFromDataBlocks( + std::vector* kv_pair_blocks) { + table_reader_->GetKVPairsFromDataBlocks(kv_pair_blocks); +} + +void ColumnAwareEncodingReader::DecodeBlocks( + const KVPairColDeclarations& kvp_col_declarations, WritableFile* out_file, + const std::vector* blocks) { + char* decoded_content_base = new char[16384]; + Options options; + ImmutableCFOptions ioptions(options); + for (auto& block : *blocks) { + KVPairColBufDecoders kvp_col_bufs(kvp_col_declarations); + auto& key_col_bufs = kvp_col_bufs.key_col_bufs; + auto& value_col_bufs = kvp_col_bufs.value_col_bufs; + auto& value_checksum_buf = kvp_col_bufs.value_checksum_buf; + + auto& slice_final_with_bit = block; + uint32_t format_version = 2; + Slice compression_dict; + BlockContents contents; + const char* content_ptr; + + CompressionType type = + (CompressionType)slice_final_with_bit[slice_final_with_bit.size() - 1]; + if (type != kNoCompression) { + UncompressBlockContents(slice_final_with_bit.c_str(), + slice_final_with_bit.size() - 1, &contents, + format_version, compression_dict, ioptions); + content_ptr = contents.data.data(); + } else { + content_ptr = slice_final_with_bit.data(); + } + + size_t num_kv_pairs; + const char* header_content_ptr = content_ptr; + num_kv_pairs = DecodeFixed64(header_content_ptr); + + header_content_ptr += sizeof(size_t); + size_t num_key_columns = key_col_bufs.size(); + size_t num_value_columns = value_col_bufs.size(); + std::vector key_content_ptr(num_key_columns); + std::vector value_content_ptr(num_value_columns); + const char* checksum_content_ptr; + + size_t num_columns = num_key_columns + num_value_columns; + const char* col_content_ptr = + header_content_ptr + sizeof(size_t) * num_columns; + + // Read headers + for (size_t i = 0; i < num_key_columns; ++i) { + key_content_ptr[i] = col_content_ptr; + key_content_ptr[i] += key_col_bufs[i]->Init(key_content_ptr[i]); + size_t offset; + offset = DecodeFixed64(header_content_ptr); + header_content_ptr += sizeof(size_t); + col_content_ptr += offset; + } + for (size_t i = 0; i < num_value_columns; ++i) { + value_content_ptr[i] = col_content_ptr; + value_content_ptr[i] += value_col_bufs[i]->Init(value_content_ptr[i]); + size_t offset; + offset = DecodeFixed64(header_content_ptr); + header_content_ptr += sizeof(size_t); + col_content_ptr += offset; + } + checksum_content_ptr = col_content_ptr; + checksum_content_ptr += value_checksum_buf->Init(checksum_content_ptr); + + // Decode block + char* decoded_content = decoded_content_base; + for (size_t j = 0; j < num_kv_pairs; ++j) { + for (size_t i = 0; i < num_key_columns; ++i) { + key_content_ptr[i] += + key_col_bufs[i]->Decode(key_content_ptr[i], &decoded_content); + } + for (size_t i = 0; i < num_value_columns; ++i) { + value_content_ptr[i] += + value_col_bufs[i]->Decode(value_content_ptr[i], &decoded_content); + } + checksum_content_ptr += + value_checksum_buf->Decode(checksum_content_ptr, &decoded_content); + } + + size_t offset = decoded_content - decoded_content_base; + Slice output_content(decoded_content, offset); + + if (out_file != nullptr) { + out_file->Append(output_content); + } + } + delete[] decoded_content_base; +} + +void ColumnAwareEncodingReader::DecodeBlocksFromRowFormat( + WritableFile* out_file, const std::vector* blocks) { + Options options; + ImmutableCFOptions ioptions(options); + for (auto& block : *blocks) { + auto& slice_final_with_bit = block; + uint32_t format_version = 2; + Slice compression_dict; + BlockContents contents; + std::string decoded_content; + + CompressionType type = + (CompressionType)slice_final_with_bit[slice_final_with_bit.size() - 1]; + if (type != kNoCompression) { + UncompressBlockContents(slice_final_with_bit.c_str(), + slice_final_with_bit.size() - 1, &contents, + format_version, compression_dict, ioptions); + decoded_content = std::string(contents.data.data(), contents.data.size()); + } else { + decoded_content = std::move(slice_final_with_bit); + } + + if (out_file != nullptr) { + out_file->Append(decoded_content); + } + } +} + +void ColumnAwareEncodingReader::DumpDataColumns( + const std::string& filename, + const KVPairColDeclarations& kvp_col_declarations, + const std::vector& kv_pair_blocks) { + KVPairColBufEncoders kvp_col_bufs(kvp_col_declarations); + auto& key_col_bufs = kvp_col_bufs.key_col_bufs; + auto& value_col_bufs = kvp_col_bufs.value_col_bufs; + auto& value_checksum_buf = kvp_col_bufs.value_checksum_buf; + + FILE* fp = fopen(filename.c_str(), "w"); + size_t block_id = 1; + for (auto& kv_pairs : kv_pair_blocks) { + fprintf(fp, "---------------- Block: %-4lu ----------------\n", block_id); + for (auto& kv_pair : kv_pairs) { + const auto& key = kv_pair.first; + const auto& value = kv_pair.second; + size_t value_offset = 0; + + const char* key_ptr = key.data(); + for (auto& buf : key_col_bufs) { + size_t col_size = buf->Append(key_ptr); + std::string tmp_buf(key_ptr, col_size); + Slice col(tmp_buf); + fprintf(fp, "%s ", col.ToString(true).c_str()); + key_ptr += col_size; + } + fprintf(fp, "|"); + + const char* value_ptr = value.data(); + for (auto& buf : value_col_bufs) { + size_t col_size = buf->Append(value_ptr); + std::string tmp_buf(value_ptr, col_size); + Slice col(tmp_buf); + fprintf(fp, " %s", col.ToString(true).c_str()); + value_ptr += col_size; + value_offset += col_size; + } + + if (value_offset < value.size()) { + size_t col_size = value_checksum_buf->Append(value_ptr); + std::string tmp_buf(value_ptr, col_size); + Slice col(tmp_buf); + fprintf(fp, "|%s", col.ToString(true).c_str()); + } else { + value_checksum_buf->Append(nullptr); + } + fprintf(fp, "\n"); + } + block_id++; + } + fclose(fp); +} + +namespace { + +void CompressDataBlock(const std::string& output_content, Slice* slice_final, + CompressionType* type, std::string* compressed_output) { + CompressionOptions compression_opts; + uint32_t format_version = 2; // hard-coded version + Slice compression_dict; + *slice_final = + CompressBlock(output_content, compression_opts, type, format_version, + compression_dict, compressed_output); +} + +} // namespace + +void ColumnAwareEncodingReader::EncodeBlocksToRowFormat( + WritableFile* out_file, CompressionType compression_type, + const std::vector& kv_pair_blocks, + std::vector* blocks) { + std::string output_content; + for (auto& kv_pairs : kv_pair_blocks) { + output_content.clear(); + std::string last_key; + size_t counter = 0; + const size_t block_restart_interval = 16; + for (auto& kv_pair : kv_pairs) { + const auto& key = kv_pair.first; + const auto& value = kv_pair.second; + + Slice last_key_piece(last_key); + size_t shared = 0; + if (counter >= block_restart_interval) { + counter = 0; + } else { + const size_t min_length = std::min(last_key_piece.size(), key.size()); + while ((shared < min_length) && last_key_piece[shared] == key[shared]) { + shared++; + } + } + const size_t non_shared = key.size() - shared; + output_content.append(key.c_str() + shared, non_shared); + output_content.append(value); + + last_key.resize(shared); + last_key.append(key.data() + shared, non_shared); + counter++; + } + Slice slice_final; + auto type = compression_type; + std::string compressed_output; + CompressDataBlock(output_content, &slice_final, &type, &compressed_output); + + if (out_file != nullptr) { + out_file->Append(slice_final); + } + + // Add a bit in the end for decoding + std::string slice_final_with_bit(slice_final.data(), slice_final.size()); + slice_final_with_bit.append(reinterpret_cast(&type), 1); + blocks->push_back( + std::string(slice_final_with_bit.data(), slice_final_with_bit.size())); + } +} + +Status ColumnAwareEncodingReader::EncodeBlocks( + const KVPairColDeclarations& kvp_col_declarations, WritableFile* out_file, + CompressionType compression_type, + const std::vector& kv_pair_blocks, + std::vector* blocks, bool print_column_stat) { + std::vector key_col_sizes( + kvp_col_declarations.key_col_declarations->size(), 0); + std::vector value_col_sizes( + kvp_col_declarations.value_col_declarations->size(), 0); + size_t value_checksum_size = 0; + + for (auto& kv_pairs : kv_pair_blocks) { + KVPairColBufEncoders kvp_col_bufs(kvp_col_declarations); + auto& key_col_bufs = kvp_col_bufs.key_col_bufs; + auto& value_col_bufs = kvp_col_bufs.value_col_bufs; + auto& value_checksum_buf = kvp_col_bufs.value_checksum_buf; + + size_t num_kv_pairs = 0; + for (auto& kv_pair : kv_pairs) { + const auto& key = kv_pair.first; + const auto& value = kv_pair.second; + size_t value_offset = 0; + num_kv_pairs++; + + const char* key_ptr = key.data(); + for (auto& buf : key_col_bufs) { + size_t col_size = buf->Append(key_ptr); + key_ptr += col_size; + } + + const char* value_ptr = value.data(); + for (auto& buf : value_col_bufs) { + size_t col_size = buf->Append(value_ptr); + value_ptr += col_size; + value_offset += col_size; + } + + if (value_offset < value.size()) { + value_checksum_buf->Append(value_ptr); + } else { + value_checksum_buf->Append(nullptr); + } + } + + kvp_col_bufs.Finish(); + // Get stats + // Compress and write a block + if (print_column_stat) { + for (size_t i = 0; i < key_col_bufs.size(); ++i) { + Slice slice_final; + auto type = compression_type; + std::string compressed_output; + CompressDataBlock(key_col_bufs[i]->GetData(), &slice_final, &type, + &compressed_output); + out_file->Append(slice_final); + key_col_sizes[i] += slice_final.size(); + } + for (size_t i = 0; i < value_col_bufs.size(); ++i) { + Slice slice_final; + auto type = compression_type; + std::string compressed_output; + CompressDataBlock(value_col_bufs[i]->GetData(), &slice_final, &type, + &compressed_output); + out_file->Append(slice_final); + value_col_sizes[i] += slice_final.size(); + } + Slice slice_final; + auto type = compression_type; + std::string compressed_output; + CompressDataBlock(value_checksum_buf->GetData(), &slice_final, &type, + &compressed_output); + out_file->Append(slice_final); + value_checksum_size += slice_final.size(); + } else { + std::string output_content; + // Write column sizes + PutFixed64(&output_content, num_kv_pairs); + for (auto& buf : key_col_bufs) { + size_t size = buf->GetData().size(); + PutFixed64(&output_content, size); + } + for (auto& buf : value_col_bufs) { + size_t size = buf->GetData().size(); + PutFixed64(&output_content, size); + } + // Write data + for (auto& buf : key_col_bufs) { + output_content.append(buf->GetData()); + } + for (auto& buf : value_col_bufs) { + output_content.append(buf->GetData()); + } + output_content.append(value_checksum_buf->GetData()); + + Slice slice_final; + auto type = compression_type; + std::string compressed_output; + CompressDataBlock(output_content, &slice_final, &type, + &compressed_output); + + if (out_file != nullptr) { + out_file->Append(slice_final); + } + + // Add a bit in the end for decoding + std::string slice_final_with_bit(slice_final.data(), + slice_final.size() + 1); + slice_final_with_bit[slice_final.size()] = static_cast(type); + blocks->push_back(std::string(slice_final_with_bit.data(), + slice_final_with_bit.size())); + } + } + + if (print_column_stat) { + size_t total_size = 0; + for (size_t i = 0; i < key_col_sizes.size(); ++i) + total_size += key_col_sizes[i]; + for (size_t i = 0; i < value_col_sizes.size(); ++i) + total_size += value_col_sizes[i]; + total_size += value_checksum_size; + + for (size_t i = 0; i < key_col_sizes.size(); ++i) + printf("Key col %lu size: %lu percentage %lf%%\n", i, key_col_sizes[i], + 100.0 * key_col_sizes[i] / total_size); + for (size_t i = 0; i < value_col_sizes.size(); ++i) + printf("Value col %lu size: %lu percentage %lf%%\n", i, + value_col_sizes[i], 100.0 * value_col_sizes[i] / total_size); + printf("Value checksum size: %lu percentage %lf%%\n", value_checksum_size, + 100.0 * value_checksum_size / total_size); + } + return Status::OK(); +} + +void ColumnAwareEncodingReader::GetColDeclarationsPrimary( + std::vector** key_col_declarations, + std::vector** value_col_declarations, + ColDeclaration** value_checksum_declaration) { + *key_col_declarations = new std::vector{ + ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 4, false, + true), + ColDeclaration("FixedLength", ColCompressionType::kColRleDeltaVarint, 8, + false, true), + ColDeclaration("FixedLength", ColCompressionType::kColDeltaVarint, 8, + false, true), + ColDeclaration("FixedLength", ColCompressionType::kColDeltaVarint, 8, + false, true), + ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 8)}; + + *value_col_declarations = new std::vector{ + ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 4), + ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 4), + ColDeclaration("FixedLength", ColCompressionType::kColRle, 1), + ColDeclaration("VariableLength"), + ColDeclaration("FixedLength", ColCompressionType::kColDeltaVarint, 4), + ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 8)}; + *value_checksum_declaration = new ColDeclaration( + "LongFixedLength", ColCompressionType::kColNoCompression, 9, + true /* nullable */); +} + +void ColumnAwareEncodingReader::GetColDeclarationsSecondary( + std::vector** key_col_declarations, + std::vector** value_col_declarations, + ColDeclaration** value_checksum_declaration) { + *key_col_declarations = new std::vector{ + ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 4, false, + true), + ColDeclaration("FixedLength", ColCompressionType::kColDeltaVarint, 8, + false, true), + ColDeclaration("FixedLength", ColCompressionType::kColRleDeltaVarint, 8, + false, true), + ColDeclaration("FixedLength", ColCompressionType::kColRle, 1), + ColDeclaration("FixedLength", ColCompressionType::kColDeltaVarint, 4, + false, true), + ColDeclaration("FixedLength", ColCompressionType::kColDeltaVarint, 8, + false, true), + ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 8, false, + true), + ColDeclaration("VariableChunk", ColCompressionType::kColNoCompression), + ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 8)}; + *value_col_declarations = new std::vector(); + *value_checksum_declaration = new ColDeclaration( + "LongFixedLength", ColCompressionType::kColNoCompression, 9, + true /* nullable */); +} + +} // namespace rocksdb + +#endif // ROCKSDB_LITE diff --git a/utilities/column_aware_encoding_util.h b/utilities/column_aware_encoding_util.h new file mode 100644 index 000000000..a1f331393 --- /dev/null +++ b/utilities/column_aware_encoding_util.h @@ -0,0 +1,80 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +#pragma once +#ifndef ROCKSDB_LITE + +#include +#include +#include "db/dbformat.h" +#include "include/rocksdb/env.h" +#include "include/rocksdb/immutable_options.h" +#include "include/rocksdb/listener.h" +#include "include/rocksdb/options.h" +#include "include/rocksdb/status.h" +#include "table/block_based_table_reader.h" + +namespace rocksdb { + +struct ColDeclaration; +struct KVPairColDeclarations; + +class ColumnAwareEncodingReader { + public: + explicit ColumnAwareEncodingReader(const std::string& file_name); + + void GetKVPairsFromDataBlocks(std::vector* kv_pair_blocks); + + void EncodeBlocksToRowFormat(WritableFile* out_file, + CompressionType compression_type, + const std::vector& kv_pair_blocks, + std::vector* blocks); + + void DecodeBlocksFromRowFormat(WritableFile* out_file, + const std::vector* blocks); + + void DumpDataColumns(const std::string& filename, + const KVPairColDeclarations& kvp_col_declarations, + const std::vector& kv_pair_blocks); + + Status EncodeBlocks(const KVPairColDeclarations& kvp_col_declarations, + WritableFile* out_file, CompressionType compression_type, + const std::vector& kv_pair_blocks, + std::vector* blocks, bool print_column_stat); + + void DecodeBlocks(const KVPairColDeclarations& kvp_col_declarations, + WritableFile* out_file, + const std::vector* blocks); + + static void GetColDeclarationsPrimary( + std::vector** key_col_declarations, + std::vector** value_col_declarations, + ColDeclaration** value_checksum_declaration); + + static void GetColDeclarationsSecondary( + std::vector** key_col_declarations, + std::vector** value_col_declarations, + ColDeclaration** value_checksum_declaration); + + private: + // Init the TableReader for the sst file + void InitTableReader(const std::string& file_path); + + std::string file_name_; + EnvOptions soptions_; + + Options options_; + + Status init_result_; + std::unique_ptr table_reader_; + std::unique_ptr file_; + + const ImmutableCFOptions ioptions_; + InternalKeyComparator internal_comparator_; + std::unique_ptr table_properties_; +}; + +} // namespace rocksdb + +#endif // ROCKSDB_LITE