Added the safe-to-ignore tag to version_edit (#6530)

Summary:
Each time RocksDB switches to a new MANIFEST file from old one, it calls WriteCurrentStateToManifest() which writes a 'snapshot' of the current in-memory state of versions to the beginning of the new manifest as a bunch of version edits. We can distinguish these version edits from other version edits written during normal operations with a custom, safe-to-ignore tag.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6530

Test Plan: added test to version_edit_test, pass make asan_check

Reviewed By: riversand963

Differential Revision: D20524516

Pulled By: zhichao-cao

fbshipit-source-id: f1de102f5499bfa88dae3caa2f32c7f42cf904db
main
Zhichao Cao 5 years ago committed by Facebook GitHub Bot
parent 442404558a
commit e10553f2a6
  1. 20
      db/version_edit.cc
  2. 20
      db/version_edit.h
  3. 35
      db/version_edit_test.cc
  4. 14
      db/version_set.cc

@ -59,6 +59,8 @@ enum Tag : uint32_t {
kDbId,
kBlobFileAddition,
kBlobFileGarbage,
kStateUponManifestSwitch,
kManifestSwitched,
};
enum NewFileCustomTag : uint32_t {
@ -160,6 +162,8 @@ void VersionEdit::Clear() {
column_family_name_.clear();
is_in_atomic_group_ = false;
remaining_entries_ = 0;
state_upon_manifest_switch_ = false;
manifest_switched_ = false;
}
bool VersionEdit::EncodeTo(std::string* dst) const {
@ -293,6 +297,14 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
PutVarint32Varint32(dst, kColumnFamily, column_family_);
}
if (state_upon_manifest_switch_) {
PutVarint32(dst, kStateUponManifestSwitch);
}
if (manifest_switched_) {
PutVarint32(dst, kManifestSwitched);
}
if (is_column_family_add_) {
PutVarint32(dst, kColumnFamilyAdd);
PutLengthPrefixedSlice(dst, Slice(column_family_name_));
@ -635,6 +647,14 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
is_column_family_drop_ = true;
break;
case kStateUponManifestSwitch:
state_upon_manifest_switch_ = true;
break;
case kManifestSwitched:
manifest_switched_ = true;
break;
case kInAtomicGroup:
is_in_atomic_group_ = true;
if (!GetVarint32(&input, &remaining_entries_)) {

@ -423,6 +423,19 @@ class VersionEdit {
std::string DebugString(bool hex_key = false) const;
std::string DebugJSON(int edit_num, bool hex_key = false) const;
void SetStateUponManifestSwitch(bool tag) {
state_upon_manifest_switch_ = tag;
}
bool GetStateUponManifestSwitch() const {
return state_upon_manifest_switch_;
}
void SetManifestSwitched(bool tag) {
manifest_switched_ = tag;
}
bool GetManifestSwitched() const {
return manifest_switched_;
}
private:
friend class ReactiveVersionSet;
friend class VersionSet;
@ -470,6 +483,13 @@ class VersionEdit {
bool is_in_atomic_group_ = false;
uint32_t remaining_entries_ = 0;
// To distinguish the version edit written by WriteCurrentStateToManifest
// and other regular writes. Default is false.
bool state_upon_manifest_switch_ = false;
// To indicate when WriteCurrentStateToManifest is successful. When both
// state_upon_manifest_switch and manifest_switch_finished are true, it can
// ensure the manifest switch is finished.
bool manifest_switched_ = false;
};
} // namespace ROCKSDB_NAMESPACE

@ -46,6 +46,7 @@ TEST_F(VersionEditTest, EncodeDecode) {
edit.SetLogNumber(kBig + 100);
edit.SetNextFile(kBig + 200);
edit.SetLastSequence(kBig + 1000);
edit.SetStateUponManifestSwitch(true);
TestEncodeDecode(edit);
}
@ -80,6 +81,7 @@ TEST_F(VersionEditTest, EncodeDecodeNewFile4) {
edit.SetLogNumber(kBig + 100);
edit.SetNextFile(kBig + 200);
edit.SetLastSequence(kBig + 1000);
edit.SetStateUponManifestSwitch(true);
TestEncodeDecode(edit);
std::string encoded, encoded2;
@ -103,6 +105,7 @@ TEST_F(VersionEditTest, EncodeDecodeNewFile4) {
ASSERT_EQ(kInvalidBlobFileNumber,
new_files[2].second.oldest_blob_file_number);
ASSERT_EQ(1001, new_files[3].second.oldest_blob_file_number);
ASSERT_TRUE(parsed.GetStateUponManifestSwitch());
}
TEST_F(VersionEditTest, ForwardCompatibleNewFile4) {
@ -279,6 +282,38 @@ TEST_F(VersionEditTest, DbId) {
TestEncodeDecode(edit);
}
TEST_F(VersionEditTest, ManifestSwitchTag) {
VersionEdit edit1, decode1;
edit1.SetStateUponManifestSwitch(true);
TestEncodeDecode(edit1);
std::string encoded1;
edit1.EncodeTo(&encoded1);
ASSERT_OK(decode1.DecodeFrom(encoded1));
ASSERT_TRUE(decode1.GetStateUponManifestSwitch());
ASSERT_TRUE(!decode1.GetManifestSwitched());
VersionEdit edit2, decode2;
edit2.SetManifestSwitched(true);
TestEncodeDecode(edit2);
std::string encoded2;
edit2.EncodeTo(&encoded2);
ASSERT_OK(decode2.DecodeFrom(encoded2));
ASSERT_TRUE(!decode2.GetStateUponManifestSwitch());
ASSERT_TRUE(decode2.GetManifestSwitched());
VersionEdit edit3, decode3;
edit3.SetStateUponManifestSwitch(true);
edit3.SetManifestSwitched(true);
TestEncodeDecode(edit3);
std::string encoded3;
edit3.EncodeTo(&encoded3);
ASSERT_OK(decode3.DecodeFrom(encoded3));
ASSERT_TRUE(decode3.GetStateUponManifestSwitch());
ASSERT_TRUE(decode3.GetManifestSwitched());
}
TEST_F(VersionEditTest, BlobFileAdditionAndGarbage) {
VersionEdit edit;

@ -5003,6 +5003,7 @@ Status VersionSet::WriteCurrentStateToManifest(
VersionEdit edit_for_db_id;
assert(!db_id_.empty());
edit_for_db_id.SetDBId(db_id_);
edit_for_db_id.SetStateUponManifestSwitch(true);
std::string db_id_record;
if (!edit_for_db_id.EncodeTo(&db_id_record)) {
return Status::Corruption("Unable to Encode VersionEdit:" +
@ -5028,6 +5029,7 @@ Status VersionSet::WriteCurrentStateToManifest(
edit.AddColumnFamily(cfd->GetName());
edit.SetColumnFamily(cfd->GetID());
}
edit.SetStateUponManifestSwitch(true);
edit.SetComparatorName(
cfd->internal_comparator().user_comparator()->Name());
std::string record;
@ -5045,6 +5047,7 @@ Status VersionSet::WriteCurrentStateToManifest(
// Save files
VersionEdit edit;
edit.SetColumnFamily(cfd->GetID());
edit.SetStateUponManifestSwitch(true);
for (int level = 0; level < cfd->NumberLevels(); level++) {
for (const auto& f :
@ -5072,7 +5075,16 @@ Status VersionSet::WriteCurrentStateToManifest(
}
}
}
return Status::OK();
VersionEdit end_flag;
end_flag.SetStateUponManifestSwitch(true);
end_flag.SetManifestSwitched(true);
std::string end_record;
if (!end_flag.EncodeTo(&end_record)) {
return Status::Corruption("Unable to Encode VersionEdit:" +
end_flag.DebugString(true));
}
Status s_end_record = log->AddRecord(end_record);
return s_end_record;
}
// TODO(aekmekji): in CompactionJob::GenSubcompactionBoundaries(), this

Loading…
Cancel
Save