Add API for writing wide-column entities (#10242)

Summary:
The patch builds on https://github.com/facebook/rocksdb/pull/9915 and adds
a new API called `PutEntity` that can be used to write a wide-column entity
to the database. The new API is added to both `DB` and `WriteBatch`. Note
that currently there is no way to retrieve these entities; more precisely, all
read APIs (`Get`, `MultiGet`, and iterator) return `NotSupported` when they
encounter a wide-column entity that is required to answer a query. Read-side
support (as well as other missing functionality like `Merge`, compaction filter,
and timestamp support) will be added in later PRs.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/10242

Test Plan: `make check`

Reviewed By: riversand963

Differential Revision: D37369748

Pulled By: ltamasi

fbshipit-source-id: 7f5e412359ed7a400fd80b897dae5599dbcd685d
main
Levi Tamasi 3 years ago committed by Facebook GitHub Bot
parent f322f273b0
commit c73d2a9d18
  1. 1
      CMakeLists.txt
  2. 3
      Makefile
  3. 6
      TARGETS
  4. 9
      db/compaction/compaction_iterator.cc
  5. 15
      db/compaction/compaction_iterator_test.cc
  6. 10
      db/db_impl/compacted_db_impl.h
  7. 5
      db/db_impl/db_impl.h
  8. 9
      db/db_impl/db_impl_readonly.h
  9. 8
      db/db_impl/db_impl_secondary.h
  10. 32
      db/db_impl/db_impl_write.cc
  11. 52
      db/db_iter.cc
  12. 2
      db/db_iter.h
  13. 81
      db/db_kv_checksum_test.cc
  14. 9
      db/db_test.cc
  15. 28
      db/memtable.cc
  16. 9
      db/merge_helper.cc
  17. 4
      db/version_set.cc
  18. 5
      db/version_set_sync_and_async.h
  19. 213
      db/wide/db_wide_basic_test.cc
  20. 13
      db/wide/wide_column_serialization.cc
  21. 16
      db/wide/wide_column_serialization_test.cc
  22. 150
      db/write_batch.cc
  23. 3
      db/write_batch_internal.h
  24. 6
      include/rocksdb/db.h
  25. 7
      include/rocksdb/utilities/stackable_db.h
  26. 11
      include/rocksdb/utilities/write_batch_with_index.h
  27. 14
      include/rocksdb/write_batch.h
  28. 5
      include/rocksdb/write_batch_base.h
  29. 1
      src.mk
  30. 8
      table/block_based/block.cc
  31. 33
      table/get_context.cc
  32. 2
      table/get_context.h

@ -1301,6 +1301,7 @@ if(WITH_TESTS)
db/version_set_test.cc db/version_set_test.cc
db/wal_manager_test.cc db/wal_manager_test.cc
db/wal_edit_test.cc db/wal_edit_test.cc
db/wide/db_wide_basic_test.cc
db/wide/wide_column_serialization_test.cc db/wide/wide_column_serialization_test.cc
db/write_batch_test.cc db/write_batch_test.cc
db/write_callback_test.cc db/write_callback_test.cc

@ -1383,6 +1383,9 @@ db_blob_compaction_test: $(OBJ_DIR)/db/blob/db_blob_compaction_test.o $(TEST_LIB
db_readonly_with_timestamp_test: $(OBJ_DIR)/db/db_readonly_with_timestamp_test.o $(TEST_LIBRARY) $(LIBRARY) db_readonly_with_timestamp_test: $(OBJ_DIR)/db/db_readonly_with_timestamp_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK) $(AM_LINK)
db_wide_basic_test: $(OBJ_DIR)/db/wide/db_wide_basic_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK)
db_with_timestamp_basic_test: $(OBJ_DIR)/db/db_with_timestamp_basic_test.o $(TEST_LIBRARY) $(LIBRARY) db_with_timestamp_basic_test: $(OBJ_DIR)/db/db_with_timestamp_basic_test.o $(TEST_LIBRARY) $(LIBRARY)
$(AM_LINK) $(AM_LINK)

@ -5240,6 +5240,12 @@ cpp_unittest_wrapper(name="db_wal_test",
extra_compiler_flags=[]) extra_compiler_flags=[])
cpp_unittest_wrapper(name="db_wide_basic_test",
srcs=["db/wide/db_wide_basic_test.cc"],
deps=[":rocksdb_test_lib"],
extra_compiler_flags=[])
cpp_unittest_wrapper(name="db_with_timestamp_basic_test", cpp_unittest_wrapper(name="db_with_timestamp_basic_test",
srcs=["db/db_with_timestamp_basic_test.cc"], srcs=["db/db_with_timestamp_basic_test.cc"],
deps=[":rocksdb_test_lib"], deps=[":rocksdb_test_lib"],

@ -197,6 +197,7 @@ void CompactionIterator::Next() {
bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip,
Slice* skip_until) { Slice* skip_until) {
// TODO: support compaction filter for wide-column entities
if (!compaction_filter_ || if (!compaction_filter_ ||
(ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex)) { (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex)) {
return true; return true;
@ -519,7 +520,8 @@ void CompactionIterator::NextFromInput() {
// In the previous iteration we encountered a single delete that we could // In the previous iteration we encountered a single delete that we could
// not compact out. We will keep this Put, but can drop it's data. // not compact out. We will keep this Put, but can drop it's data.
// (See Optimization 3, below.) // (See Optimization 3, below.)
if (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex) { if (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex &&
ikey_.type != kTypeWideColumnEntity) {
ROCKS_LOG_FATAL(info_log_, "Unexpected key %s for compaction output", ROCKS_LOG_FATAL(info_log_, "Unexpected key %s for compaction output",
ikey_.DebugString(allow_data_in_errors_, true).c_str()); ikey_.DebugString(allow_data_in_errors_, true).c_str());
assert(false); assert(false);
@ -533,7 +535,7 @@ void CompactionIterator::NextFromInput() {
assert(false); assert(false);
} }
if (ikey_.type == kTypeBlobIndex) { if (ikey_.type == kTypeBlobIndex || ikey_.type == kTypeWideColumnEntity) {
ikey_.type = kTypeValue; ikey_.type = kTypeValue;
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
} }
@ -689,7 +691,8 @@ void CompactionIterator::NextFromInput() {
// either way. We will maintain counts of how many mismatches // either way. We will maintain counts of how many mismatches
// happened // happened
if (next_ikey.type != kTypeValue && if (next_ikey.type != kTypeValue &&
next_ikey.type != kTypeBlobIndex) { next_ikey.type != kTypeBlobIndex &&
next_ikey.type != kTypeWideColumnEntity) {
++iter_stats_.num_single_del_mismatch; ++iter_stats_.num_single_del_mismatch;
} }

@ -981,6 +981,21 @@ TEST_F(CompactionIteratorWithSnapshotCheckerTest,
2 /*earliest_write_conflict_snapshot*/); 2 /*earliest_write_conflict_snapshot*/);
} }
// Same as above but with a wide-column entity. In addition to the value getting
// trimmed, the type of the KV is changed to kTypeValue.
TEST_F(CompactionIteratorWithSnapshotCheckerTest,
KeepSingleDeletionForWriteConflictChecking_WideColumnEntity) {
AddSnapshot(2, 0);
RunTest({test::KeyStr("a", 2, kTypeSingleDeletion),
test::KeyStr("a", 1, kTypeWideColumnEntity)},
{"", "fake_entity"},
{test::KeyStr("a", 2, kTypeSingleDeletion),
test::KeyStr("a", 1, kTypeValue)},
{"", ""}, 2 /* last_committed_seq */, nullptr /* merge_operator */,
nullptr /* compaction_filter */, false /* bottommost_level */,
2 /* earliest_write_conflict_snapshot */);
}
// Compaction filter should keep uncommitted key as-is, and // Compaction filter should keep uncommitted key as-is, and
// * Convert the latest value to deletion, and/or // * Convert the latest value to deletion, and/or
// * if latest value is a merge, apply filter to all subsequent merges. // * if latest value is a merge, apply filter to all subsequent merges.

@ -54,12 +54,22 @@ class CompactedDBImpl : public DBImpl {
const Slice& /*key*/, const Slice& /*value*/) override { const Slice& /*key*/, const Slice& /*value*/) override {
return Status::NotSupported("Not supported in compacted db mode."); return Status::NotSupported("Not supported in compacted db mode.");
} }
using DBImpl::PutEntity;
Status PutEntity(const WriteOptions& /* options */,
ColumnFamilyHandle* /* column_family */,
const Slice& /* key */,
const WideColumns& /* columns */) override {
return Status::NotSupported("Not supported in compacted db mode.");
}
using DBImpl::Merge; using DBImpl::Merge;
virtual Status Merge(const WriteOptions& /*options*/, virtual Status Merge(const WriteOptions& /*options*/,
ColumnFamilyHandle* /*column_family*/, ColumnFamilyHandle* /*column_family*/,
const Slice& /*key*/, const Slice& /*value*/) override { const Slice& /*key*/, const Slice& /*value*/) override {
return Status::NotSupported("Not supported in compacted db mode."); return Status::NotSupported("Not supported in compacted db mode.");
} }
using DBImpl::Delete; using DBImpl::Delete;
virtual Status Delete(const WriteOptions& /*options*/, virtual Status Delete(const WriteOptions& /*options*/,
ColumnFamilyHandle* /*column_family*/, ColumnFamilyHandle* /*column_family*/,

@ -217,6 +217,11 @@ class DBImpl : public DB {
Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family, Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& ts, const Slice& value) override; const Slice& key, const Slice& ts, const Slice& value) override;
using DB::PutEntity;
Status PutEntity(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
const WideColumns& columns) override;
using DB::Merge; using DB::Merge;
Status Merge(const WriteOptions& options, ColumnFamilyHandle* column_family, Status Merge(const WriteOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) override; const Slice& key, const Slice& value) override;

@ -48,6 +48,15 @@ class DBImplReadOnly : public DBImpl {
const Slice& /*key*/, const Slice& /*value*/) override { const Slice& /*key*/, const Slice& /*value*/) override {
return Status::NotSupported("Not supported operation in read only mode."); return Status::NotSupported("Not supported operation in read only mode.");
} }
using DBImpl::PutEntity;
Status PutEntity(const WriteOptions& /* options */,
ColumnFamilyHandle* /* column_family */,
const Slice& /* key */,
const WideColumns& /* columns */) override {
return Status::NotSupported("Not supported operation in read only mode.");
}
using DBImpl::Merge; using DBImpl::Merge;
virtual Status Merge(const WriteOptions& /*options*/, virtual Status Merge(const WriteOptions& /*options*/,
ColumnFamilyHandle* /*column_family*/, ColumnFamilyHandle* /*column_family*/,

@ -119,6 +119,14 @@ class DBImplSecondary : public DBImpl {
return Status::NotSupported("Not supported operation in secondary mode."); return Status::NotSupported("Not supported operation in secondary mode.");
} }
using DBImpl::PutEntity;
Status PutEntity(const WriteOptions& /* options */,
ColumnFamilyHandle* /* column_family */,
const Slice& /* key */,
const WideColumns& /* columns */) override {
return Status::NotSupported("Not supported operation in secondary mode.");
}
using DBImpl::Merge; using DBImpl::Merge;
Status Merge(const WriteOptions& /*options*/, Status Merge(const WriteOptions& /*options*/,
ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/, ColumnFamilyHandle* /*column_family*/, const Slice& /*key*/,

@ -37,6 +37,17 @@ Status DBImpl::Put(const WriteOptions& o, ColumnFamilyHandle* column_family,
return DB::Put(o, column_family, key, ts, val); return DB::Put(o, column_family, key, ts, val);
} }
Status DBImpl::PutEntity(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
const WideColumns& columns) {
const Status s = FailIfCfHasTs(column_family);
if (!s.ok()) {
return s;
}
return DB::PutEntity(options, column_family, key, columns);
}
Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family, Status DBImpl::Merge(const WriteOptions& o, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& val) { const Slice& key, const Slice& val) {
const Status s = FailIfCfHasTs(column_family); const Status s = FailIfCfHasTs(column_family);
@ -2236,6 +2247,27 @@ Status DB::Put(const WriteOptions& opt, ColumnFamilyHandle* column_family,
return Write(opt, &batch); return Write(opt, &batch);
} }
Status DB::PutEntity(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
const WideColumns& columns) {
const ColumnFamilyHandle* const default_cf = DefaultColumnFamily();
assert(default_cf);
const Comparator* const default_cf_ucmp = default_cf->GetComparator();
assert(default_cf_ucmp);
WriteBatch batch(/* reserved_bytes */ 0, /* max_bytes */ 0,
options.protection_bytes_per_key,
default_cf_ucmp->timestamp_size());
const Status s = batch.PutEntity(column_family, key, columns);
if (!s.ok()) {
return s;
}
return Write(options, &batch);
}
Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family, Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family,
const Slice& key) { const Slice& key) {
WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */, WriteBatch batch(0 /* reserved_bytes */, 0 /* max_bytes */,

@ -207,6 +207,15 @@ bool DBIter::SetBlobValueIfNeeded(const Slice& user_key,
return true; return true;
} }
bool DBIter::SetWideColumnValueIfNeeded(const Slice& /* wide_columns_slice */) {
assert(!is_blob_);
// TODO: support wide-column entities
status_ = Status::NotSupported("Encountered unexpected wide-column entity");
valid_ = false;
return false;
}
// PRE: saved_key_ has the current user key if skipping_saved_key // PRE: saved_key_ has the current user key if skipping_saved_key
// POST: saved_key_ should have the next user key if valid_, // POST: saved_key_ should have the next user key if valid_,
// if the current entry is a result of merge // if the current entry is a result of merge
@ -341,6 +350,7 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
break; break;
case kTypeValue: case kTypeValue:
case kTypeBlobIndex: case kTypeBlobIndex:
case kTypeWideColumnEntity:
if (timestamp_lb_) { if (timestamp_lb_) {
saved_key_.SetInternalKey(ikey_); saved_key_.SetInternalKey(ikey_);
@ -348,6 +358,10 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
if (!SetBlobValueIfNeeded(ikey_.user_key, iter_.value())) { if (!SetBlobValueIfNeeded(ikey_.user_key, iter_.value())) {
return false; return false;
} }
} else if (ikey_.type == kTypeWideColumnEntity) {
if (!SetWideColumnValueIfNeeded(iter_.value())) {
return false;
}
} }
valid_ = true; valid_ = true;
@ -369,6 +383,10 @@ bool DBIter::FindNextUserEntryInternal(bool skipping_saved_key,
if (!SetBlobValueIfNeeded(ikey_.user_key, iter_.value())) { if (!SetBlobValueIfNeeded(ikey_.user_key, iter_.value())) {
return false; return false;
} }
} else if (ikey_.type == kTypeWideColumnEntity) {
if (!SetWideColumnValueIfNeeded(iter_.value())) {
return false;
}
} }
valid_ = true; valid_ = true;
@ -580,6 +598,12 @@ bool DBIter::MergeValuesNewToOld() {
return false; return false;
} }
return true; return true;
} else if (kTypeWideColumnEntity == ikey.type) {
// TODO: support wide-column entities
status_ = Status::NotSupported(
"Merge currently not supported for wide-column entities");
valid_ = false;
return false;
} else { } else {
valid_ = false; valid_ = false;
status_ = Status::Corruption( status_ = Status::Corruption(
@ -783,7 +807,8 @@ bool DBIter::FindValueForCurrentKey() {
merge_context_.Clear(); merge_context_.Clear();
current_entry_is_merged_ = false; current_entry_is_merged_ = false;
// last entry before merge (could be kTypeDeletion, // last entry before merge (could be kTypeDeletion,
// kTypeDeletionWithTimestamp, kTypeSingleDeletion or kTypeValue) // kTypeDeletionWithTimestamp, kTypeSingleDeletion, kTypeValue,
// kTypeBlobIndex, or kTypeWideColumnEntity)
ValueType last_not_merge_type = kTypeDeletion; ValueType last_not_merge_type = kTypeDeletion;
ValueType last_key_entry_type = kTypeDeletion; ValueType last_key_entry_type = kTypeDeletion;
@ -831,6 +856,7 @@ bool DBIter::FindValueForCurrentKey() {
switch (last_key_entry_type) { switch (last_key_entry_type) {
case kTypeValue: case kTypeValue:
case kTypeBlobIndex: case kTypeBlobIndex:
case kTypeWideColumnEntity:
if (range_del_agg_.ShouldDelete( if (range_del_agg_.ShouldDelete(
ikey, RangeDelPositioningMode::kBackwardTraversal)) { ikey, RangeDelPositioningMode::kBackwardTraversal)) {
last_key_entry_type = kTypeRangeDeletion; last_key_entry_type = kTypeRangeDeletion;
@ -927,6 +953,12 @@ bool DBIter::FindValueForCurrentKey() {
} }
is_blob_ = false; is_blob_ = false;
return true; return true;
} else if (last_not_merge_type == kTypeWideColumnEntity) {
// TODO: support wide-column entities
status_ = Status::NotSupported(
"Merge currently not supported for wide-column entities");
valid_ = false;
return false;
} else { } else {
assert(last_not_merge_type == kTypeValue); assert(last_not_merge_type == kTypeValue);
s = Merge(&pinned_value_, saved_key_.GetUserKey()); s = Merge(&pinned_value_, saved_key_.GetUserKey());
@ -944,6 +976,11 @@ bool DBIter::FindValueForCurrentKey() {
return false; return false;
} }
break; break;
case kTypeWideColumnEntity:
if (!SetWideColumnValueIfNeeded(pinned_value_)) {
return false;
}
break;
default: default:
valid_ = false; valid_ = false;
status_ = Status::Corruption( status_ = Status::Corruption(
@ -1034,13 +1071,18 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
Slice ts = ExtractTimestampFromUserKey(ikey.user_key, timestamp_size_); Slice ts = ExtractTimestampFromUserKey(ikey.user_key, timestamp_size_);
saved_timestamp_.assign(ts.data(), ts.size()); saved_timestamp_.assign(ts.data(), ts.size());
} }
if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex) { if (ikey.type == kTypeValue || ikey.type == kTypeBlobIndex ||
ikey.type == kTypeWideColumnEntity) {
assert(iter_.iter()->IsValuePinned()); assert(iter_.iter()->IsValuePinned());
pinned_value_ = iter_.value(); pinned_value_ = iter_.value();
if (ikey.type == kTypeBlobIndex) { if (ikey.type == kTypeBlobIndex) {
if (!SetBlobValueIfNeeded(ikey.user_key, pinned_value_)) { if (!SetBlobValueIfNeeded(ikey.user_key, pinned_value_)) {
return false; return false;
} }
} else if (ikey_.type == kTypeWideColumnEntity) {
if (!SetWideColumnValueIfNeeded(pinned_value_)) {
return false;
}
} }
valid_ = true; valid_ = true;
@ -1109,6 +1151,12 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
} }
is_blob_ = false; is_blob_ = false;
return true; return true;
} else if (ikey.type == kTypeWideColumnEntity) {
// TODO: support wide-column entities
status_ = Status::NotSupported(
"Merge currently not supported for wide-column entities");
valid_ = false;
return false;
} else { } else {
valid_ = false; valid_ = false;
status_ = Status::Corruption( status_ = Status::Corruption(

@ -298,6 +298,8 @@ class DBIter final : public Iterator {
// index when using the integrated BlobDB implementation. // index when using the integrated BlobDB implementation.
bool SetBlobValueIfNeeded(const Slice& user_key, const Slice& blob_index); bool SetBlobValueIfNeeded(const Slice& user_key, const Slice& blob_index);
bool SetWideColumnValueIfNeeded(const Slice& wide_columns_slice);
Status Merge(const Slice* val, const Slice& user_key); Status Merge(const Slice* val, const Slice& user_key);
const SliceTransform* prefix_extractor_; const SliceTransform* prefix_extractor_;

@ -15,6 +15,7 @@ enum class WriteBatchOpType {
kSingleDelete, kSingleDelete,
kDeleteRange, kDeleteRange,
kMerge, kMerge,
kPutEntity,
kNum, kNum,
}; };
@ -62,18 +63,38 @@ std::pair<WriteBatch, Status> GetWriteBatch(ColumnFamilyHandle* cf_handle,
case WriteBatchOpType::kMerge: case WriteBatchOpType::kMerge:
s = wb.Merge(cf_handle, "key", "val"); s = wb.Merge(cf_handle, "key", "val");
break; break;
case WriteBatchOpType::kPutEntity:
s = wb.PutEntity(cf_handle, "key",
{{"attr_name1", "foo"}, {"attr_name2", "bar"}});
break;
case WriteBatchOpType::kNum: case WriteBatchOpType::kNum:
assert(false); assert(false);
} }
return {std::move(wb), std::move(s)}; return {std::move(wb), std::move(s)};
} }
class DbKvChecksumTest : public DBTestBase, class DbKvChecksumTestBase : public DBTestBase {
public:
DbKvChecksumTestBase(const std::string& path, bool env_do_fsync)
: DBTestBase(path, env_do_fsync) {}
ColumnFamilyHandle* GetCFHandleToUse(ColumnFamilyHandle* column_family,
WriteBatchOpType op_type) const {
// Note: PutEntity cannot be called without column family
if (op_type == WriteBatchOpType::kPutEntity && !column_family) {
return db_->DefaultColumnFamily();
}
return column_family;
}
};
class DbKvChecksumTest : public DbKvChecksumTestBase,
public ::testing::WithParamInterface< public ::testing::WithParamInterface<
std::tuple<WriteBatchOpType, char, WriteMode>> { std::tuple<WriteBatchOpType, char, WriteMode>> {
public: public:
DbKvChecksumTest() DbKvChecksumTest()
: DBTestBase("db_kv_checksum_test", /*env_do_fsync=*/false) { : DbKvChecksumTestBase("db_kv_checksum_test", /*env_do_fsync=*/false) {
op_type_ = std::get<0>(GetParam()); op_type_ = std::get<0>(GetParam());
corrupt_byte_addend_ = std::get<1>(GetParam()); corrupt_byte_addend_ = std::get<1>(GetParam());
write_mode_ = std::get<2>(GetParam()); write_mode_ = std::get<2>(GetParam());
@ -82,14 +103,16 @@ class DbKvChecksumTest : public DBTestBase,
Status ExecuteWrite(ColumnFamilyHandle* cf_handle) { Status ExecuteWrite(ColumnFamilyHandle* cf_handle) {
switch (write_mode_) { switch (write_mode_) {
case WriteMode::kWriteProtectedBatch: { case WriteMode::kWriteProtectedBatch: {
auto batch_and_status = GetWriteBatch( auto batch_and_status =
cf_handle, 8 /* protection_bytes_per_key */, op_type_); GetWriteBatch(GetCFHandleToUse(cf_handle, op_type_),
8 /* protection_bytes_per_key */, op_type_);
assert(batch_and_status.second.ok()); assert(batch_and_status.second.ok());
return db_->Write(WriteOptions(), &batch_and_status.first); return db_->Write(WriteOptions(), &batch_and_status.first);
} }
case WriteMode::kWriteUnprotectedBatch: { case WriteMode::kWriteUnprotectedBatch: {
auto batch_and_status = GetWriteBatch( auto batch_and_status =
cf_handle, 0 /* protection_bytes_per_key */, op_type_); GetWriteBatch(GetCFHandleToUse(cf_handle, op_type_),
0 /* protection_bytes_per_key */, op_type_);
assert(batch_and_status.second.ok()); assert(batch_and_status.second.ok());
WriteOptions write_opts; WriteOptions write_opts;
write_opts.protection_bytes_per_key = 8; write_opts.protection_bytes_per_key = 8;
@ -135,10 +158,10 @@ std::string GetOpTypeString(const WriteBatchOpType& op_type) {
return "SingleDelete"; return "SingleDelete";
case WriteBatchOpType::kDeleteRange: case WriteBatchOpType::kDeleteRange:
return "DeleteRange"; return "DeleteRange";
break;
case WriteBatchOpType::kMerge: case WriteBatchOpType::kMerge:
return "Merge"; return "Merge";
break; case WriteBatchOpType::kPutEntity:
return "PutEntity";
case WriteBatchOpType::kNum: case WriteBatchOpType::kNum:
assert(false); assert(false);
} }
@ -242,7 +265,8 @@ TEST_P(DbKvChecksumTest, MemTableAddWithColumnFamilyCorrupted) {
TEST_P(DbKvChecksumTest, NoCorruptionCase) { TEST_P(DbKvChecksumTest, NoCorruptionCase) {
// If this test fails, we may have found a piece of malfunctioned hardware // If this test fails, we may have found a piece of malfunctioned hardware
auto batch_and_status = auto batch_and_status =
GetWriteBatch(nullptr, 8 /* protection_bytes_per_key */, op_type_); GetWriteBatch(GetCFHandleToUse(nullptr, op_type_),
8 /* protection_bytes_per_key */, op_type_);
ASSERT_OK(batch_and_status.second); ASSERT_OK(batch_and_status.second);
ASSERT_OK(batch_and_status.first.VerifyChecksum()); ASSERT_OK(batch_and_status.first.VerifyChecksum());
} }
@ -323,12 +347,12 @@ TEST_P(DbKvChecksumTest, WriteToWALWithColumnFamilyCorrupted) {
} }
class DbKvChecksumTestMergedBatch class DbKvChecksumTestMergedBatch
: public DBTestBase, : public DbKvChecksumTestBase,
public ::testing::WithParamInterface< public ::testing::WithParamInterface<
std::tuple<WriteBatchOpType, WriteBatchOpType, char>> { std::tuple<WriteBatchOpType, WriteBatchOpType, char>> {
public: public:
DbKvChecksumTestMergedBatch() DbKvChecksumTestMergedBatch()
: DBTestBase("db_kv_checksum_test", /*env_do_fsync=*/false) { : DbKvChecksumTestBase("db_kv_checksum_test", /*env_do_fsync=*/false) {
op_type1_ = std::get<0>(GetParam()); op_type1_ = std::get<0>(GetParam());
op_type2_ = std::get<1>(GetParam()); op_type2_ = std::get<1>(GetParam());
corrupt_byte_addend_ = std::get<2>(GetParam()); corrupt_byte_addend_ = std::get<2>(GetParam());
@ -349,10 +373,10 @@ void CorruptWriteBatch(Slice* content, size_t offset,
TEST_P(DbKvChecksumTestMergedBatch, NoCorruptionCase) { TEST_P(DbKvChecksumTestMergedBatch, NoCorruptionCase) {
// Veirfy write batch checksum after write batch append // Veirfy write batch checksum after write batch append
auto batch1 = GetWriteBatch(nullptr /* cf_handle */, auto batch1 = GetWriteBatch(GetCFHandleToUse(nullptr, op_type1_),
8 /* protection_bytes_per_key */, op_type1_); 8 /* protection_bytes_per_key */, op_type1_);
ASSERT_OK(batch1.second); ASSERT_OK(batch1.second);
auto batch2 = GetWriteBatch(nullptr /* cf_handle */, auto batch2 = GetWriteBatch(GetCFHandleToUse(nullptr, op_type2_),
8 /* protection_bytes_per_key */, op_type2_); 8 /* protection_bytes_per_key */, op_type2_);
ASSERT_OK(batch2.second); ASSERT_OK(batch2.second);
ASSERT_OK(WriteBatchInternal::Append(&batch1.first, &batch2.first)); ASSERT_OK(WriteBatchInternal::Append(&batch1.first, &batch2.first));
@ -374,11 +398,13 @@ TEST_P(DbKvChecksumTestMergedBatch, WriteToWALCorrupted) {
options.merge_operator = MergeOperators::CreateStringAppendOperator(); options.merge_operator = MergeOperators::CreateStringAppendOperator();
} }
auto leader_batch_and_status = GetWriteBatch( auto leader_batch_and_status =
nullptr /* cf_handle */, 8 /* protection_bytes_per_key */, op_type1_); GetWriteBatch(GetCFHandleToUse(nullptr, op_type1_),
8 /* protection_bytes_per_key */, op_type1_);
ASSERT_OK(leader_batch_and_status.second); ASSERT_OK(leader_batch_and_status.second);
auto follower_batch_and_status = GetWriteBatch( auto follower_batch_and_status =
nullptr /* cf_handle */, 8 /* protection_bytes_per_key */, op_type2_); GetWriteBatch(GetCFHandleToUse(nullptr, op_type2_),
8 /* protection_bytes_per_key */, op_type2_);
size_t leader_batch_size = leader_batch_and_status.first.GetDataSize(); size_t leader_batch_size = leader_batch_and_status.first.GetDataSize();
size_t total_bytes = size_t total_bytes =
leader_batch_size + follower_batch_and_status.first.GetDataSize(); leader_batch_size + follower_batch_and_status.first.GetDataSize();
@ -419,7 +445,7 @@ TEST_P(DbKvChecksumTestMergedBatch, WriteToWALCorrupted) {
// follower // follower
follower_thread = port::Thread([&]() { follower_thread = port::Thread([&]() {
follower_batch_and_status = follower_batch_and_status =
GetWriteBatch(nullptr /* cf_handle */, GetWriteBatch(GetCFHandleToUse(nullptr, op_type2_),
8 /* protection_bytes_per_key */, op_type2_); 8 /* protection_bytes_per_key */, op_type2_);
ASSERT_OK(follower_batch_and_status.second); ASSERT_OK(follower_batch_and_status.second);
ASSERT_TRUE( ASSERT_TRUE(
@ -443,8 +469,9 @@ TEST_P(DbKvChecksumTestMergedBatch, WriteToWALCorrupted) {
Reopen(options); Reopen(options);
SyncPoint::GetInstance()->EnableProcessing(); SyncPoint::GetInstance()->EnableProcessing();
auto log_size_pre_write = dbfull()->TEST_total_log_size(); auto log_size_pre_write = dbfull()->TEST_total_log_size();
leader_batch_and_status = GetWriteBatch( leader_batch_and_status =
nullptr /* cf_handle */, 8 /* protection_bytes_per_key */, op_type1_); GetWriteBatch(GetCFHandleToUse(nullptr, op_type1_),
8 /* protection_bytes_per_key */, op_type1_);
ASSERT_OK(leader_batch_and_status.second); ASSERT_OK(leader_batch_and_status.second);
ASSERT_TRUE(db_->Write(WriteOptions(), &leader_batch_and_status.first) ASSERT_TRUE(db_->Write(WriteOptions(), &leader_batch_and_status.first)
.IsCorruption()); .IsCorruption());
@ -484,10 +511,12 @@ TEST_P(DbKvChecksumTestMergedBatch, WriteToWALWithColumnFamilyCorrupted) {
CreateAndReopenWithCF({"ramen"}, options); CreateAndReopenWithCF({"ramen"}, options);
auto leader_batch_and_status = auto leader_batch_and_status =
GetWriteBatch(handles_[1], 8 /* protection_bytes_per_key */, op_type1_); GetWriteBatch(GetCFHandleToUse(handles_[1], op_type1_),
8 /* protection_bytes_per_key */, op_type1_);
ASSERT_OK(leader_batch_and_status.second); ASSERT_OK(leader_batch_and_status.second);
auto follower_batch_and_status = auto follower_batch_and_status =
GetWriteBatch(handles_[1], 8 /* protection_bytes_per_key */, op_type2_); GetWriteBatch(GetCFHandleToUse(handles_[1], op_type2_),
8 /* protection_bytes_per_key */, op_type2_);
size_t leader_batch_size = leader_batch_and_status.first.GetDataSize(); size_t leader_batch_size = leader_batch_and_status.first.GetDataSize();
size_t total_bytes = size_t total_bytes =
leader_batch_size + follower_batch_and_status.first.GetDataSize(); leader_batch_size + follower_batch_and_status.first.GetDataSize();
@ -527,8 +556,9 @@ TEST_P(DbKvChecksumTestMergedBatch, WriteToWALWithColumnFamilyCorrupted) {
// Start the other writer thread which will join the write group as // Start the other writer thread which will join the write group as
// follower // follower
follower_thread = port::Thread([&]() { follower_thread = port::Thread([&]() {
follower_batch_and_status = GetWriteBatch( follower_batch_and_status =
handles_[1], 8 /* protection_bytes_per_key */, op_type2_); GetWriteBatch(GetCFHandleToUse(handles_[1], op_type2_),
8 /* protection_bytes_per_key */, op_type2_);
ASSERT_OK(follower_batch_and_status.second); ASSERT_OK(follower_batch_and_status.second);
ASSERT_TRUE( ASSERT_TRUE(
db_->Write(WriteOptions(), &follower_batch_and_status.first) db_->Write(WriteOptions(), &follower_batch_and_status.first)
@ -553,7 +583,8 @@ TEST_P(DbKvChecksumTestMergedBatch, WriteToWALWithColumnFamilyCorrupted) {
SyncPoint::GetInstance()->EnableProcessing(); SyncPoint::GetInstance()->EnableProcessing();
auto log_size_pre_write = dbfull()->TEST_total_log_size(); auto log_size_pre_write = dbfull()->TEST_total_log_size();
leader_batch_and_status = leader_batch_and_status =
GetWriteBatch(handles_[1], 8 /* protection_bytes_per_key */, op_type1_); GetWriteBatch(GetCFHandleToUse(handles_[1], op_type1_),
8 /* protection_bytes_per_key */, op_type1_);
ASSERT_OK(leader_batch_and_status.second); ASSERT_OK(leader_batch_and_status.second);
ASSERT_TRUE(db_->Write(WriteOptions(), &leader_batch_and_status.first) ASSERT_TRUE(db_->Write(WriteOptions(), &leader_batch_and_status.first)
.IsCorruption()); .IsCorruption());

@ -2897,6 +2897,15 @@ class ModelDB : public DB {
const Slice& /*v*/) override { const Slice& /*v*/) override {
return Status::NotSupported(); return Status::NotSupported();
} }
using DB::PutEntity;
Status PutEntity(const WriteOptions& /* options */,
ColumnFamilyHandle* /* column_family */,
const Slice& /* key */,
const WideColumns& /* columns */) override {
return Status::NotSupported();
}
using DB::Close; using DB::Close;
Status Close() override { return Status::OK(); } Status Close() override { return Status::OK(); }
using DB::Delete; using DB::Delete;

@ -740,21 +740,33 @@ static bool SaveValue(void* arg, const char* entry) {
s->seq = seq; s->seq = seq;
if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex) && if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex ||
type == kTypeWideColumnEntity) &&
max_covering_tombstone_seq > seq) { max_covering_tombstone_seq > seq) {
type = kTypeRangeDeletion; type = kTypeRangeDeletion;
} }
switch (type) { switch (type) {
case kTypeBlobIndex: case kTypeBlobIndex:
if (s->is_blob_index == nullptr) { case kTypeWideColumnEntity:
ROCKS_LOG_ERROR(s->logger, "Encounter unexpected blob index."); if (*(s->merge_in_progress)) {
*(s->status) = Status::NotSupported( *(s->status) = Status::NotSupported("Merge operator not supported");
"Encounter unsupported blob value. Please open DB with " } else if (!s->do_merge) {
"ROCKSDB_NAMESPACE::blob_db::BlobDB instead."); *(s->status) = Status::NotSupported("GetMergeOperands not supported");
} else if (*(s->merge_in_progress)) { } else if (type == kTypeBlobIndex) {
if (s->is_blob_index == nullptr) {
ROCKS_LOG_ERROR(s->logger, "Encounter unexpected blob index.");
*(s->status) = Status::NotSupported(
"Encounter unsupported blob value. Please open DB with "
"ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
}
} else {
assert(type == kTypeWideColumnEntity);
// TODO: support wide-column entities
*(s->status) = *(s->status) =
Status::NotSupported("Blob DB does not support merge operator."); Status::NotSupported("Encountered unexpected wide-column entity");
} }
if (!s->status->ok()) { if (!s->status->ok()) {
*(s->found_final_value) = true; *(s->found_final_value) = true;
return false; return false;

@ -212,11 +212,16 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
const Slice val = iter->value(); const Slice val = iter->value();
PinnableSlice blob_value; PinnableSlice blob_value;
const Slice* val_ptr; const Slice* val_ptr;
if ((kTypeValue == ikey.type || kTypeBlobIndex == ikey.type) && if ((kTypeValue == ikey.type || kTypeBlobIndex == ikey.type ||
kTypeWideColumnEntity == ikey.type) &&
(range_del_agg == nullptr || (range_del_agg == nullptr ||
!range_del_agg->ShouldDelete( !range_del_agg->ShouldDelete(
ikey, RangeDelPositioningMode::kForwardTraversal))) { ikey, RangeDelPositioningMode::kForwardTraversal))) {
if (ikey.type == kTypeBlobIndex) { if (ikey.type == kTypeWideColumnEntity) {
// TODO: support wide-column entities
return Status::NotSupported(
"Merge currently not supported for wide-column entities");
} else if (ikey.type == kTypeBlobIndex) {
BlobIndex blob_index; BlobIndex blob_index;
s = blob_index.DecodeFrom(val); s = blob_index.DecodeFrom(val);

@ -2161,6 +2161,10 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
"Encounter unexpected blob index. Please open DB with " "Encounter unexpected blob index. Please open DB with "
"ROCKSDB_NAMESPACE::blob_db::BlobDB instead."); "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
return; return;
case GetContext::kUnexpectedWideColumnEntity:
*status =
Status::NotSupported("Encountered unexpected wide-column entity");
return;
} }
f = fp.GetNextFile(); f = fp.GetNextFile();
} }

@ -141,6 +141,11 @@ DEFINE_SYNC_AND_ASYNC(Status, Version::MultiGetFromSST)
"ROCKSDB_NAMESPACE::blob_db::BlobDB instead."); "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
file_range.MarkKeyDone(iter); file_range.MarkKeyDone(iter);
continue; continue;
case GetContext::kUnexpectedWideColumnEntity:
*status =
Status::NotSupported("Encountered unexpected wide-column entity");
file_range.MarkKeyDone(iter);
continue;
} }
} }

@ -0,0 +1,213 @@
// Copyright (c) Meta Platforms, Inc. and affiliates.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include <array>
#include <memory>
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "test_util/testutil.h"
#include "utilities/merge_operators.h"
namespace ROCKSDB_NAMESPACE {
class DBWideBasicTest : public DBTestBase {
protected:
explicit DBWideBasicTest()
: DBTestBase("db_wide_basic_test", /* env_do_fsync */ false) {}
};
TEST_F(DBWideBasicTest, PutEntity) {
Options options = GetDefaultOptions();
// Use the DB::PutEntity API
constexpr char first_key[] = "first";
WideColumns first_columns{{"attr_name1", "foo"}, {"attr_name2", "bar"}};
ASSERT_OK(db_->PutEntity(WriteOptions(), db_->DefaultColumnFamily(),
first_key, first_columns));
// Use WriteBatch
constexpr char second_key[] = "second";
WideColumns second_columns{{"attr_one", "two"}, {"attr_three", "four"}};
WriteBatch batch;
ASSERT_OK(
batch.PutEntity(db_->DefaultColumnFamily(), second_key, second_columns));
ASSERT_OK(db_->Write(WriteOptions(), &batch));
// Note: currently, read APIs are supposed to return NotSupported
auto verify = [&]() {
{
PinnableSlice result;
ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(), first_key,
&result)
.IsNotSupported());
}
{
PinnableSlice result;
ASSERT_TRUE(db_->Get(ReadOptions(), db_->DefaultColumnFamily(),
second_key, &result)
.IsNotSupported());
}
{
constexpr size_t num_keys = 2;
std::array<Slice, num_keys> keys{{first_key, second_key}};
std::array<PinnableSlice, num_keys> values;
std::array<Status, num_keys> statuses;
db_->MultiGet(ReadOptions(), db_->DefaultColumnFamily(), num_keys,
&keys[0], &values[0], &statuses[0]);
ASSERT_TRUE(values[0].empty());
ASSERT_TRUE(statuses[0].IsNotSupported());
ASSERT_TRUE(values[1].empty());
ASSERT_TRUE(statuses[1].IsNotSupported());
}
{
std::unique_ptr<Iterator> iter(db_->NewIterator(ReadOptions()));
iter->SeekToFirst();
ASSERT_FALSE(iter->Valid());
ASSERT_TRUE(iter->status().IsNotSupported());
iter->SeekToLast();
ASSERT_FALSE(iter->Valid());
ASSERT_TRUE(iter->status().IsNotSupported());
}
};
// Try reading from memtable
verify();
// Try reading after recovery
Close();
options.avoid_flush_during_recovery = true;
Reopen(options);
verify();
// Try reading from storage
ASSERT_OK(Flush());
verify();
// Add a couple of merge operands
Close();
options.merge_operator = MergeOperators::CreateStringAppendOperator();
Reopen(options);
constexpr char merge_operand[] = "bla";
ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), first_key,
merge_operand));
ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), second_key,
merge_operand));
// Try reading from memtable
verify();
// Try reading from storage
ASSERT_OK(Flush());
verify();
// Do it again, with the Put and the Merge in the same memtable
ASSERT_OK(db_->PutEntity(WriteOptions(), db_->DefaultColumnFamily(),
first_key, first_columns));
ASSERT_OK(db_->Write(WriteOptions(), &batch));
ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), first_key,
merge_operand));
ASSERT_OK(db_->Merge(WriteOptions(), db_->DefaultColumnFamily(), second_key,
merge_operand));
// Try reading from memtable
verify();
}
TEST_F(DBWideBasicTest, PutEntityColumnFamily) {
Options options = GetDefaultOptions();
CreateAndReopenWithCF({"corinthian"}, options);
// Use the DB::PutEntity API
constexpr char first_key[] = "first";
WideColumns first_columns{{"attr_name1", "foo"}, {"attr_name2", "bar"}};
ASSERT_OK(
db_->PutEntity(WriteOptions(), handles_[1], first_key, first_columns));
// Use WriteBatch
constexpr char second_key[] = "second";
WideColumns second_columns{{"attr_one", "two"}, {"attr_three", "four"}};
WriteBatch batch;
ASSERT_OK(batch.PutEntity(handles_[1], second_key, second_columns));
ASSERT_OK(db_->Write(WriteOptions(), &batch));
}
TEST_F(DBWideBasicTest, PutEntityTimestampError) {
// Note: timestamps are currently not supported
Options options = GetDefaultOptions();
options.comparator = test::BytewiseComparatorWithU64TsWrapper();
ColumnFamilyHandle* handle = nullptr;
ASSERT_OK(db_->CreateColumnFamily(options, "corinthian", &handle));
std::unique_ptr<ColumnFamilyHandle> handle_guard(handle);
// Use the DB::PutEntity API
constexpr char first_key[] = "first";
WideColumns first_columns{{"attr_name1", "foo"}, {"attr_name2", "bar"}};
ASSERT_TRUE(db_->PutEntity(WriteOptions(), handle, first_key, first_columns)
.IsInvalidArgument());
// Use WriteBatch
constexpr char second_key[] = "second";
WideColumns second_columns{{"doric", "column"}, {"ionic", "column"}};
WriteBatch batch;
ASSERT_TRUE(
batch.PutEntity(handle, second_key, second_columns).IsInvalidArgument());
ASSERT_OK(db_->Write(WriteOptions(), &batch));
}
TEST_F(DBWideBasicTest, PutEntitySerializationError) {
// Make sure duplicate columns are caught
Options options = GetDefaultOptions();
// Use the DB::PutEntity API
constexpr char first_key[] = "first";
WideColumns first_columns{{"foo", "bar"}, {"foo", "baz"}};
ASSERT_TRUE(db_->PutEntity(WriteOptions(), db_->DefaultColumnFamily(),
first_key, first_columns)
.IsCorruption());
// Use WriteBatch
constexpr char second_key[] = "second";
WideColumns second_columns{{"column", "doric"}, {"column", "ionic"}};
WriteBatch batch;
ASSERT_TRUE(
batch.PutEntity(db_->DefaultColumnFamily(), second_key, second_columns)
.IsCorruption());
ASSERT_OK(db_->Write(WriteOptions(), &batch));
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
RegisterCustomObjects(argc, argv);
return RUN_ALL_TESTS();
}

@ -17,12 +17,6 @@ namespace ROCKSDB_NAMESPACE {
Status WideColumnSerialization::Serialize(const WideColumns& columns, Status WideColumnSerialization::Serialize(const WideColumns& columns,
std::string& output) { std::string& output) {
// Column names should be strictly ascending
assert(std::adjacent_find(columns.cbegin(), columns.cend(),
[](const WideColumn& lhs, const WideColumn& rhs) {
return lhs.name().compare(rhs.name()) > 0;
}) == columns.cend());
if (columns.size() > if (columns.size() >
static_cast<size_t>(std::numeric_limits<uint32_t>::max())) { static_cast<size_t>(std::numeric_limits<uint32_t>::max())) {
return Status::InvalidArgument("Too many wide columns"); return Status::InvalidArgument("Too many wide columns");
@ -32,12 +26,17 @@ Status WideColumnSerialization::Serialize(const WideColumns& columns,
PutVarint32(&output, static_cast<uint32_t>(columns.size())); PutVarint32(&output, static_cast<uint32_t>(columns.size()));
for (const auto& column : columns) { for (size_t i = 0; i < columns.size(); ++i) {
const WideColumn& column = columns[i];
const Slice& name = column.name(); const Slice& name = column.name();
if (name.size() > if (name.size() >
static_cast<size_t>(std::numeric_limits<uint32_t>::max())) { static_cast<size_t>(std::numeric_limits<uint32_t>::max())) {
return Status::InvalidArgument("Wide column name too long"); return Status::InvalidArgument("Wide column name too long");
} }
if (i > 0 && columns[i - 1].name().compare(name) >= 0) {
return Status::Corruption("Wide columns out of order");
}
const Slice& value = column.value(); const Slice& value = column.value();
if (value.size() > if (value.size() >

@ -124,6 +124,22 @@ TEST(WideColumnSerializationTest, SerializeDeserialize) {
} }
} }
TEST(WideColumnSerializationTest, SerializeDuplicateError) {
WideColumns columns{{"foo", "bar"}, {"foo", "baz"}};
std::string output;
ASSERT_TRUE(
WideColumnSerialization::Serialize(columns, output).IsCorruption());
}
TEST(WideColumnSerializationTest, SerializeOutOfOrderError) {
WideColumns columns{{"hello", "world"}, {"foo", "bar"}};
std::string output;
ASSERT_TRUE(
WideColumnSerialization::Serialize(columns, output).IsCorruption());
}
TEST(WideColumnSerializationTest, DeserializeVersionError) { TEST(WideColumnSerializationTest, DeserializeVersionError) {
// Can't decode version // Can't decode version

@ -29,6 +29,8 @@
// kTypeRollbackXID varstring // kTypeRollbackXID varstring
// kTypeBeginPersistedPrepareXID // kTypeBeginPersistedPrepareXID
// kTypeBeginUnprepareXID // kTypeBeginUnprepareXID
// kTypeWideColumnEntity varstring varstring
// kTypeColumnFamilyWideColumnEntity varint32 varstring varstring
// kTypeNoop // kTypeNoop
// varstring := // varstring :=
// len: varint32 // len: varint32
@ -36,6 +38,8 @@
#include "rocksdb/write_batch.h" #include "rocksdb/write_batch.h"
#include <algorithm>
#include <limits>
#include <map> #include <map>
#include <stack> #include <stack>
#include <stdexcept> #include <stdexcept>
@ -52,6 +56,7 @@
#include "db/merge_context.h" #include "db/merge_context.h"
#include "db/snapshot_impl.h" #include "db/snapshot_impl.h"
#include "db/trim_history_scheduler.h" #include "db/trim_history_scheduler.h"
#include "db/wide/wide_column_serialization.h"
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
#include "monitoring/perf_context_imp.h" #include "monitoring/perf_context_imp.h"
#include "monitoring/statistics.h" #include "monitoring/statistics.h"
@ -82,6 +87,7 @@ enum ContentFlags : uint32_t {
HAS_DELETE_RANGE = 1 << 9, HAS_DELETE_RANGE = 1 << 9,
HAS_BLOB_INDEX = 1 << 10, HAS_BLOB_INDEX = 1 << 10,
HAS_BEGIN_UNPREPARE = 1 << 11, HAS_BEGIN_UNPREPARE = 1 << 11,
HAS_PUT_ENTITY = 1 << 12,
}; };
struct BatchContentClassifier : public WriteBatch::Handler { struct BatchContentClassifier : public WriteBatch::Handler {
@ -92,6 +98,12 @@ struct BatchContentClassifier : public WriteBatch::Handler {
return Status::OK(); return Status::OK();
} }
Status PutEntityCF(uint32_t /* column_family_id */, const Slice& /* key */,
const Slice& /* entity */) override {
content_flags |= ContentFlags::HAS_PUT_ENTITY;
return Status::OK();
}
Status DeleteCF(uint32_t, const Slice&) override { Status DeleteCF(uint32_t, const Slice&) override {
content_flags |= ContentFlags::HAS_DELETE; content_flags |= ContentFlags::HAS_DELETE;
return Status::OK(); return Status::OK();
@ -287,6 +299,10 @@ bool WriteBatch::HasPut() const {
return (ComputeContentFlags() & ContentFlags::HAS_PUT) != 0; return (ComputeContentFlags() & ContentFlags::HAS_PUT) != 0;
} }
bool WriteBatch::HasPutEntity() const {
return (ComputeContentFlags() & ContentFlags::HAS_PUT_ENTITY) != 0;
}
bool WriteBatch::HasDelete() const { bool WriteBatch::HasDelete() const {
return (ComputeContentFlags() & ContentFlags::HAS_DELETE) != 0; return (ComputeContentFlags() & ContentFlags::HAS_DELETE) != 0;
} }
@ -435,6 +451,17 @@ Status ReadRecordFromWriteBatch(Slice* input, char* tag,
return Status::Corruption("bad Rollback XID"); return Status::Corruption("bad Rollback XID");
} }
break; break;
case kTypeColumnFamilyWideColumnEntity:
if (!GetVarint32(input, column_family)) {
return Status::Corruption("bad WriteBatch PutEntity");
}
FALLTHROUGH_INTENDED;
case kTypeWideColumnEntity:
if (!GetLengthPrefixedSlice(input, key) ||
!GetLengthPrefixedSlice(input, value)) {
return Status::Corruption("bad WriteBatch PutEntity");
}
break;
default: default:
return Status::Corruption("unknown WriteBatch tag"); return Status::Corruption("unknown WriteBatch tag");
} }
@ -462,6 +489,7 @@ Status WriteBatchInternal::Iterate(const WriteBatch* wb,
(begin == WriteBatchInternal::kHeader) && (end == wb->rep_.size()); (begin == WriteBatchInternal::kHeader) && (end == wb->rep_.size());
Slice key, value, blob, xid; Slice key, value, blob, xid;
// Sometimes a sub-batch starts with a Noop. We want to exclude such Noops as // Sometimes a sub-batch starts with a Noop. We want to exclude such Noops as
// the batch boundary symbols otherwise we would mis-count the number of // the batch boundary symbols otherwise we would mis-count the number of
// batches. We do that by checking whether the accumulated batch is empty // batches. We do that by checking whether the accumulated batch is empty
@ -661,6 +689,16 @@ Status WriteBatchInternal::Iterate(const WriteBatch* wb,
assert(s.ok()); assert(s.ok());
empty_batch = true; empty_batch = true;
break; break;
case kTypeWideColumnEntity:
case kTypeColumnFamilyWideColumnEntity:
assert(wb->content_flags_.load(std::memory_order_relaxed) &
(ContentFlags::DEFERRED | ContentFlags::HAS_PUT_ENTITY));
s = handler->PutEntityCF(column_family, key, value);
if (LIKELY(s.ok())) {
empty_batch = false;
++found;
}
break;
default: default:
return Status::Corruption("unknown WriteBatch tag"); return Status::Corruption("unknown WriteBatch tag");
} }
@ -891,6 +929,86 @@ Status WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& key,
"Cannot call this method on column family enabling timestamp"); "Cannot call this method on column family enabling timestamp");
} }
Status WriteBatchInternal::PutEntity(WriteBatch* b, uint32_t column_family_id,
const Slice& key,
const WideColumns& columns) {
assert(b);
if (key.size() > size_t{std::numeric_limits<uint32_t>::max()}) {
return Status::InvalidArgument("key is too large");
}
WideColumns sorted_columns(columns);
std::sort(sorted_columns.begin(), sorted_columns.end(),
[](const WideColumn& lhs, const WideColumn& rhs) {
return lhs.name().compare(rhs.name()) < 0;
});
std::string entity;
const Status s = WideColumnSerialization::Serialize(sorted_columns, entity);
if (!s.ok()) {
return s;
}
if (entity.size() > size_t{std::numeric_limits<uint32_t>::max()}) {
return Status::InvalidArgument("wide column entity is too large");
}
LocalSavePoint save(b);
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeWideColumnEntity));
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyWideColumnEntity));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSlice(&b->rep_, key);
PutLengthPrefixedSlice(&b->rep_, entity);
b->content_flags_.store(b->content_flags_.load(std::memory_order_relaxed) |
ContentFlags::HAS_PUT_ENTITY,
std::memory_order_relaxed);
if (b->prot_info_ != nullptr) {
b->prot_info_->entries_.emplace_back(
ProtectionInfo64()
.ProtectKVO(key, entity, kTypeWideColumnEntity)
.ProtectC(column_family_id));
}
return save.commit();
}
Status WriteBatch::PutEntity(ColumnFamilyHandle* column_family,
const Slice& key, const WideColumns& columns) {
if (!column_family) {
return Status::InvalidArgument(
"Cannot call this method without a column family handle");
}
Status s;
uint32_t cf_id = 0;
size_t ts_sz = 0;
std::tie(s, cf_id, ts_sz) =
WriteBatchInternal::GetColumnFamilyIdAndTimestampSize(this,
column_family);
if (!s.ok()) {
return s;
}
if (ts_sz) {
return Status::InvalidArgument(
"Cannot call this method on column family enabling timestamp");
}
return WriteBatchInternal::PutEntity(this, cf_id, key, columns);
}
Status WriteBatchInternal::InsertNoop(WriteBatch* b) { Status WriteBatchInternal::InsertNoop(WriteBatch* b) {
b->rep_.push_back(static_cast<char>(kTypeNoop)); b->rep_.push_back(static_cast<char>(kTypeNoop));
return Status::OK(); return Status::OK();
@ -1556,6 +1674,10 @@ Status WriteBatch::VerifyChecksum() const {
case kTypeCommitXIDAndTimestamp: case kTypeCommitXIDAndTimestamp:
checksum_protected = false; checksum_protected = false;
break; break;
case kTypeColumnFamilyWideColumnEntity:
case kTypeWideColumnEntity:
tag = kTypeWideColumnEntity;
break;
default: default:
return Status::Corruption( return Status::Corruption(
"unknown WriteBatch tag", "unknown WriteBatch tag",
@ -1994,6 +2116,29 @@ class MemTableInserter : public WriteBatch::Handler {
return ret_status; return ret_status;
} }
Status PutEntityCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
const auto* kv_prot_info = NextProtectionInfo();
Status s;
if (kv_prot_info) {
// Memtable needs seqno, doesn't need CF ID
auto mem_kv_prot_info =
kv_prot_info->StripC(column_family_id).ProtectS(sequence_);
s = PutCFImpl(column_family_id, key, value, kTypeWideColumnEntity,
&mem_kv_prot_info);
} else {
s = PutCFImpl(column_family_id, key, value, kTypeWideColumnEntity,
/* kv_prot_info */ nullptr);
}
if (UNLIKELY(s.IsTryAgain())) {
DecrementProtectionInfoIdxForTryAgain();
}
return s;
}
Status DeleteImpl(uint32_t /*column_family_id*/, const Slice& key, Status DeleteImpl(uint32_t /*column_family_id*/, const Slice& key,
const Slice& value, ValueType delete_type, const Slice& value, ValueType delete_type,
const ProtectionInfoKVOS64* kv_prot_info) { const ProtectionInfoKVOS64* kv_prot_info) {
@ -2780,6 +2925,11 @@ class ProtectionInfoUpdater : public WriteBatch::Handler {
return UpdateProtInfo(cf, key, val, kTypeValue); return UpdateProtInfo(cf, key, val, kTypeValue);
} }
Status PutEntityCF(uint32_t cf, const Slice& key,
const Slice& entity) override {
return UpdateProtInfo(cf, key, entity, kTypeWideColumnEntity);
}
Status DeleteCF(uint32_t cf, const Slice& key) override { Status DeleteCF(uint32_t cf, const Slice& key) override {
return UpdateProtInfo(cf, key, "", kTypeDeletion); return UpdateProtInfo(cf, key, "", kTypeDeletion);
} }

@ -88,6 +88,9 @@ class WriteBatchInternal {
static Status Put(WriteBatch* batch, uint32_t column_family_id, static Status Put(WriteBatch* batch, uint32_t column_family_id,
const SliceParts& key, const SliceParts& value); const SliceParts& key, const SliceParts& value);
static Status PutEntity(WriteBatch* batch, uint32_t column_family_id,
const Slice& key, const WideColumns& columns);
static Status Delete(WriteBatch* batch, uint32_t column_family_id, static Status Delete(WriteBatch* batch, uint32_t column_family_id,
const SliceParts& key); const SliceParts& key);

@ -27,6 +27,7 @@
#include "rocksdb/transaction_log.h" #include "rocksdb/transaction_log.h"
#include "rocksdb/types.h" #include "rocksdb/types.h"
#include "rocksdb/version.h" #include "rocksdb/version.h"
#include "rocksdb/wide_columns.h"
#ifdef _WIN32 #ifdef _WIN32
// Windows API macro interference // Windows API macro interference
@ -406,6 +407,11 @@ class DB {
return Put(options, DefaultColumnFamily(), key, ts, value); return Put(options, DefaultColumnFamily(), key, ts, value);
} }
// UNDER CONSTRUCTION -- DO NOT USE
virtual Status PutEntity(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
const WideColumns& columns) = 0;
// Remove the database entry (if any) for "key". Returns OK on // Remove the database entry (if any) for "key". Returns OK on
// success, and a non-OK status on error. It is not an error if "key" // success, and a non-OK status on error. It is not an error if "key"
// did not exist in the database. // did not exist in the database.

@ -85,6 +85,13 @@ class StackableDB : public DB {
return db_->Put(options, column_family, key, ts, val); return db_->Put(options, column_family, key, ts, val);
} }
using DB::PutEntity;
Status PutEntity(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
const WideColumns& columns) override {
return db_->PutEntity(options, column_family, key, columns);
}
using DB::Get; using DB::Get;
virtual Status Get(const ReadOptions& options, virtual Status Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key, ColumnFamilyHandle* column_family, const Slice& key,

@ -113,6 +113,17 @@ class WriteBatchWithIndex : public WriteBatchBase {
Status Put(ColumnFamilyHandle* column_family, const Slice& key, Status Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& ts, const Slice& value) override; const Slice& ts, const Slice& value) override;
Status PutEntity(ColumnFamilyHandle* column_family, const Slice& /* key */,
const WideColumns& /* columns */) override {
if (!column_family) {
return Status::InvalidArgument(
"Cannot call this method without a column family handle");
}
return Status::NotSupported(
"PutEntity not supported by WriteBatchWithIndex");
}
using WriteBatchBase::Merge; using WriteBatchBase::Merge;
Status Merge(ColumnFamilyHandle* column_family, const Slice& key, Status Merge(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) override; const Slice& value) override;

@ -100,6 +100,11 @@ class WriteBatch : public WriteBatchBase {
return Put(nullptr, key, value); return Put(nullptr, key, value);
} }
// UNDER CONSTRUCTION -- DO NOT USE
using WriteBatchBase::PutEntity;
Status PutEntity(ColumnFamilyHandle* column_family, const Slice& key,
const WideColumns& columns) override;
using WriteBatchBase::Delete; using WriteBatchBase::Delete;
// If the database contains a mapping for "key", erase it. Else do nothing. // If the database contains a mapping for "key", erase it. Else do nothing.
// The following Delete(..., const Slice& key) can be used when user-defined // The following Delete(..., const Slice& key) can be used when user-defined
@ -240,6 +245,12 @@ class WriteBatch : public WriteBatchBase {
} }
virtual void Put(const Slice& /*key*/, const Slice& /*value*/) {} virtual void Put(const Slice& /*key*/, const Slice& /*value*/) {}
virtual Status PutEntityCF(uint32_t /* column_family_id */,
const Slice& /* key */,
const Slice& /* entity */) {
return Status::NotSupported("PutEntityCF not implemented");
}
virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) { virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) {
if (column_family_id == 0) { if (column_family_id == 0) {
Delete(key); Delete(key);
@ -346,6 +357,9 @@ class WriteBatch : public WriteBatchBase {
// Returns true if PutCF will be called during Iterate // Returns true if PutCF will be called during Iterate
bool HasPut() const; bool HasPut() const;
// Returns true if PutEntityCF will be called during Iterate
bool HasPutEntity() const;
// Returns true if DeleteCF will be called during Iterate // Returns true if DeleteCF will be called during Iterate
bool HasDelete() const; bool HasDelete() const;

@ -11,6 +11,7 @@
#include <cstddef> #include <cstddef>
#include "rocksdb/rocksdb_namespace.h" #include "rocksdb/rocksdb_namespace.h"
#include "rocksdb/wide_columns.h"
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
@ -41,6 +42,10 @@ class WriteBatchBase {
const SliceParts& value); const SliceParts& value);
virtual Status Put(const SliceParts& key, const SliceParts& value); virtual Status Put(const SliceParts& key, const SliceParts& value);
// UNDER CONSTRUCTION -- DO NOT USE
virtual Status PutEntity(ColumnFamilyHandle* column_family, const Slice& key,
const WideColumns& columns) = 0;
// Merge "value" with the existing value of "key" in the database. // Merge "value" with the existing value of "key" in the database.
// "key->merge(existing, value)" // "key->merge(existing, value)"
virtual Status Merge(ColumnFamilyHandle* column_family, const Slice& key, virtual Status Merge(ColumnFamilyHandle* column_family, const Slice& key,

@ -503,6 +503,7 @@ TEST_MAIN_SOURCES = \
db/version_edit_test.cc \ db/version_edit_test.cc \
db/version_set_test.cc \ db/version_set_test.cc \
db/wal_manager_test.cc \ db/wal_manager_test.cc \
db/wide/db_wide_basic_test.cc \
db/wide/wide_column_serialization_test.cc \ db/wide/wide_column_serialization_test.cc \
db/write_batch_test.cc \ db/write_batch_test.cc \
db/write_callback_test.cc \ db/write_callback_test.cc \

@ -307,10 +307,11 @@ void MetaBlockIter::SeekImpl(const Slice& target) {
// target = "seek_user_key @ type | seqno". // target = "seek_user_key @ type | seqno".
// //
// For any type other than kTypeValue, kTypeDeletion, kTypeSingleDeletion, // For any type other than kTypeValue, kTypeDeletion, kTypeSingleDeletion,
// or kTypeBlobIndex, this function behaves identically as Seek(). // kTypeBlobIndex, or kTypeWideColumnEntity, this function behaves identically
// to Seek().
// //
// For any type in kTypeValue, kTypeDeletion, kTypeSingleDeletion, // For any type in kTypeValue, kTypeDeletion, kTypeSingleDeletion,
// or kTypeBlobIndex: // kTypeBlobIndex, or kTypeWideColumnEntity:
// //
// If the return value is FALSE, iter location is undefined, and it means: // If the return value is FALSE, iter location is undefined, and it means:
// 1) there is no key in this block falling into the range: // 1) there is no key in this block falling into the range:
@ -412,7 +413,8 @@ bool DataBlockIter::SeekForGetImpl(const Slice& target) {
if (value_type != ValueType::kTypeValue && if (value_type != ValueType::kTypeValue &&
value_type != ValueType::kTypeDeletion && value_type != ValueType::kTypeDeletion &&
value_type != ValueType::kTypeSingleDeletion && value_type != ValueType::kTypeSingleDeletion &&
value_type != ValueType::kTypeBlobIndex) { value_type != ValueType::kTypeBlobIndex &&
value_type != ValueType::kTypeWideColumnEntity) {
SeekImpl(target); SeekImpl(target);
return true; return true;
} }

@ -241,7 +241,8 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
auto type = parsed_key.type; auto type = parsed_key.type;
// Key matches. Process it // Key matches. Process it
if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex) && if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex ||
type == kTypeWideColumnEntity) &&
max_covering_tombstone_seq_ != nullptr && max_covering_tombstone_seq_ != nullptr &&
*max_covering_tombstone_seq_ > parsed_key.sequence) { *max_covering_tombstone_seq_ > parsed_key.sequence) {
type = kTypeRangeDeletion; type = kTypeRangeDeletion;
@ -249,15 +250,24 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
switch (type) { switch (type) {
case kTypeValue: case kTypeValue:
case kTypeBlobIndex: case kTypeBlobIndex:
case kTypeWideColumnEntity:
assert(state_ == kNotFound || state_ == kMerge); assert(state_ == kNotFound || state_ == kMerge);
if (type == kTypeBlobIndex && is_blob_index_ == nullptr) { if (type == kTypeBlobIndex) {
// Blob value not supported. Stop. if (is_blob_index_ == nullptr) {
state_ = kUnexpectedBlobIndex; // Blob value not supported. Stop.
state_ = kUnexpectedBlobIndex;
return false;
}
} else if (type == kTypeWideColumnEntity) {
// TODO: support wide-column entities
state_ = kUnexpectedWideColumnEntity;
return false; return false;
} }
if (is_blob_index_ != nullptr) { if (is_blob_index_ != nullptr) {
*is_blob_index_ = (type == kTypeBlobIndex); *is_blob_index_ = (type == kTypeBlobIndex);
} }
if (kNotFound == state_) { if (kNotFound == state_) {
state_ = kFound; state_ = kFound;
if (do_merge_) { if (do_merge_) {
@ -276,20 +286,25 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
// It means this function is called as part of DB GetMergeOperands // It means this function is called as part of DB GetMergeOperands
// API and the current value should be part of // API and the current value should be part of
// merge_context_->operand_list // merge_context_->operand_list
if (is_blob_index_ != nullptr && *is_blob_index_) { if (type == kTypeBlobIndex) {
PinnableSlice pin_val; PinnableSlice pin_val;
if (GetBlobValue(value, &pin_val) == false) { if (GetBlobValue(value, &pin_val) == false) {
return false; return false;
} }
Slice blob_value(pin_val); Slice blob_value(pin_val);
push_operand(blob_value, nullptr); push_operand(blob_value, nullptr);
} else if (type == kTypeWideColumnEntity) {
// TODO: support wide-column entities
state_ = kUnexpectedWideColumnEntity;
return false;
} else { } else {
assert(type == kTypeValue);
push_operand(value, value_pinner); push_operand(value, value_pinner);
} }
} }
} else if (kMerge == state_) { } else if (kMerge == state_) {
assert(merge_operator_ != nullptr); assert(merge_operator_ != nullptr);
if (is_blob_index_ != nullptr && *is_blob_index_) { if (type == kTypeBlobIndex) {
PinnableSlice pin_val; PinnableSlice pin_val;
if (GetBlobValue(value, &pin_val) == false) { if (GetBlobValue(value, &pin_val) == false) {
return false; return false;
@ -304,7 +319,13 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
// merge_context_->operand_list // merge_context_->operand_list
push_operand(blob_value, nullptr); push_operand(blob_value, nullptr);
} }
} else if (type == kTypeWideColumnEntity) {
// TODO: support wide-column entities
state_ = kUnexpectedWideColumnEntity;
return false;
} else { } else {
assert(type == kTypeValue);
state_ = kFound; state_ = kFound;
if (do_merge_) { if (do_merge_) {
Merge(&value); Merge(&value);

@ -74,6 +74,8 @@ class GetContext {
kCorrupt, kCorrupt,
kMerge, // saver contains the current merge result (the operands) kMerge, // saver contains the current merge result (the operands)
kUnexpectedBlobIndex, kUnexpectedBlobIndex,
// TODO: remove once wide-column entities are supported by Get/MultiGet
kUnexpectedWideColumnEntity,
}; };
GetContextStats get_context_stats_; GetContextStats get_context_stats_;

Loading…
Cancel
Save