Support for SingleDelete()

Summary:
This patch fixes #7460559. It introduces SingleDelete as a new database
operation. This operation can be used to delete keys that were never
overwritten (no put following another put of the same key). If an overwritten
key is single deleted the behavior is undefined. Single deletion of a
non-existent key has no effect but multiple consecutive single deletions are
not allowed (see limitations).

In contrast to the conventional Delete() operation, the deletion entry is
removed along with the value when the two are lined up in a compaction. Note:
The semantics are similar to @igor's prototype that allowed to have this
behavior on the granularity of a column family (
https://reviews.facebook.net/D42093 ). This new patch, however, is more
aggressive when it comes to removing tombstones: It removes the SingleDelete
together with the value whenever there is no snapshot between them while the
older patch only did this when the sequence number of the deletion was older
than the earliest snapshot.

Most of the complex additions are in the Compaction Iterator, all other changes
should be relatively straightforward. The patch also includes basic support for
single deletions in db_stress and db_bench.

Limitations:
- Not compatible with cuckoo hash tables
- Single deletions cannot be used in combination with merges and normal
  deletions on the same key (other keys are not affected by this)
- Consecutive single deletions are currently not allowed (and older version of
  this patch supported this so it could be resurrected if needed)

Test Plan: make all check

Reviewers: yhchiang, sdong, rven, anthony, yoshinorim, igor

Reviewed By: igor

Subscribers: maykov, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D43179
main
Andres Noetzli 9 years ago
parent f35560d00d
commit 014fd55adc
  1. 7
      HISTORY.md
  2. 4
      Makefile
  3. 29
      db/builder.cc
  4. 117
      db/compaction_iterator.cc
  5. 22
      db/compaction_iterator.h
  6. 70
      db/compaction_iterator_test.cc
  7. 170
      db/compaction_job_test.cc
  8. 75
      db/db_bench.cc
  9. 13
      db/db_impl.cc
  10. 6
      db/db_impl.h
  11. 6
      db/db_impl_readonly.h
  12. 30
      db/db_iter.cc
  13. 66
      db/db_iter_test.cc
  14. 170
      db/db_test.cc
  15. 4
      db/dbformat.cc
  16. 53
      db/dbformat.h
  17. 2
      db/internal_stats.cc
  18. 5
      db/memtable.cc
  19. 9
      db/merge_helper.cc
  20. 4
      db/merge_helper.h
  21. 23
      db/merge_helper_test.cc
  22. 8
      db/table_properties_collector.cc
  23. 82
      db/table_properties_collector_test.cc
  24. 137
      db/write_batch.cc
  25. 13
      db/write_batch_base.cc
  26. 6
      db/write_batch_internal.h
  27. 182
      db/write_batch_test.cc
  28. 11
      include/rocksdb/db.h
  29. 2
      include/rocksdb/perf_context.h
  30. 1
      include/rocksdb/table_properties.h
  31. 7
      include/rocksdb/utilities/stackable_db.h
  32. 13
      include/rocksdb/utilities/write_batch_with_index.h
  33. 63
      include/rocksdb/write_batch.h
  34. 11
      include/rocksdb/write_batch_base.h
  35. 3
      table/get_context.cc
  36. 169
      tools/db_stress.cc
  37. 11
      util/db_test_util.cc
  38. 4
      util/db_test_util.h
  39. 9
      util/testutil.cc
  40. 4
      util/testutil.h
  41. 39
      utilities/write_batch_with_index/write_batch_with_index.cc
  42. 7
      utilities/write_batch_with_index/write_batch_with_index_internal.cc
  43. 2
      utilities/write_batch_with_index/write_batch_with_index_internal.h

@ -1,5 +1,12 @@
# Rocksdb Change Log
## Unreleased
### New Features
* Added single delete operation as a more efficient way to delete keys that have not been overwritten.
### Public API Changes
* Added SingleDelete() to the DB interface.
## 4.0.0 (9/9/2015)
### New Features
* Added support for transactions. See include/rocksdb/utilities/transaction.h for more info.

@ -295,6 +295,7 @@ TESTS = \
flush_job_test \
wal_manager_test \
listener_test \
compaction_iterator_test \
compaction_job_test \
thread_list_test \
sst_dump_test \
@ -773,6 +774,9 @@ write_batch_with_index_test: utilities/write_batch_with_index/write_batch_with_i
flush_job_test: db/flush_job_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
compaction_iterator_test: db/compaction_iterator_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
compaction_job_test: db/compaction_job_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

@ -110,13 +110,15 @@ Status BuildTable(
}
// Finish and check for builder errors
bool empty = builder->NumEntries() == 0;
s = c_iter.status();
if (s.ok()) {
s = builder->Finish();
} else {
if (!s.ok() || empty) {
builder->Abandon();
} else {
s = builder->Finish();
}
if (s.ok()) {
if (s.ok() && !empty) {
meta->fd.file_size = builder->FileSize();
meta->marked_for_compaction = builder->NeedCompact();
assert(meta->fd.GetFileSize() > 0);
@ -127,28 +129,27 @@ Status BuildTable(
delete builder;
// Finish and check for file errors
if (s.ok() && !ioptions.disable_data_sync) {
if (s.ok() && !empty && !ioptions.disable_data_sync) {
StopWatch sw(env, ioptions.statistics, TABLE_SYNC_MICROS);
file_writer->Sync(ioptions.use_fsync);
}
if (s.ok()) {
if (s.ok() && !empty) {
s = file_writer->Close();
}
if (s.ok()) {
if (s.ok() && !empty) {
// Verify that the table is usable
Iterator* it = table_cache->NewIterator(
std::unique_ptr<Iterator> it(table_cache->NewIterator(
ReadOptions(), env_options, internal_comparator, meta->fd, nullptr,
(internal_stats == nullptr) ? nullptr
: internal_stats->GetFileReadHist(0),
false);
false));
s = it->status();
if (s.ok() && paranoid_file_checks) {
for (it->SeekToFirst(); it->Valid(); it->Next()) {}
for (it->SeekToFirst(); it->Valid(); it->Next()) {
}
s = it->status();
}
delete it;
}
}
@ -157,9 +158,7 @@ Status BuildTable(
s = iter->status();
}
if (s.ok() && meta->fd.GetFileSize() > 0) {
// Keep it
} else {
if (!s.ok() || meta->fd.GetFileSize() == 0) {
env->DeleteFile(fname);
}
return s;

@ -71,6 +71,10 @@ void CompactionIterator::Next() {
// MergeUntil stops when it encounters a corrupt key and does not
// include them in the result, so we expect the keys here to be valid.
assert(valid_key);
// Keep current_key_ in sync.
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
key_ = current_key_.GetKey();
ikey_.user_key = current_key_.GetUserKey();
valid_ = true;
} else {
// MergeHelper moves the iterator to the first record after the merged
@ -79,8 +83,11 @@ void CompactionIterator::Next() {
NextFromInput();
}
} else {
// Only advance the input iterator if there is no merge output.
input_->Next();
// Only advance the input iterator if there is no merge output and the
// iterator is not already at the next record.
if (!at_next_) {
input_->Next();
}
NextFromInput();
}
@ -88,9 +95,10 @@ void CompactionIterator::Next() {
}
void CompactionIterator::NextFromInput() {
at_next_ = false;
valid_ = false;
while (input_->Valid()) {
while (!valid_ && input_->Valid()) {
key_ = input_->key();
value_ = input_->value();
iter_stats_.num_input_records++;
@ -100,10 +108,11 @@ void CompactionIterator::NextFromInput() {
// and let the caller decide what to do with it.
// TODO(noetzli): We should have a more elegant solution for this.
if (expect_valid_internal_key_) {
assert(!"corrupted internal key is not expected");
assert(!"Corrupted internal key not expected.");
status_ = Status::Corruption("Corrupted internal key not expected.");
break;
}
current_user_key_.Clear();
key_ = current_key_.SetKey(key_);
has_current_user_key_ = false;
current_user_key_sequence_ = kMaxSequenceNumber;
current_user_key_snapshot_ = 0;
@ -113,16 +122,20 @@ void CompactionIterator::NextFromInput() {
}
// Update input statistics
if (ikey_.type == kTypeDeletion) {
if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion) {
iter_stats_.num_input_deletion_records++;
}
iter_stats_.total_input_raw_key_bytes += key_.size();
iter_stats_.total_input_raw_value_bytes += value_.size();
// Check whether the user key changed. After this if statement current_key_
// is a copy of the current input key (maybe converted to a delete by the
// compaction filter). ikey_.user_key is pointing to the copy.
if (!has_current_user_key_ ||
cmp_->Compare(ikey_.user_key, current_user_key_.GetKey()) != 0) {
!cmp_->Equal(ikey_.user_key, current_user_key_)) {
// First occurrence of this user key
current_user_key_.SetKey(ikey_.user_key);
key_ = current_key_.SetKey(key_, &ikey_);
current_user_key_ = ikey_.user_key;
has_current_user_key_ = true;
current_user_key_sequence_ = kMaxSequenceNumber;
current_user_key_snapshot_ = 0;
@ -145,13 +158,9 @@ void CompactionIterator::NextFromInput() {
env_ != nullptr ? timer.ElapsedNanos() : 0;
}
if (to_delete) {
// make a copy of the original key and convert it to a delete
delete_key_.SetInternalKey(ExtractUserKey(key_), ikey_.sequence,
kTypeDeletion);
// anchor the key again
key_ = delete_key_.GetKey();
// needed because ikey_ is backed by key
ParseInternalKey(key_, &ikey_);
// convert the current key to a delete
ikey_.type = kTypeDeletion;
current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion);
// no value associated with delete
value_.clear();
iter_stats_.num_record_drop_user++;
@ -159,6 +168,12 @@ void CompactionIterator::NextFromInput() {
value_ = compaction_filter_value_;
}
}
} else {
// Update the current key to reflect the new sequence number/type without
// copying the user key.
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
key_ = current_key_.GetKey();
ikey_.user_key = current_key_.GetUserKey();
}
// If there are no snapshots, then this kv affect visibility at tip.
@ -173,7 +188,58 @@ void CompactionIterator::NextFromInput() {
visible_at_tip_ ? visible_at_tip_ : findEarliestVisibleSnapshot(
ikey_.sequence, &prev_snapshot);
if (last_snapshot == current_user_key_snapshot_) {
if (ikey_.type == kTypeSingleDeletion) {
ParsedInternalKey next_ikey;
input_->Next();
// Check whether the current key is valid, not corrupt and the same
// as the single delete.
if (input_->Valid() && ParseInternalKey(input_->key(), &next_ikey) &&
cmp_->Equal(ikey_.user_key, next_ikey.user_key)) {
// Mixing single deletes and merges is not supported. Consecutive
// single deletes are not valid.
if (next_ikey.type != kTypeValue) {
assert(false);
status_ =
Status::InvalidArgument("Put expected after single delete.");
break;
}
// Check whether the current key belongs to the same snapshot as the
// single delete.
if (prev_snapshot == 0 || next_ikey.sequence > prev_snapshot) {
// Found the matching value, we can drop the single delete and the
// value.
++iter_stats_.num_record_drop_hidden;
++iter_stats_.num_record_drop_obsolete;
input_->Next();
} else {
// We hit the next snapshot without hitting a put, so the iterator
// returns the single delete.
valid_ = true;
}
} else {
// We are at the end of the input, could not parse the next key, or hit
// the next key. The iterator returns the single delete if the key
// possibly exists beyond the current output level. We set
// has_current_user_key to false so that if the iterator is at the next
// key, we do not compare it again against the previous key at the next
// iteration. If the next key is corrupt, we return before the
// comparison, so the value of has_current_user_key does not matter.
has_current_user_key_ = false;
if (compaction_ != nullptr &&
compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
&level_ptrs_)) {
++iter_stats_.num_record_drop_obsolete;
} else {
valid_ = true;
}
}
if (valid_) {
at_next_ = true;
}
} else if (last_snapshot == current_user_key_snapshot_) {
// If the earliest snapshot is which this key is visible in
// is the same as the visibility of a previous instance of the
// same key, then this kv is not visible in any snapshot.
@ -181,6 +247,7 @@ void CompactionIterator::NextFromInput() {
// TODO: why not > ?
assert(last_sequence >= current_user_key_sequence_);
++iter_stats_.num_record_drop_hidden; // (A)
input_->Next();
} else if (compaction_ != nullptr && ikey_.type == kTypeDeletion &&
ikey_.sequence <= earliest_snapshot_ &&
compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key,
@ -197,6 +264,7 @@ void CompactionIterator::NextFromInput() {
// few iterations of this loop (by rule (A) above).
// Therefore this deletion marker is obsolete and can be dropped.
++iter_stats_.num_record_drop_obsolete;
input_->Next();
} else if (ikey_.type == kTypeMerge) {
if (!merge_helper_->HasOperator()) {
LogToBuffer(log_buffer_, "Options::merge_operator is null.");
@ -222,14 +290,14 @@ void CompactionIterator::NextFromInput() {
// MergeUntil stops when it encounters a corrupt key and does not
// include them in the result, so we expect the keys here to valid.
assert(valid_key);
// Keep current_key_ in sync.
current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type);
key_ = current_key_.GetKey();
ikey_.user_key = current_key_.GetUserKey();
valid_ = true;
break;
} else {
valid_ = true;
break;
}
input_->Next();
}
}
@ -240,12 +308,9 @@ void CompactionIterator::PrepareOutput() {
// then we can squash the seqno to zero.
if (bottommost_level_ && valid_ && ikey_.sequence < earliest_snapshot_ &&
ikey_.type != kTypeMerge) {
assert(ikey_.type != kTypeDeletion);
// make a copy because updating in place would cause problems
// with the priority queue that is managing the input key iterator
updated_key_.assign(key_.data(), key_.size());
UpdateInternalKey(&updated_key_, (uint64_t)0, ikey_.type);
key_ = Slice(updated_key_);
assert(ikey_.type != kTypeDeletion && ikey_.type != kTypeSingleDeletion);
ikey_.sequence = 0;
current_key_.UpdateInternalKey(0, ikey_.type);
}
}

@ -64,7 +64,7 @@ class CompactionIterator {
const Status& status() const { return status_; }
const ParsedInternalKey& ikey() const { return ikey_; }
bool Valid() const { return valid_; }
Slice user_key() const { return current_user_key_.GetKey(); }
const Slice& user_key() const { return current_user_key_; }
const CompactionIteratorStats& iter_stats() const { return iter_stats_; }
private:
@ -102,18 +102,32 @@ class CompactionIterator {
SequenceNumber latest_snapshot_;
// State
//
// Points to a copy of the current compaction iterator output (current_key_)
// if valid_.
Slice key_;
// Points to the value in the underlying iterator that corresponds to the
// current output.
Slice value_;
// The status is OK unless compaction iterator encounters a merge operand
// while not having a merge operator defined.
Status status_;
// Stores the user key, sequence number and type of the current compaction
// iterator output (or current key in the underlying iterator during
// NextFromInput()).
ParsedInternalKey ikey_;
// Stores whether ikey_.user_key is valid. If set to false, the user key is
// not compared against the current key in the underlying iterator.
bool has_current_user_key_ = false;
IterKey current_user_key_;
bool at_next_ = false; // If false, the iterator
// Holds a copy of the current compaction iterator output (or current key in
// the underlying iterator during NextFromInput()).
IterKey current_key_;
Slice current_user_key_;
SequenceNumber current_user_key_sequence_;
SequenceNumber current_user_key_snapshot_;
MergeOutputIterator merge_out_iter_;
std::string updated_key_;
std::string compaction_filter_value_;
IterKey delete_key_;
// "level_ptrs" holds indices that remember which file of an associated
// level we were last checking during the last call to compaction->
// KeyNotExistsBeyondOutputLevel(). This allows future calls to the function

@ -0,0 +1,70 @@
// Copyright (c) 2013, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#include "db/compaction_iterator.h"
#include "util/testharness.h"
#include "util/testutil.h"
namespace rocksdb {
class CompactionIteratorTest : public testing::Test {
public:
CompactionIteratorTest() : cmp_(BytewiseComparator()), snapshots_({}) {}
void InitIterator(const std::vector<std::string>& ks,
const std::vector<std::string>& vs,
SequenceNumber last_sequence) {
merge_helper_.reset(new MergeHelper(cmp_, nullptr, nullptr, 0U, false));
iter_.reset(new test::VectorIterator(ks, vs));
iter_->SeekToFirst();
c_iter_.reset(new CompactionIterator(iter_.get(), cmp_, merge_helper_.get(),
last_sequence, &snapshots_,
Env::Default(), false));
}
const Comparator* cmp_;
std::vector<SequenceNumber> snapshots_;
std::unique_ptr<MergeHelper> merge_helper_;
std::unique_ptr<test::VectorIterator> iter_;
std::unique_ptr<CompactionIterator> c_iter_;
};
// It is possible that the output of the compaction iterator is empty even if
// the input is not.
TEST_F(CompactionIteratorTest, EmptyResult) {
InitIterator({test::KeyStr("a", 5, kTypeSingleDeletion),
test::KeyStr("a", 3, kTypeValue)},
{"", "val"}, 5);
c_iter_->SeekToFirst();
ASSERT_FALSE(c_iter_->Valid());
}
// If there is a corruption after a single deletion, the corrupted key should
// be preserved.
TEST_F(CompactionIteratorTest, CorruptionAfterSingleDeletion) {
InitIterator({test::KeyStr("a", 5, kTypeSingleDeletion),
test::KeyStr("a", 3, kTypeValue, true),
test::KeyStr("b", 10, kTypeValue)},
{"", "val", "val2"}, 10);
c_iter_->SeekToFirst();
ASSERT_TRUE(c_iter_->Valid());
ASSERT_EQ(test::KeyStr("a", 5, kTypeSingleDeletion),
c_iter_->key().ToString());
c_iter_->Next();
ASSERT_TRUE(c_iter_->Valid());
ASSERT_EQ(test::KeyStr("a", 3, kTypeValue, true), c_iter_->key().ToString());
c_iter_->Next();
ASSERT_TRUE(c_iter_->Valid());
ASSERT_EQ(test::KeyStr("b", 10, kTypeValue), c_iter_->key().ToString());
c_iter_->Next();
ASSERT_FALSE(c_iter_->Valid());
}
} // namespace rocksdb
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -215,7 +215,8 @@ class CompactionJobTest : public testing::Test {
}
void RunCompaction(const std::vector<std::vector<FileMetaData*>>& input_files,
const stl_wrappers::KVMap& expected_results) {
const stl_wrappers::KVMap& expected_results,
const std::vector<SequenceNumber>& snapshots = {}) {
auto cfd = versions_->GetColumnFamilySet()->GetDefault();
size_t num_input_files = 0;
@ -241,9 +242,9 @@ class CompactionJobTest : public testing::Test {
EventLogger event_logger(db_options_.info_log.get());
CompactionJob compaction_job(0, &compaction, db_options_, env_options_,
versions_.get(), &shutting_down_, &log_buffer,
nullptr, nullptr, nullptr, {}, table_cache_,
&event_logger, false, false, dbname_,
&compaction_job_stats_);
nullptr, nullptr, nullptr, snapshots,
table_cache_, &event_logger, false, false,
dbname_, &compaction_job_stats_);
VerifyInitializationOfCompactionJobStats(compaction_job_stats_);
@ -419,6 +420,167 @@ TEST_F(CompactionJobTest, NonAssocMerge) {
RunCompaction({ files }, expected_results);
}
TEST_F(CompactionJobTest, SimpleSingleDelete) {
NewDB();
auto file1 = mock::MakeMockFile({
{KeyStr("a", 5U, kTypeDeletion), ""},
{KeyStr("b", 6U, kTypeSingleDeletion), ""},
});
AddMockFile(file1);
auto file2 = mock::MakeMockFile({{KeyStr("a", 3U, kTypeValue), "val"},
{KeyStr("b", 4U, kTypeValue), "val"}});
AddMockFile(file2);
auto file3 = mock::MakeMockFile({
{KeyStr("a", 1U, kTypeValue), "val"},
});
AddMockFile(file3, 2);
auto expected_results =
mock::MakeMockFile({{KeyStr("a", 5U, kTypeDeletion), ""}});
SetLastSequence(6U);
auto files = cfd_->current()->storage_info()->LevelFiles(0);
RunCompaction({files}, expected_results);
}
TEST_F(CompactionJobTest, SingleDeleteSnapshots) {
NewDB();
auto file1 = mock::MakeMockFile({{KeyStr("A", 12U, kTypeSingleDeletion), ""},
{KeyStr("a", 12U, kTypeSingleDeletion), ""},
{KeyStr("b", 21U, kTypeSingleDeletion), ""},
{KeyStr("c", 22U, kTypeSingleDeletion), ""},
{KeyStr("d", 9U, kTypeSingleDeletion), ""}});
AddMockFile(file1);
auto file2 = mock::MakeMockFile({{KeyStr("0", 2U, kTypeSingleDeletion), ""},
{KeyStr("a", 11U, kTypeValue), "val1"},
{KeyStr("b", 11U, kTypeValue), "val2"},
{KeyStr("c", 21U, kTypeValue), "val3"},
{KeyStr("d", 8U, kTypeValue), "val4"},
{KeyStr("e", 2U, kTypeSingleDeletion), ""}});
AddMockFile(file2);
auto file3 = mock::MakeMockFile({{KeyStr("A", 1U, kTypeValue), "val"},
{KeyStr("e", 1U, kTypeValue), "val"}});
AddMockFile(file3, 2);
auto expected_results =
mock::MakeMockFile({{KeyStr("A", 12U, kTypeSingleDeletion), ""},
{KeyStr("b", 21U, kTypeSingleDeletion), ""},
{KeyStr("b", 11U, kTypeValue), "val2"},
{KeyStr("e", 2U, kTypeSingleDeletion), ""}});
SetLastSequence(22U);
auto files = cfd_->current()->storage_info()->LevelFiles(0);
RunCompaction({files}, expected_results, {10U, 20U});
}
TEST_F(CompactionJobTest, SingleDeleteZeroSeq) {
NewDB();
auto file1 = mock::MakeMockFile({
{KeyStr("A", 10U, kTypeSingleDeletion), ""},
{KeyStr("dummy", 5U, kTypeValue), "val2"},
});
AddMockFile(file1);
auto file2 = mock::MakeMockFile({
{KeyStr("A", 0U, kTypeValue), "val"},
});
AddMockFile(file2);
auto expected_results = mock::MakeMockFile({
{KeyStr("dummy", 0U, kTypeValue), "val2"},
});
SetLastSequence(22U);
auto files = cfd_->current()->storage_info()->LevelFiles(0);
RunCompaction({files}, expected_results, {});
}
TEST_F(CompactionJobTest, MultiSingleDelete) {
// Tests three scenarios involving multiple single delete/put pairs:
//
// A: Put Snapshot SDel Put SDel -> Put Snapshot SDel
// B: Put SDel Put SDel -> (Removed)
// C: SDel Put SDel Snapshot Put -> Snapshot Put
// D: (Put) SDel Snapshot Put SDel -> (Put) SDel Snapshot
NewDB();
auto file1 = mock::MakeMockFile({
{KeyStr("A", 14U, kTypeSingleDeletion), ""},
{KeyStr("A", 13U, kTypeValue), "val5"},
{KeyStr("A", 12U, kTypeSingleDeletion), ""},
{KeyStr("B", 14U, kTypeSingleDeletion), ""},
{KeyStr("B", 13U, kTypeValue), "val2"},
{KeyStr("C", 14U, kTypeValue), "val3"},
{KeyStr("D", 12U, kTypeSingleDeletion), ""},
{KeyStr("D", 11U, kTypeValue), "val4"},
});
AddMockFile(file1);
auto file2 = mock::MakeMockFile({
{KeyStr("A", 10U, kTypeValue), "val"},
{KeyStr("B", 12U, kTypeSingleDeletion), ""},
{KeyStr("B", 11U, kTypeValue), "val2"},
{KeyStr("C", 10U, kTypeSingleDeletion), ""},
{KeyStr("C", 9U, kTypeValue), "val6"},
{KeyStr("C", 8U, kTypeSingleDeletion), ""},
{KeyStr("D", 10U, kTypeSingleDeletion), ""},
});
AddMockFile(file2);
auto file3 = mock::MakeMockFile({
{KeyStr("D", 11U, kTypeValue), "val"},
});
AddMockFile(file3, 2);
auto expected_results = mock::MakeMockFile({
{KeyStr("A", 12U, kTypeSingleDeletion), ""},
{KeyStr("A", 10U, kTypeValue), "val"},
{KeyStr("C", 14U, kTypeValue), "val3"},
{KeyStr("D", 10U, kTypeSingleDeletion), ""},
});
SetLastSequence(22U);
auto files = cfd_->current()->storage_info()->LevelFiles(0);
RunCompaction({files}, expected_results, {10U});
}
// This test documents the behavior where a corrupt key follows a deletion or a
// single deletion and the (single) deletion gets removed while the corrupt key
// gets written out. TODO(noetzli): We probably want a better way to treat
// corrupt keys.
TEST_F(CompactionJobTest, CorruptionAfterDeletion) {
NewDB();
auto file1 =
mock::MakeMockFile({{test::KeyStr("A", 6U, kTypeValue), "val3"},
{test::KeyStr("a", 5U, kTypeDeletion), ""},
{test::KeyStr("a", 4U, kTypeValue, true), "val"}});
AddMockFile(file1);
auto file2 =
mock::MakeMockFile({{test::KeyStr("b", 3U, kTypeSingleDeletion), ""},
{test::KeyStr("b", 2U, kTypeValue, true), "val"},
{test::KeyStr("c", 1U, kTypeValue), "val2"}});
AddMockFile(file2);
auto expected_results =
mock::MakeMockFile({{test::KeyStr("A", 0U, kTypeValue), "val3"},
{test::KeyStr("a", 0U, kTypeValue, true), "val"},
{test::KeyStr("b", 0U, kTypeValue, true), "val"},
{test::KeyStr("c", 0U, kTypeValue), "val2"}});
SetLastSequence(6U);
auto files = cfd_->current()->storage_info()->LevelFiles(0);
RunCompaction({files}, expected_results);
}
} // namespace rocksdb
int main(int argc, char** argv) {

@ -110,7 +110,8 @@ DEFINE_string(benchmarks,
"uncompress,"
"acquireload,"
"fillseekseq,"
"randomtransaction",
"randomtransaction,"
"randomreplacekeys",
"Comma-separated list of operations to run in the specified"
" order. Available benchmarks:\n"
@ -161,6 +162,8 @@ DEFINE_string(benchmarks,
"them by seeking to each key\n"
"\trandomtransaction -- execute N random transactions and "
"verify correctness\n"
"\trandomreplacekeys -- randomly replaces N keys by deleting "
"the old version and putting the new version\n\n"
"Meta operations:\n"
"\tcompact -- Compact the entire DB\n"
"\tstats -- Print DB stats\n"
@ -688,6 +691,13 @@ DEFINE_uint64(wal_bytes_per_sync, rocksdb::Options().wal_bytes_per_sync,
DEFINE_bool(filter_deletes, false, " On true, deletes use bloom-filter and drop"
" the delete if key not present");
DEFINE_bool(use_single_deletes, true,
"Use single deletes (used in RandomReplaceKeys only).");
DEFINE_double(stddev, 2000.0,
"Standard deviation of normal distribution used for picking keys"
" (used in RandomReplaceKeys only).");
DEFINE_int32(max_successive_merges, 0, "Maximum number of successive merge"
" operations on a key in the memtable");
@ -1925,6 +1935,9 @@ class Benchmark {
} else if (name == "randomtransaction") {
method = &Benchmark::RandomTransaction;
post_process_method = &Benchmark::RandomTransactionVerify;
} else if (name == "randomreplacekeys") {
fresh_db = true;
method = &Benchmark::RandomReplaceKeys;
} else if (name == "stats") {
PrintStats("rocksdb.stats");
} else if (name == "levelstats") {
@ -3846,6 +3859,66 @@ class Benchmark {
fprintf(stdout, "RandomTransactionVerify Success!\n");
}
// Writes and deletes random keys without overwriting keys.
//
// This benchmark is intended to partially replicate the behavior of MyRocks
// secondary indices: All data is stored in keys and updates happen by
// deleting the old version of the key and inserting the new version.
void RandomReplaceKeys(ThreadState* thread) {
std::unique_ptr<const char[]> key_guard;
Slice key = AllocateKey(&key_guard);
std::vector<uint32_t> counters(FLAGS_numdistinct, 0);
size_t max_counter = 50;
RandomGenerator gen;
Status s;
DB* db = SelectDB(thread);
for (int64_t i = 0; i < FLAGS_numdistinct; i++) {
GenerateKeyFromInt(i * max_counter, FLAGS_num, &key);
s = db->Put(write_options_, key, gen.Generate(value_size_));
if (!s.ok()) {
fprintf(stderr, "Operation failed: %s\n", s.ToString().c_str());
exit(1);
}
}
db->GetSnapshot();
std::default_random_engine generator;
std::normal_distribution<double> distribution(FLAGS_numdistinct / 2.0,
FLAGS_stddev);
Duration duration(FLAGS_duration, FLAGS_num);
while (!duration.Done(1)) {
int64_t rnd_id = static_cast<int64_t>(distribution(generator));
int64_t key_id = std::max(std::min(FLAGS_numdistinct - 1, rnd_id),
static_cast<int64_t>(0));
GenerateKeyFromInt(key_id * max_counter + counters[key_id], FLAGS_num,
&key);
s = FLAGS_use_single_deletes ? db->SingleDelete(write_options_, key)
: db->Delete(write_options_, key);
if (s.ok()) {
counters[key_id] = (counters[key_id] + 1) % max_counter;
GenerateKeyFromInt(key_id * max_counter + counters[key_id], FLAGS_num,
&key);
s = db->Put(write_options_, key, Slice());
}
if (!s.ok()) {
fprintf(stderr, "Operation failed: %s\n", s.ToString().c_str());
exit(1);
}
thread->stats.FinishedOps(nullptr, db, 1);
}
char msg[200];
snprintf(msg, sizeof(msg),
"use single deletes: %d, "
"standard deviation: %lf\n",
FLAGS_use_single_deletes, FLAGS_stddev);
thread->stats.AddMessage(msg);
}
void Compact(ThreadState* thread) {
DB* db = SelectDB(thread);
db->CompactRange(CompactRangeOptions(), nullptr, nullptr);

@ -3432,6 +3432,12 @@ Status DBImpl::Delete(const WriteOptions& write_options,
return DB::Delete(write_options, column_family, key);
}
Status DBImpl::SingleDelete(const WriteOptions& write_options,
ColumnFamilyHandle* column_family,
const Slice& key) {
return DB::SingleDelete(write_options, column_family, key);
}
Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
return WriteImpl(write_options, my_batch, nullptr);
}
@ -4315,6 +4321,13 @@ Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family,
return Write(opt, &batch);
}
Status DB::SingleDelete(const WriteOptions& opt,
ColumnFamilyHandle* column_family, const Slice& key) {
WriteBatch batch;
batch.SingleDelete(column_family, key);
return Write(opt, &batch);
}
Status DB::Merge(const WriteOptions& opt, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
WriteBatch batch;

@ -12,7 +12,6 @@
#include <deque>
#include <limits>
#include <list>
#include <list>
#include <set>
#include <string>
#include <utility>
@ -40,7 +39,6 @@
#include "util/autovector.h"
#include "util/event_logger.h"
#include "util/hash.h"
#include "util/hash.h"
#include "util/instrumented_mutex.h"
#include "util/scoped_arena_iterator.h"
#include "util/stop_watch.h"
@ -76,6 +74,10 @@ class DBImpl : public DB {
virtual Status Delete(const WriteOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key) override;
using DB::SingleDelete;
virtual Status SingleDelete(const WriteOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key) override;
using DB::Write;
virtual Status Write(const WriteOptions& options,
WriteBatch* updates) override;

@ -53,6 +53,12 @@ class DBImplReadOnly : public DBImpl {
const Slice& key) override {
return Status::NotSupported("Not supported operation in read only mode.");
}
using DBImpl::SingleDelete;
virtual Status SingleDelete(const WriteOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key) override {
return Status::NotSupported("Not supported operation in read only mode.");
}
virtual Status Write(const WriteOptions& options,
WriteBatch* updates) override {
return Status::NotSupported("Not supported operation in read only mode.");

@ -235,6 +235,7 @@ void DBIter::FindNextUserEntryInternal(bool skipping) {
} else {
switch (ikey.type) {
case kTypeDeletion:
case kTypeSingleDeletion:
// Arrange to skip all upcoming entries for this key since
// they are hidden by this deletion.
saved_key_.SetKey(ikey.user_key);
@ -308,16 +309,12 @@ void DBIter::MergeValuesNewToOld() {
if (!user_comparator_->Equal(ikey.user_key, saved_key_.GetKey())) {
// hit the next user key, stop right here
break;
}
if (kTypeDeletion == ikey.type) {
} else if (kTypeDeletion == ikey.type || kTypeSingleDeletion == ikey.type) {
// hit a delete with the same user key, stop right here
// iter_ is positioned after delete
iter_->Next();
break;
}
if (kTypeValue == ikey.type) {
} else if (kTypeValue == ikey.type) {
// hit a put, merge the put value with operands and store the
// final result in saved_value_. We are done!
// ignore corruption if there is any.
@ -333,13 +330,13 @@ void DBIter::MergeValuesNewToOld() {
// iter_ is positioned after put
iter_->Next();
return;
}
if (kTypeMerge == ikey.type) {
} else if (kTypeMerge == ikey.type) {
// hit a merge, add the value as an operand and run associative merge.
// when complete, add result to operands and continue.
const Slice& val = iter_->value();
operands.push_front(val.ToString());
} else {
assert(false);
}
}
@ -433,12 +430,14 @@ void DBIter::PrevInternal() {
}
// This function checks, if the entry with biggest sequence_number <= sequence_
// is non kTypeDeletion. If it's not, we save value in saved_value_
// is non kTypeDeletion or kTypeSingleDeletion. If it's not, we save value in
// saved_value_
bool DBIter::FindValueForCurrentKey() {
assert(iter_->Valid());
// Contains operands for merge operator.
std::deque<std::string> operands;
// last entry before merge (could be kTypeDeletion or kTypeValue)
// last entry before merge (could be kTypeDeletion, kTypeSingleDeletion or
// kTypeValue)
ValueType last_not_merge_type = kTypeDeletion;
ValueType last_key_entry_type = kTypeDeletion;
@ -461,8 +460,9 @@ bool DBIter::FindValueForCurrentKey() {
last_not_merge_type = kTypeValue;
break;
case kTypeDeletion:
case kTypeSingleDeletion:
operands.clear();
last_not_merge_type = kTypeDeletion;
last_not_merge_type = last_key_entry_type;
PERF_COUNTER_ADD(internal_delete_skipped_count, 1);
break;
case kTypeMerge:
@ -482,6 +482,7 @@ bool DBIter::FindValueForCurrentKey() {
switch (last_key_entry_type) {
case kTypeDeletion:
case kTypeSingleDeletion:
valid_ = false;
return false;
case kTypeMerge:
@ -530,7 +531,8 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
ParsedInternalKey ikey;
FindParseableKey(&ikey, kForward);
if (ikey.type == kTypeValue || ikey.type == kTypeDeletion) {
if (ikey.type == kTypeValue || ikey.type == kTypeDeletion ||
ikey.type == kTypeSingleDeletion) {
if (ikey.type == kTypeValue) {
saved_value_ = iter_->value().ToString();
valid_ = true;
@ -553,7 +555,7 @@ bool DBIter::FindValueForCurrentKeyUsingSeek() {
if (!iter_->Valid() ||
!user_comparator_->Equal(ikey.user_key, saved_key_.GetKey()) ||
ikey.type == kTypeDeletion) {
ikey.type == kTypeDeletion || ikey.type == kTypeSingleDeletion) {
{
StopWatchNano timer(env_, statistics_ != nullptr);
PERF_TIMER_GUARD(merge_operator_time_nanos);

@ -38,16 +38,20 @@ class TestIterator : public Iterator {
iter_(0),
cmp(comparator) {}
void AddMerge(std::string argkey, std::string argvalue) {
Add(argkey, kTypeMerge, argvalue);
void AddPut(std::string argkey, std::string argvalue) {
Add(argkey, kTypeValue, argvalue);
}
void AddDeletion(std::string argkey) {
Add(argkey, kTypeDeletion, std::string());
}
void AddPut(std::string argkey, std::string argvalue) {
Add(argkey, kTypeValue, argvalue);
void AddSingleDeletion(std::string argkey) {
Add(argkey, kTypeSingleDeletion, std::string());
}
void AddMerge(std::string argkey, std::string argvalue) {
Add(argkey, kTypeMerge, argvalue);
}
void Add(std::string argkey, ValueType type, std::string argvalue) {
@ -891,6 +895,8 @@ TEST_F(DBIteratorTest, DBIterator1) {
db_iter->Next();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "b");
db_iter->Next();
ASSERT_FALSE(db_iter->Valid());
}
TEST_F(DBIteratorTest, DBIterator2) {
@ -1787,6 +1793,58 @@ TEST_F(DBIteratorTest, SeekToLastOccurrenceSeq0) {
ASSERT_FALSE(db_iter->Valid());
}
TEST_F(DBIteratorTest, DBIterator11) {
Options options;
options.merge_operator = MergeOperators::CreateFromStringId("stringappend");
TestIterator* internal_iter = new TestIterator(BytewiseComparator());
internal_iter->AddPut("a", "0");
internal_iter->AddPut("b", "0");
internal_iter->AddSingleDeletion("b");
internal_iter->AddMerge("a", "1");
internal_iter->AddMerge("b", "2");
internal_iter->Finish();
std::unique_ptr<Iterator> db_iter(NewDBIterator(
env_, ImmutableCFOptions(options), BytewiseComparator(), internal_iter, 1,
options.max_sequential_skip_in_iterations));
db_iter->SeekToFirst();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "a");
ASSERT_EQ(db_iter->value().ToString(), "0");
db_iter->Next();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "b");
db_iter->Next();
ASSERT_FALSE(db_iter->Valid());
}
TEST_F(DBIteratorTest, DBIterator12) {
Options options;
options.merge_operator = nullptr;
TestIterator* internal_iter = new TestIterator(BytewiseComparator());
internal_iter->AddPut("a", "1");
internal_iter->AddPut("b", "2");
internal_iter->AddPut("c", "3");
internal_iter->AddSingleDeletion("b");
internal_iter->Finish();
std::unique_ptr<Iterator> db_iter(
NewDBIterator(env_, ImmutableCFOptions(options), BytewiseComparator(),
internal_iter, 10, 0));
db_iter->SeekToLast();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "c");
ASSERT_EQ(db_iter->value().ToString(), "3");
db_iter->Prev();
ASSERT_TRUE(db_iter->Valid());
ASSERT_EQ(db_iter->key().ToString(), "a");
ASSERT_EQ(db_iter->value().ToString(), "1");
db_iter->Prev();
ASSERT_FALSE(db_iter->Valid());
}
class DBIterWithMergeIterTest : public testing::Test {
public:
DBIterWithMergeIterTest()

@ -854,6 +854,107 @@ TEST_F(DBTest, PutDeleteGet) {
} while (ChangeOptions());
}
TEST_F(DBTest, PutSingleDeleteGet) {
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
ASSERT_OK(Put(1, "foo", "v1"));
ASSERT_EQ("v1", Get(1, "foo"));
ASSERT_OK(Put(1, "foo2", "v2"));
ASSERT_EQ("v2", Get(1, "foo2"));
ASSERT_OK(SingleDelete(1, "foo"));
ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
// Skip HashCuckooRep as it does not support single delete. FIFO and
// universal compaction do not apply to the test case. Skip MergePut
// because single delete does not get removed when it encounters a merge.
} while (ChangeOptions(kSkipHashCuckoo | kSkipFIFOCompaction |
kSkipUniversalCompaction | kSkipMergePut));
}
TEST_F(DBTest, SingleDeleteFlush) {
// Test to check whether flushing preserves a single delete hidden
// behind a put.
do {
Random rnd(301);
Options options = CurrentOptions();
options.disable_auto_compactions = true;
CreateAndReopenWithCF({"pikachu"}, options);
// Put values on second level (so that they will not be in the same
// compaction as the other operations.
Put(1, "foo", "first");
Put(1, "bar", "one");
ASSERT_OK(Flush(1));
MoveFilesToLevel(2, 1);
// (Single) delete hidden by a put
SingleDelete(1, "foo");
Put(1, "foo", "second");
Delete(1, "bar");
Put(1, "bar", "two");
ASSERT_OK(Flush(1));
SingleDelete(1, "foo");
Delete(1, "bar");
ASSERT_OK(Flush(1));
dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
nullptr);
ASSERT_EQ("NOT_FOUND", Get(1, "bar"));
ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
// Skip HashCuckooRep as it does not support single delete. FIFO and
// universal compaction do not apply to the test case. Skip MergePut
// because merges cannot be combined with single deletions.
} while (ChangeOptions(kSkipHashCuckoo | kSkipFIFOCompaction |
kSkipUniversalCompaction | kSkipMergePut));
}
TEST_F(DBTest, SingleDeletePutFlush) {
// Single deletes that encounter the matching put in a flush should get
// removed.
do {
Random rnd(301);
Options options = CurrentOptions();
options.disable_auto_compactions = true;
CreateAndReopenWithCF({"pikachu"}, options);
Put(1, "foo", Slice());
Put(1, "a", Slice());
SingleDelete(1, "a");
ASSERT_OK(Flush(1));
ASSERT_EQ("[ ]", AllEntriesFor("a", 1));
// Skip HashCuckooRep as it does not support single delete. FIFO and
// universal compaction do not apply to the test case. Skip MergePut
// because merges cannot be combined with single deletions.
} while (ChangeOptions(kSkipHashCuckoo | kSkipFIFOCompaction |
kSkipUniversalCompaction | kSkipMergePut));
}
TEST_F(DBTest, EmptyFlush) {
// It is possible to produce empty flushes when using single deletes. Tests
// whether empty flushes cause issues.
do {
Random rnd(301);
Options options = CurrentOptions();
options.disable_auto_compactions = true;
CreateAndReopenWithCF({"pikachu"}, options);
Put(1, "a", Slice());
SingleDelete(1, "a");
ASSERT_OK(Flush(1));
ASSERT_EQ("[ ]", AllEntriesFor("a", 1));
// Skip HashCuckooRep as it does not support single delete. FIFO and
// universal compaction do not apply to the test case. Skip MergePut
// because merges cannot be combined with single deletions.
} while (ChangeOptions(kSkipHashCuckoo | kSkipFIFOCompaction |
kSkipUniversalCompaction | kSkipMergePut));
}
TEST_F(DBTest, GetFromImmutableLayer) {
do {
Options options;
@ -3548,6 +3649,54 @@ TEST_F(DBTest, CompactBetweenSnapshots) {
} while (ChangeOptions(kSkipHashCuckoo | kSkipFIFOCompaction));
}
TEST_F(DBTest, UnremovableSingleDelete) {
// If we compact:
//
// Put(A, v1) Snapshot SingleDelete(A) Put(A, v2)
//
// We do not want to end up with:
//
// Put(A, v1) Snapshot Put(A, v2)
//
// Because a subsequent SingleDelete(A) would delete the Put(A, v2)
// but not Put(A, v1), so Get(A) would return v1.
anon::OptionsOverride options_override;
options_override.skip_policy = kSkipNoSnapshot;
do {
Options options = CurrentOptions(options_override);
options.disable_auto_compactions = true;
CreateAndReopenWithCF({"pikachu"}, options);
Put(1, "foo", "first");
const Snapshot* snapshot = db_->GetSnapshot();
SingleDelete(1, "foo");
Put(1, "foo", "second");
ASSERT_OK(Flush(1));
ASSERT_EQ("first", Get(1, "foo", snapshot));
ASSERT_EQ("second", Get(1, "foo"));
dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
nullptr);
ASSERT_EQ("[ second, SDEL, first ]", AllEntriesFor("foo", 1));
SingleDelete(1, "foo");
ASSERT_EQ("first", Get(1, "foo", snapshot));
ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
nullptr);
ASSERT_EQ("first", Get(1, "foo", snapshot));
ASSERT_EQ("NOT_FOUND", Get(1, "foo"));
// Skip HashCuckooRep as it does not support single delete. FIFO and
// universal compaction do not apply to the test case. Skip MergePut
// because single delete does not get removed when it encounters a merge.
} while (ChangeOptions(kSkipHashCuckoo | kSkipFIFOCompaction |
kSkipUniversalCompaction | kSkipMergePut));
}
TEST_F(DBTest, DeletionMarkers1) {
Options options = CurrentOptions();
options.max_background_flushes = 0;
@ -5361,13 +5510,6 @@ class ModelDB: public DB {
batch.Put(cf, k, v);
return Write(o, &batch);
}
using DB::Merge;
virtual Status Merge(const WriteOptions& o, ColumnFamilyHandle* cf,
const Slice& k, const Slice& v) override {
WriteBatch batch;
batch.Merge(cf, k, v);
return Write(o, &batch);
}
using DB::Delete;
virtual Status Delete(const WriteOptions& o, ColumnFamilyHandle* cf,
const Slice& key) override {
@ -5375,6 +5517,20 @@ class ModelDB: public DB {
batch.Delete(cf, key);
return Write(o, &batch);
}
using DB::SingleDelete;
virtual Status SingleDelete(const WriteOptions& o, ColumnFamilyHandle* cf,
const Slice& key) override {
WriteBatch batch;
batch.SingleDelete(cf, key);
return Write(o, &batch);
}
using DB::Merge;
virtual Status Merge(const WriteOptions& o, ColumnFamilyHandle* cf,
const Slice& k, const Slice& v) override {
WriteBatch batch;
batch.Merge(cf, k, v);
return Write(o, &batch);
}
using DB::Get;
virtual Status Get(const ReadOptions& options, ColumnFamilyHandle* cf,
const Slice& key, std::string* value) override {

@ -22,7 +22,7 @@ namespace rocksdb {
uint64_t PackSequenceAndType(uint64_t seq, ValueType t) {
assert(seq <= kMaxSequenceNumber);
assert(t <= kValueTypeForSeek);
assert(IsValueType(t));
return (seq << 8) | t;
}
@ -31,7 +31,7 @@ void UnPackSequenceAndType(uint64_t packed, uint64_t* seq, ValueType* t) {
*t = static_cast<ValueType>(packed & 0xff);
assert(*seq <= kMaxSequenceNumber);
assert(*t <= kValueTypeForSeek);
assert(IsValueType(*t));
}
void AppendInternalKey(std::string* result, const ParsedInternalKey& key) {

@ -33,13 +33,13 @@ enum ValueType : unsigned char {
kTypeDeletion = 0x0,
kTypeValue = 0x1,
kTypeMerge = 0x2,
// Following types are used only in write ahead logs. They are not used in
// memtables or sst files:
kTypeLogData = 0x3,
kTypeColumnFamilyDeletion = 0x4,
kTypeColumnFamilyValue = 0x5,
kTypeColumnFamilyMerge = 0x6,
kMaxValue = 0x7F
kTypeLogData = 0x3, // WAL only.
kTypeColumnFamilyDeletion = 0x4, // WAL only.
kTypeColumnFamilyValue = 0x5, // WAL only.
kTypeColumnFamilyMerge = 0x6, // WAL only.
kTypeSingleDeletion = 0x7,
kTypeColumnFamilySingleDeletion = 0x8, // WAL only.
kMaxValue = 0x7F // Not used for storing records.
};
// kValueTypeForSeek defines the ValueType that should be passed when
@ -48,7 +48,13 @@ enum ValueType : unsigned char {
// and the value type is embedded as the low 8 bits in the sequence
// number in internal keys, we need to use the highest-numbered
// ValueType, not the lowest).
static const ValueType kValueTypeForSeek = kTypeMerge;
static const ValueType kValueTypeForSeek = kTypeSingleDeletion;
// Checks whether a type is a value type (i.e. a type used in memtables and sst
// files).
inline bool IsValueType(ValueType t) {
return t <= kTypeMerge || t == kTypeSingleDeletion;
}
// We leave eight bits empty at the bottom so a type and sequence#
// can be packed together into 64-bits.
@ -193,13 +199,12 @@ inline bool ParseInternalKey(const Slice& internal_key,
result->type = static_cast<ValueType>(c);
assert(result->type <= ValueType::kMaxValue);
result->user_key = Slice(internal_key.data(), n - 8);
return (c <= static_cast<unsigned char>(kValueTypeForSeek));
return IsValueType(result->type);
}
// Update the sequence number in the internal key.
// Guarantees not to invalidate ikey.data().
inline void UpdateInternalKey(std::string* ikey,
uint64_t seq, ValueType t) {
inline void UpdateInternalKey(std::string* ikey, uint64_t seq, ValueType t) {
size_t ikey_sz = ikey->size();
assert(ikey_sz >= 8);
uint64_t newval = (seq << 8) | t;
@ -272,6 +277,11 @@ class IterKey {
Slice GetKey() const { return Slice(key_, key_size_); }
Slice GetUserKey() const {
assert(key_size_ >= 8);
return Slice(key_, key_size_ - 8);
}
size_t Size() { return key_size_; }
void Clear() { key_size_ = 0; }
@ -304,11 +314,30 @@ class IterKey {
memcpy(key_ + shared_len, non_shared_data, non_shared_len);
}
void SetKey(const Slice& key) {
Slice SetKey(const Slice& key) {
size_t size = key.size();
EnlargeBufferIfNeeded(size);
memcpy(key_, key.data(), size);
key_size_ = size;
return Slice(key_, key_size_);
}
// Copies the content of key, updates the reference to the user key in ikey
// and returns a Slice referencing the new copy.
Slice SetKey(const Slice& key, ParsedInternalKey* ikey) {
size_t key_n = key.size();
assert(key_n >= 8);
SetKey(key);
ikey->user_key = Slice(key_, key_n - 8);
return Slice(key_, key_n);
}
// Update the sequence number in the internal key. Guarantees not to
// invalidate slices to the key (and the user key).
void UpdateInternalKey(uint64_t seq, ValueType t) {
assert(key_size_ >= 8);
uint64_t newval = (seq << 8) | t;
EncodeFixed64(&key_[key_size_ - 8], newval);
}
void SetInternalKey(const Slice& key_prefix, const Slice& user_key,

@ -57,7 +57,7 @@ void PrintLevelStats(char* buf, size_t len, const std::string& name,
NumberToHumanString(stats.num_dropped_records);
snprintf(buf, len,
"%4s %6d/%-3d %8.0f %5.1f " /* Level, Files, Size(MB), Score */
"%4s %6d/%-3d %8.2f %5.1f " /* Level, Files, Size(MB), Score */
"%8.1f " /* Read(GB) */
"%7.1f " /* Rn(GB) */
"%8.1f " /* Rnp1(GB) */

@ -444,9 +444,10 @@ static bool SaveValue(void* arg, const char* entry) {
*(s->found_final_value) = true;
return false;
}
case kTypeDeletion: {
case kTypeDeletion:
case kTypeSingleDeletion: {
if (*(s->merge_in_progress)) {
assert(merge_operator);
assert(merge_operator != nullptr);
*(s->status) = Status::OK();
bool merge_success = false;
{

@ -88,7 +88,8 @@ Status MergeHelper::MergeUntil(Iterator* iter, const SequenceNumber stop_before,
if (!ParseInternalKey(iter->key(), &ikey)) {
// stop at corrupted key
if (assert_valid_internal_key_) {
assert(!"corrupted internal key is not expected");
assert(!"Corrupted internal key not expected.");
return Status::Corruption("Corrupted internal key not expected.");
}
break;
} else if (!user_comparator_->Equal(ikey.user_key, orig_ikey.user_key)) {
@ -102,8 +103,12 @@ Status MergeHelper::MergeUntil(Iterator* iter, const SequenceNumber stop_before,
// At this point we are guaranteed that we need to process this key.
assert(ikey.type <= kValueTypeForSeek);
assert(IsValueType(ikey.type));
if (ikey.type != kTypeMerge) {
// Merges operands can only be used with puts and deletions, single
// deletions are not supported.
assert(ikey.type == kTypeValue || ikey.type == kTypeDeletion);
// hit a put/delete
// => merge the put value or a nullptr with operands_
// => store result in operands_.back() (and update keys_.back())

@ -65,7 +65,9 @@ class MergeHelper {
// Returns one of the following statuses:
// - OK: Entries were successfully merged.
// - MergeInProgress: Put/Delete not encountered and unable to merge operands.
// - Corruption: Merge operator reported unsuccessful merge.
// - Corruption: Merge operator reported unsuccessful merge or a corrupted
// key has been encountered and not expected (applies only when compiling
// with asserts removed).
//
// REQUIRED: The first key in the input is not corrupted.
Status MergeUntil(Iterator* iter, const SequenceNumber stop_before = 0,

@ -40,15 +40,10 @@ class MergeHelperTest : public testing::Test {
nullptr, Env::Default());
}
std::string Key(const std::string& user_key, const SequenceNumber& seq,
const ValueType& t) {
return InternalKey(user_key, seq, t).Encode().ToString();
}
void AddKeyVal(const std::string& user_key, const SequenceNumber& seq,
const ValueType& t, const std::string& val,
bool corrupt = false) {
InternalKey ikey = InternalKey(user_key, seq, t);
InternalKey ikey(user_key, seq, t);
if (corrupt) {
test::CorruptKeyType(&ikey);
}
@ -83,7 +78,7 @@ TEST_F(MergeHelperTest, MergeAtBottomSuccess) {
ASSERT_TRUE(RunUInt64MergeHelper(0, true).ok());
ASSERT_EQ(ks_[2], iter_->key());
ASSERT_EQ(Key("a", 20, kTypeValue), merge_helper_->keys()[0]);
ASSERT_EQ(test::KeyStr("a", 20, kTypeValue), merge_helper_->keys()[0]);
ASSERT_EQ(EncodeInt(4U), merge_helper_->values()[0]);
ASSERT_EQ(1U, merge_helper_->keys().size());
ASSERT_EQ(1U, merge_helper_->values().size());
@ -98,7 +93,7 @@ TEST_F(MergeHelperTest, MergeValue) {
ASSERT_TRUE(RunUInt64MergeHelper(0, false).ok());
ASSERT_EQ(ks_[3], iter_->key());
ASSERT_EQ(Key("a", 40, kTypeValue), merge_helper_->keys()[0]);
ASSERT_EQ(test::KeyStr("a", 40, kTypeValue), merge_helper_->keys()[0]);
ASSERT_EQ(EncodeInt(8U), merge_helper_->values()[0]);
ASSERT_EQ(1U, merge_helper_->keys().size());
ASSERT_EQ(1U, merge_helper_->values().size());
@ -114,7 +109,7 @@ TEST_F(MergeHelperTest, SnapshotBeforeValue) {
ASSERT_TRUE(RunUInt64MergeHelper(31, true).IsMergeInProgress());
ASSERT_EQ(ks_[2], iter_->key());
ASSERT_EQ(Key("a", 50, kTypeMerge), merge_helper_->keys()[0]);
ASSERT_EQ(test::KeyStr("a", 50, kTypeMerge), merge_helper_->keys()[0]);
ASSERT_EQ(EncodeInt(4U), merge_helper_->values()[0]);
ASSERT_EQ(1U, merge_helper_->keys().size());
ASSERT_EQ(1U, merge_helper_->values().size());
@ -129,9 +124,9 @@ TEST_F(MergeHelperTest, NoPartialMerge) {
ASSERT_TRUE(RunStringAppendMergeHelper(31, true).IsMergeInProgress());
ASSERT_EQ(ks_[2], iter_->key());
ASSERT_EQ(Key("a", 40, kTypeMerge), merge_helper_->keys()[0]);
ASSERT_EQ(test::KeyStr("a", 40, kTypeMerge), merge_helper_->keys()[0]);
ASSERT_EQ("v", merge_helper_->values()[0]);
ASSERT_EQ(Key("a", 50, kTypeMerge), merge_helper_->keys()[1]);
ASSERT_EQ(test::KeyStr("a", 50, kTypeMerge), merge_helper_->keys()[1]);
ASSERT_EQ("v2", merge_helper_->values()[1]);
ASSERT_EQ(2U, merge_helper_->keys().size());
ASSERT_EQ(2U, merge_helper_->values().size());
@ -143,7 +138,7 @@ TEST_F(MergeHelperTest, SingleOperand) {
ASSERT_TRUE(RunUInt64MergeHelper(31, true).IsMergeInProgress());
ASSERT_FALSE(iter_->Valid());
ASSERT_EQ(Key("a", 50, kTypeMerge), merge_helper_->keys()[0]);
ASSERT_EQ(test::KeyStr("a", 50, kTypeMerge), merge_helper_->keys()[0]);
ASSERT_EQ(EncodeInt(1U), merge_helper_->values()[0]);
ASSERT_EQ(1U, merge_helper_->keys().size());
ASSERT_EQ(1U, merge_helper_->values().size());
@ -156,7 +151,7 @@ TEST_F(MergeHelperTest, MergeDeletion) {
ASSERT_TRUE(RunUInt64MergeHelper(15, false).ok());
ASSERT_FALSE(iter_->Valid());
ASSERT_EQ(Key("a", 30, kTypeValue), merge_helper_->keys()[0]);
ASSERT_EQ(test::KeyStr("a", 30, kTypeValue), merge_helper_->keys()[0]);
ASSERT_EQ(EncodeInt(3U), merge_helper_->values()[0]);
ASSERT_EQ(1U, merge_helper_->keys().size());
ASSERT_EQ(1U, merge_helper_->values().size());
@ -171,7 +166,7 @@ TEST_F(MergeHelperTest, CorruptKey) {
ASSERT_TRUE(RunUInt64MergeHelper(15, false).IsMergeInProgress());
ASSERT_EQ(ks_[2], iter_->key());
ASSERT_EQ(Key("a", 30, kTypeMerge), merge_helper_->keys()[0]);
ASSERT_EQ(test::KeyStr("a", 30, kTypeMerge), merge_helper_->keys()[0]);
ASSERT_EQ(EncodeInt(4U), merge_helper_->values()[0]);
ASSERT_EQ(1U, merge_helper_->keys().size());
ASSERT_EQ(1U, merge_helper_->values().size());

@ -19,7 +19,9 @@ Status InternalKeyPropertiesCollector::InternalAdd(const Slice& key,
return Status::InvalidArgument("Invalid internal key");
}
if (ikey.type == ValueType::kTypeDeletion) {
// Note: We count both, deletions and single deletions here.
if (ikey.type == ValueType::kTypeDeletion ||
ikey.type == ValueType::kTypeSingleDeletion) {
++deleted_keys_;
}
@ -47,18 +49,22 @@ InternalKeyPropertiesCollector::GetReadableProperties() const {
}
namespace {
EntryType GetEntryType(ValueType value_type) {
switch (value_type) {
case kTypeValue:
return kEntryPut;
case kTypeDeletion:
return kEntryDelete;
case kTypeSingleDeletion:
return kEntrySingleDelete;
case kTypeMerge:
return kEntryMerge;
default:
return kEntryOther;
}
}
} // namespace
Status UserKeyTablePropertiesCollector::InternalAdd(const Slice& key,

@ -6,6 +6,7 @@
#include <map>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "db/db_impl.h"
@ -58,16 +59,19 @@ class RegularKeysStartWithA: public TablePropertiesCollector {
std::string encoded;
std::string encoded_num_puts;
std::string encoded_num_deletes;
std::string encoded_num_single_deletes;
std::string encoded_num_size_changes;
PutVarint32(&encoded, count_);
PutVarint32(&encoded_num_puts, num_puts_);
PutVarint32(&encoded_num_deletes, num_deletes_);
PutVarint32(&encoded_num_single_deletes, num_single_deletes_);
PutVarint32(&encoded_num_size_changes, num_size_changes_);
*properties = UserCollectedProperties{
{"TablePropertiesTest", message_},
{"Count", encoded},
{"NumPuts", encoded_num_puts},
{"NumDeletes", encoded_num_deletes},
{"NumSingleDeletes", encoded_num_single_deletes},
{"NumSizeChanges", encoded_num_size_changes},
};
return Status::OK();
@ -83,6 +87,8 @@ class RegularKeysStartWithA: public TablePropertiesCollector {
num_puts_++;
} else if (type == kEntryDelete) {
num_deletes_++;
} else if (type == kEntrySingleDelete) {
num_single_deletes_++;
}
if (file_size < file_size_) {
message_ = "File size should not decrease.";
@ -102,6 +108,7 @@ class RegularKeysStartWithA: public TablePropertiesCollector {
uint32_t count_ = 0;
uint32_t num_puts_ = 0;
uint32_t num_deletes_ = 0;
uint32_t num_single_deletes_ = 0;
uint32_t num_size_changes_ = 0;
uint64_t file_size_ = 0;
};
@ -217,18 +224,18 @@ namespace {
void TestCustomizedTablePropertiesCollector(
bool backward_mode, uint64_t magic_number, bool test_int_tbl_prop_collector,
const Options& options, const InternalKeyComparator& internal_comparator) {
const std::string kDeleteFlag = "D";
// make sure the entries will be inserted with order.
std::map<std::string, std::string> kvs = {
{"About ", "val5"}, // starts with 'A'
{"Abstract", "val2"}, // starts with 'A'
{"Around ", "val7"}, // starts with 'A'
{"Beyond ", "val3"},
{"Builder ", "val1"},
{"Love ", kDeleteFlag},
{"Cancel ", "val4"},
{"Find ", "val6"},
{"Rocks ", kDeleteFlag},
std::map<std::pair<std::string, ValueType>, std::string> kvs = {
{{"About ", kTypeValue}, "val5"}, // starts with 'A'
{{"Abstract", kTypeValue}, "val2"}, // starts with 'A'
{{"Around ", kTypeValue}, "val7"}, // starts with 'A'
{{"Beyond ", kTypeValue}, "val3"},
{{"Builder ", kTypeValue}, "val1"},
{{"Love ", kTypeDeletion}, ""},
{{"Cancel ", kTypeValue}, "val4"},
{{"Find ", kTypeValue}, "val6"},
{{"Rocks ", kTypeDeletion}, ""},
{{"Foo ", kTypeSingleDeletion}, ""},
};
// -- Step 1: build table
@ -248,9 +255,7 @@ void TestCustomizedTablePropertiesCollector(
SequenceNumber seqNum = 0U;
for (const auto& kv : kvs) {
InternalKey ikey(kv.first, seqNum++, (kv.second != kDeleteFlag)
? ValueType::kTypeValue
: ValueType::kTypeDeletion);
InternalKey ikey(kv.first.first, seqNum++, kv.first.second);
builder->Add(ikey.Encode(), kv.second);
}
ASSERT_OK(builder->Finish());
@ -270,31 +275,36 @@ void TestCustomizedTablePropertiesCollector(
auto user_collected = props->user_collected_properties;
ASSERT_TRUE(user_collected.find("TablePropertiesTest") !=
user_collected.end());
ASSERT_NE(user_collected.find("TablePropertiesTest"), user_collected.end());
ASSERT_EQ("Rocksdb", user_collected.at("TablePropertiesTest"));
uint32_t starts_with_A = 0;
ASSERT_TRUE(user_collected.find("Count") != user_collected.end());
ASSERT_NE(user_collected.find("Count"), user_collected.end());
Slice key(user_collected.at("Count"));
ASSERT_TRUE(GetVarint32(&key, &starts_with_A));
ASSERT_EQ(3u, starts_with_A);
if (!backward_mode && !test_int_tbl_prop_collector) {
uint32_t num_puts;
ASSERT_NE(user_collected.find("NumPuts"), user_collected.end());
Slice key_puts(user_collected.at("NumPuts"));
ASSERT_TRUE(GetVarint32(&key_puts, &num_puts));
ASSERT_EQ(7u, num_puts);
uint32_t num_deletes;
ASSERT_TRUE(user_collected.find("NumDeletes") != user_collected.end());
ASSERT_NE(user_collected.find("NumDeletes"), user_collected.end());
Slice key_deletes(user_collected.at("NumDeletes"));
ASSERT_TRUE(GetVarint32(&key_deletes, &num_deletes));
ASSERT_EQ(2u, num_deletes);
uint32_t num_puts;
ASSERT_TRUE(user_collected.find("NumPuts") != user_collected.end());
Slice key_puts(user_collected.at("NumPuts"));
ASSERT_TRUE(GetVarint32(&key_puts, &num_puts));
ASSERT_EQ(7u, num_puts);
uint32_t num_single_deletes;
ASSERT_NE(user_collected.find("NumSingleDeletes"), user_collected.end());
Slice key_single_deletes(user_collected.at("NumSingleDeletes"));
ASSERT_TRUE(GetVarint32(&key_single_deletes, &num_single_deletes));
ASSERT_EQ(1u, num_single_deletes);
uint32_t num_size_changes;
ASSERT_TRUE(user_collected.find("NumSizeChanges") != user_collected.end());
ASSERT_NE(user_collected.find("NumSizeChanges"), user_collected.end());
Slice key_size_changes(user_collected.at("NumSizeChanges"));
ASSERT_TRUE(GetVarint32(&key_size_changes, &num_size_changes));
ASSERT_GE(num_size_changes, 2u);
@ -350,6 +360,7 @@ void TestInternalKeyPropertiesCollector(
InternalKey("X ", 4, ValueType::kTypeDeletion),
InternalKey("Y ", 5, ValueType::kTypeDeletion),
InternalKey("Z ", 6, ValueType::kTypeDeletion),
InternalKey("a ", 7, ValueType::kTypeSingleDeletion),
};
std::unique_ptr<TableBuilder> builder;
@ -403,27 +414,34 @@ void TestInternalKeyPropertiesCollector(
std::unique_ptr<TableProperties> props_guard(props);
auto user_collected = props->user_collected_properties;
uint64_t deleted = GetDeletedKeys(user_collected);
ASSERT_EQ(4u, deleted);
ASSERT_EQ(5u, deleted); // deletes + single-deletes
if (sanitized) {
uint32_t starts_with_A = 0;
ASSERT_TRUE(user_collected.find("Count") != user_collected.end());
ASSERT_NE(user_collected.find("Count"), user_collected.end());
Slice key(user_collected.at("Count"));
ASSERT_TRUE(GetVarint32(&key, &starts_with_A));
ASSERT_EQ(1u, starts_with_A);
if (!backward_mode) {
uint32_t num_puts;
ASSERT_NE(user_collected.find("NumPuts"), user_collected.end());
Slice key_puts(user_collected.at("NumPuts"));
ASSERT_TRUE(GetVarint32(&key_puts, &num_puts));
ASSERT_EQ(3u, num_puts);
uint32_t num_deletes;
ASSERT_TRUE(user_collected.find("NumDeletes") != user_collected.end());
ASSERT_NE(user_collected.find("NumDeletes"), user_collected.end());
Slice key_deletes(user_collected.at("NumDeletes"));
ASSERT_TRUE(GetVarint32(&key_deletes, &num_deletes));
ASSERT_EQ(4u, num_deletes);
uint32_t num_puts;
ASSERT_TRUE(user_collected.find("NumPuts") != user_collected.end());
Slice key_puts(user_collected.at("NumPuts"));
ASSERT_TRUE(GetVarint32(&key_puts, &num_puts));
ASSERT_EQ(3u, num_puts);
uint32_t num_single_deletes;
ASSERT_NE(user_collected.find("NumSingleDeletes"),
user_collected.end());
Slice key_single_deletes(user_collected.at("NumSingleDeletes"));
ASSERT_TRUE(GetVarint32(&key_single_deletes, &num_single_deletes));
ASSERT_EQ(1u, num_single_deletes);
}
}
}

@ -13,11 +13,13 @@
// data: record[count]
// record :=
// kTypeValue varstring varstring
// kTypeMerge varstring varstring
// kTypeDeletion varstring
// kTypeSingleDeletion varstring
// kTypeMerge varstring varstring
// kTypeColumnFamilyValue varint32 varstring varstring
// kTypeColumnFamilyMerge varint32 varstring varstring
// kTypeColumnFamilyDeletion varint32 varstring varstring
// kTypeColumnFamilySingleDeletion varint32 varstring varstring
// kTypeColumnFamilyMerge varint32 varstring varstring
// varstring :=
// len: varint32
// data: uint8[len]
@ -110,11 +112,13 @@ Status ReadRecordFromWriteBatch(Slice* input, char* tag,
}
break;
case kTypeColumnFamilyDeletion:
case kTypeColumnFamilySingleDeletion:
if (!GetVarint32(input, column_family)) {
return Status::Corruption("bad WriteBatch Delete");
}
// intentional fallthrough
case kTypeDeletion:
case kTypeSingleDeletion:
if (!GetLengthPrefixedSlice(input, key)) {
return Status::Corruption("bad WriteBatch Delete");
}
@ -173,6 +177,11 @@ Status WriteBatch::Iterate(Handler* handler) const {
s = handler->DeleteCF(column_family, key);
found++;
break;
case kTypeColumnFamilySingleDeletion:
case kTypeSingleDeletion:
s = handler->SingleDeleteCF(column_family, key);
found++;
break;
case kTypeColumnFamilyMerge:
case kTypeMerge:
s = handler->MergeCF(column_family, key, value);
@ -282,6 +291,40 @@ void WriteBatch::Delete(ColumnFamilyHandle* column_family,
WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family), key);
}
void WriteBatchInternal::SingleDelete(WriteBatch* b, uint32_t column_family_id,
const Slice& key) {
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeSingleDeletion));
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilySingleDeletion));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSlice(&b->rep_, key);
}
void WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
const Slice& key) {
WriteBatchInternal::SingleDelete(this, GetColumnFamilyID(column_family), key);
}
void WriteBatchInternal::SingleDelete(WriteBatch* b, uint32_t column_family_id,
const SliceParts& key) {
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeSingleDeletion));
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilySingleDeletion));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSliceParts(&b->rep_, key);
}
void WriteBatch::SingleDelete(ColumnFamilyHandle* column_family,
const SliceParts& key) {
WriteBatchInternal::SingleDelete(this, GetColumnFamilyID(column_family), key);
}
void WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
const Slice& key, const Slice& value) {
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
@ -466,6 +509,66 @@ class MemTableInserter : public WriteBatch::Handler {
return Status::OK();
}
virtual Status DeleteCF(uint32_t column_family_id,
const Slice& key) override {
Status seek_status;
if (!SeekToColumnFamily(column_family_id, &seek_status)) {
++sequence_;
return seek_status;
}
MemTable* mem = cf_mems_->GetMemTable();
auto* moptions = mem->GetMemTableOptions();
if (!dont_filter_deletes_ && moptions->filter_deletes) {
SnapshotImpl read_from_snapshot;
read_from_snapshot.number_ = sequence_;
ReadOptions ropts;
ropts.snapshot = &read_from_snapshot;
std::string value;
auto cf_handle = cf_mems_->GetColumnFamilyHandle();
if (cf_handle == nullptr) {
cf_handle = db_->DefaultColumnFamily();
}
if (!db_->KeyMayExist(ropts, cf_handle, key, &value)) {
RecordTick(moptions->statistics, NUMBER_FILTERED_DELETES);
return Status::OK();
}
}
mem->Add(sequence_, kTypeDeletion, key, Slice());
sequence_++;
cf_mems_->CheckMemtableFull();
return Status::OK();
}
virtual Status SingleDeleteCF(uint32_t column_family_id,
const Slice& key) override {
Status seek_status;
if (!SeekToColumnFamily(column_family_id, &seek_status)) {
++sequence_;
return seek_status;
}
MemTable* mem = cf_mems_->GetMemTable();
auto* moptions = mem->GetMemTableOptions();
if (!dont_filter_deletes_ && moptions->filter_deletes) {
SnapshotImpl read_from_snapshot;
read_from_snapshot.number_ = sequence_;
ReadOptions ropts;
ropts.snapshot = &read_from_snapshot;
std::string value;
auto cf_handle = cf_mems_->GetColumnFamilyHandle();
if (cf_handle == nullptr) {
cf_handle = db_->DefaultColumnFamily();
}
if (!db_->KeyMayExist(ropts, cf_handle, key, &value)) {
RecordTick(moptions->statistics, NUMBER_FILTERED_DELETES);
return Status::OK();
}
}
mem->Add(sequence_, kTypeSingleDeletion, key, Slice());
sequence_++;
cf_mems_->CheckMemtableFull();
return Status::OK();
}
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
Status seek_status;
@ -545,36 +648,6 @@ class MemTableInserter : public WriteBatch::Handler {
cf_mems_->CheckMemtableFull();
return Status::OK();
}
virtual Status DeleteCF(uint32_t column_family_id,
const Slice& key) override {
Status seek_status;
if (!SeekToColumnFamily(column_family_id, &seek_status)) {
++sequence_;
return seek_status;
}
MemTable* mem = cf_mems_->GetMemTable();
auto* moptions = mem->GetMemTableOptions();
if (!dont_filter_deletes_ && moptions->filter_deletes) {
SnapshotImpl read_from_snapshot;
read_from_snapshot.number_ = sequence_;
ReadOptions ropts;
ropts.snapshot = &read_from_snapshot;
std::string value;
auto cf_handle = cf_mems_->GetColumnFamilyHandle();
if (cf_handle == nullptr) {
cf_handle = db_->DefaultColumnFamily();
}
if (!db_->KeyMayExist(ropts, cf_handle, key, &value)) {
RecordTick(moptions->statistics, NUMBER_FILTERED_DELETES);
return Status::OK();
}
}
mem->Add(sequence_, kTypeDeletion, key, Slice());
sequence_++;
cf_mems_->CheckMemtableFull();
return Status::OK();
}
};
} // namespace

@ -43,6 +43,19 @@ void WriteBatchBase::Delete(const SliceParts& key) {
Delete(key_slice);
}
void WriteBatchBase::SingleDelete(ColumnFamilyHandle* column_family,
const SliceParts& key) {
std::string key_buf;
Slice key_slice(key, &key_buf);
SingleDelete(column_family, key_slice);
}
void WriteBatchBase::SingleDelete(const SliceParts& key) {
std::string key_buf;
Slice key_slice(key, &key_buf);
SingleDelete(key_slice);
}
void WriteBatchBase::Merge(ColumnFamilyHandle* column_family,
const SliceParts& key, const SliceParts& value) {
std::string key_buf, value_buf;

@ -73,6 +73,12 @@ class WriteBatchInternal {
static void Delete(WriteBatch* batch, uint32_t column_family_id,
const Slice& key);
static void SingleDelete(WriteBatch* batch, uint32_t column_family_id,
const SliceParts& key);
static void SingleDelete(WriteBatch* batch, uint32_t column_family_id,
const Slice& key);
static void Merge(WriteBatch* batch, uint32_t column_family_id,
const Slice& key, const Slice& value);

@ -54,17 +54,23 @@ static std::string PrintContents(WriteBatch* b) {
state.append(")");
count++;
break;
case kTypeMerge:
state.append("Merge(");
case kTypeDeletion:
state.append("Delete(");
state.append(ikey.user_key.ToString());
state.append(", ");
state.append(iter->value().ToString());
state.append(")");
count++;
break;
case kTypeDeletion:
state.append("Delete(");
case kTypeSingleDeletion:
state.append("SingleDelete(");
state.append(ikey.user_key.ToString());
state.append(")");
count++;
break;
case kTypeMerge:
state.append("Merge(");
state.append(ikey.user_key.ToString());
state.append(", ");
state.append(iter->value().ToString());
state.append(")");
count++;
break;
@ -151,6 +157,22 @@ TEST_F(WriteBatchTest, Append) {
ASSERT_EQ(4, b1.Count());
}
TEST_F(WriteBatchTest, SingleDeletion) {
WriteBatch batch;
WriteBatchInternal::SetSequence(&batch, 100);
ASSERT_EQ("", PrintContents(&batch));
ASSERT_EQ(0, batch.Count());
batch.Put("a", "va");
ASSERT_EQ("Put(a, va)@100", PrintContents(&batch));
ASSERT_EQ(1, batch.Count());
batch.SingleDelete("a");
ASSERT_EQ(
"SingleDelete(a)@101"
"Put(a, va)@100",
PrintContents(&batch));
ASSERT_EQ(2, batch.Count());
}
namespace {
struct TestHandler : public WriteBatch::Handler {
std::string seen;
@ -164,6 +186,26 @@ namespace {
}
return Status::OK();
}
virtual Status DeleteCF(uint32_t column_family_id,
const Slice& key) override {
if (column_family_id == 0) {
seen += "Delete(" + key.ToString() + ")";
} else {
seen += "DeleteCF(" + ToString(column_family_id) + ", " +
key.ToString() + ")";
}
return Status::OK();
}
virtual Status SingleDeleteCF(uint32_t column_family_id,
const Slice& key) override {
if (column_family_id == 0) {
seen += "SingleDelete(" + key.ToString() + ")";
} else {
seen += "SingleDeleteCF(" + ToString(column_family_id) + ", " +
key.ToString() + ")";
}
return Status::OK();
}
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
if (column_family_id == 0) {
@ -177,47 +219,44 @@ namespace {
virtual void LogData(const Slice& blob) override {
seen += "LogData(" + blob.ToString() + ")";
}
virtual Status DeleteCF(uint32_t column_family_id,
const Slice& key) override {
if (column_family_id == 0) {
seen += "Delete(" + key.ToString() + ")";
} else {
seen += "DeleteCF(" + ToString(column_family_id) + ", " +
key.ToString() + ")";
}
return Status::OK();
}
};
}
TEST_F(WriteBatchTest, MergeNotImplemented) {
TEST_F(WriteBatchTest, PutNotImplemented) {
WriteBatch batch;
batch.Merge(Slice("foo"), Slice("bar"));
batch.Put(Slice("k1"), Slice("v1"));
ASSERT_EQ(1, batch.Count());
ASSERT_EQ("Merge(foo, bar)@0",
PrintContents(&batch));
ASSERT_EQ("Put(k1, v1)@0", PrintContents(&batch));
WriteBatch::Handler handler;
ASSERT_OK(batch.Iterate(&handler));
}
TEST_F(WriteBatchTest, PutNotImplemented) {
TEST_F(WriteBatchTest, DeleteNotImplemented) {
WriteBatch batch;
batch.Put(Slice("k1"), Slice("v1"));
batch.Delete(Slice("k2"));
ASSERT_EQ(1, batch.Count());
ASSERT_EQ("Put(k1, v1)@0",
PrintContents(&batch));
ASSERT_EQ("Delete(k2)@0", PrintContents(&batch));
WriteBatch::Handler handler;
ASSERT_OK(batch.Iterate(&handler));
}
TEST_F(WriteBatchTest, DeleteNotImplemented) {
TEST_F(WriteBatchTest, SingleDeleteNotImplemented) {
WriteBatch batch;
batch.Delete(Slice("k2"));
batch.SingleDelete(Slice("k2"));
ASSERT_EQ(1, batch.Count());
ASSERT_EQ("Delete(k2)@0",
PrintContents(&batch));
ASSERT_EQ("SingleDelete(k2)@0", PrintContents(&batch));
WriteBatch::Handler handler;
ASSERT_OK(batch.Iterate(&handler));
}
TEST_F(WriteBatchTest, MergeNotImplemented) {
WriteBatch batch;
batch.Merge(Slice("foo"), Slice("bar"));
ASSERT_EQ(1, batch.Count());
ASSERT_EQ("Merge(foo, bar)@0", PrintContents(&batch));
WriteBatch::Handler handler;
ASSERT_OK(batch.Iterate(&handler));
@ -230,27 +269,31 @@ TEST_F(WriteBatchTest, Blob) {
batch.Put(Slice("k3"), Slice("v3"));
batch.PutLogData(Slice("blob1"));
batch.Delete(Slice("k2"));
batch.SingleDelete(Slice("k3"));
batch.PutLogData(Slice("blob2"));
batch.Merge(Slice("foo"), Slice("bar"));
ASSERT_EQ(5, batch.Count());
ASSERT_EQ("Merge(foo, bar)@4"
"Put(k1, v1)@0"
"Delete(k2)@3"
"Put(k2, v2)@1"
"Put(k3, v3)@2",
PrintContents(&batch));
ASSERT_EQ(6, batch.Count());
ASSERT_EQ(
"Merge(foo, bar)@5"
"Put(k1, v1)@0"
"Delete(k2)@3"
"Put(k2, v2)@1"
"SingleDelete(k3)@4"
"Put(k3, v3)@2",
PrintContents(&batch));
TestHandler handler;
batch.Iterate(&handler);
ASSERT_EQ(
"Put(k1, v1)"
"Put(k2, v2)"
"Put(k3, v3)"
"LogData(blob1)"
"Delete(k2)"
"LogData(blob2)"
"Merge(foo, bar)",
handler.seen);
"Put(k1, v1)"
"Put(k2, v2)"
"Put(k3, v3)"
"LogData(blob1)"
"Delete(k2)"
"SingleDelete(k3)"
"LogData(blob2)"
"Merge(foo, bar)",
handler.seen);
}
TEST_F(WriteBatchTest, Continue) {
@ -263,6 +306,16 @@ TEST_F(WriteBatchTest, Continue) {
++num_seen;
return TestHandler::PutCF(column_family_id, key, value);
}
virtual Status DeleteCF(uint32_t column_family_id,
const Slice& key) override {
++num_seen;
return TestHandler::DeleteCF(column_family_id, key);
}
virtual Status SingleDeleteCF(uint32_t column_family_id,
const Slice& key) override {
++num_seen;
return TestHandler::SingleDeleteCF(column_family_id, key);
}
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
++num_seen;
@ -272,27 +325,24 @@ TEST_F(WriteBatchTest, Continue) {
++num_seen;
TestHandler::LogData(blob);
}
virtual Status DeleteCF(uint32_t column_family_id,
const Slice& key) override {
++num_seen;
return TestHandler::DeleteCF(column_family_id, key);
}
virtual bool Continue() override {
return num_seen < 3;
}
virtual bool Continue() override { return num_seen < 5; }
} handler;
batch.Put(Slice("k1"), Slice("v1"));
batch.Put(Slice("k2"), Slice("v2"));
batch.PutLogData(Slice("blob1"));
batch.Delete(Slice("k1"));
batch.SingleDelete(Slice("k2"));
batch.PutLogData(Slice("blob2"));
batch.Merge(Slice("foo"), Slice("bar"));
batch.Iterate(&handler);
ASSERT_EQ(
"Put(k1, v1)"
"LogData(blob1)"
"Delete(k1)",
handler.seen);
"Put(k1, v1)"
"Put(k2, v2)"
"LogData(blob1)"
"Delete(k1)"
"SingleDelete(k2)",
handler.seen);
}
TEST_F(WriteBatchTest, PutGatherSlices) {
@ -345,6 +395,7 @@ TEST_F(WriteBatchTest, ColumnFamiliesBatchTest) {
batch.Put(&two, Slice("twofoo"), Slice("bar2"));
batch.Put(&eight, Slice("eightfoo"), Slice("bar8"));
batch.Delete(&eight, Slice("eightfoo"));
batch.SingleDelete(&two, Slice("twofoo"));
batch.Merge(&three, Slice("threethree"), Slice("3three"));
batch.Put(&zero, Slice("foo"), Slice("bar"));
batch.Merge(Slice("omom"), Slice("nom"));
@ -356,6 +407,7 @@ TEST_F(WriteBatchTest, ColumnFamiliesBatchTest) {
"PutCF(2, twofoo, bar2)"
"PutCF(8, eightfoo, bar8)"
"DeleteCF(8, eightfoo)"
"SingleDeleteCF(2, twofoo)"
"MergeCF(3, threethree, 3three)"
"Put(foo, bar)"
"Merge(omom, nom)",
@ -370,6 +422,7 @@ TEST_F(WriteBatchTest, ColumnFamiliesBatchWithIndexTest) {
batch.Put(&two, Slice("twofoo"), Slice("bar2"));
batch.Put(&eight, Slice("eightfoo"), Slice("bar8"));
batch.Delete(&eight, Slice("eightfoo"));
batch.SingleDelete(&two, Slice("twofoo"));
batch.Merge(&three, Slice("threethree"), Slice("3three"));
batch.Put(&zero, Slice("foo"), Slice("bar"));
batch.Merge(Slice("omom"), Slice("nom"));
@ -394,6 +447,24 @@ TEST_F(WriteBatchTest, ColumnFamiliesBatchWithIndexTest) {
ASSERT_OK(iter->status());
ASSERT_TRUE(!iter->Valid());
iter.reset(batch.NewIterator(&two));
iter->Seek("twofoo");
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(WriteType::kPutRecord, iter->Entry().type);
ASSERT_EQ("twofoo", iter->Entry().key.ToString());
ASSERT_EQ("bar2", iter->Entry().value.ToString());
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(WriteType::kSingleDeleteRecord, iter->Entry().type);
ASSERT_EQ("twofoo", iter->Entry().key.ToString());
iter->Next();
ASSERT_OK(iter->status());
ASSERT_TRUE(!iter->Valid());
iter.reset(batch.NewIterator());
iter->Seek("gggg");
ASSERT_OK(iter->status());
@ -439,6 +510,7 @@ TEST_F(WriteBatchTest, ColumnFamiliesBatchWithIndexTest) {
"PutCF(2, twofoo, bar2)"
"PutCF(8, eightfoo, bar8)"
"DeleteCF(8, eightfoo)"
"SingleDeleteCF(2, twofoo)"
"MergeCF(3, threethree, 3three)"
"Put(foo, bar)"
"Merge(omom, nom)",

@ -184,6 +184,17 @@ class DB {
return Delete(options, DefaultColumnFamily(), key);
}
// Remove the database entry for "key". Requires that the key exists
// and was not overwritten. Returns OK on success, and a non-OK status
// on error. It is not an error if "key" did not exist in the database.
// Note: consider setting options.sync = true.
virtual Status SingleDelete(const WriteOptions& options,
ColumnFamilyHandle* column_family,
const Slice& key) = 0;
virtual Status SingleDelete(const WriteOptions& options, const Slice& key) {
return SingleDelete(options, DefaultColumnFamily(), key);
}
// Merge the database entry for "key" with "value". Returns OK on success,
// and a non-OK status on error. The semantics of this operation is
// determined by the user provided merge_operator when opening DB.

@ -33,7 +33,7 @@ struct PerfContext {
// total number of internal keys skipped over during iteration (overwritten or
// deleted, to be more specific, hidden by a put or delete of the same key)
uint64_t internal_key_skipped_count;
// total number of deletes skipped over during iteration
// total number of deletes and single deletes skipped over during iteration
uint64_t internal_delete_skipped_count;
uint64_t get_snapshot_time; // total time spent on getting snapshot

@ -86,6 +86,7 @@ extern const std::string kPropertiesBlock;
enum EntryType {
kEntryPut,
kEntryDelete,
kEntrySingleDelete,
kEntryMerge,
kEntryOther,
};

@ -78,6 +78,13 @@ class StackableDB : public DB {
return db_->Delete(wopts, column_family, key);
}
using DB::SingleDelete;
virtual Status SingleDelete(const WriteOptions& wopts,
ColumnFamilyHandle* column_family,
const Slice& key) override {
return db_->SingleDelete(wopts, column_family, key);
}
using DB::Merge;
virtual Status Merge(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,

@ -29,7 +29,13 @@ class DB;
struct ReadOptions;
struct DBOptions;
enum WriteType { kPutRecord, kMergeRecord, kDeleteRecord, kLogDataRecord };
enum WriteType {
kPutRecord,
kMergeRecord,
kDeleteRecord,
kSingleDeleteRecord,
kLogDataRecord
};
// an entry for Put, Merge or Delete entry for write batches. Used in
// WBWIIterator.
@ -101,6 +107,11 @@ class WriteBatchWithIndex : public WriteBatchBase {
void Delete(ColumnFamilyHandle* column_family, const Slice& key) override;
void Delete(const Slice& key) override;
using WriteBatchBase::SingleDelete;
void SingleDelete(ColumnFamilyHandle* column_family,
const Slice& key) override;
void SingleDelete(const Slice& key) override;
using WriteBatchBase::PutLogData;
void PutLogData(const Slice& blob) override;

@ -60,6 +60,30 @@ class WriteBatch : public WriteBatchBase {
Put(nullptr, key, value);
}
using WriteBatchBase::Delete;
// If the database contains a mapping for "key", erase it. Else do nothing.
void Delete(ColumnFamilyHandle* column_family, const Slice& key) override;
void Delete(const Slice& key) override { Delete(nullptr, key); }
// variant that takes SliceParts
void Delete(ColumnFamilyHandle* column_family,
const SliceParts& key) override;
void Delete(const SliceParts& key) override { Delete(nullptr, key); }
using WriteBatchBase::SingleDelete;
// If the database contains a mapping for "key", erase it. Expects that the
// key was not overwritten. Else do nothing.
void SingleDelete(ColumnFamilyHandle* column_family,
const Slice& key) override;
void SingleDelete(const Slice& key) override { SingleDelete(nullptr, key); }
// variant that takes SliceParts
void SingleDelete(ColumnFamilyHandle* column_family,
const SliceParts& key) override;
void SingleDelete(const SliceParts& key) override {
SingleDelete(nullptr, key);
}
using WriteBatchBase::Merge;
// Merge "value" with the existing value of "key" in the database.
// "key->merge(existing, value)"
@ -76,16 +100,6 @@ class WriteBatch : public WriteBatchBase {
Merge(nullptr, key, value);
}
using WriteBatchBase::Delete;
// If the database contains a mapping for "key", erase it. Else do nothing.
void Delete(ColumnFamilyHandle* column_family, const Slice& key) override;
void Delete(const Slice& key) override { Delete(nullptr, key); }
// variant that takes SliceParts
void Delete(ColumnFamilyHandle* column_family,
const SliceParts& key) override;
void Delete(const SliceParts& key) override { Delete(nullptr, key); }
using WriteBatchBase::PutLogData;
// Append a blob of arbitrary size to the records in this batch. The blob will
// be stored in the transaction log but not in any other file. In particular,
@ -135,6 +149,26 @@ class WriteBatch : public WriteBatchBase {
}
virtual void Put(const Slice& key, const Slice& value) {}
virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) {
if (column_family_id == 0) {
Delete(key);
return Status::OK();
}
return Status::InvalidArgument(
"non-default column family and DeleteCF not implemented");
}
virtual void Delete(const Slice& key) {}
virtual Status SingleDeleteCF(uint32_t column_family_id, const Slice& key) {
if (column_family_id == 0) {
SingleDelete(key);
return Status::OK();
}
return Status::InvalidArgument(
"non-default column family and SingleDeleteCF not implemented");
}
virtual void SingleDelete(const Slice& key) {}
// Merge and LogData are not pure virtual. Otherwise, we would break
// existing clients of Handler on a source code level. The default
// implementation of Merge does nothing.
@ -151,15 +185,6 @@ class WriteBatch : public WriteBatchBase {
// The default implementation of LogData does nothing.
virtual void LogData(const Slice& blob);
virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) {
if (column_family_id == 0) {
Delete(key);
return Status::OK();
}
return Status::InvalidArgument(
"non-default column family and DeleteCF not implemented");
}
virtual void Delete(const Slice& key) {}
// Continue is called by WriteBatch::Iterate. If it returns false,
// iteration is halted. Otherwise, it continues iterating. The default

@ -54,6 +54,17 @@ class WriteBatchBase {
virtual void Delete(ColumnFamilyHandle* column_family, const SliceParts& key);
virtual void Delete(const SliceParts& key);
// If the database contains a mapping for "key", erase it. Expects that the
// key was not overwritten. Else do nothing.
virtual void SingleDelete(ColumnFamilyHandle* column_family,
const Slice& key) = 0;
virtual void SingleDelete(const Slice& key) = 0;
// variant that takes SliceParts
virtual void SingleDelete(ColumnFamilyHandle* column_family,
const SliceParts& key);
virtual void SingleDelete(const SliceParts& key);
// Append a blob of arbitrary size to the records in this batch. The blob will
// be stored in the transaction log but not in any other file. In particular,
// it will not be persisted to the SST files. When iterating over this

@ -102,6 +102,9 @@ bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
return false;
case kTypeDeletion:
case kTypeSingleDeletion:
// TODO(noetzli): Verify correctness once merge of single-deletes
// is supported
assert(state_ == kNotFound || state_ == kMerge);
if (kNotFound == state_) {
state_ = kDeleted;

@ -86,6 +86,7 @@ DEFINE_int64(max_key, 1 * KB* KB,
DEFINE_int32(column_families, 10, "Number of column families");
// TODO(noetzli) Add support for single deletes
DEFINE_bool(test_batches_snapshots, false,
"If set, the test uses MultiGet(), MultiPut() and MultiDelete()"
" which read/write/delete multiple keys in a batch. In this mode,"
@ -318,6 +319,12 @@ DEFINE_int32(delpercent, 15,
static const bool FLAGS_delpercent_dummy __attribute__((unused)) =
RegisterFlagValidator(&FLAGS_delpercent, &ValidateInt32Percent);
DEFINE_int32(nooverwritepercent, 60,
"Ratio of keys without overwrite to total workload (expressed as "
" a percentage)");
static const bool FLAGS_nooverwritepercent_dummy __attribute__((__unused__)) =
RegisterFlagValidator(&FLAGS_nooverwritepercent, &ValidateInt32Percent);
DEFINE_int32(iterpercent, 10, "Ratio of iterations to total workload"
" (expressed as a percentage)");
static const bool FLAGS_iterpercent_dummy __attribute__((unused)) =
@ -453,6 +460,7 @@ class Stats {
long prefixes_;
long writes_;
long deletes_;
size_t single_deletes_;
long iterator_size_sums_;
long founds_;
long iterations_;
@ -473,6 +481,7 @@ class Stats {
prefixes_ = 0;
writes_ = 0;
deletes_ = 0;
single_deletes_ = 0;
iterator_size_sums_ = 0;
founds_ = 0;
iterations_ = 0;
@ -491,6 +500,7 @@ class Stats {
prefixes_ += other.prefixes_;
writes_ += other.writes_;
deletes_ += other.deletes_;
single_deletes_ += other.single_deletes_;
iterator_size_sums_ += other.iterator_size_sums_;
founds_ += other.founds_;
iterations_ += other.iterations_;
@ -555,6 +565,8 @@ class Stats {
deletes_ += n;
}
void AddSingleDeletes(size_t n) { single_deletes_ += n; }
void AddErrors(int n) {
errors_ += n;
}
@ -578,6 +590,7 @@ class Stats {
"", bytes_mb, rate, (100*writes_)/done_, done_);
fprintf(stdout, "%-12s: Wrote %ld times\n", "", writes_);
fprintf(stdout, "%-12s: Deleted %ld times\n", "", deletes_);
fprintf(stdout, "%-12s: Single deleted %ld times\n", "", single_deletes_);
fprintf(stdout, "%-12s: %ld read and %ld found the key\n", "",
gets_, founds_);
fprintf(stdout, "%-12s: Prefix scanned %ld times\n", "", prefixes_);
@ -613,7 +626,25 @@ class SharedState {
should_stop_bg_thread_(false),
bg_thread_finished_(false),
stress_test_(stress_test),
verification_failure_(false) {
verification_failure_(false),
no_overwrite_ids_(FLAGS_column_families) {
// Pick random keys in each column family that will not experience
// overwrite
printf("Choosing random keys with no overwrite\n");
Random rnd(seed_);
size_t num_no_overwrite_keys = (max_key_ * FLAGS_nooverwritepercent) / 100;
for (auto& cf_ids : no_overwrite_ids_) {
for (size_t i = 0; i < num_no_overwrite_keys; i++) {
size_t rand_key;
do {
rand_key = rnd.Next() % max_key_;
} while (cf_ids.find(rand_key) != cf_ids.end());
cf_ids.insert(rand_key);
}
assert(cf_ids.size() == num_no_overwrite_keys);
}
if (FLAGS_test_batches_snapshots) {
fprintf(stdout, "No lock creation because test_batches_snapshots set\n");
return;
@ -741,6 +772,14 @@ class SharedState {
void Delete(int cf, long key) { values_[cf][key] = SENTINEL; }
void SingleDelete(int cf, size_t key) { values_[cf][key] = SENTINEL; }
bool AllowsOverwrite(int cf, size_t key) {
return no_overwrite_ids_[cf].find(key) == no_overwrite_ids_[cf].end();
}
bool Exists(int cf, size_t key) { return values_[cf][key] != SENTINEL; }
uint32_t GetSeed() const { return seed_; }
void SetShouldStopBgThread() { should_stop_bg_thread_ = true; }
@ -769,6 +808,9 @@ class SharedState {
StressTest* stress_test_;
std::atomic<bool> verification_failure_;
// Keys that should not be overwritten
std::vector<std::set<size_t> > no_overwrite_ids_;
std::vector<std::vector<uint32_t>> values_;
// Has to make it owned by a smart ptr as port::Mutex is not copyable
// and storing it in the container may require copying depending on the impl.
@ -1445,6 +1487,7 @@ class StressTest {
void OperateDb(ThreadState* thread) {
ReadOptions read_opts(FLAGS_verify_checksum, true);
WriteOptions write_opts;
auto shared = thread->shared;
char value[100];
long max_key = thread->shared->GetMaxKey();
std::string from_db;
@ -1529,14 +1572,14 @@ class StressTest {
int rand_column_family = thread->rand.Next() % FLAGS_column_families;
std::string keystr = Key(rand_key);
Slice key = keystr;
int prob_op = thread->rand.Uniform(100);
std::unique_ptr<MutexLock> l;
if (!FLAGS_test_batches_snapshots) {
l.reset(new MutexLock(
thread->shared->GetMutexForKey(rand_column_family, rand_key)));
shared->GetMutexForKey(rand_column_family, rand_key)));
}
auto column_family = column_families_[rand_column_family];
int prob_op = thread->rand.Uniform(100);
if (prob_op >= 0 && prob_op < (int)FLAGS_readpercent) {
// OPERATION read
if (!FLAGS_test_batches_snapshots) {
@ -1585,16 +1628,31 @@ class StressTest {
size_t sz = GenerateValue(value_base, value, sizeof(value));
Slice v(value, sz);
if (!FLAGS_test_batches_snapshots) {
// If the chosen key does not allow overwrite and it already
// exists, choose another key.
while (!shared->AllowsOverwrite(rand_column_family, rand_key) &&
shared->Exists(rand_column_family, rand_key)) {
l.reset();
rand_key = thread->rand.Next() % max_key;
rand_column_family = thread->rand.Next() % FLAGS_column_families;
l.reset(new MutexLock(
shared->GetMutexForKey(rand_column_family, rand_key)));
}
keystr = Key(rand_key);
key = keystr;
column_family = column_families_[rand_column_family];
if (FLAGS_verify_before_write) {
std::string keystr2 = Key(rand_key);
Slice k = keystr2;
Status s = db_->Get(read_opts, column_family, k, &from_db);
if (VerifyValue(rand_column_family, rand_key, read_opts,
thread->shared, from_db, s, true) == false) {
if (!VerifyValue(rand_column_family, rand_key, read_opts,
thread->shared, from_db, s, true)) {
break;
}
}
thread->shared->Put(rand_column_family, rand_key, value_base);
shared->Put(rand_column_family, rand_key, value_base);
Status s;
if (FLAGS_use_merge) {
s = db_->Merge(write_opts, column_family, key, v);
@ -1614,12 +1672,40 @@ class StressTest {
} else if (writeBound <= prob_op && prob_op < delBound) {
// OPERATION delete
if (!FLAGS_test_batches_snapshots) {
thread->shared->Delete(rand_column_family, rand_key);
Status s = db_->Delete(write_opts, column_family, key);
thread->stats.AddDeletes(1);
if (!s.ok()) {
fprintf(stderr, "delete error: %s\n", s.ToString().c_str());
std::terminate();
// If the chosen key does not allow overwrite and it does not exist,
// choose another key.
while (!shared->AllowsOverwrite(rand_column_family, rand_key) &&
!shared->Exists(rand_column_family, rand_key)) {
l.reset();
rand_key = thread->rand.Next() % max_key;
rand_column_family = thread->rand.Next() % FLAGS_column_families;
l.reset(new MutexLock(
shared->GetMutexForKey(rand_column_family, rand_key)));
}
keystr = Key(rand_key);
key = keystr;
column_family = column_families_[rand_column_family];
// Use delete if the key may be overwritten and a single deletion
// otherwise.
if (shared->AllowsOverwrite(rand_column_family, rand_key)) {
shared->Delete(rand_column_family, rand_key);
Status s = db_->Delete(write_opts, column_family, key);
thread->stats.AddDeletes(1);
if (!s.ok()) {
fprintf(stderr, "delete error: %s\n", s.ToString().c_str());
std::terminate();
}
} else {
shared->SingleDelete(rand_column_family, rand_key);
Status s = db_->SingleDelete(write_opts, column_family, key);
thread->stats.AddSingleDeletes(1);
if (!s.ok()) {
fprintf(stderr, "single delete error: %s\n",
s.ToString().c_str());
std::terminate();
}
}
} else {
MultiDelete(thread, write_opts, column_family, key);
@ -1778,50 +1864,47 @@ class StressTest {
}
void PrintEnv() const {
fprintf(stdout, "RocksDB version : %d.%d\n", kMajorVersion,
fprintf(stdout, "RocksDB version : %d.%d\n", kMajorVersion,
kMinorVersion);
fprintf(stdout, "Column families : %d\n", FLAGS_column_families);
fprintf(stdout, "Column families : %d\n", FLAGS_column_families);
if (!FLAGS_test_batches_snapshots) {
fprintf(stdout, "Clear CFs one in : %d\n",
fprintf(stdout, "Clear CFs one in : %d\n",
FLAGS_clear_column_family_one_in);
}
fprintf(stdout, "Number of threads : %d\n", FLAGS_threads);
fprintf(stdout,
"Ops per thread : %lu\n",
fprintf(stdout, "Number of threads : %d\n", FLAGS_threads);
fprintf(stdout, "Ops per thread : %lu\n",
(unsigned long)FLAGS_ops_per_thread);
std::string ttl_state("unused");
if (FLAGS_ttl > 0) {
ttl_state = NumberToString(FLAGS_ttl);
}
fprintf(stdout, "Time to live(sec) : %s\n", ttl_state.c_str());
fprintf(stdout, "Read percentage : %d%%\n", FLAGS_readpercent);
fprintf(stdout, "Prefix percentage : %d%%\n", FLAGS_prefixpercent);
fprintf(stdout, "Write percentage : %d%%\n", FLAGS_writepercent);
fprintf(stdout, "Delete percentage : %d%%\n", FLAGS_delpercent);
fprintf(stdout, "Iterate percentage : %d%%\n", FLAGS_iterpercent);
fprintf(stdout, "DB-write-buffer-size: %" PRIu64 "\n",
FLAGS_db_write_buffer_size);
fprintf(stdout, "Write-buffer-size : %d\n", FLAGS_write_buffer_size);
fprintf(stdout,
"Iterations : %lu\n",
fprintf(stdout, "Time to live(sec) : %s\n", ttl_state.c_str());
fprintf(stdout, "Read percentage : %d%%\n", FLAGS_readpercent);
fprintf(stdout, "Prefix percentage : %d%%\n", FLAGS_prefixpercent);
fprintf(stdout, "Write percentage : %d%%\n", FLAGS_writepercent);
fprintf(stdout, "Delete percentage : %d%%\n", FLAGS_delpercent);
fprintf(stdout, "No overwrite percentage : %d%%\n",
FLAGS_nooverwritepercent);
fprintf(stdout, "Iterate percentage : %d%%\n", FLAGS_iterpercent);
fprintf(stdout, "DB-write-buffer-size : %" PRIu64 "\n",
FLAGS_db_write_buffer_size);
fprintf(stdout, "Write-buffer-size : %d\n",
FLAGS_write_buffer_size);
fprintf(stdout, "Iterations : %lu\n",
(unsigned long)FLAGS_num_iterations);
fprintf(stdout,
"Max key : %lu\n",
fprintf(stdout, "Max key : %lu\n",
(unsigned long)FLAGS_max_key);
fprintf(stdout, "Ratio #ops/#keys : %f\n",
(1.0 * FLAGS_ops_per_thread * FLAGS_threads)/FLAGS_max_key);
fprintf(stdout, "Num times DB reopens: %d\n", FLAGS_reopen);
fprintf(stdout, "Batches/snapshots : %d\n",
fprintf(stdout, "Ratio #ops/#keys : %f\n",
(1.0 * FLAGS_ops_per_thread * FLAGS_threads) / FLAGS_max_key);
fprintf(stdout, "Num times DB reopens : %d\n", FLAGS_reopen);
fprintf(stdout, "Batches/snapshots : %d\n",
FLAGS_test_batches_snapshots);
fprintf(stdout, "Deletes use filter : %d\n",
FLAGS_filter_deletes);
fprintf(stdout, "Do update in place : %d\n",
FLAGS_in_place_update);
fprintf(stdout, "Num keys per lock : %d\n",
fprintf(stdout, "Deletes use filter : %d\n", FLAGS_filter_deletes);
fprintf(stdout, "Do update in place : %d\n", FLAGS_in_place_update);
fprintf(stdout, "Num keys per lock : %d\n",
1 << FLAGS_log2_keys_per_lock);
std::string compression = CompressionTypeToString(FLAGS_compression_type_e);
fprintf(stdout, "Compression : %s\n", compression.c_str());
fprintf(stdout, "Compression : %s\n", compression.c_str());
const char* memtablerep = "";
switch (FLAGS_rep_factory) {
@ -1836,7 +1919,7 @@ class StressTest {
break;
}
fprintf(stdout, "Memtablerep : %s\n", memtablerep);
fprintf(stdout, "Memtablerep : %s\n", memtablerep);
fprintf(stdout, "------------------------------------------------\n");
}

@ -462,6 +462,14 @@ Status DBTestBase::Delete(int cf, const std::string& k) {
return db_->Delete(WriteOptions(), handles_[cf], k);
}
Status DBTestBase::SingleDelete(const std::string& k) {
return db_->SingleDelete(WriteOptions(), k);
}
Status DBTestBase::SingleDelete(int cf, const std::string& k) {
return db_->SingleDelete(WriteOptions(), handles_[cf], k);
}
std::string DBTestBase::Get(const std::string& k, const Snapshot* snapshot) {
ReadOptions options;
options.verify_checksums = true;
@ -571,6 +579,9 @@ std::string DBTestBase::AllEntriesFor(const Slice& user_key, int cf) {
case kTypeDeletion:
result += "DEL";
break;
case kTypeSingleDeletion:
result += "SDEL";
break;
default:
assert(false);
break;

@ -554,6 +554,10 @@ class DBTestBase : public testing::Test {
Status Delete(int cf, const std::string& k);
Status SingleDelete(const std::string& k);
Status SingleDelete(int cf, const std::string& k);
std::string Get(const std::string& k, const Snapshot* snapshot = nullptr);
std::string Get(int cf, const std::string& k,

@ -129,5 +129,14 @@ void CorruptKeyType(InternalKey* ikey) {
ikey->DecodeFrom(Slice(keystr.data(), keystr.size()));
}
std::string KeyStr(const std::string& user_key, const SequenceNumber& seq,
const ValueType& t, bool corrupt) {
InternalKey k(user_key, seq, t);
if (corrupt) {
CorruptKeyType(&k);
}
return k.Encode().ToString();
}
} // namespace test
} // namespace rocksdb

@ -277,6 +277,10 @@ class NullLogger : public Logger {
// Corrupts key by changing the type
extern void CorruptKeyType(InternalKey* ikey);
extern std::string KeyStr(const std::string& user_key,
const SequenceNumber& seq, const ValueType& t,
bool corrupt = false);
class SleepingBackgroundTask {
public:
SleepingBackgroundTask()

@ -344,7 +344,7 @@ class WBWIIteratorImpl : public WBWIIterator {
&ret.key, &ret.value, &blob);
assert(s.ok());
assert(ret.type == kPutRecord || ret.type == kDeleteRecord ||
ret.type == kMergeRecord);
ret.type == kSingleDeleteRecord || ret.type == kMergeRecord);
return ret;
}
@ -580,36 +580,49 @@ void WriteBatchWithIndex::Put(const Slice& key, const Slice& value) {
rep->AddOrUpdateIndex(key);
}
void WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
void WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family,
const Slice& key) {
rep->SetLastEntryOffset();
rep->write_batch.Merge(column_family, key, value);
rep->write_batch.Delete(column_family, key);
rep->AddOrUpdateIndex(column_family, key);
}
void WriteBatchWithIndex::Merge(const Slice& key, const Slice& value) {
void WriteBatchWithIndex::Delete(const Slice& key) {
rep->SetLastEntryOffset();
rep->write_batch.Merge(key, value);
rep->write_batch.Delete(key);
rep->AddOrUpdateIndex(key);
}
void WriteBatchWithIndex::PutLogData(const Slice& blob) {
rep->write_batch.PutLogData(blob);
void WriteBatchWithIndex::SingleDelete(ColumnFamilyHandle* column_family,
const Slice& key) {
rep->SetLastEntryOffset();
rep->write_batch.SingleDelete(column_family, key);
rep->AddOrUpdateIndex(column_family, key);
}
void WriteBatchWithIndex::Delete(ColumnFamilyHandle* column_family,
const Slice& key) {
void WriteBatchWithIndex::SingleDelete(const Slice& key) {
rep->SetLastEntryOffset();
rep->write_batch.Delete(column_family, key);
rep->write_batch.SingleDelete(key);
rep->AddOrUpdateIndex(key);
}
void WriteBatchWithIndex::Merge(ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) {
rep->SetLastEntryOffset();
rep->write_batch.Merge(column_family, key, value);
rep->AddOrUpdateIndex(column_family, key);
}
void WriteBatchWithIndex::Delete(const Slice& key) {
void WriteBatchWithIndex::Merge(const Slice& key, const Slice& value) {
rep->SetLastEntryOffset();
rep->write_batch.Delete(key);
rep->write_batch.Merge(key, value);
rep->AddOrUpdateIndex(key);
}
void WriteBatchWithIndex::PutLogData(const Slice& blob) {
rep->write_batch.PutLogData(blob);
}
void WriteBatchWithIndex::Clear() { rep->Clear(); }
Status WriteBatchWithIndex::GetFromBatch(ColumnFamilyHandle* column_family,

@ -5,6 +5,8 @@
#ifndef ROCKSDB_LITE
#include "utilities/write_batch_with_index/write_batch_with_index_internal.h"
#include "db/column_family.h"
#include "db/merge_context.h"
#include "db/merge_helper.h"
@ -13,7 +15,6 @@
#include "rocksdb/utilities/write_batch_with_index.h"
#include "util/coding.h"
#include "util/string_util.h"
#include "utilities/write_batch_with_index/write_batch_with_index_internal.h"
namespace rocksdb {
@ -53,6 +54,10 @@ Status ReadableWriteBatch::GetEntryFromDataOffset(size_t data_offset,
case kTypeDeletion:
*type = kDeleteRecord;
break;
case kTypeColumnFamilySingleDeletion:
case kTypeSingleDeletion:
*type = kSingleDeleteRecord;
break;
case kTypeColumnFamilyMerge:
case kTypeMerge:
*type = kMergeRecord;

@ -30,7 +30,7 @@ struct WriteBatchIndexEntry {
// If this flag appears in the offset, it indicates a key that is smaller
// than any other entry for the same column family
static const size_t kFlagMin = UINT_MAX;
static const size_t kFlagMin = std::numeric_limits<size_t>::max();
size_t offset; // offset of an entry in write batch's string buffer.
uint32_t column_family; // column family of the entry

Loading…
Cancel
Save