diff --git a/HISTORY.md b/HISTORY.md index 0543e0276..ecff88201 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -3,6 +3,7 @@ ## Unreleased ### Public API Changes * CompactionFilter::Context includes information of Column Family ID +* The need-compaction hint given by TablePropertiesCollector::NeedCompact() will be persistent and recoverable after DB recovery. This introduces a breaking format change. If you use this experimental feature, including NewCompactOnDeletionCollectorFactory() in the new version, you may not be able to directly downgrade the DB back to version 4.0 or lower. ## 4.1.0 (10/8/2015) ### New Features diff --git a/db/db_test.cc b/db/db_test.cc index 4bfe4dbd2..35a0b6755 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -8306,8 +8306,8 @@ TEST_F(DBTest, TablePropertiesNeedCompactTest) { options.soft_rate_limit = 1.1; options.num_levels = 8; - std::shared_ptr collector_factory( - new CountingDeleteTabPropCollectorFactory); + std::shared_ptr collector_factory = + std::make_shared(); options.table_properties_collector_factories.resize(1); options.table_properties_collector_factories[0] = collector_factory; @@ -8366,6 +8366,61 @@ TEST_F(DBTest, TablePropertiesNeedCompactTest) { } } +TEST_F(DBTest, NeedCompactHintPersistentTest) { + Random rnd(301); + + Options options; + options.create_if_missing = true; + options.max_write_buffer_number = 8; + options.level0_file_num_compaction_trigger = 10; + options.level0_slowdown_writes_trigger = 10; + options.level0_stop_writes_trigger = 10; + options.disable_auto_compactions = true; + + std::shared_ptr collector_factory = + std::make_shared(); + options.table_properties_collector_factories.resize(1); + options.table_properties_collector_factories[0] = collector_factory; + + DestroyAndReopen(options); + + const int kMaxKey = 100; + for (int i = 0; i < kMaxKey; i++) { + ASSERT_OK(Put(Key(i), "")); + } + Flush(); + dbfull()->TEST_WaitForFlushMemTable(); + + for (int i = 1; i < kMaxKey - 1; i++) { + Delete(Key(i)); + } + Flush(); + dbfull()->TEST_WaitForFlushMemTable(); + ASSERT_EQ(NumTableFilesAtLevel(0), 2); + + // Restart the DB. Although number of files didn't reach + // options.level0_file_num_compaction_trigger, compaction should + // still be triggered because of the need-compaction hint. + options.disable_auto_compactions = false; + Reopen(options); + dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(NumTableFilesAtLevel(0), 0); + { + SetPerfLevel(kEnableCount); + perf_context.Reset(); + int c = 0; + std::unique_ptr iter(db_->NewIterator(ReadOptions())); + for (iter->Seek(Key(0)); iter->Valid(); iter->Next()) { + c++; + } + ASSERT_EQ(c, 2); + ASSERT_EQ(perf_context.internal_delete_skipped_count, 0); + // We iterate every key twice. Is it a bug? + ASSERT_LE(perf_context.internal_key_skipped_count, 2); + SetPerfLevel(kDisable); + } +} + TEST_F(DBTest, SuggestCompactRangeTest) { class CompactionFilterFactoryGetContext : public CompactionFilterFactory { public: diff --git a/db/version_edit.cc b/db/version_edit.cc index 0c9efe419..23df641af 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -12,6 +12,7 @@ #include "db/version_set.h" #include "util/coding.h" #include "util/event_logger.h" +#include "util/sync_point.h" #include "rocksdb/slice.h" namespace rocksdb { @@ -32,12 +33,22 @@ enum Tag { // these are new formats divergent from open source leveldb kNewFile2 = 100, kNewFile3 = 102, + kNewFile4 = 103, // 4th (the latest) format version of adding files kColumnFamily = 200, // specify column family for version edit kColumnFamilyAdd = 201, kColumnFamilyDrop = 202, kMaxColumnFamily = 203, }; +enum CustomTag { + kTerminate = 1, // The end of customized fields + kNeedCompaction = 2, + kPathId = 65, +}; +// If this bit for the custom tag is set, opening DB should fail if +// we don't know this field. +uint32_t kCustomTagNonSafeIgnoreMask = 1 << 6; + uint64_t PackFileNumberAndPathId(uint64_t number, uint64_t path_id) { assert(number <= kFileNumberMask); return number | (path_id * (kFileNumberMask + 1)); @@ -102,7 +113,11 @@ bool VersionEdit::EncodeTo(std::string* dst) const { if (!f.smallest.Valid() || !f.largest.Valid()) { return false; } - if (f.fd.GetPathId() == 0) { + bool has_customized_fields = false; + if (f.marked_for_compaction) { + PutVarint32(dst, kNewFile4); + has_customized_fields = true; + } else if (f.fd.GetPathId() == 0) { // Use older format to make sure user can roll back the build if they // don't config multiple DB paths. PutVarint32(dst, kNewFile2); @@ -111,7 +126,8 @@ bool VersionEdit::EncodeTo(std::string* dst) const { } PutVarint32(dst, new_files_[i].first); // level PutVarint64(dst, f.fd.GetNumber()); - if (f.fd.GetPathId() != 0) { + if (f.fd.GetPathId() != 0 && !has_customized_fields) { + // kNewFile3 PutVarint32(dst, f.fd.GetPathId()); } PutVarint64(dst, f.fd.GetFileSize()); @@ -119,6 +135,48 @@ bool VersionEdit::EncodeTo(std::string* dst) const { PutLengthPrefixedSlice(dst, f.largest.Encode()); PutVarint64(dst, f.smallest_seqno); PutVarint64(dst, f.largest_seqno); + if (has_customized_fields) { + // Customized fields' format: + // +-----------------------------+ + // | 1st field's tag (varint32) | + // +-----------------------------+ + // | 1st field's size (varint32) | + // +-----------------------------+ + // | bytes for 1st field | + // | (based on size decoded) | + // +-----------------------------+ + // | | + // | ...... | + // | | + // +-----------------------------+ + // | last field's size (varint32)| + // +-----------------------------+ + // | bytes for last field | + // | (based on size decoded) | + // +-----------------------------+ + // | terminating tag (varint32) | + // +-----------------------------+ + // + // Customized encoding for fields: + // tag kPathId: 1 byte as path_id + // tag kNeedCompaction: + // now only can take one char value 1 indicating need-compaction + // + if (f.fd.GetPathId() != 0) { + PutVarint32(dst, CustomTag::kPathId); + char p = static_cast(f.fd.GetPathId()); + PutLengthPrefixedSlice(dst, Slice(&p, 1)); + } + if (f.marked_for_compaction) { + PutVarint32(dst, CustomTag::kNeedCompaction); + char p = static_cast(1); + PutLengthPrefixedSlice(dst, Slice(&p, 1)); + } + TEST_SYNC_POINT_CALLBACK("VersionEdit::EncodeTo:NewFile4:CustomizeFields", + dst); + + PutVarint32(dst, CustomTag::kTerminate); + } } // 0 is default and does not need to be explicitly written @@ -161,6 +219,63 @@ bool VersionEdit::GetLevel(Slice* input, int* level, const char** msg) { } } +const char* VersionEdit::DecodeNewFile4From(Slice* input) { + const char* msg = nullptr; + int level; + FileMetaData f; + uint64_t number; + uint32_t path_id = 0; + uint64_t file_size; + if (GetLevel(input, &level, &msg) && GetVarint64(input, &number) && + GetVarint64(input, &file_size) && GetInternalKey(input, &f.smallest) && + GetInternalKey(input, &f.largest) && + GetVarint64(input, &f.smallest_seqno) && + GetVarint64(input, &f.largest_seqno)) { + // See comments in VersionEdit::EncodeTo() for format of customized fields + while (true) { + uint32_t custom_tag; + Slice field; + if (!GetVarint32(input, &custom_tag)) { + return "new-file4 custom field"; + } + if (custom_tag == kTerminate) { + break; + } + if (!GetLengthPrefixedSlice(input, &field)) { + return "new-file4 custom field lenth prefixed slice error"; + } + switch (custom_tag) { + case kPathId: + if (field.size() != 1) { + return "path_id field wrong size"; + } + path_id = field[0]; + if (path_id > 3) { + return "path_id wrong vaue"; + } + break; + case kNeedCompaction: + if (field.size() != 1) { + return "need_compaction field wrong size"; + } + f.marked_for_compaction = (field[0] == 1); + break; + default: + if ((custom_tag & kCustomTagNonSafeIgnoreMask) != 0) { + // Should not proceed if cannot understand it + return "new-file4 custom field not supported"; + } + break; + } + } + } else { + return "new-file4 entry"; + } + f.fd = FileDescriptor(number, path_id, file_size); + new_files_.push_back(std::make_pair(level, f)); + return nullptr; +} + Status VersionEdit::DecodeFrom(const Slice& src) { Clear(); Slice input = src; @@ -304,6 +419,11 @@ Status VersionEdit::DecodeFrom(const Slice& src) { break; } + case kNewFile4: { + msg = DecodeNewFile4From(&input); + break; + } + case kColumnFamily: if (!GetVarint32(&input, &column_family_)) { if (!msg) { diff --git a/db/version_edit.h b/db/version_edit.h index 5c558409a..412e7257c 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -237,6 +237,8 @@ class VersionEdit { bool EncodeTo(std::string* dst) const; Status DecodeFrom(const Slice& src); + const char* DecodeNewFile4From(Slice* input); + typedef std::set> DeletedFileSet; const DeletedFileSet& GetDeletedFiles() { return deleted_files_; } diff --git a/db/version_edit_test.cc b/db/version_edit_test.cc index 4186e08e6..629f904b0 100644 --- a/db/version_edit_test.cc +++ b/db/version_edit_test.cc @@ -8,6 +8,7 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "db/version_edit.h" +#include "util/sync_point.h" #include "util/testharness.h" namespace rocksdb { @@ -45,6 +46,121 @@ TEST_F(VersionEditTest, EncodeDecode) { TestEncodeDecode(edit); } +TEST_F(VersionEditTest, EncodeDecodeNewFile4) { + static const uint64_t kBig = 1ull << 50; + + VersionEdit edit; + edit.AddFile(3, 300, 3, 100, InternalKey("foo", kBig + 500, kTypeValue), + InternalKey("zoo", kBig + 600, kTypeDeletion), kBig + 500, + kBig + 600, true); + edit.AddFile(4, 301, 3, 100, InternalKey("foo", kBig + 501, kTypeValue), + InternalKey("zoo", kBig + 601, kTypeDeletion), kBig + 501, + kBig + 601, false); + edit.AddFile(5, 302, 0, 100, InternalKey("foo", kBig + 502, kTypeValue), + InternalKey("zoo", kBig + 602, kTypeDeletion), kBig + 502, + kBig + 602, true); + + edit.DeleteFile(4, 700); + + edit.SetComparatorName("foo"); + edit.SetLogNumber(kBig + 100); + edit.SetNextFile(kBig + 200); + edit.SetLastSequence(kBig + 1000); + TestEncodeDecode(edit); + + std::string encoded, encoded2; + edit.EncodeTo(&encoded); + VersionEdit parsed; + Status s = parsed.DecodeFrom(encoded); + ASSERT_TRUE(s.ok()) << s.ToString(); + auto& new_files = parsed.GetNewFiles(); + ASSERT_TRUE(new_files[0].second.marked_for_compaction); + ASSERT_TRUE(!new_files[1].second.marked_for_compaction); + ASSERT_TRUE(new_files[2].second.marked_for_compaction); + ASSERT_EQ(3, new_files[0].second.fd.GetPathId()); + ASSERT_EQ(3, new_files[1].second.fd.GetPathId()); + ASSERT_EQ(0, new_files[2].second.fd.GetPathId()); +} + +TEST_F(VersionEditTest, ForwardCompatibleNewFile4) { + static const uint64_t kBig = 1ull << 50; + VersionEdit edit; + edit.AddFile(3, 300, 3, 100, InternalKey("foo", kBig + 500, kTypeValue), + InternalKey("zoo", kBig + 600, kTypeDeletion), kBig + 500, + kBig + 600, true); + edit.AddFile(4, 301, 3, 100, InternalKey("foo", kBig + 501, kTypeValue), + InternalKey("zoo", kBig + 601, kTypeDeletion), kBig + 501, + kBig + 601, false); + edit.DeleteFile(4, 700); + + edit.SetComparatorName("foo"); + edit.SetLogNumber(kBig + 100); + edit.SetNextFile(kBig + 200); + edit.SetLastSequence(kBig + 1000); + + std::string encoded; + + // Call back function to add extra customized builds. + bool first = true; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "VersionEdit::EncodeTo:NewFile4:CustomizeFields", [&](void* arg) { + std::string* str = reinterpret_cast(arg); + PutVarint32(str, 33); + const std::string str1 = "random_string"; + PutLengthPrefixedSlice(str, str1); + if (first) { + first = false; + PutVarint32(str, 22); + const std::string str2 = "s"; + PutLengthPrefixedSlice(str, str2); + } + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + edit.EncodeTo(&encoded); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + + VersionEdit parsed; + Status s = parsed.DecodeFrom(encoded); + ASSERT_TRUE(s.ok()) << s.ToString(); + ASSERT_TRUE(!first); + auto& new_files = parsed.GetNewFiles(); + ASSERT_TRUE(new_files[0].second.marked_for_compaction); + ASSERT_TRUE(!new_files[1].second.marked_for_compaction); + ASSERT_EQ(3, new_files[0].second.fd.GetPathId()); + ASSERT_EQ(3, new_files[1].second.fd.GetPathId()); + ASSERT_EQ(1u, parsed.GetDeletedFiles().size()); +} + +TEST_F(VersionEditTest, NewFile4NotSupportedField) { + static const uint64_t kBig = 1ull << 50; + VersionEdit edit; + edit.AddFile(3, 300, 3, 100, InternalKey("foo", kBig + 500, kTypeValue), + InternalKey("zoo", kBig + 600, kTypeDeletion), kBig + 500, + kBig + 600, true); + + edit.SetComparatorName("foo"); + edit.SetLogNumber(kBig + 100); + edit.SetNextFile(kBig + 200); + edit.SetLastSequence(kBig + 1000); + + std::string encoded; + + // Call back function to add extra customized builds. + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "VersionEdit::EncodeTo:NewFile4:CustomizeFields", [&](void* arg) { + std::string* str = reinterpret_cast(arg); + const std::string str1 = "s"; + PutLengthPrefixedSlice(str, str1); + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + edit.EncodeTo(&encoded); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + + VersionEdit parsed; + Status s = parsed.DecodeFrom(encoded); + ASSERT_NOK(s); +} + TEST_F(VersionEditTest, EncodeEmptyFile) { VersionEdit edit; edit.AddFile(0, 0, 0, 0, InternalKey(), InternalKey(), 0, 0, false);