Introduce options.check_flush_compaction_key_order (#7467)

Summary:
Introduce an new option options.check_flush_compaction_key_order, by default set to true, which checks key order of flush and compaction, and fail the operation if the order is violated.
Also did minor refactor hash checking code, which consolidates the hashing logic to a vlidation class, where the key ordering logic is added.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7467

Test Plan: Add unit tests to validate the check can catch reordering in flush and compaction, and can be properly disabled.

Reviewed By: riversand963

Differential Revision: D24010683

fbshipit-source-id: 8dd6292d2cda8006054e9ded7cfa4bf405f0527c
main
sdong 4 years ago committed by Facebook GitHub Bot
parent 3e745053b7
commit 7508175558
  1. 1
      CMakeLists.txt
  2. 1
      HISTORY.md
  3. 1
      TARGETS
  4. 56
      db/builder.cc
  5. 65
      db/compaction/compaction_job.cc
  6. 22
      db/compaction/compaction_job_test.cc
  7. 81
      db/corruption_test.cc
  8. 7
      db/db_test.cc
  9. 57
      db/dbformat.h
  10. 11
      db/flush_job_test.cc
  11. 1
      db/memtable.cc
  12. 30
      db/output_validator.cc
  13. 45
      db/output_validator.h
  14. 8
      include/rocksdb/advanced_options.h
  15. 6
      options/cf_options.cc
  16. 4
      options/cf_options.h
  17. 2
      options/options_helper.cc
  18. 1
      options/options_settable_test.cc
  19. 1
      src.mk
  20. 70
      table/mock_table.cc
  21. 15
      table/mock_table.h

@ -620,6 +620,7 @@ set(SOURCES
db/memtable_list.cc
db/merge_helper.cc
db/merge_operator.cc
db/output_validator.cc
db/range_del_aggregator.cc
db/range_tombstone_fragmenter.cc
db/repair.cc

@ -14,6 +14,7 @@
### New Features
* Methods to configure serialize, and compare -- such as TableFactory -- are exposed directly through the Configurable base class (from which these objects inherit). This change will allow for better and more thorough configuration management and retrieval in the future. The options for a Configurable object can be set via the ConfigureFromMap, ConfigureFromString, or ConfigureOption method. The serialized version of the options of an object can be retrieved via the GetOptionString, ToString, or GetOption methods. The list of options supported by an object can be obtained via the GetOptionNames method. The "raw" object (such as the BlockBasedTableOption) for an option may be retrieved via the GetOptions method. Configurable options can be compared via the AreEquivalent method. The settings within a Configurable object may be validated via the ValidateOptions method. The object may be intialized (at which point only mutable options may be updated) via the PrepareOptions method.
* Introduce options.check_flush_compaction_key_order with default value to be true. With this option, during flush and compaction, key order will be checked when writing to each SST file. If the order is violated, the flush or compaction will fail.
## 6.13 (09/12/2020)
### Bug fixes

@ -184,6 +184,7 @@ cpp_library(
"db/memtable_list.cc",
"db/merge_helper.cc",
"db/merge_operator.cc",
"db/output_validator.cc",
"db/range_del_aggregator.cc",
"db/range_tombstone_fragmenter.cc",
"db/repair.cc",

@ -19,6 +19,7 @@
#include "db/event_helpers.h"
#include "db/internal_stats.h"
#include "db/merge_helper.h"
#include "db/output_validator.h"
#include "db/range_del_aggregator.h"
#include "db/table_cache.h"
#include "db/version_edit.h"
@ -96,7 +97,11 @@ Status BuildTable(
column_family_name.empty());
// Reports the IOStats for flush for every following bytes.
const size_t kReportFlushIOStatsEvery = 1048576;
uint64_t paranoid_hash = 0;
OutputValidator output_validator(
internal_comparator,
/*enable_order_check=*/
mutable_cf_options.check_flush_compaction_key_order,
/*enable_hash=*/paranoid_file_checks);
Status s;
meta->fd.file_size = 0;
iter->SeekToFirst();
@ -187,10 +192,10 @@ Status BuildTable(
const Slice& key = c_iter.key();
const Slice& value = c_iter.value();
const ParsedInternalKey& ikey = c_iter.ikey();
if (paranoid_file_checks) {
// Generate a rolling 64-bit hash of the key and values
paranoid_hash = Hash64(key.data(), key.size(), paranoid_hash);
paranoid_hash = Hash64(value.data(), value.size(), paranoid_hash);
// Generate a rolling 64-bit hash of the key and values
s = output_validator.Add(key, value);
if (!s.ok()) {
break;
}
builder->Add(key, value);
meta->UpdateBoundaries(key, value, ikey.sequence, ikey.type);
@ -202,23 +207,24 @@ Status BuildTable(
ThreadStatus::FLUSH_BYTES_WRITTEN, IOSTATS(bytes_written));
}
}
if (s.ok()) {
auto range_del_it = range_del_agg->NewIterator();
for (range_del_it->SeekToFirst(); range_del_it->Valid();
range_del_it->Next()) {
auto tombstone = range_del_it->Tombstone();
auto kv = tombstone.Serialize();
builder->Add(kv.first.Encode(), kv.second);
meta->UpdateBoundariesForRange(kv.first, tombstone.SerializeEndKey(),
tombstone.seq_, internal_comparator);
}
auto range_del_it = range_del_agg->NewIterator();
for (range_del_it->SeekToFirst(); range_del_it->Valid();
range_del_it->Next()) {
auto tombstone = range_del_it->Tombstone();
auto kv = tombstone.Serialize();
builder->Add(kv.first.Encode(), kv.second);
meta->UpdateBoundariesForRange(kv.first, tombstone.SerializeEndKey(),
tombstone.seq_, internal_comparator);
}
// Finish and check for builder errors
s = c_iter.status();
// Finish and check for builder errors
s = c_iter.status();
if (blob_file_builder) {
if (s.ok()) {
s = blob_file_builder->Finish();
if (blob_file_builder) {
if (s.ok()) {
s = blob_file_builder->Finish();
}
}
}
@ -291,15 +297,15 @@ Status BuildTable(
/*allow_unprepared_value*/ false));
s = it->status();
if (s.ok() && paranoid_file_checks) {
uint64_t check_hash = 0;
OutputValidator file_validator(internal_comparator,
/*enable_order_check=*/true,
/*enable_hash=*/true);
for (it->SeekToFirst(); it->Valid(); it->Next()) {
// Generate a rolling 64-bit hash of the key and values
check_hash = Hash64(it->key().data(), it->key().size(), check_hash);
check_hash =
Hash64(it->value().data(), it->value().size(), check_hash);
file_validator.Add(it->key(), it->value()).PermitUncheckedError();
}
s = it->status();
if (s.ok() && check_hash != paranoid_hash) {
if (s.ok() && !output_validator.CompareValidator(file_validator)) {
s = Status::Corruption("Paranoid checksums do not match");
}
}

@ -32,6 +32,7 @@
#include "db/memtable_list.h"
#include "db/merge_context.h"
#include "db/merge_helper.h"
#include "db/output_validator.h"
#include "db/range_del_aggregator.h"
#include "db/version_set.h"
#include "file/filename.h"
@ -124,9 +125,14 @@ struct CompactionJob::SubcompactionState {
// Files produced by this subcompaction
struct Output {
Output(FileMetaData&& _meta, const InternalKeyComparator& _icmp,
bool _enable_order_check, bool _enable_hash)
: meta(std::move(_meta)),
validator(_icmp, _enable_order_check, _enable_hash),
finished(false) {}
FileMetaData meta;
OutputValidator validator;
bool finished;
uint64_t paranoid_hash;
std::shared_ptr<const TableProperties> table_properties;
};
@ -170,17 +176,16 @@ struct CompactionJob::SubcompactionState {
// Adds the key and value to the builder
// If paranoid is true, adds the key-value to the paranoid hash
void AddToBuilder(const Slice& key, const Slice& value, bool paranoid) {
Status AddToBuilder(const Slice& key, const Slice& value) {
auto curr = current_output();
assert(builder != nullptr);
assert(curr != nullptr);
if (paranoid) {
// Generate a rolling 64-bit hash of the key and values
curr->paranoid_hash = Hash64(key.data(), key.size(), curr->paranoid_hash);
curr->paranoid_hash =
Hash64(value.data(), value.size(), curr->paranoid_hash);
Status s = curr->validator.Add(key, value);
if (!s.ok()) {
return s;
}
builder->Add(key, value);
return Status::OK();
}
// Returns true iff we should stop building the current output
@ -662,14 +667,20 @@ Status CompactionJob::Run() {
auto s = iter->status();
if (s.ok() && paranoid_file_checks_) {
uint64_t hash = 0;
OutputValidator validator(cfd->internal_comparator(),
/*_enable_order_check=*/true,
/*_enable_hash=*/true);
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
// Generate a rolling 64-bit hash of the key and values, using the
hash = Hash64(iter->key().data(), iter->key().size(), hash);
hash = Hash64(iter->value().data(), iter->value().size(), hash);
s = validator.Add(iter->key(), iter->value());
if (!s.ok()) {
break;
}
}
s = iter->status();
if (s.ok() && hash != files_output[file_idx]->paranoid_hash) {
if (s.ok()) {
s = iter->status();
}
if (s.ok() &&
!validator.CompareValidator(files_output[file_idx]->validator)) {
s = Status::Corruption("Paranoid checksums do not match");
}
}
@ -961,7 +972,10 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
break;
}
}
sub_compact->AddToBuilder(key, value, paranoid_file_checks_);
status = sub_compact->AddToBuilder(key, value);
if (!status.ok()) {
break;
}
sub_compact->current_output_file_size =
sub_compact->builder->EstimatedFileSize();
@ -1276,8 +1290,8 @@ Status CompactionJob::FinishCompactionOutputFile(
auto kv = tombstone.Serialize();
assert(lower_bound == nullptr ||
ucmp->Compare(*lower_bound, kv.second) < 0);
sub_compact->AddToBuilder(kv.first.Encode(), kv.second,
paranoid_file_checks_);
// Range tombstone is not supported by output validator yet.
sub_compact->builder->Add(kv.first.Encode(), kv.second);
InternalKey smallest_candidate = std::move(kv.first);
if (lower_bound != nullptr &&
ucmp->Compare(smallest_candidate.user_key(), *lower_bound) <= 0) {
@ -1594,14 +1608,17 @@ Status CompactionJob::OpenCompactionOutputFile(
// Initialize a SubcompactionState::Output and add it to sub_compact->outputs
{
SubcompactionState::Output out;
out.meta.fd = FileDescriptor(file_number,
sub_compact->compaction->output_path_id(), 0);
out.meta.oldest_ancester_time = oldest_ancester_time;
out.meta.file_creation_time = current_time;
out.finished = false;
out.paranoid_hash = 0;
sub_compact->outputs.push_back(out);
FileMetaData meta;
meta.fd = FileDescriptor(file_number,
sub_compact->compaction->output_path_id(), 0);
meta.oldest_ancester_time = oldest_ancester_time;
meta.file_creation_time = current_time;
sub_compact->outputs.emplace_back(
std::move(meta), cfd->internal_comparator(),
/*enable_order_check=*/
sub_compact->compaction->mutable_cf_options()
->check_flush_compaction_key_order,
/*enable_hash=*/paranoid_file_checks_);
}
writable_file->SetIOPriority(Env::IOPriority::IO_LOW);

@ -130,7 +130,7 @@ class CompactionJobTest : public testing::Test {
return blob_index;
}
void AddMockFile(const stl_wrappers::KVMap& contents, int level = 0) {
void AddMockFile(const mock::KVVector& contents, int level = 0) {
assert(contents.size() > 0);
bool first_key = true;
@ -205,8 +205,8 @@ class CompactionJobTest : public testing::Test {
}
// returns expected result after compaction
stl_wrappers::KVMap CreateTwoFiles(bool gen_corrupted_keys) {
auto expected_results = mock::MakeMockFile();
mock::KVVector CreateTwoFiles(bool gen_corrupted_keys) {
stl_wrappers::KVMap expected_results;
const int kKeysPerFile = 10000;
const int kCorruptKeysPerFile = 200;
const int kMatchingKeys = kKeysPerFile / 2;
@ -232,19 +232,25 @@ class CompactionJobTest : public testing::Test {
test::CorruptKeyType(&internal_key);
test::CorruptKeyType(&bottommost_internal_key);
}
contents.insert({ internal_key.Encode().ToString(), value });
contents.push_back({internal_key.Encode().ToString(), value});
if (i == 1 || k < kMatchingKeys || corrupt_id(k - kMatchingKeys)) {
expected_results.insert(
{ bottommost_internal_key.Encode().ToString(), value });
{bottommost_internal_key.Encode().ToString(), value});
}
}
mock::SortKVVector(&contents);
AddMockFile(contents);
}
SetLastSequence(sequence_number);
return expected_results;
mock::KVVector expected_results_kvvector;
for (auto& kv : expected_results) {
expected_results_kvvector.push_back({kv.first, kv.second});
}
return expected_results_kvvector;
}
void NewDB() {
@ -299,7 +305,7 @@ class CompactionJobTest : public testing::Test {
void RunCompaction(
const std::vector<std::vector<FileMetaData*>>& input_files,
const stl_wrappers::KVMap& expected_results,
const mock::KVVector& expected_results,
const std::vector<SequenceNumber>& snapshots = {},
SequenceNumber earliest_write_conflict_snapshot = kMaxSequenceNumber,
int output_level = 1, bool verify = true,
@ -644,7 +650,7 @@ TEST_F(CompactionJobTest, FilterAllMergeOperands) {
SetLastSequence(11U);
auto files = cfd_->current()->storage_info()->LevelFiles(0);
stl_wrappers::KVMap empty_map;
mock::KVVector empty_map;
RunCompaction({files}, empty_map);
}

@ -562,12 +562,14 @@ TEST_F(CorruptionTest, FileSystemStateCorrupted) {
}
}
static const auto& corruption_modes = {mock::MockTableFactory::kCorruptNone,
mock::MockTableFactory::kCorruptKey,
mock::MockTableFactory::kCorruptValue};
static const auto& corruption_modes = {
mock::MockTableFactory::kCorruptNone, mock::MockTableFactory::kCorruptKey,
mock::MockTableFactory::kCorruptValue,
mock::MockTableFactory::kCorruptReorderKey};
TEST_F(CorruptionTest, ParanoidFileChecksOnFlush) {
Options options;
options.check_flush_compaction_key_order = false;
options.paranoid_file_checks = true;
options.create_if_missing = true;
Status s;
@ -595,6 +597,7 @@ TEST_F(CorruptionTest, ParanoidFileChecksOnCompact) {
Options options;
options.paranoid_file_checks = true;
options.create_if_missing = true;
options.check_flush_compaction_key_order = false;
Status s;
for (const auto& mode : corruption_modes) {
delete db_;
@ -644,6 +647,78 @@ TEST_F(CorruptionTest, LogCorruptionErrorsInCompactionIterator) {
ASSERT_TRUE(s.IsCorruption());
}
TEST_F(CorruptionTest, CompactionKeyOrderCheck) {
Options options;
options.paranoid_file_checks = false;
options.create_if_missing = true;
options.check_flush_compaction_key_order = false;
delete db_;
db_ = nullptr;
ASSERT_OK(DestroyDB(dbname_, options));
std::shared_ptr<mock::MockTableFactory> mock =
std::make_shared<mock::MockTableFactory>();
options.table_factory = mock;
ASSERT_OK(DB::Open(options, dbname_, &db_));
assert(db_ != nullptr);
mock->SetCorruptionMode(mock::MockTableFactory::kCorruptReorderKey);
Build(100, 2);
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
ASSERT_OK(dbi->TEST_FlushMemTable());
mock->SetCorruptionMode(mock::MockTableFactory::kCorruptNone);
ASSERT_OK(db_->SetOptions({{"check_flush_compaction_key_order", "true"}}));
ASSERT_NOK(dbi->TEST_CompactRange(0, nullptr, nullptr, nullptr, true));
}
TEST_F(CorruptionTest, FlushKeyOrderCheck) {
Options options;
options.paranoid_file_checks = false;
options.create_if_missing = true;
ASSERT_OK(db_->SetOptions({{"check_flush_compaction_key_order", "true"}}));
ASSERT_OK(db_->Put(WriteOptions(), "foo1", "v1"));
ASSERT_OK(db_->Put(WriteOptions(), "foo2", "v1"));
ASSERT_OK(db_->Put(WriteOptions(), "foo3", "v1"));
ASSERT_OK(db_->Put(WriteOptions(), "foo4", "v1"));
int cnt = 0;
// Generate some out of order keys from the memtable
SyncPoint::GetInstance()->SetCallBack(
"MemTableIterator::Next:0", [&](void* arg) {
MemTableRep::Iterator* mem_iter =
static_cast<MemTableRep::Iterator*>(arg);
if (++cnt == 3) {
mem_iter->Prev();
mem_iter->Prev();
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Status s = static_cast_with_check<DBImpl>(db_)->TEST_FlushMemTable();
ASSERT_NOK(s);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}
TEST_F(CorruptionTest, DisableKeyOrderCheck) {
Options options;
ASSERT_OK(db_->SetOptions({{"check_flush_compaction_key_order", "false"}}));
DBImpl* dbi = static_cast_with_check<DBImpl>(db_);
SyncPoint::GetInstance()->SetCallBack(
"OutputValidator::Add:order_check",
[&](void* /*arg*/) { ASSERT_TRUE(false); });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ASSERT_OK(db_->Put(WriteOptions(), "foo1", "v1"));
ASSERT_OK(db_->Put(WriteOptions(), "foo3", "v1"));
ASSERT_OK(dbi->TEST_FlushMemTable());
ASSERT_OK(db_->Put(WriteOptions(), "foo2", "v1"));
ASSERT_OK(db_->Put(WriteOptions(), "foo4", "v1"));
ASSERT_OK(dbi->TEST_FlushMemTable());
ASSERT_OK(dbi->TEST_CompactRange(0, nullptr, nullptr, nullptr, true));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}
} // namespace ROCKSDB_NAMESPACE
int main(int argc, char** argv) {

@ -5291,6 +5291,13 @@ TEST_F(DBTest, DynamicMiscOptions) {
ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_[1],
&mutable_cf_options));
ASSERT_TRUE(mutable_cf_options.report_bg_io_stats);
ASSERT_TRUE(mutable_cf_options.check_flush_compaction_key_order);
ASSERT_OK(dbfull()->SetOptions(
handles_[1], {{"check_flush_compaction_key_order", "false"}}));
ASSERT_OK(dbfull()->TEST_GetLatestMutableCFOptions(handles_[1],
&mutable_cf_options));
ASSERT_FALSE(mutable_cf_options.check_flush_compaction_key_order);
}
#endif // ROCKSDB_LITE

@ -96,6 +96,8 @@ static const SequenceNumber kMaxSequenceNumber = ((0x1ull << 56) - 1);
static const SequenceNumber kDisableGlobalSequenceNumber = port::kMaxUint64;
constexpr uint64_t kNumInternalBytes = 8;
// The data structure that represents an internal key in the way that user_key,
// sequence number and type are stored in separated forms.
struct ParsedInternalKey {
@ -121,7 +123,7 @@ struct ParsedInternalKey {
// Return the length of the encoding of "key".
inline size_t InternalKeyEncodingLength(const ParsedInternalKey& key) {
return key.user_key.size() + 8;
return key.user_key.size() + kNumInternalBytes;
}
// Pack a sequence number and a ValueType into a uint64_t
@ -168,14 +170,15 @@ extern Status ParseInternalKey(const Slice& internal_key,
// Returns the user key portion of an internal key.
inline Slice ExtractUserKey(const Slice& internal_key) {
assert(internal_key.size() >= 8);
return Slice(internal_key.data(), internal_key.size() - 8);
assert(internal_key.size() >= kNumInternalBytes);
return Slice(internal_key.data(), internal_key.size() - kNumInternalBytes);
}
inline Slice ExtractUserKeyAndStripTimestamp(const Slice& internal_key,
size_t ts_sz) {
assert(internal_key.size() >= 8 + ts_sz);
return Slice(internal_key.data(), internal_key.size() - 8 - ts_sz);
assert(internal_key.size() >= kNumInternalBytes + ts_sz);
return Slice(internal_key.data(),
internal_key.size() - kNumInternalBytes - ts_sz);
}
inline Slice StripTimestampFromUserKey(const Slice& user_key, size_t ts_sz) {
@ -189,9 +192,9 @@ inline Slice ExtractTimestampFromUserKey(const Slice& user_key, size_t ts_sz) {
}
inline uint64_t ExtractInternalKeyFooter(const Slice& internal_key) {
assert(internal_key.size() >= 8);
assert(internal_key.size() >= kNumInternalBytes);
const size_t n = internal_key.size();
return DecodeFixed64(internal_key.data() + n - 8);
return DecodeFixed64(internal_key.data() + n - kNumInternalBytes);
}
inline ValueType ExtractValueType(const Slice& internal_key) {
@ -327,13 +330,15 @@ inline int InternalKeyComparator::Compare(const InternalKey& a,
inline Status ParseInternalKey(const Slice& internal_key,
ParsedInternalKey* result) {
const size_t n = internal_key.size();
if (n < 8) return Status::Corruption("Internal Key too small");
uint64_t num = DecodeFixed64(internal_key.data() + n - 8);
if (n < kNumInternalBytes) {
return Status::Corruption("Internal Key too small");
}
uint64_t num = DecodeFixed64(internal_key.data() + n - kNumInternalBytes);
unsigned char c = num & 0xff;
result->sequence = num >> 8;
result->type = static_cast<ValueType>(c);
assert(result->type <= ValueType::kMaxValue);
result->user_key = Slice(internal_key.data(), n - 8);
result->user_key = Slice(internal_key.data(), n - kNumInternalBytes);
return IsExtendedValueType(result->type)
? Status::OK()
: Status::Corruption("Invalid Key Type");
@ -343,19 +348,19 @@ inline Status ParseInternalKey(const Slice& internal_key,
// Guarantees not to invalidate ikey.data().
inline void UpdateInternalKey(std::string* ikey, uint64_t seq, ValueType t) {
size_t ikey_sz = ikey->size();
assert(ikey_sz >= 8);
assert(ikey_sz >= kNumInternalBytes);
uint64_t newval = (seq << 8) | t;
// Note: Since C++11, strings are guaranteed to be stored contiguously and
// string::operator[]() is guaranteed not to change ikey.data().
EncodeFixed64(&(*ikey)[ikey_sz - 8], newval);
EncodeFixed64(&(*ikey)[ikey_sz - kNumInternalBytes], newval);
}
// Get the sequence number from the internal key
inline uint64_t GetInternalKeySeqno(const Slice& internal_key) {
const size_t n = internal_key.size();
assert(n >= 8);
uint64_t num = DecodeFixed64(internal_key.data() + n - 8);
assert(n >= kNumInternalBytes);
uint64_t num = DecodeFixed64(internal_key.data() + n - kNumInternalBytes);
return num >> 8;
}
@ -394,8 +399,8 @@ class IterKey {
if (IsUserKey()) {
return Slice(key_, key_size_);
} else {
assert(key_size_ >= 8);
return Slice(key_, key_size_ - 8);
assert(key_size_ >= kNumInternalBytes);
return Slice(key_, key_size_ - kNumInternalBytes);
}
}
@ -453,9 +458,9 @@ class IterKey {
// and returns a Slice referencing the new copy.
Slice SetInternalKey(const Slice& key, ParsedInternalKey* ikey) {
size_t key_n = key.size();
assert(key_n >= 8);
assert(key_n >= kNumInternalBytes);
SetInternalKey(key);
ikey->user_key = Slice(key_, key_n - 8);
ikey->user_key = Slice(key_, key_n - kNumInternalBytes);
return Slice(key_, key_n);
}
@ -472,9 +477,9 @@ class IterKey {
// invalidate slices to the key (and the user key).
void UpdateInternalKey(uint64_t seq, ValueType t) {
assert(!IsKeyPinned());
assert(key_size_ >= 8);
assert(key_size_ >= kNumInternalBytes);
uint64_t newval = (seq << 8) | t;
EncodeFixed64(&buf_[key_size_ - 8], newval);
EncodeFixed64(&buf_[key_size_ - kNumInternalBytes], newval);
}
bool IsKeyPinned() const { return (key_ != buf_); }
@ -679,8 +684,10 @@ inline int InternalKeyComparator::Compare(const Slice& akey,
// decreasing type (though sequence# should be enough to disambiguate)
int r = user_comparator_.Compare(ExtractUserKey(akey), ExtractUserKey(bkey));
if (r == 0) {
const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8);
const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8);
const uint64_t anum =
DecodeFixed64(akey.data() + akey.size() - kNumInternalBytes);
const uint64_t bnum =
DecodeFixed64(bkey.data() + bkey.size() - kNumInternalBytes);
if (anum > bnum) {
r = -1;
} else if (anum < bnum) {
@ -698,8 +705,10 @@ inline int InternalKeyComparator::CompareKeySeq(const Slice& akey,
int r = user_comparator_.Compare(ExtractUserKey(akey), ExtractUserKey(bkey));
if (r == 0) {
// Shift the number to exclude the last byte which contains the value type
const uint64_t anum = DecodeFixed64(akey.data() + akey.size() - 8) >> 8;
const uint64_t bnum = DecodeFixed64(bkey.data() + bkey.size() - 8) >> 8;
const uint64_t anum =
DecodeFixed64(akey.data() + akey.size() - kNumInternalBytes) >> 8;
const uint64_t bnum =
DecodeFixed64(bkey.data() + bkey.size() - kNumInternalBytes) >> 8;
if (anum > bnum) {
r = -1;
} else if (anum < bnum) {

@ -170,14 +170,14 @@ TEST_F(FlushJobTest, NonEmpty) {
new_mem->Add(SequenceNumber(i), kTypeValue, key, value);
if ((i + 1000) % 10000 < 9995) {
InternalKey internal_key(key, SequenceNumber(i), kTypeValue);
inserted_keys.insert({internal_key.Encode().ToString(), value});
inserted_keys.push_back({internal_key.Encode().ToString(), value});
}
}
{
new_mem->Add(SequenceNumber(10000), kTypeRangeDeletion, "9995", "9999a");
InternalKey internal_key("9995", SequenceNumber(10000), kTypeRangeDeletion);
inserted_keys.insert({internal_key.Encode().ToString(), "9999a"});
inserted_keys.push_back({internal_key.Encode().ToString(), "9999a"});
}
// Note: the first two blob references will not be considered when resolving
@ -205,9 +205,9 @@ TEST_F(FlushJobTest, NonEmpty) {
new_mem->Add(seq, kTypeBlobIndex, key, blob_index);
InternalKey internal_key(key, seq, kTypeBlobIndex);
inserted_keys.emplace_hint(inserted_keys.end(),
internal_key.Encode().ToString(), blob_index);
inserted_keys.push_back({internal_key.Encode().ToString(), blob_index});
}
mock::SortKVVector(&inserted_keys);
autovector<MemTable*> to_delete;
cfd->imm()->Add(new_mem, &to_delete);
@ -454,10 +454,11 @@ TEST_F(FlushJobTest, Snapshots) {
(snapshots_set.find(seqno) != snapshots_set.end());
if (visible) {
InternalKey internal_key(key, seqno, kTypeValue);
inserted_keys.insert({internal_key.Encode().ToString(), value});
inserted_keys.push_back({internal_key.Encode().ToString(), value});
}
}
}
mock::SortKVVector(&inserted_keys);
autovector<MemTable*> to_delete;
cfd->imm()->Add(new_mem, &to_delete);

@ -376,6 +376,7 @@ class MemTableIterator : public InternalIterator {
PERF_COUNTER_ADD(next_on_memtable_count, 1);
assert(Valid());
iter_->Next();
TEST_SYNC_POINT_CALLBACK("MemTableIterator::Next:0", iter_);
valid_ = iter_->Valid();
}
bool NextAndGetResult(IterateResult* result) override {

@ -0,0 +1,30 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
#include "db/output_validator.h"
namespace ROCKSDB_NAMESPACE {
Status OutputValidator::Add(const Slice& key, const Slice& value) {
if (enable_hash_) {
// Generate a rolling 64-bit hash of the key and values
paranoid_hash_ = Hash64(key.data(), key.size(), paranoid_hash_);
paranoid_hash_ = Hash64(value.data(), value.size(), paranoid_hash_);
}
if (enable_order_check_) {
TEST_SYNC_POINT_CALLBACK("OutputValidator::Add:order_check",
/*arg=*/nullptr);
if (key.size() < kNumInternalBytes) {
return Status::Corruption(
"Compaction tries to write a key without internal bytes.");
}
// prev_key_ starts with empty.
if (!prev_key_.empty() && icmp_.Compare(key, prev_key_) < 0) {
return Status::Corruption("Compaction sees out-of-order keys.");
}
prev_key_.assign(key.data(), key.size());
}
return Status::OK();
}
} // namespace ROCKSDB_NAMESPACE

@ -0,0 +1,45 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
#pragma once
#include "db/dbformat.h"
#include "rocksdb/status.h"
#include "test_util/sync_point.h"
#include "util/hash.h"
namespace ROCKSDB_NAMESPACE {
// A class that validates key/value that is inserted to an SST file.
// Pass every key/value of the file using OutputValidator::Add()
// and the class validates key order and optionally calculate a hash
// of all the key and value.
class OutputValidator {
public:
explicit OutputValidator(const InternalKeyComparator& icmp,
bool enable_order_check, bool enable_hash)
: icmp_(icmp),
enable_order_check_(enable_order_check),
enable_hash_(enable_hash) {}
// Add a key to the KV sequence, and return whether the key follows
// criteria, e.g. key is ordered.
Status Add(const Slice& key, const Slice& value);
// Compare result of two key orders are the same. It can be used
// to compare the keys inserted into a file, and what is read back.
// Return true if the validation passes.
bool CompareValidator(const OutputValidator& other_validator) {
return GetHash() == other_validator.GetHash();
}
private:
uint64_t GetHash() const { return paranoid_hash_; }
const InternalKeyComparator& icmp_;
std::string prev_key_;
uint64_t paranoid_hash_ = 0;
bool enable_order_check_;
bool enable_hash_;
};
} // namespace ROCKSDB_NAMESPACE

@ -642,6 +642,14 @@ struct AdvancedColumnFamilyOptions {
// Default: false
bool optimize_filters_for_hits = false;
// During flush or compaction, check whether keys inserted to output files
// are in order.
//
// Default: true
//
// Dynamically changeable through SetOptions() API
bool check_flush_compaction_key_order = true;
// After writing every SST file, reopen it and read all the keys.
// Checks the hash of all of the keys and values written versus the
// keys in the file and signals a corruption if they do not match

@ -223,6 +223,10 @@ static std::unordered_map<std::string, OptionTypeInfo>
{"filter_deletes",
{0, OptionType::kBoolean, OptionVerificationType::kDeprecated,
OptionTypeFlags::kMutable}},
{"check_flush_compaction_key_order",
{offsetof(struct MutableCFOptions, check_flush_compaction_key_order),
OptionType::kBoolean, OptionVerificationType::kNormal,
OptionTypeFlags::kMutable}},
{"paranoid_file_checks",
{offsetof(struct MutableCFOptions, paranoid_file_checks),
OptionType::kBoolean, OptionVerificationType::kNormal,
@ -974,6 +978,8 @@ void MutableCFOptions::Dump(Logger* log) const {
result.c_str());
ROCKS_LOG_INFO(log, " max_sequential_skip_in_iterations: %" PRIu64,
max_sequential_skip_in_iterations);
ROCKS_LOG_INFO(log, " check_flush_compaction_key_order: %d",
check_flush_compaction_key_order);
ROCKS_LOG_INFO(log, " paranoid_file_checks: %d",
paranoid_file_checks);
ROCKS_LOG_INFO(log, " report_bg_io_stats: %d",

@ -165,6 +165,8 @@ struct MutableCFOptions {
blob_compression_type(options.blob_compression_type),
max_sequential_skip_in_iterations(
options.max_sequential_skip_in_iterations),
check_flush_compaction_key_order(
options.check_flush_compaction_key_order),
paranoid_file_checks(options.paranoid_file_checks),
report_bg_io_stats(options.report_bg_io_stats),
compression(options.compression),
@ -205,6 +207,7 @@ struct MutableCFOptions {
blob_file_size(0),
blob_compression_type(kNoCompression),
max_sequential_skip_in_iterations(0),
check_flush_compaction_key_order(true),
paranoid_file_checks(false),
report_bg_io_stats(false),
compression(Snappy_Supported() ? kSnappyCompression : kNoCompression),
@ -267,6 +270,7 @@ struct MutableCFOptions {
// Misc options
uint64_t max_sequential_skip_in_iterations;
bool check_flush_compaction_key_order;
bool paranoid_file_checks;
bool report_bg_io_stats;
CompressionType compression;

@ -230,6 +230,8 @@ ColumnFamilyOptions BuildColumnFamilyOptions(
// Misc options
cf_opts.max_sequential_skip_in_iterations =
mutable_cf_options.max_sequential_skip_in_iterations;
cf_opts.check_flush_compaction_key_order =
mutable_cf_options.check_flush_compaction_key_order;
cf_opts.paranoid_file_checks = mutable_cf_options.paranoid_file_checks;
cf_opts.report_bg_io_stats = mutable_cf_options.report_bg_io_stats;
cf_opts.compression = mutable_cf_options.compression;

@ -479,6 +479,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
"memtable_prefix_bloom_size_ratio=0.4642;"
"memtable_whole_key_filtering=true;"
"memtable_insert_with_hint_prefix_extractor=rocksdb.CappedPrefix.13;"
"check_flush_compaction_key_order=false;"
"paranoid_file_checks=true;"
"force_consistency_checks=true;"
"inplace_update_num_locks=7429;"

@ -56,6 +56,7 @@ LIB_SOURCES = \
db/memtable_list.cc \
db/merge_helper.cc \
db/merge_operator.cc \
db/output_validator.cc \
db/range_del_aggregator.cc \
db/range_tombstone_fragmenter.cc \
db/repair.cc \

@ -16,20 +16,19 @@
namespace ROCKSDB_NAMESPACE {
namespace mock {
namespace {
const InternalKeyComparator icmp_(BytewiseComparator());
} // namespace
stl_wrappers::KVMap MakeMockFile(
std::initializer_list<std::pair<const std::string, std::string>> l) {
return stl_wrappers::KVMap(l, stl_wrappers::LessOfComparator(&icmp_));
KVVector MakeMockFile(std::initializer_list<KVPair> l) { return KVVector(l); }
void SortKVVector(KVVector* kv_vector) {
InternalKeyComparator icmp(BytewiseComparator());
std::sort(kv_vector->begin(), kv_vector->end(),
[icmp](KVPair a, KVPair b) -> bool {
return icmp.Compare(a.first, b.first) < 0;
});
}
class MockTableReader : public TableReader {
public:
explicit MockTableReader(const stl_wrappers::KVMap& table) : table_(table) {}
explicit MockTableReader(const KVVector& table) : table_(table) {}
InternalIterator* NewIterator(const ReadOptions&,
const SliceTransform* prefix_extractor,
@ -61,12 +60,12 @@ class MockTableReader : public TableReader {
~MockTableReader() {}
private:
const stl_wrappers::KVMap& table_;
const KVVector& table_;
};
class MockTableIterator : public InternalIterator {
public:
explicit MockTableIterator(const stl_wrappers::KVMap& table) : table_(table) {
explicit MockTableIterator(const KVVector& table) : table_(table) {
itr_ = table_.end();
}
@ -80,13 +79,21 @@ class MockTableIterator : public InternalIterator {
}
void Seek(const Slice& target) override {
std::string str_target(target.data(), target.size());
itr_ = table_.lower_bound(str_target);
KVPair target_pair(target.ToString(), "");
InternalKeyComparator icmp(BytewiseComparator());
itr_ = std::lower_bound(table_.begin(), table_.end(), target_pair,
[icmp](KVPair a, KVPair b) -> bool {
return icmp.Compare(a.first, b.first) < 0;
});
}
void SeekForPrev(const Slice& target) override {
std::string str_target(target.data(), target.size());
itr_ = table_.upper_bound(str_target);
KVPair target_pair(target.ToString(), "");
InternalKeyComparator icmp(BytewiseComparator());
itr_ = std::upper_bound(table_.begin(), table_.end(), target_pair,
[icmp](KVPair a, KVPair b) -> bool {
return icmp.Compare(a.first, b.first) < 0;
});
Prev();
}
@ -107,8 +114,8 @@ class MockTableIterator : public InternalIterator {
Status status() const override { return Status::OK(); }
private:
const stl_wrappers::KVMap& table_;
stl_wrappers::KVMap::const_iterator itr_;
const KVVector& table_;
KVVector::const_iterator itr_;
};
class MockTableBuilder : public TableBuilder {
@ -129,13 +136,22 @@ class MockTableBuilder : public TableBuilder {
void Add(const Slice& key, const Slice& value) override {
if (corrupt_mode_ == MockTableFactory::kCorruptValue) {
// Corrupt the value
table_.insert({key.ToString(), value.ToString() + " "});
table_.push_back({key.ToString(), value.ToString() + " "});
corrupt_mode_ = MockTableFactory::kCorruptNone;
} else if (corrupt_mode_ == MockTableFactory::kCorruptKey) {
table_.insert({key.ToString() + " ", value.ToString()});
table_.push_back({key.ToString() + " ", value.ToString()});
corrupt_mode_ = MockTableFactory::kCorruptNone;
} else if (corrupt_mode_ == MockTableFactory::kCorruptReorderKey) {
if (prev_key_.empty()) {
prev_key_ = key.ToString();
prev_value_ = value.ToString();
} else {
table_.push_back({key.ToString(), value.ToString()});
table_.push_back({prev_key_, prev_value_});
corrupt_mode_ = MockTableFactory::kCorruptNone;
}
} else {
table_.insert({key.ToString(), value.ToString()});
table_.push_back({key.ToString(), value.ToString()});
}
}
@ -170,9 +186,11 @@ class MockTableBuilder : public TableBuilder {
private:
uint32_t id_;
std::string prev_key_;
std::string prev_value_;
MockTableFileSystem* file_system_;
int corrupt_mode_;
stl_wrappers::KVMap table_;
KVVector table_;
};
InternalIterator* MockTableReader::NewIterator(
@ -238,7 +256,7 @@ TableBuilder* MockTableFactory::NewTableBuilder(
}
Status MockTableFactory::CreateMockTable(Env* env, const std::string& fname,
stl_wrappers::KVMap file_contents) {
KVVector file_contents) {
std::unique_ptr<WritableFile> file;
auto s = env->NewWritableFile(fname, &file, EnvOptions());
if (!s.ok()) {
@ -269,14 +287,12 @@ uint32_t MockTableFactory::GetIDFromFile(RandomAccessFileReader* file) const {
return DecodeFixed32(buf);
}
void MockTableFactory::AssertSingleFile(
const stl_wrappers::KVMap& file_contents) {
void MockTableFactory::AssertSingleFile(const KVVector& file_contents) {
ASSERT_EQ(file_system_.files.size(), 1U);
ASSERT_EQ(file_contents, file_system_.files.begin()->second);
}
void MockTableFactory::AssertLatestFile(
const stl_wrappers::KVMap& file_contents) {
void MockTableFactory::AssertLatestFile(const KVVector& file_contents) {
ASSERT_GE(file_system_.files.size(), 1U);
auto latest = file_system_.files.end();
--latest;

@ -27,13 +27,15 @@
namespace ROCKSDB_NAMESPACE {
namespace mock {
using KVPair = std::pair<std::string, std::string>;
using KVVector = std::vector<KVPair>;
stl_wrappers::KVMap MakeMockFile(
std::initializer_list<std::pair<const std::string, std::string>> l = {});
KVVector MakeMockFile(std::initializer_list<KVPair> l = {});
void SortKVVector(KVVector* kv_vector);
struct MockTableFileSystem {
port::Mutex mutex;
std::map<uint32_t, stl_wrappers::KVMap> files;
std::map<uint32_t, KVVector> files;
};
class MockTableFactory : public TableFactory {
@ -42,6 +44,7 @@ class MockTableFactory : public TableFactory {
kCorruptNone,
kCorruptKey,
kCorruptValue,
kCorruptReorderKey,
};
MockTableFactory();
@ -60,7 +63,7 @@ class MockTableFactory : public TableFactory {
// MockTableBuilder. file_contents has to have a format of <internal_key,
// value>. Those key-value pairs will then be inserted into the mock table.
Status CreateMockTable(Env* env, const std::string& fname,
stl_wrappers::KVMap file_contents);
KVVector file_contents);
virtual std::string GetPrintableOptions() const override {
return std::string();
@ -69,8 +72,8 @@ class MockTableFactory : public TableFactory {
void SetCorruptionMode(MockCorruptionMode mode) { corrupt_mode_ = mode; }
// This function will assert that only a single file exists and that the
// contents are equal to file_contents
void AssertSingleFile(const stl_wrappers::KVMap& file_contents);
void AssertLatestFile(const stl_wrappers::KVMap& file_contents);
void AssertSingleFile(const KVVector& file_contents);
void AssertLatestFile(const KVVector& file_contents);
private:
uint32_t GetAndWriteNextID(WritableFileWriter* file) const;

Loading…
Cancel
Save