diff --git a/CMakeLists.txt b/CMakeLists.txt index 10f8b1ede..7da02ca79 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -675,6 +675,7 @@ set(SOURCES db/version_set.cc db/wal_edit.cc db/wal_manager.cc + db/wide/wide_column_serialization.cc db/write_batch.cc db/write_batch_base.cc db/write_controller.cc @@ -1294,6 +1295,7 @@ if(WITH_TESTS) db/version_set_test.cc db/wal_manager_test.cc db/wal_edit_test.cc + db/wide/wide_column_serialization_test.cc db/write_batch_test.cc db/write_callback_test.cc db/write_controller_test.cc diff --git a/Makefile b/Makefile index 20c78edba..6e288c0d8 100644 --- a/Makefile +++ b/Makefile @@ -1902,6 +1902,10 @@ db_basic_bench: $(OBJ_DIR)/microbench/db_basic_bench.o $(LIBRARY) cache_reservation_manager_test: $(OBJ_DIR)/cache/cache_reservation_manager_test.o $(TEST_LIBRARY) $(LIBRARY) $(AM_LINK) + +wide_column_serialization_test: $(OBJ_DIR)/db/wide/wide_column_serialization_test.o $(TEST_LIBRARY) $(LIBRARY) + $(AM_LINK) + #------------------------------------------------- # make install related stuff PREFIX ?= /usr/local diff --git a/TARGETS b/TARGETS index 24a88eca9..827d0d17a 100644 --- a/TARGETS +++ b/TARGETS @@ -91,6 +91,7 @@ cpp_library_wrapper(name="rocksdb_lib", srcs=[ "db/version_set.cc", "db/wal_edit.cc", "db/wal_manager.cc", + "db/wide/wide_column_serialization.cc", "db/write_batch.cc", "db/write_batch_base.cc", "db/write_controller.cc", @@ -419,6 +420,7 @@ cpp_library_wrapper(name="rocksdb_whole_archive_lib", srcs=[ "db/version_set.cc", "db/wal_edit.cc", "db/wal_manager.cc", + "db/wide/wide_column_serialization.cc", "db/write_batch.cc", "db/write_batch_base.cc", "db/write_controller.cc", @@ -5814,6 +5816,12 @@ cpp_unittest_wrapper(name="wal_manager_test", extra_compiler_flags=[]) +cpp_unittest_wrapper(name="wide_column_serialization_test", + srcs=["db/wide/wide_column_serialization_test.cc"], + deps=[":rocksdb_test_lib"], + extra_compiler_flags=[]) + + cpp_unittest_wrapper(name="work_queue_test", srcs=["util/work_queue_test.cc"], deps=[":rocksdb_test_lib"], diff --git a/db/wide/wide_column_serialization.cc b/db/wide/wide_column_serialization.cc new file mode 100644 index 000000000..f44b81e2c --- /dev/null +++ b/db/wide/wide_column_serialization.cc @@ -0,0 +1,141 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/wide/wide_column_serialization.h" + +#include +#include +#include + +#include "rocksdb/slice.h" +#include "util/autovector.h" +#include "util/coding.h" + +namespace ROCKSDB_NAMESPACE { + +Status WideColumnSerialization::Serialize(const WideColumns& columns, + std::string& output) { + // Column names should be strictly ascending + assert(std::adjacent_find(columns.cbegin(), columns.cend(), + [](const WideColumn& lhs, const WideColumn& rhs) { + return lhs.name().compare(rhs.name()) > 0; + }) == columns.cend()); + + if (columns.size() > + static_cast(std::numeric_limits::max())) { + return Status::InvalidArgument("Too many wide columns"); + } + + PutVarint32(&output, kCurrentVersion); + + PutVarint32(&output, static_cast(columns.size())); + + for (const auto& column : columns) { + const Slice& name = column.name(); + if (name.size() > + static_cast(std::numeric_limits::max())) { + return Status::InvalidArgument("Wide column name too long"); + } + + const Slice& value = column.value(); + if (value.size() > + static_cast(std::numeric_limits::max())) { + return Status::InvalidArgument("Wide column value too long"); + } + + PutLengthPrefixedSlice(&output, name); + PutVarint32(&output, static_cast(value.size())); + } + + for (const auto& column : columns) { + const Slice& value = column.value(); + + output.append(value.data(), value.size()); + } + + return Status::OK(); +} + +Status WideColumnSerialization::Deserialize(Slice& input, + WideColumns& columns) { + assert(columns.empty()); + + uint32_t version = 0; + if (!GetVarint32(&input, &version)) { + return Status::Corruption("Error decoding wide column version"); + } + + if (version > kCurrentVersion) { + return Status::NotSupported("Unsupported wide column version"); + } + + uint32_t num_columns = 0; + if (!GetVarint32(&input, &num_columns)) { + return Status::Corruption("Error decoding number of wide columns"); + } + + if (!num_columns) { + return Status::OK(); + } + + columns.reserve(num_columns); + + autovector column_value_sizes; + column_value_sizes.reserve(num_columns); + + for (uint32_t i = 0; i < num_columns; ++i) { + Slice name; + if (!GetLengthPrefixedSlice(&input, &name)) { + return Status::Corruption("Error decoding wide column name"); + } + + if (!columns.empty() && columns.back().name().compare(name) >= 0) { + return Status::Corruption("Wide columns out of order"); + } + + columns.emplace_back(name, Slice()); + + uint32_t value_size = 0; + if (!GetVarint32(&input, &value_size)) { + return Status::Corruption("Error decoding wide column value size"); + } + + column_value_sizes.emplace_back(value_size); + } + + const Slice data(input); + size_t pos = 0; + + for (uint32_t i = 0; i < num_columns; ++i) { + const uint32_t value_size = column_value_sizes[i]; + + if (pos + value_size > data.size()) { + return Status::Corruption("Error decoding wide column value payload"); + } + + columns[i].value() = Slice(data.data() + pos, value_size); + + pos += value_size; + } + + return Status::OK(); +} + +WideColumns::const_iterator WideColumnSerialization::Find( + const WideColumns& columns, const Slice& column_name) { + const auto it = + std::lower_bound(columns.cbegin(), columns.cend(), column_name, + [](const WideColumn& lhs, const Slice& rhs) { + return lhs.name().compare(rhs) < 0; + }); + + if (it == columns.cend() || it->name() != column_name) { + return columns.cend(); + } + + return it; +} + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/wide/wide_column_serialization.h b/db/wide/wide_column_serialization.h new file mode 100644 index 000000000..67b771e76 --- /dev/null +++ b/db/wide/wide_column_serialization.h @@ -0,0 +1,55 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include +#include + +#include "rocksdb/rocksdb_namespace.h" +#include "rocksdb/status.h" +#include "rocksdb/wide_columns.h" + +namespace ROCKSDB_NAMESPACE { + +class Slice; + +// Wide-column serialization/deserialization primitives. +// +// The two main parts of the layout are 1) a sorted index containing the column +// names and column value sizes and 2) the column values themselves. Keeping the +// index and the values separate will enable selectively reading column values +// down the line. Note that currently the index has to be fully parsed in order +// to find out the offset of each column value. +// +// Legend: cn = column name, cv = column value, cns = column name size, cvs = +// column value size. +// +// +----------+--------------+----------+-------+----------+---... +// | version | # of columns | cns 1 | cn 1 | cvs 1 | +// +----------+--------------+------------------+--------- +---... +// | varint32 | varint32 | varint32 | bytes | varint32 | +// +----------+--------------+----------+-------+----------+---... +// +// ... continued ... +// +// ...---+----------+-------+----------+-------+---...---+-------+ +// | cns N | cn N | cvs N | cv 1 | | cv N | +// ...---+----------+-------+----------+-------+---...---+-------+ +// | varint32 | bytes | varint32 | bytes | | bytes | +// ...---+----------+-------+----------+-------+---...---+-------+ + +class WideColumnSerialization { + public: + static Status Serialize(const WideColumns& columns, std::string& output); + static Status Deserialize(Slice& input, WideColumns& columns); + + static WideColumns::const_iterator Find(const WideColumns& columns, + const Slice& column_name); + + static constexpr uint32_t kCurrentVersion = 1; +}; + +} // namespace ROCKSDB_NAMESPACE diff --git a/db/wide/wide_column_serialization_test.cc b/db/wide/wide_column_serialization_test.cc new file mode 100644 index 000000000..8421a8f61 --- /dev/null +++ b/db/wide/wide_column_serialization_test.cc @@ -0,0 +1,292 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "db/wide/wide_column_serialization.h" + +#include "test_util/testharness.h" +#include "util/coding.h" + +namespace ROCKSDB_NAMESPACE { + +TEST(WideColumnSerializationTest, Construct) { + constexpr char foo[] = "foo"; + constexpr char bar[] = "bar"; + + const std::string foo_str(foo); + const std::string bar_str(bar); + + const Slice foo_slice(foo_str); + const Slice bar_slice(bar_str); + + { + WideColumn column(foo, bar); + ASSERT_EQ(column.name(), foo); + ASSERT_EQ(column.value(), bar); + } + + { + WideColumn column(foo_str, bar); + ASSERT_EQ(column.name(), foo_str); + ASSERT_EQ(column.value(), bar); + } + + { + WideColumn column(foo_slice, bar); + ASSERT_EQ(column.name(), foo_slice); + ASSERT_EQ(column.value(), bar); + } + + { + WideColumn column(foo, bar_str); + ASSERT_EQ(column.name(), foo); + ASSERT_EQ(column.value(), bar_str); + } + + { + WideColumn column(foo_str, bar_str); + ASSERT_EQ(column.name(), foo_str); + ASSERT_EQ(column.value(), bar_str); + } + + { + WideColumn column(foo_slice, bar_str); + ASSERT_EQ(column.name(), foo_slice); + ASSERT_EQ(column.value(), bar_str); + } + + { + WideColumn column(foo, bar_slice); + ASSERT_EQ(column.name(), foo); + ASSERT_EQ(column.value(), bar_slice); + } + + { + WideColumn column(foo_str, bar_slice); + ASSERT_EQ(column.name(), foo_str); + ASSERT_EQ(column.value(), bar_slice); + } + + { + WideColumn column(foo_slice, bar_slice); + ASSERT_EQ(column.name(), foo_slice); + ASSERT_EQ(column.value(), bar_slice); + } + + { + constexpr char foo_name[] = "foo_name"; + constexpr char bar_value[] = "bar_value"; + + WideColumn column(std::piecewise_construct, + std::forward_as_tuple(foo_name, sizeof(foo) - 1), + std::forward_as_tuple(bar_value, sizeof(bar) - 1)); + ASSERT_EQ(column.name(), foo); + ASSERT_EQ(column.value(), bar); + } +} + +TEST(WideColumnSerializationTest, SerializeDeserialize) { + WideColumns columns{{"foo", "bar"}, {"hello", "world"}}; + std::string output; + + ASSERT_OK(WideColumnSerialization::Serialize(columns, output)); + + Slice input(output); + WideColumns deserialized_columns; + + ASSERT_OK(WideColumnSerialization::Deserialize(input, deserialized_columns)); + ASSERT_EQ(columns, deserialized_columns); + + { + const auto it = WideColumnSerialization::Find(deserialized_columns, "foo"); + ASSERT_NE(it, deserialized_columns.cend()); + ASSERT_EQ(*it, deserialized_columns.front()); + } + + { + const auto it = + WideColumnSerialization::Find(deserialized_columns, "hello"); + ASSERT_NE(it, deserialized_columns.cend()); + ASSERT_EQ(*it, deserialized_columns.back()); + } + + { + const auto it = + WideColumnSerialization::Find(deserialized_columns, "fubar"); + ASSERT_EQ(it, deserialized_columns.cend()); + } + + { + const auto it = + WideColumnSerialization::Find(deserialized_columns, "snafu"); + ASSERT_EQ(it, deserialized_columns.cend()); + } +} + +TEST(WideColumnSerializationTest, DeserializeVersionError) { + // Can't decode version + + std::string buf; + + Slice input(buf); + WideColumns columns; + + const Status s = WideColumnSerialization::Deserialize(input, columns); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(std::strstr(s.getState(), "version")); +} + +TEST(WideColumnSerializationTest, DeserializeUnsupportedVersion) { + // Unsupported version + constexpr uint32_t future_version = 1000; + + std::string buf; + PutVarint32(&buf, future_version); + + Slice input(buf); + WideColumns columns; + + const Status s = WideColumnSerialization::Deserialize(input, columns); + ASSERT_TRUE(s.IsNotSupported()); + ASSERT_TRUE(std::strstr(s.getState(), "version")); +} + +TEST(WideColumnSerializationTest, DeserializeNumberOfColumnsError) { + // Can't decode number of columns + + std::string buf; + PutVarint32(&buf, WideColumnSerialization::kCurrentVersion); + + Slice input(buf); + WideColumns columns; + + const Status s = WideColumnSerialization::Deserialize(input, columns); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(std::strstr(s.getState(), "number")); +} + +TEST(WideColumnSerializationTest, DeserializeColumnsError) { + std::string buf; + + PutVarint32(&buf, WideColumnSerialization::kCurrentVersion); + + constexpr uint32_t num_columns = 2; + PutVarint32(&buf, num_columns); + + // Can't decode the first column name + { + Slice input(buf); + WideColumns columns; + + const Status s = WideColumnSerialization::Deserialize(input, columns); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(std::strstr(s.getState(), "name")); + } + + constexpr char first_column_name[] = "foo"; + PutLengthPrefixedSlice(&buf, first_column_name); + + // Can't decode the size of the first column value + { + Slice input(buf); + WideColumns columns; + + const Status s = WideColumnSerialization::Deserialize(input, columns); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(std::strstr(s.getState(), "value size")); + } + + constexpr uint32_t first_value_size = 16; + PutVarint32(&buf, first_value_size); + + // Can't decode the second column name + { + Slice input(buf); + WideColumns columns; + + const Status s = WideColumnSerialization::Deserialize(input, columns); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(std::strstr(s.getState(), "name")); + } + + constexpr char second_column_name[] = "hello"; + PutLengthPrefixedSlice(&buf, second_column_name); + + // Can't decode the size of the second column value + { + Slice input(buf); + WideColumns columns; + + const Status s = WideColumnSerialization::Deserialize(input, columns); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(std::strstr(s.getState(), "value size")); + } + + constexpr uint32_t second_value_size = 64; + PutVarint32(&buf, second_value_size); + + // Can't decode the payload of the first column + { + Slice input(buf); + WideColumns columns; + + const Status s = WideColumnSerialization::Deserialize(input, columns); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(std::strstr(s.getState(), "payload")); + } + + buf.append(first_value_size, '0'); + + // Can't decode the payload of the second column + { + Slice input(buf); + WideColumns columns; + + const Status s = WideColumnSerialization::Deserialize(input, columns); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(std::strstr(s.getState(), "payload")); + } + + buf.append(second_value_size, 'x'); + + // Success + { + Slice input(buf); + WideColumns columns; + + ASSERT_OK(WideColumnSerialization::Deserialize(input, columns)); + } +} + +TEST(WideColumnSerializationTest, DeserializeColumnsOutOfOrder) { + std::string buf; + + PutVarint32(&buf, WideColumnSerialization::kCurrentVersion); + + constexpr uint32_t num_columns = 2; + PutVarint32(&buf, num_columns); + + constexpr char first_column_name[] = "b"; + PutLengthPrefixedSlice(&buf, first_column_name); + + constexpr uint32_t first_value_size = 16; + PutVarint32(&buf, first_value_size); + + constexpr char second_column_name[] = "a"; + PutLengthPrefixedSlice(&buf, second_column_name); + + Slice input(buf); + WideColumns columns; + + const Status s = WideColumnSerialization::Deserialize(input, columns); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE(std::strstr(s.getState(), "order")); +} + +} // namespace ROCKSDB_NAMESPACE + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/include/rocksdb/wide_columns.h b/include/rocksdb/wide_columns.h new file mode 100644 index 000000000..1974a6951 --- /dev/null +++ b/include/rocksdb/wide_columns.h @@ -0,0 +1,74 @@ +// Copyright (c) Meta Platforms, Inc. and affiliates. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#include +#include +#include + +#include "rocksdb/rocksdb_namespace.h" +#include "rocksdb/slice.h" + +namespace ROCKSDB_NAMESPACE { + +// Class representing a wide column, which is defined as a pair of column name +// and column value. +class WideColumn { + public: + WideColumn() = default; + + // Initializes a WideColumn object by forwarding the name and value + // arguments to the corresponding member Slices. This makes it possible to + // construct a WideColumn using combinations of const char*, const + // std::string&, const Slice& etc., for example: + // + // constexpr char foo[] = "foo"; + // const std::string bar("bar"); + // WideColumn column(foo, bar); + template + WideColumn(N&& name, V&& value) + : name_(std::forward(name)), value_(std::forward(value)) {} + + // Initializes a WideColumn object by forwarding the elements of + // name_tuple and value_tuple to the constructors of the corresponding member + // Slices. This makes it possible to initialize the Slices using the Slice + // constructors that take more than one argument, for example: + // + // constexpr char foo_name[] = "foo_name"; + // constexpr char bar_value[] = "bar_value"; + // WideColumn column(std::piecewise_construct, + // std::forward_as_tuple(foo_name, 3), + // std::forward_as_tuple(bar_value, 3)); + template + WideColumn(std::piecewise_construct_t, NTuple&& name_tuple, + VTuple&& value_tuple) + : name_(std::make_from_tuple(std::forward(name_tuple))), + value_(std::make_from_tuple(std::forward(value_tuple))) { + } + + const Slice& name() const { return name_; } + const Slice& value() const { return value_; } + + Slice& name() { return name_; } + Slice& value() { return value_; } + + private: + Slice name_; + Slice value_; +}; + +// Note: column names and values are compared bytewise. +inline bool operator==(const WideColumn& lhs, const WideColumn& rhs) { + return lhs.name() == rhs.name() && lhs.value() == rhs.value(); +} + +inline bool operator!=(const WideColumn& lhs, const WideColumn& rhs) { + return !(lhs == rhs); +} + +using WideColumns = std::vector; + +} // namespace ROCKSDB_NAMESPACE diff --git a/src.mk b/src.mk index 08b911475..0a77e7f46 100644 --- a/src.mk +++ b/src.mk @@ -82,6 +82,7 @@ LIB_SOURCES = \ db/version_set.cc \ db/wal_edit.cc \ db/wal_manager.cc \ + db/wide/wide_column_serialization.cc \ db/write_batch.cc \ db/write_batch_base.cc \ db/write_controller.cc \ @@ -497,6 +498,7 @@ TEST_MAIN_SOURCES = \ db/version_edit_test.cc \ db/version_set_test.cc \ db/wal_manager_test.cc \ + db/wide/wide_column_serialization_test.cc \ db/write_batch_test.cc \ db/write_callback_test.cc \ db/write_controller_test.cc \ diff --git a/util/autovector.h b/util/autovector.h index 5bda2d2d1..22c9450d7 100644 --- a/util/autovector.h +++ b/util/autovector.h @@ -36,7 +36,7 @@ class autovector : public std::vector { // full-fledged generic container. // // Currently we don't support: -// * reserve()/shrink_to_fit() +// * shrink_to_fit() // If used correctly, in most cases, people should not touch the // underlying vector at all. // * random insert()/erase(), please only use push_back()/pop_back(). @@ -223,6 +223,16 @@ class autovector { bool empty() const { return size() == 0; } + size_type capacity() const { return kSize + vect_.capacity(); } + + void reserve(size_t cap) { + if (cap > kSize) { + vect_.reserve(cap - kSize); + } + + assert(cap <= capacity()); + } + const_reference operator[](size_type n) const { assert(n < size()); if (n < kSize) {