Experiments on column-aware encodings

Summary:
Experiments on column-aware encodings. Supported features: 1) extract data blocks from SST file and encode with specified encodings; 2) Decode encoded data back into row format; 3) Directly extract data blocks and write in row format (without prefix encoding); 4) Get column distribution statistics for column format; 5) Dump data blocks separated by columns in human-readable format.

There is still on-going work on this diff. More refactoring is necessary.

Test Plan: Wrote tests in `column_aware_encoding_test.cc`. More tests should be added.

Reviewers: sdong

Reviewed By: sdong

Subscribers: arahut, andrewkr, dhruba

Differential Revision: https://reviews.facebook.net/D60027
main
omegaga 9 years ago
parent c116b47804
commit d51dc96a79
  1. 1
      .gitignore
  2. 5
      CMakeLists.txt
  3. 13
      Makefile
  4. 11
      src.mk
  5. 4
      table/block_based_table_builder.cc
  6. 6
      table/block_based_table_builder.h
  7. 51
      table/block_based_table_reader.cc
  8. 9
      table/block_based_table_reader.h
  9. 240
      utilities/col_buf_decoder.cc
  10. 117
      utilities/col_buf_decoder.h
  11. 213
      utilities/col_buf_encoder.cc
  12. 220
      utilities/col_buf_encoder.h
  13. 171
      utilities/column_aware_encoding_exp.cc
  14. 193
      utilities/column_aware_encoding_test.cc
  15. 490
      utilities/column_aware_encoding_util.cc
  16. 80
      utilities/column_aware_encoding_util.h

1
.gitignore vendored

@ -32,6 +32,7 @@ build/
ldb
manifest_dump
sst_dump
column_aware_encoding_exp
util/build_version.cc
build_tools/VALGRIND_LOGS/
coverage/COVERAGE_REPORT

@ -282,6 +282,9 @@ set(SOURCES
utilities/ttl/db_ttl_impl.cc
utilities/write_batch_with_index/write_batch_with_index.cc
utilities/write_batch_with_index/write_batch_with_index_internal.cc
utilities/col_buf_encoder.cc
utilities/col_buf_decoder.cc
utilities/column_aware_encoding_util.cc
)
# For test util library that is build only in DEBUG mode
@ -327,6 +330,7 @@ set(APPS
tools/dump/rocksdb_undump.cc
util/cache_bench.cc
utilities/persistent_cache/hash_table_bench.cc
utilities/column_aware_encoding_exp.cc
)
set(C_TESTS db/c_test.c)
@ -442,6 +446,7 @@ set(TESTS
utilities/transactions/transaction_test.cc
utilities/ttl/ttl_test.cc
utilities/write_batch_with_index/write_batch_with_index_test.cc
utilities/column_aware_encoding_test.cc
)
set(EXES ${APPS})

@ -268,6 +268,8 @@ VALGRIND_OPTS = --error-exitcode=$(VALGRIND_ERROR) --leak-check=full
BENCHTOOLOBJECTS = $(BENCH_LIB_SOURCES:.cc=.o) $(LIBOBJECTS) $(TESTUTIL)
EXPOBJECTS = $(EXP_LIB_SOURCES:.cc=.o) $(LIBOBJECTS) $(TESTUTIL)
TESTS = \
db_test \
db_test2 \
@ -367,6 +369,7 @@ TESTS = \
compaction_job_test \
thread_list_test \
sst_dump_test \
column_aware_encoding_test \
compact_files_test \
perf_context_test \
optimistic_transaction_test \
@ -416,7 +419,7 @@ TEST_LIBS = \
librocksdb_env_basic_test.a
# TODO: add back forward_iterator_bench, after making it build in all environemnts.
BENCHMARKS = db_bench table_reader_bench cache_bench memtablerep_bench
BENCHMARKS = db_bench table_reader_bench cache_bench memtablerep_bench column_aware_encoding_exp
# if user didn't config LIBNAME, set the default
ifeq ($(LIBNAME),)
@ -1176,6 +1179,9 @@ event_logger_test: util/event_logger_test.o $(LIBOBJECTS) $(TESTHARNESS)
sst_dump_test: tools/sst_dump_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
column_aware_encoding_test: utilities/column_aware_encoding_test.o $(TESTHARNESS) $(EXPOBJECTS)
$(AM_LINK)
optimistic_transaction_test: utilities/transactions/optimistic_transaction_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
@ -1206,6 +1212,9 @@ transaction_test: utilities/transactions/transaction_test.o $(LIBOBJECTS) $(TEST
sst_dump: tools/sst_dump.o $(LIBOBJECTS)
$(AM_LINK)
column_aware_encoding_exp: utilities/column_aware_encoding_exp.o $(EXPOBJECTS)
$(AM_LINK)
repair_test: db/repair_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
@ -1440,7 +1449,7 @@ endif
# Source files dependencies detection
# ---------------------------------------------------------------------------
all_sources = $(LIB_SOURCES) $(MAIN_SOURCES) $(MOCK_LIB_SOURCES) $(TOOL_LIB_SOURCES) $(BENCH_LIB_SOURCES) $(TEST_LIB_SOURCES)
all_sources = $(LIB_SOURCES) $(MAIN_SOURCES) $(MOCK_LIB_SOURCES) $(TOOL_LIB_SOURCES) $(BENCH_LIB_SOURCES) $(TEST_LIB_SOURCES) $(EXP_LIB_SOURCES)
DEPFILES = $(all_sources:.cc=.d)
# Add proper dependency support so changing a .h file forces a .cc file to

@ -192,7 +192,12 @@ MOCK_LIB_SOURCES = \
util/fault_injection_test_env.cc
BENCH_LIB_SOURCES = \
tools/db_bench_tool.cc
tools/db_bench_tool.cc \
EXP_LIB_SOURCES = \
utilities/col_buf_encoder.cc \
utilities/col_buf_decoder.cc \
utilities/column_aware_encoding_util.cc
TEST_LIB_SOURCES = \
util/testharness.cc \
@ -296,6 +301,7 @@ MAIN_SOURCES = \
utilities/transactions/transaction_test.cc \
utilities/ttl/ttl_test.cc \
utilities/write_batch_with_index/write_batch_with_index_test.cc \
utilities/column_aware_encoding_test.cc \
util/iostats_context_test.cc \
util/log_write_bench.cc \
util/mock_env_test.cc \
@ -304,7 +310,8 @@ MAIN_SOURCES = \
util/rate_limiter_test.cc \
util/slice_transform_test.cc \
util/thread_list_test.cc \
util/thread_local_test.cc
util/thread_local_test.cc \
utilities/column_aware_encoding_exp.cc
JNI_NATIVE_SOURCES = \
java/rocksjni/backupenginejni.cc \

@ -312,6 +312,8 @@ bool GoodCompressionRatio(size_t compressed_size, size_t raw_size) {
return compressed_size < raw_size - (raw_size / 8u);
}
} // namespace
// format_version is the block format as defined in include/rocksdb/table.h
Slice CompressBlock(const Slice& raw,
const CompressionOptions& compression_options,
@ -391,8 +393,6 @@ Slice CompressBlock(const Slice& raw,
return raw;
}
} // namespace
// kBlockBasedTableMagicNumber was picked by running
// echo rocksdb.table.block_based | sha1sum
// and taking the leading 64 bits.

@ -117,4 +117,10 @@ class BlockBasedTableBuilder : public TableBuilder {
void operator=(const BlockBasedTableBuilder&) = delete;
};
Slice CompressBlock(const Slice& raw,
const CompressionOptions& compression_options,
CompressionType* type, uint32_t format_version,
const Slice& compression_dict,
std::string* compressed_output);
} // namespace rocksdb

@ -10,6 +10,7 @@
#include <string>
#include <utility>
#include <vector>
#include "db/dbformat.h"
#include "db/pinned_iterators_manager.h"
@ -1667,6 +1668,56 @@ bool BlockBasedTable::TEST_index_reader_preloaded() const {
return rep_->index_reader != nullptr;
}
Status BlockBasedTable::GetKVPairsFromDataBlocks(
std::vector<KVPairBlock>* kv_pair_blocks) {
std::unique_ptr<InternalIterator> blockhandles_iter(
NewIndexIterator(ReadOptions()));
Status s = blockhandles_iter->status();
if (!s.ok()) {
// Cannot read Index Block
return s;
}
for (blockhandles_iter->SeekToFirst(); blockhandles_iter->Valid();
blockhandles_iter->Next()) {
s = blockhandles_iter->status();
if (!s.ok()) {
break;
}
std::unique_ptr<InternalIterator> datablock_iter;
datablock_iter.reset(
NewDataBlockIterator(rep_, ReadOptions(), blockhandles_iter->value()));
s = datablock_iter->status();
if (!s.ok()) {
// Error reading the block - Skipped
continue;
}
KVPairBlock kv_pair_block;
for (datablock_iter->SeekToFirst(); datablock_iter->Valid();
datablock_iter->Next()) {
s = datablock_iter->status();
if (!s.ok()) {
// Error reading the block - Skipped
break;
}
const Slice& key = datablock_iter->key();
const Slice& value = datablock_iter->value();
std::string key_copy = std::string(key.data(), key.size());
std::string value_copy = std::string(value.data(), value.size());
kv_pair_block.push_back(
std::make_pair(std::move(key_copy), std::move(value_copy)));
}
kv_pair_blocks->push_back(std::move(kv_pair_block));
}
return Status::OK();
}
Status BlockBasedTable::DumpTable(WritableFile* out_file) {
// Output Footer
out_file->Append(

@ -11,8 +11,9 @@
#include <stdint.h>
#include <memory>
#include <utility>
#include <string>
#include <utility>
#include <vector>
#include "rocksdb/options.h"
#include "rocksdb/persistent_cache.h"
@ -48,6 +49,8 @@ class InternalIterator;
using std::unique_ptr;
typedef std::vector<std::pair<std::string, std::string>> KVPairBlock;
// A Table is a sorted map from strings to strings. Tables are
// immutable and persistent. A Table may be safely accessed from
// multiple threads without external synchronization.
@ -138,6 +141,10 @@ class BlockBasedTable : public TableReader {
size_t cache_key_prefix_size,
const BlockHandle& handle, char* cache_key);
// Retrieve all key value pairs from data blocks in the table.
// The key retrieved are internal keys.
Status GetKVPairsFromDataBlocks(std::vector<KVPairBlock>* kv_pair_blocks);
private:
template <class TValue>
struct CachableEntry;

@ -0,0 +1,240 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "utilities/col_buf_decoder.h"
#include <cstring>
#include <string>
#include "port/port.h"
namespace rocksdb {
ColBufDecoder::~ColBufDecoder() {}
namespace {
inline uint64_t EncodeFixed64WithEndian(uint64_t val, bool big_endian) {
if (big_endian && port::kLittleEndian) {
val = le64toh(val);
val = htobe64(val);
} else if (!big_endian && !port::kLittleEndian) {
val = be64toh(val);
val = htole64(val);
}
return val;
}
} // namespace
ColBufDecoder* ColBufDecoder::NewColBufDecoder(
const ColDeclaration& col_declaration) {
if (col_declaration.col_type == "FixedLength") {
return new FixedLengthColBufDecoder(
col_declaration.size, col_declaration.col_compression_type,
col_declaration.nullable, col_declaration.big_endian);
} else if (col_declaration.col_type == "VariableLength") {
return new VariableLengthColBufDecoder();
} else if (col_declaration.col_type == "VariableChunk") {
return new VariableChunkColBufDecoder(col_declaration.col_compression_type);
} else if (col_declaration.col_type == "LongFixedLength") {
return new LongFixedLengthColBufDecoder(col_declaration.size,
col_declaration.nullable);
}
// Unrecognized column type
return nullptr;
}
namespace {
void ReadVarint64(const char** src_ptr, uint64_t* val_ptr) {
const char* q = GetVarint64Ptr(*src_ptr, *src_ptr + 10, val_ptr);
assert(q != nullptr);
*src_ptr = q;
}
} // namespace
size_t FixedLengthColBufDecoder::Init(const char* src) {
remain_runs_ = 0;
last_val_ = 0;
// Dictionary initialization
dict_vec_.clear();
const char* orig_src = src;
if (col_compression_type_ == kColDict ||
col_compression_type_ == kColRleDict) {
const char* q;
size_t dict_size;
// Bypass limit
q = GetVarint64Ptr(src, src + 10, &dict_size);
assert(q != nullptr);
src = q;
uint64_t dict_key;
for (size_t i = 0; i < dict_size; ++i) {
// Bypass limit
ReadVarint64(&src, &dict_key);
dict_key = EncodeFixed64WithEndian(dict_key, big_endian_);
dict_vec_.push_back(dict_key);
}
}
return src - orig_src;
}
size_t FixedLengthColBufDecoder::Decode(const char* src, char** dest) {
uint64_t read_val = 0;
const char* orig_src = src;
const char* src_limit = src + 20;
if (nullable_) {
bool not_null;
not_null = *src;
src += 1;
if (!not_null) {
return 1;
}
}
if (IsRunLength(col_compression_type_)) {
if (remain_runs_ == 0) {
const char* q;
run_val_ = 0;
if (col_compression_type_ == kColRle) {
memcpy(&run_val_, src, size_);
src += size_;
} else {
q = GetVarint64Ptr(src, src_limit, &run_val_);
assert(q != nullptr);
src = q;
}
q = GetVarint64Ptr(src, src_limit, &remain_runs_);
assert(q != nullptr);
src = q;
if (col_compression_type_ != kColRleDeltaVarint &&
col_compression_type_ != kColRleDict) {
run_val_ = EncodeFixed64WithEndian(run_val_, big_endian_);
}
}
read_val = run_val_;
} else {
if (col_compression_type_ == kColNoCompression) {
memcpy(&read_val, src, size_);
src += size_;
} else {
// Assume a column does not exceed 8 bytes here
const char* q = GetVarint64Ptr(src, src_limit, &read_val);
assert(q != nullptr);
src = q;
}
if (col_compression_type_ != kColDeltaVarint &&
col_compression_type_ != kColDict) {
read_val = EncodeFixed64WithEndian(read_val, big_endian_);
}
}
uint64_t write_val = read_val;
if (col_compression_type_ == kColDeltaVarint ||
col_compression_type_ == kColRleDeltaVarint) {
// does not support 64 bit
uint64_t mask = (write_val & 1) ? (~0UL) : 0;
int64_t delta = (write_val >> 1) ^ mask;
write_val = last_val_ + delta;
uint64_t tmp = write_val;
write_val = EncodeFixed64WithEndian(write_val, big_endian_);
last_val_ = tmp;
} else if (col_compression_type_ == kColRleDict ||
col_compression_type_ == kColDict) {
size_t dict_val = read_val;
assert(dict_val < dict_vec_.size());
write_val = dict_vec_[dict_val];
}
// dest->append(reinterpret_cast<char*>(&write_val), size_);
memcpy(*dest, reinterpret_cast<char*>(&write_val), size_);
*dest += size_;
if (IsRunLength(col_compression_type_)) {
--remain_runs_;
}
return src - orig_src;
}
size_t LongFixedLengthColBufDecoder::Decode(const char* src, char** dest) {
if (nullable_) {
bool not_null;
not_null = *src;
src += 1;
if (!not_null) {
return 1;
}
}
memcpy(*dest, src, size_);
*dest += size_;
return size_ + 1;
}
size_t VariableLengthColBufDecoder::Decode(const char* src, char** dest) {
uint8_t len;
len = *src;
memcpy(dest, reinterpret_cast<char*>(&len), 1);
*dest += 1;
src += 1;
memcpy(*dest, src, len);
*dest += len;
return len + 1;
}
size_t VariableChunkColBufDecoder::Init(const char* src) {
// Dictionary initialization
dict_vec_.clear();
const char* orig_src = src;
if (col_compression_type_ == kColDict) {
const char* q;
size_t dict_size;
// Bypass limit
q = GetVarint64Ptr(src, src + 10, &dict_size);
assert(q != nullptr);
src = q;
uint64_t dict_key;
for (size_t i = 0; i < dict_size; ++i) {
// Bypass limit
ReadVarint64(&src, &dict_key);
dict_vec_.push_back(dict_key);
}
}
return src - orig_src;
}
size_t VariableChunkColBufDecoder::Decode(const char* src, char** dest) {
const char* orig_src = src;
uint64_t size = 0;
ReadVarint64(&src, &size);
int64_t full_chunks = size / 8;
uint64_t chunk_buf;
size_t chunk_size = 8;
for (int64_t i = 0; i < full_chunks + 1; ++i) {
chunk_buf = 0;
if (i == full_chunks) {
chunk_size = size % 8;
}
if (col_compression_type_ == kColDict) {
size_t dict_val;
ReadVarint64(&src, &dict_val);
assert(dict_val < dict_vec_.size());
chunk_buf = dict_vec_[dict_val];
} else {
memcpy(&chunk_buf, src, chunk_size);
src += chunk_size;
}
memcpy(*dest, reinterpret_cast<char*>(&chunk_buf), 8);
*dest += 8;
uint8_t mask = 0xFF - 8 + chunk_size;
memcpy(*dest, reinterpret_cast<char*>(&mask), 1);
*dest += 1;
}
return src - orig_src;
}
} // namespace rocksdb

@ -0,0 +1,117 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include <endian.h>
#include <cstdio>
#include <cstring>
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>
#include "util/coding.h"
#include "utilities/col_buf_encoder.h"
namespace rocksdb {
struct ColDeclaration;
// ColBufDecoder is a class to decode column buffers. It can be populated from a
// ColDeclaration. Before starting decoding, a Init() method should be called.
// Each time it takes a column value into Decode() method.
class ColBufDecoder {
public:
virtual ~ColBufDecoder() = 0;
virtual size_t Init(const char* src) { return 0; }
virtual size_t Decode(const char* src, char** dest) = 0;
static ColBufDecoder* NewColBufDecoder(const ColDeclaration& col_declaration);
protected:
std::string buffer_;
static inline bool IsRunLength(ColCompressionType type) {
return type == kColRle || type == kColRleVarint ||
type == kColRleDeltaVarint || type == kColRleDict;
}
};
class FixedLengthColBufDecoder : public ColBufDecoder {
public:
explicit FixedLengthColBufDecoder(
size_t size, ColCompressionType col_compression_type = kColNoCompression,
bool nullable = false, bool big_endian = false)
: size_(size),
col_compression_type_(col_compression_type),
nullable_(nullable),
big_endian_(big_endian) {}
size_t Init(const char* src) override;
size_t Decode(const char* src, char** dest) override;
~FixedLengthColBufDecoder() {}
private:
size_t size_;
ColCompressionType col_compression_type_;
bool nullable_;
bool big_endian_;
// for decoding
std::vector<uint64_t> dict_vec_;
uint64_t remain_runs_;
uint64_t run_val_;
uint64_t last_val_;
};
class LongFixedLengthColBufDecoder : public ColBufDecoder {
public:
LongFixedLengthColBufDecoder(size_t size, bool nullable)
: size_(size), nullable_(nullable) {}
size_t Decode(const char* src, char** dest) override;
~LongFixedLengthColBufDecoder() {}
private:
size_t size_;
bool nullable_;
};
class VariableLengthColBufDecoder : public ColBufDecoder {
public:
size_t Decode(const char* src, char** dest) override;
~VariableLengthColBufDecoder() {}
};
class VariableChunkColBufDecoder : public VariableLengthColBufDecoder {
public:
size_t Init(const char* src) override;
size_t Decode(const char* src, char** dest) override;
explicit VariableChunkColBufDecoder(ColCompressionType col_compression_type)
: col_compression_type_(col_compression_type) {}
VariableChunkColBufDecoder() : col_compression_type_(kColNoCompression) {}
private:
ColCompressionType col_compression_type_;
std::unordered_map<uint64_t, uint64_t> dictionary_;
std::vector<uint64_t> dict_vec_;
};
struct KVPairColBufDecoders {
std::vector<std::unique_ptr<ColBufDecoder>> key_col_bufs;
std::vector<std::unique_ptr<ColBufDecoder>> value_col_bufs;
std::unique_ptr<ColBufDecoder> value_checksum_buf;
explicit KVPairColBufDecoders(const KVPairColDeclarations& kvp_cd) {
for (auto kcd : *kvp_cd.key_col_declarations) {
key_col_bufs.emplace_back(
std::move(ColBufDecoder::NewColBufDecoder(kcd)));
}
for (auto vcd : *kvp_cd.value_col_declarations) {
value_col_bufs.emplace_back(
std::move(ColBufDecoder::NewColBufDecoder(vcd)));
}
value_checksum_buf.reset(
ColBufDecoder::NewColBufDecoder(*kvp_cd.value_checksum_declaration));
}
};
} // namespace rocksdb

@ -0,0 +1,213 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "utilities/col_buf_encoder.h"
#include <cstring>
#include <string>
#include "port/port.h"
namespace rocksdb {
ColBufEncoder::~ColBufEncoder() {}
namespace {
inline uint64_t DecodeFixed64WithEndian(uint64_t val, bool big_endian) {
if (big_endian && port::kLittleEndian) {
val = be64toh(val);
val = htole64(val);
} else if (!big_endian && !port::kLittleEndian) {
val = le64toh(val);
val = htobe64(val);
}
return val;
}
} // namespace
const std::string &ColBufEncoder::GetData() { return buffer_; }
ColBufEncoder *ColBufEncoder::NewColBufEncoder(
const ColDeclaration &col_declaration) {
if (col_declaration.col_type == "FixedLength") {
return new FixedLengthColBufEncoder(
col_declaration.size, col_declaration.col_compression_type,
col_declaration.nullable, col_declaration.big_endian);
} else if (col_declaration.col_type == "VariableLength") {
return new VariableLengthColBufEncoder();
} else if (col_declaration.col_type == "VariableChunk") {
return new VariableChunkColBufEncoder(col_declaration.col_compression_type);
} else if (col_declaration.col_type == "LongFixedLength") {
return new LongFixedLengthColBufEncoder(col_declaration.size,
col_declaration.nullable);
}
// Unrecognized column type
return nullptr;
}
size_t FixedLengthColBufEncoder::Append(const char *buf) {
if (nullable_) {
if (buf == nullptr) {
buffer_.append(1, 0);
return 0;
} else {
buffer_.append(1, 1);
}
}
uint64_t read_val = 0;
memcpy(&read_val, buf, size_);
if (big_endian_) {
read_val = DecodeFixed64WithEndian(read_val, big_endian_);
}
// Determine write value
uint64_t write_val = read_val;
if (col_compression_type_ == kColDeltaVarint ||
col_compression_type_ == kColRleDeltaVarint) {
int64_t delta = read_val - last_val_;
// Encode signed delta value
delta = (delta << 1) ^ (delta >> 63);
write_val = delta;
last_val_ = read_val;
} else if (col_compression_type_ == kColDict ||
col_compression_type_ == kColRleDict) {
auto iter = dictionary_.find(read_val);
uint64_t dict_val;
if (iter == dictionary_.end()) {
// Add new entry to dictionary
dict_val = dictionary_.size();
dictionary_.insert(std::make_pair(read_val, dict_val));
dict_vec_.push_back(read_val);
} else {
dict_val = iter->second;
}
write_val = dict_val;
}
// Write into buffer
if (IsRunLength(col_compression_type_)) {
if (run_length_ == -1) {
// First element
run_val_ = write_val;
run_length_ = 1;
} else if (write_val != run_val_) {
// End of run
// Write run value
if (col_compression_type_ == kColRle) {
buffer_.append(reinterpret_cast<char *>(&run_val_), size_);
} else {
PutVarint64(&buffer_, run_val_);
}
// Write run length
PutVarint64(&buffer_, run_length_);
run_val_ = write_val;
run_length_ = 1;
} else {
run_length_++;
}
} else { // non run-length encodings
if (col_compression_type_ == kColNoCompression) {
buffer_.append(reinterpret_cast<char *>(&write_val), size_);
} else {
PutVarint64(&buffer_, write_val);
}
}
return size_;
}
void FixedLengthColBufEncoder::Finish() {
if (col_compression_type_ == kColDict ||
col_compression_type_ == kColRleDict) {
std::string header;
PutVarint64(&header, dict_vec_.size());
// Put dictionary in the header
for (auto item : dict_vec_) {
PutVarint64(&header, item);
}
buffer_ = header + buffer_;
}
if (IsRunLength(col_compression_type_)) {
// Finish last run value
if (col_compression_type_ == kColRle) {
buffer_.append(reinterpret_cast<char *>(&run_val_), size_);
} else {
PutVarint64(&buffer_, run_val_);
}
PutVarint64(&buffer_, run_length_);
}
}
size_t LongFixedLengthColBufEncoder::Append(const char *buf) {
if (nullable_) {
if (buf == nullptr) {
buffer_.append(1, 0);
return 0;
} else {
buffer_.append(1, 1);
}
}
buffer_.append(buf, size_);
return size_;
}
void LongFixedLengthColBufEncoder::Finish() {}
size_t VariableLengthColBufEncoder::Append(const char *buf) {
uint8_t length = 0;
length = *buf;
buffer_.append(buf, 1);
buf += 1;
buffer_.append(buf, length);
return length + 1;
}
void VariableLengthColBufEncoder::Finish() {}
size_t VariableChunkColBufEncoder::Append(const char *buf) {
const char *orig_buf = buf;
uint8_t mark = 0xFF;
size_t length = 0;
std::string tmp_buffer;
while (mark == 0xFF) {
uint64_t val;
memcpy(&val, buf, 8);
buf += 8;
mark = *buf;
buf += 1;
int8_t chunk_size = 8 - (0xFF - mark);
if (col_compression_type_ == kColDict) {
auto iter = dictionary_.find(val);
size_t dict_val;
if (iter == dictionary_.end()) {
dict_val = dictionary_.size();
dictionary_.insert(std::make_pair(val, dict_val));
dict_vec_.push_back(val);
} else {
dict_val = iter->second;
}
PutVarint64(&tmp_buffer, dict_val);
} else {
tmp_buffer.append(reinterpret_cast<char *>(&val), chunk_size);
}
length += chunk_size;
}
PutVarint64(&buffer_, length);
buffer_.append(tmp_buffer);
return buf - orig_buf;
}
void VariableChunkColBufEncoder::Finish() {
if (col_compression_type_ == kColDict) {
std::string header;
PutVarint64(&header, dict_vec_.size());
for (auto item : dict_vec_) {
PutVarint64(&header, item);
}
buffer_ = header + buffer_;
}
}
} // namespace rocksdb

@ -0,0 +1,220 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#include <endian.h>
#include <cstdio>
#include <cstring>
#include <memory>
#include <string>
#include <unordered_map>
#include <vector>
#include "util/coding.h"
namespace rocksdb {
enum ColCompressionType {
kColNoCompression,
kColRle,
kColVarint,
kColRleVarint,
kColDeltaVarint,
kColRleDeltaVarint,
kColDict,
kColRleDict
};
struct ColDeclaration;
// ColBufEncoder is a class to encode column buffers. It can be populated from a
// ColDeclaration. Each time it takes a column value into Append() method to
// encode the column and store it into an internal buffer. After all rows for
// this column are consumed, a Finish() should be called to add header and
// remaining data.
class ColBufEncoder {
public:
// Read a column, encode data and append into internal buffer.
virtual size_t Append(const char *buf) = 0;
virtual ~ColBufEncoder() = 0;
// Get the internal column buffer. Should only be called after Finish().
const std::string &GetData();
// Finish encoding. Add header and remaining data.
virtual void Finish() = 0;
// Populate a ColBufEncoder from ColDeclaration.
static ColBufEncoder *NewColBufEncoder(const ColDeclaration &col_declaration);
protected:
std::string buffer_;
static inline bool IsRunLength(ColCompressionType type) {
return type == kColRle || type == kColRleVarint ||
type == kColRleDeltaVarint || type == kColRleDict;
}
};
// Encoder for fixed length column buffer. In fixed length column buffer, the
// size of the column should not exceed 8 bytes.
// The following encodings are supported:
// Varint: Variable length integer. See util/coding.h for more details
// Rle (Run length encoding): encode a sequence of contiguous value as
// [run_value][run_length]. Can be combined with Varint
// Delta: Encode value to its delta with its adjacent entry. Use varint to
// possibly reduce stored bytes. Can be combined with Rle.
// Dictionary: Use a dictionary to record all possible values in the block and
// encode them with an ID started from 0. IDs are encoded as varint. A column
// with dictionary encoding will have a header to store all actual values,
// ordered by their dictionary value, and the data will be replaced by
// dictionary value. Can be combined with Rle.
class FixedLengthColBufEncoder : public ColBufEncoder {
public:
explicit FixedLengthColBufEncoder(
size_t size, ColCompressionType col_compression_type = kColNoCompression,
bool nullable = false, bool big_endian = false)
: size_(size),
col_compression_type_(col_compression_type),
nullable_(nullable),
big_endian_(big_endian),
last_val_(0),
run_length_(-1),
run_val_(0) {}
size_t Append(const char *buf) override;
void Finish() override;
~FixedLengthColBufEncoder() {}
private:
size_t size_;
ColCompressionType col_compression_type_;
// If set as true, the input value can be null (represented as nullptr). When
// nullable is true, use one more byte before actual value to indicate if the
// current value is null.
bool nullable_;
// If set as true, input value will be treated as big endian encoded.
bool big_endian_;
// for encoding
uint64_t last_val_;
int16_t run_length_;
uint64_t run_val_;
// Map to store dictionary for dictionary encoding
std::unordered_map<uint64_t, uint64_t> dictionary_;
// Vector of dictionary keys.
std::vector<uint64_t> dict_vec_;
};
// Long fixed length column buffer is a variant of fixed length buffer to hold
// fixed length buffer with more than 8 bytes. We do not support any special
// encoding schemes in LongFixedLengthColBufEncoder.
class LongFixedLengthColBufEncoder : public ColBufEncoder {
public:
LongFixedLengthColBufEncoder(size_t size, bool nullable)
: size_(size), nullable_(nullable) {}
size_t Append(const char *buf) override;
void Finish() override;
~LongFixedLengthColBufEncoder() {}
private:
size_t size_;
bool nullable_;
};
// Variable length column buffer holds a format of variable length column. In
// this format, a column is composed of one byte length k, followed by data with
// k bytes long data.
class VariableLengthColBufEncoder : public ColBufEncoder {
public:
size_t Append(const char *buf) override;
void Finish() override;
~VariableLengthColBufEncoder() {}
};
// Variable chunk column buffer holds another format of variable length column.
// In this format, a column contains multiple chunks of data, each of which is
// composed of 8 bytes long data, and one byte as a mask to indicate whether we
// have more data to come. If no more data coming, the mask is set as 0xFF. If
// the chunk is the last chunk and has only k valid bytes, the mask is set as
// 0xFF - (8 - k).
class VariableChunkColBufEncoder : public VariableLengthColBufEncoder {
public:
size_t Append(const char *buf) override;
void Finish() override;
explicit VariableChunkColBufEncoder(ColCompressionType col_compression_type)
: col_compression_type_(col_compression_type) {}
VariableChunkColBufEncoder() : col_compression_type_(kColNoCompression) {}
private:
ColCompressionType col_compression_type_;
// Map to store dictionary for dictionary encoding
std::unordered_map<uint64_t, uint64_t> dictionary_;
// Vector of dictionary keys.
std::vector<uint64_t> dict_vec_;
};
// ColDeclaration declares a column's type, algorithm of column-aware encoding,
// and other column data like endian and nullability.
struct ColDeclaration {
explicit ColDeclaration(
std::string _col_type,
ColCompressionType _col_compression_type = kColNoCompression,
size_t _size = 0, bool _nullable = false, bool _big_endian = false)
: col_type(_col_type),
col_compression_type(_col_compression_type),
size(_size),
nullable(_nullable),
big_endian(_big_endian) {}
std::string col_type;
ColCompressionType col_compression_type;
size_t size;
bool nullable;
bool big_endian;
};
// KVPairColDeclarations is a class to hold column declaration of columns in
// key and value.
struct KVPairColDeclarations {
std::vector<ColDeclaration> *key_col_declarations;
std::vector<ColDeclaration> *value_col_declarations;
ColDeclaration *value_checksum_declaration;
KVPairColDeclarations(std::vector<ColDeclaration> *_key_col_declarations,
std::vector<ColDeclaration> *_value_col_declarations,
ColDeclaration *_value_checksum_declaration)
: key_col_declarations(_key_col_declarations),
value_col_declarations(_value_col_declarations),
value_checksum_declaration(_value_checksum_declaration) {}
};
// Similar to KVPairDeclarations, KVPairColBufEncoders is used to hold column
// buffer encoders of all columns in key and value.
struct KVPairColBufEncoders {
std::vector<std::unique_ptr<ColBufEncoder>> key_col_bufs;
std::vector<std::unique_ptr<ColBufEncoder>> value_col_bufs;
std::unique_ptr<ColBufEncoder> value_checksum_buf;
explicit KVPairColBufEncoders(const KVPairColDeclarations &kvp_cd) {
for (auto kcd : *kvp_cd.key_col_declarations) {
key_col_bufs.emplace_back(
std::move(ColBufEncoder::NewColBufEncoder(kcd)));
}
for (auto vcd : *kvp_cd.value_col_declarations) {
value_col_bufs.emplace_back(
std::move(ColBufEncoder::NewColBufEncoder(vcd)));
}
value_checksum_buf.reset(
ColBufEncoder::NewColBufEncoder(*kvp_cd.value_checksum_declaration));
}
// Helper function to call Finish()
void Finish() {
for (auto &col_buf : key_col_bufs) {
col_buf->Finish();
}
for (auto &col_buf : value_col_bufs) {
col_buf->Finish();
}
value_checksum_buf->Finish();
}
};
} // namespace rocksdb

@ -0,0 +1,171 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
#include <cstdio>
#ifndef ROCKSDB_LITE
#ifdef GFLAGS
#include <gflags/gflags.h>
#include <vector>
#include "rocksdb/env.h"
#include "rocksdb/options.h"
#include "table/block_based_table_builder.h"
#include "table/block_based_table_reader.h"
#include "table/format.h"
#include "tools/sst_dump_tool_imp.h"
#include "util/compression.h"
#include "util/stop_watch.h"
#include "utilities/col_buf_encoder.h"
#include "utilities/column_aware_encoding_util.h"
using GFLAGS::ParseCommandLineFlags;
DEFINE_string(encoded_file, "", "file to store encoded data blocks");
DEFINE_string(decoded_file, "",
"file to store decoded data blocks after encoding");
DEFINE_string(format, "col", "Output Format. Can be 'row' or 'col'");
// TODO(jhli): option `col` should be removed and replaced by general
// column specifications.
DEFINE_string(index_type, "col", "Index type. Can be 'primary' or 'secondary'");
DEFINE_string(dump_file, "",
"Dump data blocks separated by columns in human-readable format");
DEFINE_bool(decode, false, "Deocde blocks after they are encoded");
DEFINE_bool(stat, false,
"Print column distribution statistics. Cannot decode in this mode");
DEFINE_string(compression_type, "kNoCompression",
"The compression algorithm used to compress data blocks");
namespace rocksdb {
class ColumnAwareEncodingExp {
public:
static void Run(const std::string& sst_file) {
bool decode = FLAGS_decode;
if (FLAGS_decoded_file.size() > 0) {
decode = true;
}
if (FLAGS_stat) {
decode = false;
}
ColumnAwareEncodingReader reader(sst_file);
std::vector<ColDeclaration>* key_col_declarations;
std::vector<ColDeclaration>* value_col_declarations;
ColDeclaration* value_checksum_declaration;
if (FLAGS_index_type == "primary") {
ColumnAwareEncodingReader::GetColDeclarationsPrimary(
&key_col_declarations, &value_col_declarations,
&value_checksum_declaration);
} else {
ColumnAwareEncodingReader::GetColDeclarationsSecondary(
&key_col_declarations, &value_col_declarations,
&value_checksum_declaration);
}
KVPairColDeclarations kvp_cd(key_col_declarations, value_col_declarations,
value_checksum_declaration);
if (!FLAGS_dump_file.empty()) {
std::vector<KVPairBlock> kv_pair_blocks;
reader.GetKVPairsFromDataBlocks(&kv_pair_blocks);
reader.DumpDataColumns(FLAGS_dump_file, kvp_cd, kv_pair_blocks);
return;
}
std::unordered_map<std::string, CompressionType> compressions = {
{"kNoCompression", CompressionType::kNoCompression},
{"kZlibCompression", CompressionType::kZlibCompression},
{"kZSTDNotFinalCompression",
CompressionType::kZSTDNotFinalCompression}};
// Find Compression
CompressionType compression_type = compressions[FLAGS_compression_type];
EnvOptions env_options;
if (CompressionTypeSupported(compression_type)) {
fprintf(stdout, "[%s]\n", FLAGS_compression_type.c_str());
unique_ptr<WritableFile> encoded_out_file;
std::unique_ptr<Env> env(NewMemEnv(Env::Default()));
if (!FLAGS_encoded_file.empty()) {
env->NewWritableFile(FLAGS_encoded_file, &encoded_out_file,
env_options);
}
std::vector<KVPairBlock> kv_pair_blocks;
reader.GetKVPairsFromDataBlocks(&kv_pair_blocks);
std::vector<std::string> encoded_blocks;
StopWatchNano sw(env.get(), true);
if (FLAGS_format == "col") {
reader.EncodeBlocks(kvp_cd, encoded_out_file.get(), compression_type,
kv_pair_blocks, &encoded_blocks, FLAGS_stat);
} else { // row format
reader.EncodeBlocksToRowFormat(encoded_out_file.get(), compression_type,
kv_pair_blocks, &encoded_blocks);
}
if (encoded_out_file != nullptr) {
uint64_t size = 0;
env->GetFileSize(FLAGS_encoded_file, &size);
fprintf(stdout, "File size: %lu\n", size);
}
uint64_t encode_time = sw.ElapsedNanosSafe(false /* reset */);
fprintf(stdout, "Encode time:%lu\n", encode_time);
if (decode) {
unique_ptr<WritableFile> decoded_out_file;
if (!FLAGS_decoded_file.empty()) {
env->NewWritableFile(FLAGS_decoded_file, &decoded_out_file,
env_options);
}
sw.Start();
if (FLAGS_format == "col") {
reader.DecodeBlocks(kvp_cd, decoded_out_file.get(), &encoded_blocks);
} else {
reader.DecodeBlocksFromRowFormat(decoded_out_file.get(),
&encoded_blocks);
}
uint64_t decode_time = sw.ElapsedNanosSafe(true /* reset */);
fprintf(stdout, "Decode time:%lu\n", decode_time);
}
} else {
fprintf(stdout, "Unsupported compression type: %s.\n",
FLAGS_compression_type.c_str());
}
delete key_col_declarations;
delete value_col_declarations;
delete value_checksum_declaration;
}
};
} // namespace rocksdb
int main(int argc, char** argv) {
int arg_idx = ParseCommandLineFlags(&argc, &argv, true);
if (arg_idx >= argc) {
fprintf(stdout, "SST filename required.\n");
exit(1);
}
std::string sst_file(argv[arg_idx]);
if (FLAGS_format != "row" && FLAGS_format != "col") {
fprintf(stderr, "Format must be 'row' or 'col'\n");
exit(1);
}
if (FLAGS_index_type != "primary" && FLAGS_index_type != "secondary") {
fprintf(stderr, "Format must be 'primary' or 'secondary'\n");
exit(1);
}
rocksdb::ColumnAwareEncodingExp::Run(sst_file);
return 0;
}
#else
int main() {
fprintf(stderr, "Please install gflags to run rocksdb tools\n");
return 1;
}
#endif // GFLAGS
#else
int main(int argc, char** argv) {
fprintf(stderr, "Not supported in lite mode.\n");
return 1;
}
#endif // ROCKSDB_LITE

@ -0,0 +1,193 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
#ifndef ROCKSDB_LITE
#include <vector>
#include "util/testharness.h"
#include "util/testutil.h"
#include "utilities/col_buf_decoder.h"
#include "utilities/col_buf_encoder.h"
namespace rocksdb {
class ColumnAwareEncodingTest : public testing::Test {
public:
ColumnAwareEncodingTest() {}
~ColumnAwareEncodingTest() {}
};
TEST_F(ColumnAwareEncodingTest, NoCompressionEncodeDecode) {
std::unique_ptr<ColBufEncoder> col_buf_encoder(
new FixedLengthColBufEncoder(8, kColNoCompression, false, true));
std::string str_buf;
uint64_t val = 0x0102030405060708;
for (int i = 0; i < 4; ++i) {
str_buf.append(reinterpret_cast<char*>(&val), 8);
}
const char* str_buf_ptr = str_buf.c_str();
for (int i = 0; i < 4; ++i) {
col_buf_encoder->Append(str_buf_ptr);
}
col_buf_encoder->Finish();
const std::string& encoded_data = col_buf_encoder->GetData();
ASSERT_EQ(encoded_data.size(), 32);
const char* encoded_data_ptr = encoded_data.c_str();
uint64_t encoded_val;
for (int i = 0; i < 4; ++i) {
memcpy(&encoded_val, encoded_data_ptr, 8);
ASSERT_EQ(encoded_val, 0x0807060504030201);
encoded_data_ptr += 8;
}
std::unique_ptr<ColBufDecoder> col_buf_decoder(
new FixedLengthColBufDecoder(8, kColNoCompression, false, true));
encoded_data_ptr = encoded_data.c_str();
encoded_data_ptr += col_buf_decoder->Init(encoded_data_ptr);
char* decoded_data = new char[100];
char* decoded_data_base = decoded_data;
for (int i = 0; i < 4; ++i) {
encoded_data_ptr +=
col_buf_decoder->Decode(encoded_data_ptr, &decoded_data);
}
ASSERT_EQ(4 * 8, decoded_data - decoded_data_base);
decoded_data = decoded_data_base;
for (int i = 0; i < 4; ++i) {
uint64_t decoded_val;
decoded_val = 0;
memcpy(&decoded_val, decoded_data, 8);
ASSERT_EQ(decoded_val, val);
decoded_data += 8;
}
delete[] decoded_data_base;
}
TEST_F(ColumnAwareEncodingTest, RleEncodeDecode) {
std::unique_ptr<ColBufEncoder> col_buf_encoder(
new FixedLengthColBufEncoder(8, kColRle, false, true));
std::string str_buf;
uint64_t val = 0x0102030405060708;
for (int i = 0; i < 4; ++i) {
str_buf.append(reinterpret_cast<char*>(&val), 8);
}
const char* str_buf_ptr = str_buf.c_str();
for (int i = 0; i < 4; ++i) {
str_buf_ptr += col_buf_encoder->Append(str_buf_ptr);
}
col_buf_encoder->Finish();
const std::string& encoded_data = col_buf_encoder->GetData();
ASSERT_EQ(encoded_data.size(), 9);
const char* encoded_data_ptr = encoded_data.c_str();
uint64_t encoded_val;
memcpy(&encoded_val, encoded_data_ptr, 8);
ASSERT_EQ(encoded_val, 0x0807060504030201);
std::unique_ptr<ColBufDecoder> col_buf_decoder(
new FixedLengthColBufDecoder(8, kColRle, false, true));
char* decoded_data = new char[100];
char* decoded_data_base = decoded_data;
encoded_data_ptr += col_buf_decoder->Init(encoded_data_ptr);
for (int i = 0; i < 4; ++i) {
encoded_data_ptr +=
col_buf_decoder->Decode(encoded_data_ptr, &decoded_data);
}
ASSERT_EQ(4 * 8, decoded_data - decoded_data_base);
decoded_data = decoded_data_base;
for (int i = 0; i < 4; ++i) {
uint64_t decoded_val;
decoded_val = 0;
memcpy(&decoded_val, decoded_data, 8);
ASSERT_EQ(decoded_val, val);
decoded_data += 8;
}
delete[] decoded_data_base;
}
TEST_F(ColumnAwareEncodingTest, DeltaEncodeDecode) {
std::unique_ptr<ColBufEncoder> col_buf_encoder(
new FixedLengthColBufEncoder(8, kColDeltaVarint, false, true));
std::string str_buf;
uint64_t val = 0x0102030405060708;
uint64_t val2 = 0x0202030405060708;
const char* str_buf_ptr;
for (int i = 0; i < 2; ++i) {
str_buf = std::string(reinterpret_cast<char*>(&val), 8);
str_buf_ptr = str_buf.c_str();
col_buf_encoder->Append(str_buf_ptr);
str_buf = std::string(reinterpret_cast<char*>(&val2), 8);
str_buf_ptr = str_buf.c_str();
col_buf_encoder->Append(str_buf_ptr);
}
col_buf_encoder->Finish();
const std::string& encoded_data = col_buf_encoder->GetData();
ASSERT_EQ(encoded_data.size(), 9 + 3);
std::unique_ptr<ColBufDecoder> col_buf_decoder(
new FixedLengthColBufDecoder(8, kColDeltaVarint, false, true));
char* decoded_data = new char[100];
char* decoded_data_base = decoded_data;
const char* encoded_data_ptr = encoded_data.c_str();
encoded_data_ptr += col_buf_decoder->Init(encoded_data_ptr);
for (int i = 0; i < 4; ++i) {
encoded_data_ptr +=
col_buf_decoder->Decode(encoded_data_ptr, &decoded_data);
}
ASSERT_EQ(4 * 8, decoded_data - decoded_data_base);
decoded_data = decoded_data_base;
for (int i = 0; i < 2; ++i) {
uint64_t decoded_val;
memcpy(&decoded_val, decoded_data, 8);
ASSERT_EQ(decoded_val, val);
decoded_data += 8;
memcpy(&decoded_val, decoded_data, 8);
ASSERT_EQ(decoded_val, val2);
decoded_data += 8;
}
delete[] decoded_data_base;
}
TEST_F(ColumnAwareEncodingTest, ChunkBufEncodeDecode) {
std::unique_ptr<ColBufEncoder> col_buf_encoder(
new VariableChunkColBufEncoder(kColDict));
std::string buf("12345678\377\1\0\0\0\0\0\0\0\376", 18);
col_buf_encoder->Append(buf.c_str());
col_buf_encoder->Finish();
const std::string& encoded_data = col_buf_encoder->GetData();
const char* str_ptr = encoded_data.c_str();
std::unique_ptr<ColBufDecoder> col_buf_decoder(
new VariableChunkColBufDecoder(kColDict));
str_ptr += col_buf_decoder->Init(str_ptr);
char* decoded_data = new char[100];
char* decoded_data_base = decoded_data;
col_buf_decoder->Decode(str_ptr, &decoded_data);
for (size_t i = 0; i < buf.size(); ++i) {
ASSERT_EQ(buf[i], decoded_data_base[i]);
}
delete[] decoded_data_base;
}
} // namespace rocksdb
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#else
#include <cstdio>
int main() {
fprintf(stderr,
"SKIPPED as column aware encoding experiment is not enabled in "
"ROCKSDB_LITE\n");
}
#endif // ROCKSDB_LITE

@ -0,0 +1,490 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
//
#ifndef ROCKSDB_LITE
#include "utilities/column_aware_encoding_util.h"
#ifndef __STDC_FORMAT_MACROS
#define __STDC_FORMAT_MACROS
#endif
#include <stddef.h>
#include <stdint.h>
#include <stdio.h>
#include <algorithm>
#include <utility>
#include <vector>
#include "include/rocksdb/comparator.h"
#include "include/rocksdb/slice.h"
#include "rocksdb/env.h"
#include "rocksdb/status.h"
#include "table/block_based_table_builder.h"
#include "table/block_based_table_factory.h"
#include "table/format.h"
#include "table/table_reader.h"
#include "util/coding.h"
#include "utilities/col_buf_decoder.h"
#include "utilities/col_buf_encoder.h"
namespace rocksdb {
ColumnAwareEncodingReader::ColumnAwareEncodingReader(
const std::string& file_path)
: file_name_(file_path),
ioptions_(options_),
internal_comparator_(BytewiseComparator()) {
InitTableReader(file_name_);
}
void ColumnAwareEncodingReader::InitTableReader(const std::string& file_path) {
std::unique_ptr<RandomAccessFile> file;
uint64_t file_size;
options_.env->NewRandomAccessFile(file_path, &file, soptions_);
options_.env->GetFileSize(file_path, &file_size);
file_.reset(new RandomAccessFileReader(std::move(file)));
options_.comparator = &internal_comparator_;
options_.table_factory = std::make_shared<BlockBasedTableFactory>();
shared_ptr<BlockBasedTableFactory> block_table_factory =
std::dynamic_pointer_cast<BlockBasedTableFactory>(options_.table_factory);
std::unique_ptr<TableReader> table_reader;
block_table_factory->NewTableReader(
TableReaderOptions(ioptions_, soptions_, internal_comparator_,
/*skip_filters=*/false),
std::move(file_), file_size, &table_reader, /*enable_prefetch=*/false);
table_reader_.reset(dynamic_cast<BlockBasedTable*>(table_reader.release()));
}
void ColumnAwareEncodingReader::GetKVPairsFromDataBlocks(
std::vector<KVPairBlock>* kv_pair_blocks) {
table_reader_->GetKVPairsFromDataBlocks(kv_pair_blocks);
}
void ColumnAwareEncodingReader::DecodeBlocks(
const KVPairColDeclarations& kvp_col_declarations, WritableFile* out_file,
const std::vector<std::string>* blocks) {
char* decoded_content_base = new char[16384];
Options options;
ImmutableCFOptions ioptions(options);
for (auto& block : *blocks) {
KVPairColBufDecoders kvp_col_bufs(kvp_col_declarations);
auto& key_col_bufs = kvp_col_bufs.key_col_bufs;
auto& value_col_bufs = kvp_col_bufs.value_col_bufs;
auto& value_checksum_buf = kvp_col_bufs.value_checksum_buf;
auto& slice_final_with_bit = block;
uint32_t format_version = 2;
Slice compression_dict;
BlockContents contents;
const char* content_ptr;
CompressionType type =
(CompressionType)slice_final_with_bit[slice_final_with_bit.size() - 1];
if (type != kNoCompression) {
UncompressBlockContents(slice_final_with_bit.c_str(),
slice_final_with_bit.size() - 1, &contents,
format_version, compression_dict, ioptions);
content_ptr = contents.data.data();
} else {
content_ptr = slice_final_with_bit.data();
}
size_t num_kv_pairs;
const char* header_content_ptr = content_ptr;
num_kv_pairs = DecodeFixed64(header_content_ptr);
header_content_ptr += sizeof(size_t);
size_t num_key_columns = key_col_bufs.size();
size_t num_value_columns = value_col_bufs.size();
std::vector<const char*> key_content_ptr(num_key_columns);
std::vector<const char*> value_content_ptr(num_value_columns);
const char* checksum_content_ptr;
size_t num_columns = num_key_columns + num_value_columns;
const char* col_content_ptr =
header_content_ptr + sizeof(size_t) * num_columns;
// Read headers
for (size_t i = 0; i < num_key_columns; ++i) {
key_content_ptr[i] = col_content_ptr;
key_content_ptr[i] += key_col_bufs[i]->Init(key_content_ptr[i]);
size_t offset;
offset = DecodeFixed64(header_content_ptr);
header_content_ptr += sizeof(size_t);
col_content_ptr += offset;
}
for (size_t i = 0; i < num_value_columns; ++i) {
value_content_ptr[i] = col_content_ptr;
value_content_ptr[i] += value_col_bufs[i]->Init(value_content_ptr[i]);
size_t offset;
offset = DecodeFixed64(header_content_ptr);
header_content_ptr += sizeof(size_t);
col_content_ptr += offset;
}
checksum_content_ptr = col_content_ptr;
checksum_content_ptr += value_checksum_buf->Init(checksum_content_ptr);
// Decode block
char* decoded_content = decoded_content_base;
for (size_t j = 0; j < num_kv_pairs; ++j) {
for (size_t i = 0; i < num_key_columns; ++i) {
key_content_ptr[i] +=
key_col_bufs[i]->Decode(key_content_ptr[i], &decoded_content);
}
for (size_t i = 0; i < num_value_columns; ++i) {
value_content_ptr[i] +=
value_col_bufs[i]->Decode(value_content_ptr[i], &decoded_content);
}
checksum_content_ptr +=
value_checksum_buf->Decode(checksum_content_ptr, &decoded_content);
}
size_t offset = decoded_content - decoded_content_base;
Slice output_content(decoded_content, offset);
if (out_file != nullptr) {
out_file->Append(output_content);
}
}
delete[] decoded_content_base;
}
void ColumnAwareEncodingReader::DecodeBlocksFromRowFormat(
WritableFile* out_file, const std::vector<std::string>* blocks) {
Options options;
ImmutableCFOptions ioptions(options);
for (auto& block : *blocks) {
auto& slice_final_with_bit = block;
uint32_t format_version = 2;
Slice compression_dict;
BlockContents contents;
std::string decoded_content;
CompressionType type =
(CompressionType)slice_final_with_bit[slice_final_with_bit.size() - 1];
if (type != kNoCompression) {
UncompressBlockContents(slice_final_with_bit.c_str(),
slice_final_with_bit.size() - 1, &contents,
format_version, compression_dict, ioptions);
decoded_content = std::string(contents.data.data(), contents.data.size());
} else {
decoded_content = std::move(slice_final_with_bit);
}
if (out_file != nullptr) {
out_file->Append(decoded_content);
}
}
}
void ColumnAwareEncodingReader::DumpDataColumns(
const std::string& filename,
const KVPairColDeclarations& kvp_col_declarations,
const std::vector<KVPairBlock>& kv_pair_blocks) {
KVPairColBufEncoders kvp_col_bufs(kvp_col_declarations);
auto& key_col_bufs = kvp_col_bufs.key_col_bufs;
auto& value_col_bufs = kvp_col_bufs.value_col_bufs;
auto& value_checksum_buf = kvp_col_bufs.value_checksum_buf;
FILE* fp = fopen(filename.c_str(), "w");
size_t block_id = 1;
for (auto& kv_pairs : kv_pair_blocks) {
fprintf(fp, "---------------- Block: %-4lu ----------------\n", block_id);
for (auto& kv_pair : kv_pairs) {
const auto& key = kv_pair.first;
const auto& value = kv_pair.second;
size_t value_offset = 0;
const char* key_ptr = key.data();
for (auto& buf : key_col_bufs) {
size_t col_size = buf->Append(key_ptr);
std::string tmp_buf(key_ptr, col_size);
Slice col(tmp_buf);
fprintf(fp, "%s ", col.ToString(true).c_str());
key_ptr += col_size;
}
fprintf(fp, "|");
const char* value_ptr = value.data();
for (auto& buf : value_col_bufs) {
size_t col_size = buf->Append(value_ptr);
std::string tmp_buf(value_ptr, col_size);
Slice col(tmp_buf);
fprintf(fp, " %s", col.ToString(true).c_str());
value_ptr += col_size;
value_offset += col_size;
}
if (value_offset < value.size()) {
size_t col_size = value_checksum_buf->Append(value_ptr);
std::string tmp_buf(value_ptr, col_size);
Slice col(tmp_buf);
fprintf(fp, "|%s", col.ToString(true).c_str());
} else {
value_checksum_buf->Append(nullptr);
}
fprintf(fp, "\n");
}
block_id++;
}
fclose(fp);
}
namespace {
void CompressDataBlock(const std::string& output_content, Slice* slice_final,
CompressionType* type, std::string* compressed_output) {
CompressionOptions compression_opts;
uint32_t format_version = 2; // hard-coded version
Slice compression_dict;
*slice_final =
CompressBlock(output_content, compression_opts, type, format_version,
compression_dict, compressed_output);
}
} // namespace
void ColumnAwareEncodingReader::EncodeBlocksToRowFormat(
WritableFile* out_file, CompressionType compression_type,
const std::vector<KVPairBlock>& kv_pair_blocks,
std::vector<std::string>* blocks) {
std::string output_content;
for (auto& kv_pairs : kv_pair_blocks) {
output_content.clear();
std::string last_key;
size_t counter = 0;
const size_t block_restart_interval = 16;
for (auto& kv_pair : kv_pairs) {
const auto& key = kv_pair.first;
const auto& value = kv_pair.second;
Slice last_key_piece(last_key);
size_t shared = 0;
if (counter >= block_restart_interval) {
counter = 0;
} else {
const size_t min_length = std::min(last_key_piece.size(), key.size());
while ((shared < min_length) && last_key_piece[shared] == key[shared]) {
shared++;
}
}
const size_t non_shared = key.size() - shared;
output_content.append(key.c_str() + shared, non_shared);
output_content.append(value);
last_key.resize(shared);
last_key.append(key.data() + shared, non_shared);
counter++;
}
Slice slice_final;
auto type = compression_type;
std::string compressed_output;
CompressDataBlock(output_content, &slice_final, &type, &compressed_output);
if (out_file != nullptr) {
out_file->Append(slice_final);
}
// Add a bit in the end for decoding
std::string slice_final_with_bit(slice_final.data(), slice_final.size());
slice_final_with_bit.append(reinterpret_cast<char*>(&type), 1);
blocks->push_back(
std::string(slice_final_with_bit.data(), slice_final_with_bit.size()));
}
}
Status ColumnAwareEncodingReader::EncodeBlocks(
const KVPairColDeclarations& kvp_col_declarations, WritableFile* out_file,
CompressionType compression_type,
const std::vector<KVPairBlock>& kv_pair_blocks,
std::vector<std::string>* blocks, bool print_column_stat) {
std::vector<size_t> key_col_sizes(
kvp_col_declarations.key_col_declarations->size(), 0);
std::vector<size_t> value_col_sizes(
kvp_col_declarations.value_col_declarations->size(), 0);
size_t value_checksum_size = 0;
for (auto& kv_pairs : kv_pair_blocks) {
KVPairColBufEncoders kvp_col_bufs(kvp_col_declarations);
auto& key_col_bufs = kvp_col_bufs.key_col_bufs;
auto& value_col_bufs = kvp_col_bufs.value_col_bufs;
auto& value_checksum_buf = kvp_col_bufs.value_checksum_buf;
size_t num_kv_pairs = 0;
for (auto& kv_pair : kv_pairs) {
const auto& key = kv_pair.first;
const auto& value = kv_pair.second;
size_t value_offset = 0;
num_kv_pairs++;
const char* key_ptr = key.data();
for (auto& buf : key_col_bufs) {
size_t col_size = buf->Append(key_ptr);
key_ptr += col_size;
}
const char* value_ptr = value.data();
for (auto& buf : value_col_bufs) {
size_t col_size = buf->Append(value_ptr);
value_ptr += col_size;
value_offset += col_size;
}
if (value_offset < value.size()) {
value_checksum_buf->Append(value_ptr);
} else {
value_checksum_buf->Append(nullptr);
}
}
kvp_col_bufs.Finish();
// Get stats
// Compress and write a block
if (print_column_stat) {
for (size_t i = 0; i < key_col_bufs.size(); ++i) {
Slice slice_final;
auto type = compression_type;
std::string compressed_output;
CompressDataBlock(key_col_bufs[i]->GetData(), &slice_final, &type,
&compressed_output);
out_file->Append(slice_final);
key_col_sizes[i] += slice_final.size();
}
for (size_t i = 0; i < value_col_bufs.size(); ++i) {
Slice slice_final;
auto type = compression_type;
std::string compressed_output;
CompressDataBlock(value_col_bufs[i]->GetData(), &slice_final, &type,
&compressed_output);
out_file->Append(slice_final);
value_col_sizes[i] += slice_final.size();
}
Slice slice_final;
auto type = compression_type;
std::string compressed_output;
CompressDataBlock(value_checksum_buf->GetData(), &slice_final, &type,
&compressed_output);
out_file->Append(slice_final);
value_checksum_size += slice_final.size();
} else {
std::string output_content;
// Write column sizes
PutFixed64(&output_content, num_kv_pairs);
for (auto& buf : key_col_bufs) {
size_t size = buf->GetData().size();
PutFixed64(&output_content, size);
}
for (auto& buf : value_col_bufs) {
size_t size = buf->GetData().size();
PutFixed64(&output_content, size);
}
// Write data
for (auto& buf : key_col_bufs) {
output_content.append(buf->GetData());
}
for (auto& buf : value_col_bufs) {
output_content.append(buf->GetData());
}
output_content.append(value_checksum_buf->GetData());
Slice slice_final;
auto type = compression_type;
std::string compressed_output;
CompressDataBlock(output_content, &slice_final, &type,
&compressed_output);
if (out_file != nullptr) {
out_file->Append(slice_final);
}
// Add a bit in the end for decoding
std::string slice_final_with_bit(slice_final.data(),
slice_final.size() + 1);
slice_final_with_bit[slice_final.size()] = static_cast<char>(type);
blocks->push_back(std::string(slice_final_with_bit.data(),
slice_final_with_bit.size()));
}
}
if (print_column_stat) {
size_t total_size = 0;
for (size_t i = 0; i < key_col_sizes.size(); ++i)
total_size += key_col_sizes[i];
for (size_t i = 0; i < value_col_sizes.size(); ++i)
total_size += value_col_sizes[i];
total_size += value_checksum_size;
for (size_t i = 0; i < key_col_sizes.size(); ++i)
printf("Key col %lu size: %lu percentage %lf%%\n", i, key_col_sizes[i],
100.0 * key_col_sizes[i] / total_size);
for (size_t i = 0; i < value_col_sizes.size(); ++i)
printf("Value col %lu size: %lu percentage %lf%%\n", i,
value_col_sizes[i], 100.0 * value_col_sizes[i] / total_size);
printf("Value checksum size: %lu percentage %lf%%\n", value_checksum_size,
100.0 * value_checksum_size / total_size);
}
return Status::OK();
}
void ColumnAwareEncodingReader::GetColDeclarationsPrimary(
std::vector<ColDeclaration>** key_col_declarations,
std::vector<ColDeclaration>** value_col_declarations,
ColDeclaration** value_checksum_declaration) {
*key_col_declarations = new std::vector<ColDeclaration>{
ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 4, false,
true),
ColDeclaration("FixedLength", ColCompressionType::kColRleDeltaVarint, 8,
false, true),
ColDeclaration("FixedLength", ColCompressionType::kColDeltaVarint, 8,
false, true),
ColDeclaration("FixedLength", ColCompressionType::kColDeltaVarint, 8,
false, true),
ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 8)};
*value_col_declarations = new std::vector<ColDeclaration>{
ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 4),
ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 4),
ColDeclaration("FixedLength", ColCompressionType::kColRle, 1),
ColDeclaration("VariableLength"),
ColDeclaration("FixedLength", ColCompressionType::kColDeltaVarint, 4),
ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 8)};
*value_checksum_declaration = new ColDeclaration(
"LongFixedLength", ColCompressionType::kColNoCompression, 9,
true /* nullable */);
}
void ColumnAwareEncodingReader::GetColDeclarationsSecondary(
std::vector<ColDeclaration>** key_col_declarations,
std::vector<ColDeclaration>** value_col_declarations,
ColDeclaration** value_checksum_declaration) {
*key_col_declarations = new std::vector<ColDeclaration>{
ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 4, false,
true),
ColDeclaration("FixedLength", ColCompressionType::kColDeltaVarint, 8,
false, true),
ColDeclaration("FixedLength", ColCompressionType::kColRleDeltaVarint, 8,
false, true),
ColDeclaration("FixedLength", ColCompressionType::kColRle, 1),
ColDeclaration("FixedLength", ColCompressionType::kColDeltaVarint, 4,
false, true),
ColDeclaration("FixedLength", ColCompressionType::kColDeltaVarint, 8,
false, true),
ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 8, false,
true),
ColDeclaration("VariableChunk", ColCompressionType::kColNoCompression),
ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 8)};
*value_col_declarations = new std::vector<ColDeclaration>();
*value_checksum_declaration = new ColDeclaration(
"LongFixedLength", ColCompressionType::kColNoCompression, 9,
true /* nullable */);
}
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -0,0 +1,80 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#ifndef ROCKSDB_LITE
#include <string>
#include <vector>
#include "db/dbformat.h"
#include "include/rocksdb/env.h"
#include "include/rocksdb/immutable_options.h"
#include "include/rocksdb/listener.h"
#include "include/rocksdb/options.h"
#include "include/rocksdb/status.h"
#include "table/block_based_table_reader.h"
namespace rocksdb {
struct ColDeclaration;
struct KVPairColDeclarations;
class ColumnAwareEncodingReader {
public:
explicit ColumnAwareEncodingReader(const std::string& file_name);
void GetKVPairsFromDataBlocks(std::vector<KVPairBlock>* kv_pair_blocks);
void EncodeBlocksToRowFormat(WritableFile* out_file,
CompressionType compression_type,
const std::vector<KVPairBlock>& kv_pair_blocks,
std::vector<std::string>* blocks);
void DecodeBlocksFromRowFormat(WritableFile* out_file,
const std::vector<std::string>* blocks);
void DumpDataColumns(const std::string& filename,
const KVPairColDeclarations& kvp_col_declarations,
const std::vector<KVPairBlock>& kv_pair_blocks);
Status EncodeBlocks(const KVPairColDeclarations& kvp_col_declarations,
WritableFile* out_file, CompressionType compression_type,
const std::vector<KVPairBlock>& kv_pair_blocks,
std::vector<std::string>* blocks, bool print_column_stat);
void DecodeBlocks(const KVPairColDeclarations& kvp_col_declarations,
WritableFile* out_file,
const std::vector<std::string>* blocks);
static void GetColDeclarationsPrimary(
std::vector<ColDeclaration>** key_col_declarations,
std::vector<ColDeclaration>** value_col_declarations,
ColDeclaration** value_checksum_declaration);
static void GetColDeclarationsSecondary(
std::vector<ColDeclaration>** key_col_declarations,
std::vector<ColDeclaration>** value_col_declarations,
ColDeclaration** value_checksum_declaration);
private:
// Init the TableReader for the sst file
void InitTableReader(const std::string& file_path);
std::string file_name_;
EnvOptions soptions_;
Options options_;
Status init_result_;
std::unique_ptr<BlockBasedTable> table_reader_;
std::unique_ptr<RandomAccessFileReader> file_;
const ImmutableCFOptions ioptions_;
InternalKeyComparator internal_comparator_;
std::unique_ptr<TableProperties> table_properties_;
};
} // namespace rocksdb
#endif // ROCKSDB_LITE
Loading…
Cancel
Save