Add wide column serialization primitives (#9915)

Summary:
The patch adds some low-level logic that can be used to serialize/deserialize
a sorted vector of wide columns to/from a simple binary searchable string
representation. Currently, there is no user-facing API; this will be implemented in
subsequent stages.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9915

Test Plan: `make check`

Reviewed By: siying

Differential Revision: D35978076

Pulled By: ltamasi

fbshipit-source-id: 33f5f6628ec3bcd8c8beab363b1978ac047a8788
main
Levi Tamasi 2 years ago committed by Facebook GitHub Bot
parent 3e02c6e05a
commit e9c74bc474
  1. 2
      CMakeLists.txt
  2. 4
      Makefile
  3. 8
      TARGETS
  4. 141
      db/wide/wide_column_serialization.cc
  5. 55
      db/wide/wide_column_serialization.h
  6. 292
      db/wide/wide_column_serialization_test.cc
  7. 74
      include/rocksdb/wide_columns.h
  8. 2
      src.mk
  9. 12
      util/autovector.h

@ -675,6 +675,7 @@ set(SOURCES
db/version_set.cc db/version_set.cc
db/wal_edit.cc db/wal_edit.cc
db/wal_manager.cc db/wal_manager.cc
db/wide/wide_column_serialization.cc
db/write_batch.cc db/write_batch.cc
db/write_batch_base.cc db/write_batch_base.cc
db/write_controller.cc db/write_controller.cc
@ -1294,6 +1295,7 @@ if(WITH_TESTS)
db/version_set_test.cc db/version_set_test.cc
db/wal_manager_test.cc db/wal_manager_test.cc
db/wal_edit_test.cc db/wal_edit_test.cc
db/wide/wide_column_serialization_test.cc
db/write_batch_test.cc db/write_batch_test.cc
db/write_callback_test.cc db/write_callback_test.cc
db/write_controller_test.cc db/write_controller_test.cc

@ -1902,6 +1902,10 @@ db_basic_bench: $(OBJ_DIR)/microbench/db_basic_bench.o $(LIBRARY)
cache_reservation_manager_test: $(OBJ_DIR)/cache/cache_reservation_manager_test.o $(TEST_LIBRARY) $(LIBRARY) cache_reservation_manager_test: $(OBJ_DIR)/cache/cache_reservation_manager_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK) $(AM_LINK)
wide_column_serialization_test: $(OBJ_DIR)/db/wide/wide_column_serialization_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
#------------------------------------------------- #-------------------------------------------------
# make install related stuff # make install related stuff
PREFIX ?= /usr/local PREFIX ?= /usr/local

@ -91,6 +91,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[
"db/version_set.cc", "db/version_set.cc",
"db/wal_edit.cc", "db/wal_edit.cc",
"db/wal_manager.cc", "db/wal_manager.cc",
"db/wide/wide_column_serialization.cc",
"db/write_batch.cc", "db/write_batch.cc",
"db/write_batch_base.cc", "db/write_batch_base.cc",
"db/write_controller.cc", "db/write_controller.cc",
@ -419,6 +420,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[
"db/version_set.cc", "db/version_set.cc",
"db/wal_edit.cc", "db/wal_edit.cc",
"db/wal_manager.cc", "db/wal_manager.cc",
"db/wide/wide_column_serialization.cc",
"db/write_batch.cc", "db/write_batch.cc",
"db/write_batch_base.cc", "db/write_batch_base.cc",
"db/write_controller.cc", "db/write_controller.cc",
@ -5814,6 +5816,12 @@ cpp_unittest_wrapper(name="wal_manager_test",
extra_compiler_flags=[]) extra_compiler_flags=[])
cpp_unittest_wrapper(name="wide_column_serialization_test",
srcs=["db/wide/wide_column_serialization_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])
cpp_unittest_wrapper(name="work_queue_test", cpp_unittest_wrapper(name="work_queue_test",
srcs=["util/work_queue_test.cc"], srcs=["util/work_queue_test.cc"],
deps=[":rocksdb_test_lib"], deps=[":rocksdb_test_lib"],

@ -0,0 +1,141 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/wide/wide_column_serialization.h"
#include <algorithm>
#include <cassert>
#include <limits>
#include "rocksdb/slice.h"
#include "util/autovector.h"
#include "util/coding.h"
namespace ROCKSDB_NAMESPACE {
Status WideColumnSerialization::Serialize(const WideColumns& columns,
std::string& output) {
// Column names should be strictly ascending
assert(std::adjacent_find(columns.cbegin(), columns.cend(),
[](const WideColumn& lhs, const WideColumn& rhs) {
return lhs.name().compare(rhs.name()) > 0;
}) == columns.cend());
if (columns.size() >
static_cast<size_t>(std::numeric_limits<uint32_t>::max())) {
return Status::InvalidArgument("Too many wide columns");
}
PutVarint32(&output, kCurrentVersion);
PutVarint32(&output, static_cast<uint32_t>(columns.size()));
for (const auto& column : columns) {
const Slice& name = column.name();
if (name.size() >
static_cast<size_t>(std::numeric_limits<uint32_t>::max())) {
return Status::InvalidArgument("Wide column name too long");
}
const Slice& value = column.value();
if (value.size() >
static_cast<size_t>(std::numeric_limits<uint32_t>::max())) {
return Status::InvalidArgument("Wide column value too long");
}
PutLengthPrefixedSlice(&output, name);
PutVarint32(&output, static_cast<uint32_t>(value.size()));
}
for (const auto& column : columns) {
const Slice& value = column.value();
output.append(value.data(), value.size());
}
return Status::OK();
}
Status WideColumnSerialization::Deserialize(Slice& input,
WideColumns& columns) {
assert(columns.empty());
uint32_t version = 0;
if (!GetVarint32(&input, &version)) {
return Status::Corruption("Error decoding wide column version");
}
if (version > kCurrentVersion) {
return Status::NotSupported("Unsupported wide column version");
}
uint32_t num_columns = 0;
if (!GetVarint32(&input, &num_columns)) {
return Status::Corruption("Error decoding number of wide columns");
}
if (!num_columns) {
return Status::OK();
}
columns.reserve(num_columns);
autovector<uint32_t, 16> column_value_sizes;
column_value_sizes.reserve(num_columns);
for (uint32_t i = 0; i < num_columns; ++i) {
Slice name;
if (!GetLengthPrefixedSlice(&input, &name)) {
return Status::Corruption("Error decoding wide column name");
}
if (!columns.empty() && columns.back().name().compare(name) >= 0) {
return Status::Corruption("Wide columns out of order");
}
columns.emplace_back(name, Slice());
uint32_t value_size = 0;
if (!GetVarint32(&input, &value_size)) {
return Status::Corruption("Error decoding wide column value size");
}
column_value_sizes.emplace_back(value_size);
}
const Slice data(input);
size_t pos = 0;
for (uint32_t i = 0; i < num_columns; ++i) {
const uint32_t value_size = column_value_sizes[i];
if (pos + value_size > data.size()) {
return Status::Corruption("Error decoding wide column value payload");
}
columns[i].value() = Slice(data.data() + pos, value_size);
pos += value_size;
}
return Status::OK();
}
WideColumns::const_iterator WideColumnSerialization::Find(
const WideColumns& columns, const Slice& column_name) {
const auto it =
std::lower_bound(columns.cbegin(), columns.cend(), column_name,
[](const WideColumn& lhs, const Slice& rhs) {
return lhs.name().compare(rhs) < 0;
});
if (it == columns.cend() || it->name() != column_name) {
return columns.cend();
}
return it;
}
} // namespace ROCKSDB_NAMESPACE

@ -0,0 +1,55 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <cstdint>
#include <string>
#include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/status.h"
#include "rocksdb/wide_columns.h"
namespace ROCKSDB_NAMESPACE {
class Slice;
// Wide-column serialization/deserialization primitives.
//
// The two main parts of the layout are 1) a sorted index containing the column
// names and column value sizes and 2) the column values themselves. Keeping the
// index and the values separate will enable selectively reading column values
// down the line. Note that currently the index has to be fully parsed in order
// to find out the offset of each column value.
//
// Legend: cn = column name, cv = column value, cns = column name size, cvs =
// column value size.
//
// +----------+--------------+----------+-------+----------+---...
// | version | # of columns | cns 1 | cn 1 | cvs 1 |
// +----------+--------------+------------------+--------- +---...
// | varint32 | varint32 | varint32 | bytes | varint32 |
// +----------+--------------+----------+-------+----------+---...
//
// ... continued ...
//
// ...---+----------+-------+----------+-------+---...---+-------+
// | cns N | cn N | cvs N | cv 1 | | cv N |
// ...---+----------+-------+----------+-------+---...---+-------+
// | varint32 | bytes | varint32 | bytes | | bytes |
// ...---+----------+-------+----------+-------+---...---+-------+
class WideColumnSerialization {
public:
static Status Serialize(const WideColumns& columns, std::string& output);
static Status Deserialize(Slice& input, WideColumns& columns);
static WideColumns::const_iterator Find(const WideColumns& columns,
const Slice& column_name);
static constexpr uint32_t kCurrentVersion = 1;
};
} // namespace ROCKSDB_NAMESPACE

@ -0,0 +1,292 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "db/wide/wide_column_serialization.h"
#include "test_util/testharness.h"
#include "util/coding.h"
namespace ROCKSDB_NAMESPACE {
TEST(WideColumnSerializationTest, Construct) {
constexpr char foo[] = "foo";
constexpr char bar[] = "bar";
const std::string foo_str(foo);
const std::string bar_str(bar);
const Slice foo_slice(foo_str);
const Slice bar_slice(bar_str);
{
WideColumn column(foo, bar);
ASSERT_EQ(column.name(), foo);
ASSERT_EQ(column.value(), bar);
}
{
WideColumn column(foo_str, bar);
ASSERT_EQ(column.name(), foo_str);
ASSERT_EQ(column.value(), bar);
}
{
WideColumn column(foo_slice, bar);
ASSERT_EQ(column.name(), foo_slice);
ASSERT_EQ(column.value(), bar);
}
{
WideColumn column(foo, bar_str);
ASSERT_EQ(column.name(), foo);
ASSERT_EQ(column.value(), bar_str);
}
{
WideColumn column(foo_str, bar_str);
ASSERT_EQ(column.name(), foo_str);
ASSERT_EQ(column.value(), bar_str);
}
{
WideColumn column(foo_slice, bar_str);
ASSERT_EQ(column.name(), foo_slice);
ASSERT_EQ(column.value(), bar_str);
}
{
WideColumn column(foo, bar_slice);
ASSERT_EQ(column.name(), foo);
ASSERT_EQ(column.value(), bar_slice);
}
{
WideColumn column(foo_str, bar_slice);
ASSERT_EQ(column.name(), foo_str);
ASSERT_EQ(column.value(), bar_slice);
}
{
WideColumn column(foo_slice, bar_slice);
ASSERT_EQ(column.name(), foo_slice);
ASSERT_EQ(column.value(), bar_slice);
}
{
constexpr char foo_name[] = "foo_name";
constexpr char bar_value[] = "bar_value";
WideColumn column(std::piecewise_construct,
std::forward_as_tuple(foo_name, sizeof(foo) - 1),
std::forward_as_tuple(bar_value, sizeof(bar) - 1));
ASSERT_EQ(column.name(), foo);
ASSERT_EQ(column.value(), bar);
}
}
TEST(WideColumnSerializationTest, SerializeDeserialize) {
WideColumns columns{{"foo", "bar"}, {"hello", "world"}};
std::string output;
ASSERT_OK(WideColumnSerialization::Serialize(columns, output));
Slice input(output);
WideColumns deserialized_columns;
ASSERT_OK(WideColumnSerialization::Deserialize(input, deserialized_columns));
ASSERT_EQ(columns, deserialized_columns);
{
const auto it = WideColumnSerialization::Find(deserialized_columns, "foo");
ASSERT_NE(it, deserialized_columns.cend());
ASSERT_EQ(*it, deserialized_columns.front());
}
{
const auto it =
WideColumnSerialization::Find(deserialized_columns, "hello");
ASSERT_NE(it, deserialized_columns.cend());
ASSERT_EQ(*it, deserialized_columns.back());
}
{
const auto it =
WideColumnSerialization::Find(deserialized_columns, "fubar");
ASSERT_EQ(it, deserialized_columns.cend());
}
{
const auto it =
WideColumnSerialization::Find(deserialized_columns, "snafu");
ASSERT_EQ(it, deserialized_columns.cend());
}
}
TEST(WideColumnSerializationTest, DeserializeVersionError) {
// Can't decode version
std::string buf;
Slice input(buf);
WideColumns columns;
const Status s = WideColumnSerialization::Deserialize(input, columns);
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(std::strstr(s.getState(), "version"));
}
TEST(WideColumnSerializationTest, DeserializeUnsupportedVersion) {
// Unsupported version
constexpr uint32_t future_version = 1000;
std::string buf;
PutVarint32(&buf, future_version);
Slice input(buf);
WideColumns columns;
const Status s = WideColumnSerialization::Deserialize(input, columns);
ASSERT_TRUE(s.IsNotSupported());
ASSERT_TRUE(std::strstr(s.getState(), "version"));
}
TEST(WideColumnSerializationTest, DeserializeNumberOfColumnsError) {
// Can't decode number of columns
std::string buf;
PutVarint32(&buf, WideColumnSerialization::kCurrentVersion);
Slice input(buf);
WideColumns columns;
const Status s = WideColumnSerialization::Deserialize(input, columns);
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(std::strstr(s.getState(), "number"));
}
TEST(WideColumnSerializationTest, DeserializeColumnsError) {
std::string buf;
PutVarint32(&buf, WideColumnSerialization::kCurrentVersion);
constexpr uint32_t num_columns = 2;
PutVarint32(&buf, num_columns);
// Can't decode the first column name
{
Slice input(buf);
WideColumns columns;
const Status s = WideColumnSerialization::Deserialize(input, columns);
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(std::strstr(s.getState(), "name"));
}
constexpr char first_column_name[] = "foo";
PutLengthPrefixedSlice(&buf, first_column_name);
// Can't decode the size of the first column value
{
Slice input(buf);
WideColumns columns;
const Status s = WideColumnSerialization::Deserialize(input, columns);
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(std::strstr(s.getState(), "value size"));
}
constexpr uint32_t first_value_size = 16;
PutVarint32(&buf, first_value_size);
// Can't decode the second column name
{
Slice input(buf);
WideColumns columns;
const Status s = WideColumnSerialization::Deserialize(input, columns);
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(std::strstr(s.getState(), "name"));
}
constexpr char second_column_name[] = "hello";
PutLengthPrefixedSlice(&buf, second_column_name);
// Can't decode the size of the second column value
{
Slice input(buf);
WideColumns columns;
const Status s = WideColumnSerialization::Deserialize(input, columns);
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(std::strstr(s.getState(), "value size"));
}
constexpr uint32_t second_value_size = 64;
PutVarint32(&buf, second_value_size);
// Can't decode the payload of the first column
{
Slice input(buf);
WideColumns columns;
const Status s = WideColumnSerialization::Deserialize(input, columns);
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(std::strstr(s.getState(), "payload"));
}
buf.append(first_value_size, '0');
// Can't decode the payload of the second column
{
Slice input(buf);
WideColumns columns;
const Status s = WideColumnSerialization::Deserialize(input, columns);
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(std::strstr(s.getState(), "payload"));
}
buf.append(second_value_size, 'x');
// Success
{
Slice input(buf);
WideColumns columns;
ASSERT_OK(WideColumnSerialization::Deserialize(input, columns));
}
}
TEST(WideColumnSerializationTest, DeserializeColumnsOutOfOrder) {
std::string buf;
PutVarint32(&buf, WideColumnSerialization::kCurrentVersion);
constexpr uint32_t num_columns = 2;
PutVarint32(&buf, num_columns);
constexpr char first_column_name[] = "b";
PutLengthPrefixedSlice(&buf, first_column_name);
constexpr uint32_t first_value_size = 16;
PutVarint32(&buf, first_value_size);
constexpr char second_column_name[] = "a";
PutLengthPrefixedSlice(&buf, second_column_name);
Slice input(buf);
WideColumns columns;
const Status s = WideColumnSerialization::Deserialize(input, columns);
ASSERT_TRUE(s.IsCorruption());
ASSERT_TRUE(std::strstr(s.getState(), "order"));
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -0,0 +1,74 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <tuple>
#include <utility>
#include <vector>
#include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/slice.h"
namespace ROCKSDB_NAMESPACE {
// Class representing a wide column, which is defined as a pair of column name
// and column value.
class WideColumn {
public:
WideColumn() = default;
// Initializes a WideColumn object by forwarding the name and value
// arguments to the corresponding member Slices. This makes it possible to
// construct a WideColumn using combinations of const char*, const
// std::string&, const Slice& etc., for example:
//
// constexpr char foo[] = "foo";
// const std::string bar("bar");
// WideColumn column(foo, bar);
template <typename N, typename V>
WideColumn(N&& name, V&& value)
: name_(std::forward<N>(name)), value_(std::forward<V>(value)) {}
// Initializes a WideColumn object by forwarding the elements of
// name_tuple and value_tuple to the constructors of the corresponding member
// Slices. This makes it possible to initialize the Slices using the Slice
// constructors that take more than one argument, for example:
//
// constexpr char foo_name[] = "foo_name";
// constexpr char bar_value[] = "bar_value";
// WideColumn column(std::piecewise_construct,
// std::forward_as_tuple(foo_name, 3),
// std::forward_as_tuple(bar_value, 3));
template <typename NTuple, typename VTuple>
WideColumn(std::piecewise_construct_t, NTuple&& name_tuple,
VTuple&& value_tuple)
: name_(std::make_from_tuple<Slice>(std::forward<NTuple>(name_tuple))),
value_(std::make_from_tuple<Slice>(std::forward<VTuple>(value_tuple))) {
}
const Slice& name() const { return name_; }
const Slice& value() const { return value_; }
Slice& name() { return name_; }
Slice& value() { return value_; }
private:
Slice name_;
Slice value_;
};
// Note: column names and values are compared bytewise.
inline bool operator==(const WideColumn& lhs, const WideColumn& rhs) {
return lhs.name() == rhs.name() && lhs.value() == rhs.value();
}
inline bool operator!=(const WideColumn& lhs, const WideColumn& rhs) {
return !(lhs == rhs);
}
using WideColumns = std::vector<WideColumn>;
} // namespace ROCKSDB_NAMESPACE

@ -82,6 +82,7 @@ LIB_SOURCES = \
db/version_set.cc \ db/version_set.cc \
db/wal_edit.cc \ db/wal_edit.cc \
db/wal_manager.cc \ db/wal_manager.cc \
db/wide/wide_column_serialization.cc \
db/write_batch.cc \ db/write_batch.cc \
db/write_batch_base.cc \ db/write_batch_base.cc \
db/write_controller.cc \ db/write_controller.cc \
@ -497,6 +498,7 @@ TEST_MAIN_SOURCES = \
db/version_edit_test.cc \ db/version_edit_test.cc \
db/version_set_test.cc \ db/version_set_test.cc \
db/wal_manager_test.cc \ db/wal_manager_test.cc \
db/wide/wide_column_serialization_test.cc \
db/write_batch_test.cc \ db/write_batch_test.cc \
db/write_callback_test.cc \ db/write_callback_test.cc \
db/write_controller_test.cc \ db/write_controller_test.cc \

@ -36,7 +36,7 @@ class autovector : public std::vector<T> {
// full-fledged generic container. // full-fledged generic container.
// //
// Currently we don't support: // Currently we don't support:
// * reserve()/shrink_to_fit() // * shrink_to_fit()
// If used correctly, in most cases, people should not touch the // If used correctly, in most cases, people should not touch the
// underlying vector at all. // underlying vector at all.
// * random insert()/erase(), please only use push_back()/pop_back(). // * random insert()/erase(), please only use push_back()/pop_back().
@ -223,6 +223,16 @@ class autovector {
bool empty() const { return size() == 0; } bool empty() const { return size() == 0; }
size_type capacity() const { return kSize + vect_.capacity(); }
void reserve(size_t cap) {
if (cap > kSize) {
vect_.reserve(cap - kSize);
}
assert(cap <= capacity());
}
const_reference operator[](size_type n) const { const_reference operator[](size_type n) const {
assert(n < size()); assert(n < size());
if (n < kSize) { if (n < kSize) {

Loading…
Cancel
Save