You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
rocksdb/utilities/col_buf_encoder.cc

217 lines
6.0 KiB

// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "utilities/col_buf_encoder.h"
#include <cstring>
#include <string>
#include "port/port.h"
namespace rocksdb {
ColBufEncoder::~ColBufEncoder() {}
namespace {
inline uint64_t DecodeFixed64WithEndian(uint64_t val, bool big_endian,
size_t size) {
if (big_endian && port::kLittleEndian) {
val = EndianTransform(val, size);
} else if (!big_endian && !port::kLittleEndian) {
val = EndianTransform(val, size);
}
return val;
}
} // namespace
const std::string &ColBufEncoder::GetData() { return buffer_; }
ColBufEncoder *ColBufEncoder::NewColBufEncoder(
const ColDeclaration &col_declaration) {
if (col_declaration.col_type == "FixedLength") {
return new FixedLengthColBufEncoder(
col_declaration.size, col_declaration.col_compression_type,
col_declaration.nullable, col_declaration.big_endian);
} else if (col_declaration.col_type == "VariableLength") {
return new VariableLengthColBufEncoder();
} else if (col_declaration.col_type == "VariableChunk") {
return new VariableChunkColBufEncoder(col_declaration.col_compression_type);
} else if (col_declaration.col_type == "LongFixedLength") {
return new LongFixedLengthColBufEncoder(col_declaration.size,
col_declaration.nullable);
}
// Unrecognized column type
return nullptr;
}
#ifdef ROCKSDB_UBSAN_RUN
#if defined(__clang__)
__attribute__((__no_sanitize__("shift")))
#elif defined(__GNUC__)
__attribute__((__no_sanitize_undefined__))
#endif
#endif
size_t FixedLengthColBufEncoder::Append(const char *buf) {
if (nullable_) {
if (buf == nullptr) {
buffer_.append(1, 0);
return 0;
} else {
buffer_.append(1, 1);
}
}
uint64_t read_val = 0;
memcpy(&read_val, buf, size_);
read_val = DecodeFixed64WithEndian(read_val, big_endian_, size_);
// Determine write value
uint64_t write_val = read_val;
if (col_compression_type_ == kColDeltaVarint ||
col_compression_type_ == kColRleDeltaVarint) {
int64_t delta = read_val - last_val_;
// Encode signed delta value
delta = (delta << 1) ^ (delta >> 63);
write_val = delta;
last_val_ = read_val;
} else if (col_compression_type_ == kColDict ||
col_compression_type_ == kColRleDict) {
auto iter = dictionary_.find(read_val);
uint64_t dict_val;
if (iter == dictionary_.end()) {
// Add new entry to dictionary
dict_val = dictionary_.size();
dictionary_.insert(std::make_pair(read_val, dict_val));
dict_vec_.push_back(read_val);
} else {
dict_val = iter->second;
}
write_val = dict_val;
}
// Write into buffer
if (IsRunLength(col_compression_type_)) {
if (run_length_ == -1) {
// First element
run_val_ = write_val;
run_length_ = 1;
} else if (write_val != run_val_) {
// End of run
// Write run value
if (col_compression_type_ == kColRle) {
buffer_.append(reinterpret_cast<char *>(&run_val_), size_);
} else {
PutVarint64(&buffer_, run_val_);
}
// Write run length
PutVarint64(&buffer_, run_length_);
run_val_ = write_val;
run_length_ = 1;
} else {
run_length_++;
}
} else { // non run-length encodings
if (col_compression_type_ == kColNoCompression) {
buffer_.append(reinterpret_cast<char *>(&write_val), size_);
} else {
PutVarint64(&buffer_, write_val);
}
}
return size_;
}
void FixedLengthColBufEncoder::Finish() {
if (col_compression_type_ == kColDict ||
col_compression_type_ == kColRleDict) {
std::string header;
PutVarint64(&header, dict_vec_.size());
// Put dictionary in the header
for (auto item : dict_vec_) {
PutVarint64(&header, item);
}
buffer_ = header + buffer_;
}
if (IsRunLength(col_compression_type_)) {
// Finish last run value
if (col_compression_type_ == kColRle) {
buffer_.append(reinterpret_cast<char *>(&run_val_), size_);
} else {
PutVarint64(&buffer_, run_val_);
}
PutVarint64(&buffer_, run_length_);
}
}
size_t LongFixedLengthColBufEncoder::Append(const char *buf) {
if (nullable_) {
if (buf == nullptr) {
buffer_.append(1, 0);
return 0;
} else {
buffer_.append(1, 1);
}
}
buffer_.append(buf, size_);
return size_;
}
void LongFixedLengthColBufEncoder::Finish() {}
size_t VariableLengthColBufEncoder::Append(const char *buf) {
uint8_t length = 0;
length = *buf;
buffer_.append(buf, 1);
buf += 1;
buffer_.append(buf, length);
return length + 1;
}
void VariableLengthColBufEncoder::Finish() {}
size_t VariableChunkColBufEncoder::Append(const char *buf) {
const char *orig_buf = buf;
uint8_t mark = 0xFF;
size_t length = 0;
std::string tmp_buffer;
while (mark == 0xFF) {
uint64_t val;
memcpy(&val, buf, 8);
buf += 8;
mark = *buf;
buf += 1;
int8_t chunk_size = 8 - (0xFF - mark);
if (col_compression_type_ == kColDict) {
auto iter = dictionary_.find(val);
uint64_t dict_val;
if (iter == dictionary_.end()) {
dict_val = dictionary_.size();
dictionary_.insert(std::make_pair(val, dict_val));
dict_vec_.push_back(val);
} else {
dict_val = iter->second;
}
PutVarint64(&tmp_buffer, dict_val);
} else {
tmp_buffer.append(reinterpret_cast<char *>(&val), chunk_size);
}
length += chunk_size;
}
PutVarint64(&buffer_, length);
buffer_.append(tmp_buffer);
return buf - orig_buf;
}
void VariableChunkColBufEncoder::Finish() {
if (col_compression_type_ == kColDict) {
std::string header;
PutVarint64(&header, dict_vec_.size());
for (auto item : dict_vec_) {
PutVarint64(&header, item);
}
buffer_ = header + buffer_;
}
}
} // namespace rocksdb