From f799c8be5fee3bf98b4558cc588ae1072d41bc34 Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Sun, 29 Jun 2014 12:35:47 -0700 Subject: [PATCH 01/13] [Java] Correct the library loading for zlib in RocksJava. Summary: Correct the library loading for zlib in RocksJava: zlib should be loaded by loadLibrary("z") instead of loadLibrary("zlib"). Test Plan: make rocksdbjava cd java make db_bench ./jdb_bench.sh --compression_type=zlib Reviewers: sdong, ljin, ankgup87 Reviewed By: ankgup87 Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D19341 --- java/org/rocksdb/RocksDB.java | 2 +- java/org/rocksdb/benchmark/DbBenchmark.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/java/org/rocksdb/RocksDB.java b/java/org/rocksdb/RocksDB.java index cec73ed49..a55266e25 100644 --- a/java/org/rocksdb/RocksDB.java +++ b/java/org/rocksdb/RocksDB.java @@ -21,7 +21,7 @@ import org.rocksdb.util.Environment; public class RocksDB extends RocksObject { public static final int NOT_FOUND = -1; private static final String[] compressionLibs_ = { - "snappy", "zlib", "bzip2", "lz4", "lz4hc"}; + "snappy", "z", "bzip2", "lz4", "lz4hc"}; /** * Loads the necessary library files. diff --git a/java/org/rocksdb/benchmark/DbBenchmark.java b/java/org/rocksdb/benchmark/DbBenchmark.java index 5fcbed7e2..d0b2eab21 100644 --- a/java/org/rocksdb/benchmark/DbBenchmark.java +++ b/java/org/rocksdb/benchmark/DbBenchmark.java @@ -463,7 +463,7 @@ public class DbBenchmark { if (compressionType_.equals("snappy")) { System.loadLibrary("snappy"); } else if (compressionType_.equals("zlib")) { - System.loadLibrary("zlib"); + System.loadLibrary("z"); } else if (compressionType_.equals("bzip2")) { System.loadLibrary("bzip2"); } else if (compressionType_.equals("lz4")) { From 56563674166e27ad0e1cc53f5e6c21ad9707ac7f Mon Sep 17 00:00:00 2001 From: Feng Zhu Date: Mon, 30 Jun 2014 15:54:31 -0700 Subject: [PATCH 02/13] use arena to allocate memtable's bloomfilter and hashskiplist's buckets_ Summary: Bloomfilter and hashskiplist's buckets_ allocated by memtable's arena DynamicBloom: pass arena via constructor, allocate space in SetTotalBits HashSkipListRep: allocate space of buckets_ using arena. do not delete it in deconstructor because arena would take care of it. Several test files are changed. Test Plan: make all check Reviewers: ljin, haobo, yhchiang, sdong Reviewed By: sdong Subscribers: igor, dhruba Differential Revision: https://reviews.facebook.net/D19335 --- db/c_test.c | 2 +- db/db_test.cc | 7 ++++--- db/memtable.cc | 1 + table/plain_table_reader.cc | 4 ++-- util/dynamic_bloom.cc | 16 ++++++++++------ util/dynamic_bloom.h | 8 ++++---- util/dynamic_bloom_test.cc | 18 +++++++++++------- util/hash_skiplist_rep.cc | 7 ++++--- 8 files changed, 37 insertions(+), 26 deletions(-) diff --git a/db/c_test.c b/db/c_test.c index 89380a08b..5220cd8a3 100644 --- a/db/c_test.c +++ b/db/c_test.c @@ -495,7 +495,7 @@ int main(int argc, char** argv) { rocksdb_filterpolicy_t* policy = rocksdb_filterpolicy_create_bloom(10); rocksdb_options_set_filter_policy(options, policy); rocksdb_options_set_prefix_extractor(options, rocksdb_slicetransform_create_fixed_prefix(3)); - rocksdb_options_set_hash_skip_list_rep(options, 50000, 4, 4); + rocksdb_options_set_hash_skip_list_rep(options, 5000, 4, 4); rocksdb_options_set_plain_table_factory(options, 4, 10, 0.75, 16); db = rocksdb_open(options, dbname, &err); diff --git a/db/db_test.cc b/db/db_test.cc index 6344722ed..701c72e11 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -436,7 +436,8 @@ class DBTest { switch (option_config_) { case kHashSkipList: options.prefix_extractor.reset(NewFixedPrefixTransform(1)); - options.memtable_factory.reset(NewHashSkipListRepFactory()); + options.memtable_factory.reset( + NewHashSkipListRepFactory(16)); break; case kPlainTableFirstBytePrefix: options.table_factory.reset(new PlainTableFactory()); @@ -6691,7 +6692,7 @@ TEST(DBTest, PrefixScan) { options.disable_auto_compactions = true; options.max_background_compactions = 2; options.create_if_missing = true; - options.memtable_factory.reset(NewHashSkipListRepFactory()); + options.memtable_factory.reset(NewHashSkipListRepFactory(16)); // 11 RAND I/Os DestroyAndReopen(&options); @@ -6848,7 +6849,7 @@ TEST(DBTest, TailingIteratorPrefixSeek) { options.create_if_missing = true; options.disable_auto_compactions = true; options.prefix_extractor.reset(NewFixedPrefixTransform(2)); - options.memtable_factory.reset(NewHashSkipListRepFactory()); + options.memtable_factory.reset(NewHashSkipListRepFactory(16)); DestroyAndReopen(&options); CreateAndReopenWithCF({"pikachu"}, &options); diff --git a/db/memtable.cc b/db/memtable.cc index f6d322d83..6023edde9 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -55,6 +55,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp, const Options& options) assert(!should_flush_); if (prefix_extractor_ && options.memtable_prefix_bloom_bits > 0) { prefix_bloom_.reset(new DynamicBloom( + &arena_, options.memtable_prefix_bloom_bits, options.bloom_locality, options.memtable_prefix_bloom_probes, nullptr, options.memtable_prefix_bloom_huge_page_tlb_size, diff --git a/table/plain_table_reader.cc b/table/plain_table_reader.cc index 469a61cf4..20cb87538 100644 --- a/table/plain_table_reader.cc +++ b/table/plain_table_reader.cc @@ -333,7 +333,7 @@ void PlainTableReader::AllocateIndexAndBloom(int num_prefixes, uint32_t bloom_total_bits = num_prefixes * bloom_bits_per_key; if (bloom_total_bits > 0) { enable_bloom_ = true; - bloom_.SetTotalBits(bloom_total_bits, options_.bloom_locality, + bloom_.SetTotalBits(&arena_, bloom_total_bits, options_.bloom_locality, huge_page_tlb_size, options_.info_log.get()); } } @@ -465,7 +465,7 @@ Status PlainTableReader::PopulateIndex(TableProperties* props, table_properties_->num_entries * bloom_bits_per_key; if (num_bloom_bits > 0) { enable_bloom_ = true; - bloom_.SetTotalBits(num_bloom_bits, options_.bloom_locality, + bloom_.SetTotalBits(&arena_, num_bloom_bits, options_.bloom_locality, huge_page_tlb_size, options_.info_log.get()); } } diff --git a/util/dynamic_bloom.cc b/util/dynamic_bloom.cc index b90f199ae..cbe895ace 100644 --- a/util/dynamic_bloom.cc +++ b/util/dynamic_bloom.cc @@ -32,12 +32,13 @@ uint32_t GetTotalBitsForLocality(uint32_t total_bits) { } } -DynamicBloom::DynamicBloom(uint32_t total_bits, uint32_t locality, +DynamicBloom::DynamicBloom(Arena* arena, uint32_t total_bits, uint32_t locality, uint32_t num_probes, uint32_t (*hash_func)(const Slice& key), - size_t huge_page_tlb_size, Logger* logger) + size_t huge_page_tlb_size, + Logger* logger) : DynamicBloom(num_probes, hash_func) { - SetTotalBits(total_bits, locality, huge_page_tlb_size, logger); + SetTotalBits(arena, total_bits, locality, huge_page_tlb_size, logger); } DynamicBloom::DynamicBloom(uint32_t num_probes, @@ -47,8 +48,10 @@ DynamicBloom::DynamicBloom(uint32_t num_probes, kNumProbes(num_probes), hash_func_(hash_func == nullptr ? &BloomHash : hash_func) {} -void DynamicBloom::SetTotalBits(uint32_t total_bits, uint32_t locality, - size_t huge_page_tlb_size, Logger* logger) { +void DynamicBloom::SetTotalBits(Arena* arena, + uint32_t total_bits, uint32_t locality, + size_t huge_page_tlb_size, + Logger* logger) { kTotalBits = (locality > 0) ? GetTotalBitsForLocality(total_bits) : (total_bits + 7) / 8 * 8; kNumBlocks = (locality > 0) ? (kTotalBits / (CACHE_LINE_SIZE * 8)) : 0; @@ -60,8 +63,9 @@ void DynamicBloom::SetTotalBits(uint32_t total_bits, uint32_t locality, if (kNumBlocks > 0) { sz += CACHE_LINE_SIZE - 1; } + assert(arena); raw_ = reinterpret_cast( - arena_.AllocateAligned(sz, huge_page_tlb_size, logger)); + arena->AllocateAligned(sz, huge_page_tlb_size, logger)); memset(raw_, 0, sz); if (kNumBlocks > 0 && (reinterpret_cast(raw_) % CACHE_LINE_SIZE)) { data_ = raw_ + CACHE_LINE_SIZE - diff --git a/util/dynamic_bloom.h b/util/dynamic_bloom.h index 4c4f7e1f9..ba0016ddb 100644 --- a/util/dynamic_bloom.h +++ b/util/dynamic_bloom.h @@ -18,6 +18,7 @@ class Logger; class DynamicBloom { public: + // arena: pass arena to bloom filter, hence trace the usage of memory // total_bits: fixed total bits for the bloom // num_probes: number of hash probes for a single key // locality: If positive, optimize for cache line locality, 0 otherwise. @@ -27,7 +28,8 @@ class DynamicBloom { // it to be allocated, like: // sysctl -w vm.nr_hugepages=20 // See linux doc Documentation/vm/hugetlbpage.txt - explicit DynamicBloom(uint32_t total_bits, uint32_t locality = 0, + explicit DynamicBloom(Arena* arena, + uint32_t total_bits, uint32_t locality = 0, uint32_t num_probes = 6, uint32_t (*hash_func)(const Slice& key) = nullptr, size_t huge_page_tlb_size = 0, @@ -36,7 +38,7 @@ class DynamicBloom { explicit DynamicBloom(uint32_t num_probes = 6, uint32_t (*hash_func)(const Slice& key) = nullptr); - void SetTotalBits(uint32_t total_bits, uint32_t locality, + void SetTotalBits(Arena* arena, uint32_t total_bits, uint32_t locality, size_t huge_page_tlb_size, Logger* logger); ~DynamicBloom() {} @@ -63,8 +65,6 @@ class DynamicBloom { uint32_t (*hash_func_)(const Slice& key); unsigned char* data_; unsigned char* raw_; - - Arena arena_; }; inline void DynamicBloom::Add(const Slice& key) { AddHash(hash_func_(key)); } diff --git a/util/dynamic_bloom_test.cc b/util/dynamic_bloom_test.cc index d345addba..3e55488f2 100644 --- a/util/dynamic_bloom_test.cc +++ b/util/dynamic_bloom_test.cc @@ -40,17 +40,19 @@ class DynamicBloomTest { }; TEST(DynamicBloomTest, EmptyFilter) { - DynamicBloom bloom1(100, 0, 2); + Arena arena; + DynamicBloom bloom1(&arena, 100, 0, 2); ASSERT_TRUE(!bloom1.MayContain("hello")); ASSERT_TRUE(!bloom1.MayContain("world")); - DynamicBloom bloom2(CACHE_LINE_SIZE * 8 * 2 - 1, 1, 2); + DynamicBloom bloom2(&arena, CACHE_LINE_SIZE * 8 * 2 - 1, 1, 2); ASSERT_TRUE(!bloom2.MayContain("hello")); ASSERT_TRUE(!bloom2.MayContain("world")); } TEST(DynamicBloomTest, Small) { - DynamicBloom bloom1(100, 0, 2); + Arena arena; + DynamicBloom bloom1(&arena, 100, 0, 2); bloom1.Add("hello"); bloom1.Add("world"); ASSERT_TRUE(bloom1.MayContain("hello")); @@ -58,7 +60,7 @@ TEST(DynamicBloomTest, Small) { ASSERT_TRUE(!bloom1.MayContain("x")); ASSERT_TRUE(!bloom1.MayContain("foo")); - DynamicBloom bloom2(CACHE_LINE_SIZE * 8 * 2 - 1, 1, 2); + DynamicBloom bloom2(&arena, CACHE_LINE_SIZE * 8 * 2 - 1, 1, 2); bloom2.Add("hello"); bloom2.Add("world"); ASSERT_TRUE(bloom2.MayContain("hello")); @@ -94,13 +96,14 @@ TEST(DynamicBloomTest, VaryingLengths) { for (uint32_t enable_locality = 0; enable_locality < 2; ++enable_locality) { for (uint32_t num = 1; num <= 10000; num = NextNum(num)) { uint32_t bloom_bits = 0; + Arena arena; if (enable_locality == 0) { bloom_bits = std::max(num * FLAGS_bits_per_key, 64U); } else { bloom_bits = std::max(num * FLAGS_bits_per_key, enable_locality * CACHE_LINE_SIZE * 8); } - DynamicBloom bloom(bloom_bits, enable_locality, num_probes); + DynamicBloom bloom(&arena, bloom_bits, enable_locality, num_probes); for (uint64_t i = 0; i < num; i++) { bloom.Add(Key(i, buffer)); ASSERT_TRUE(bloom.MayContain(Key(i, buffer))); @@ -148,10 +151,11 @@ TEST(DynamicBloomTest, perf) { } for (uint64_t m = 1; m <= 8; ++m) { + Arena arena; const uint64_t num_keys = m * 8 * 1024 * 1024; fprintf(stderr, "testing %" PRIu64 "M keys\n", m * 8); - DynamicBloom std_bloom(num_keys * 10, 0, num_probes); + DynamicBloom std_bloom(&arena, num_keys * 10, 0, num_probes); timer.Start(); for (uint64_t i = 1; i <= num_keys; ++i) { @@ -175,7 +179,7 @@ TEST(DynamicBloomTest, perf) { ASSERT_TRUE(count == num_keys); // Locality enabled version - DynamicBloom blocked_bloom(num_keys * 10, 1, num_probes); + DynamicBloom blocked_bloom(&arena, num_keys * 10, 1, num_probes); timer.Start(); for (uint64_t i = 1; i <= num_keys; ++i) { diff --git a/util/hash_skiplist_rep.cc b/util/hash_skiplist_rep.cc index 85d4e3356..1c7a459bd 100644 --- a/util/hash_skiplist_rep.cc +++ b/util/hash_skiplist_rep.cc @@ -229,7 +229,9 @@ HashSkipListRep::HashSkipListRep(const MemTableRep::KeyComparator& compare, transform_(transform), compare_(compare), arena_(arena) { - buckets_ = new port::AtomicPointer[bucket_size]; + auto mem = + arena->AllocateAligned(sizeof(port::AtomicPointer) * bucket_size); + buckets_ = new (mem) port::AtomicPointer[bucket_size]; for (size_t i = 0; i < bucket_size_; ++i) { buckets_[i].NoBarrier_Store(nullptr); @@ -237,7 +239,6 @@ HashSkipListRep::HashSkipListRep(const MemTableRep::KeyComparator& compare, } HashSkipListRep::~HashSkipListRep() { - delete[] buckets_; } HashSkipListRep::Bucket* HashSkipListRep::GetInitializedBucket( @@ -271,7 +272,7 @@ bool HashSkipListRep::Contains(const char* key) const { } size_t HashSkipListRep::ApproximateMemoryUsage() { - return sizeof(buckets_); + return 0; } void HashSkipListRep::Get(const LookupKey& k, void* callback_args, From a2e0d890ed6da025c0d34ab46b63389f4e6b3f2d Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Tue, 1 Jul 2014 08:55:04 +0200 Subject: [PATCH 03/13] No need for files_by_size_ in universal compaction Summary: files_by_size_ is sorted by time in case of universal compaction. However, Version::files_ is also sorted by time. So no need for files_by_size_ Test Plan: 1) make check with the change 2) make check with `assert(last_index == c->input_version_->files_[level].size() - 1);` in compaction picker Reviewers: dhruba, haobo, yhchiang, sdong, ljin Reviewed By: ljin Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D19125 --- db/compaction_picker.cc | 79 ++++++++++++++++------------------------- db/version_set.cc | 41 +++++---------------- db/version_set.h | 2 +- 3 files changed, 41 insertions(+), 81 deletions(-) diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index f5551f774..92bd81d3f 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -585,15 +585,9 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version, newerfile = f; } - // The files are sorted from newest first to oldest last. - std::vector& file_by_time = c->input_version_->files_by_size_[level]; - // Is the earliest file part of this compaction? - int last_index = file_by_time[file_by_time.size()-1]; - FileMetaData* last_file = c->input_version_->files_[level][last_index]; - if (c->inputs_[0][c->inputs_[0].size()-1] == last_file) { - c->bottommost_level_ = true; - } + FileMetaData* last_file = c->input_version_->files_[level].back(); + c->bottommost_level_ = c->inputs_[0].back() == last_file; // update statistics MeasureTime(options_->statistics.get(), NUM_FILES_IN_SINGLE_COMPACTION, @@ -628,12 +622,12 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( options_->compaction_options_universal.max_merge_width; // The files are sorted from newest first to oldest last. - std::vector& file_by_time = version->files_by_size_[level]; + const auto& files = version->files_[level]; + FileMetaData* f = nullptr; bool done = false; int start_index = 0; unsigned int candidate_count = 0; - assert(file_by_time.size() == version->files_[level].size()); unsigned int max_files_to_compact = std::min(max_merge_width, max_number_of_files_to_compact); @@ -641,14 +635,13 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( // Considers a candidate file only if it is smaller than the // total size accumulated so far. - for (unsigned int loop = 0; loop < file_by_time.size(); loop++) { + for (unsigned int loop = 0; loop < files.size(); loop++) { candidate_count = 0; // Skip files that are already being compacted - for (f = nullptr; loop < file_by_time.size(); loop++) { - int index = file_by_time[loop]; - f = version->files_[level][index]; + for (f = nullptr; loop < files.size(); loop++) { + f = files[loop]; if (!f->being_compacted) { candidate_count = 1; @@ -670,11 +663,9 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( } // Check if the suceeding files need compaction. - for (unsigned int i = loop+1; - candidate_count < max_files_to_compact && i < file_by_time.size(); - i++) { - int index = file_by_time[i]; - FileMetaData* f = version->files_[level][index]; + for (unsigned int i = loop + 1; + candidate_count < max_files_to_compact && i < files.size(); i++) { + FileMetaData* f = files[i]; if (f->being_compacted) { break; } @@ -713,14 +704,14 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( break; } else { for (unsigned int i = loop; - i < loop + candidate_count && i < file_by_time.size(); i++) { - int index = file_by_time[i]; - FileMetaData* f = version->files_[level][index]; - LogToBuffer(log_buffer, - "[%s] Universal: Skipping file %" PRIu64 "[%d] " - "with size %" PRIu64 " (compensated size %" PRIu64 ") %d\n", - version->cfd_->GetName().c_str(), f->fd.GetNumber(), - i, f->fd.GetFileSize(), f->compensated_file_size, f->being_compacted); + i < loop + candidate_count && i < files.size(); i++) { + FileMetaData* f = files[i]; + LogToBuffer(log_buffer, "[%s] Universal: Skipping file %" PRIu64 + "[%d] with size %" PRIu64 + " (compensated size %" PRIu64 ") %d\n", + version->cfd_->GetName().c_str(), f->fd.GetNumber(), i, + f->fd.GetFileSize(), f->compensated_file_size, + f->being_compacted); } } } @@ -736,10 +727,9 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( if (ratio_to_compress >= 0) { uint64_t total_size = version->NumLevelBytes(level); uint64_t older_file_size = 0; - for (unsigned int i = file_by_time.size() - 1; i >= first_index_after; - i--) { - older_file_size += - version->files_[level][file_by_time[i]]->fd.GetFileSize(); + for (unsigned int i = files.size() - 1; + i >= first_index_after; i--) { + older_file_size += files[i]->fd.GetFileSize(); if (older_file_size * 100L >= total_size * (long) ratio_to_compress) { enable_compression = false; break; @@ -752,8 +742,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( c->score_ = score; for (unsigned int i = start_index; i < first_index_after; i++) { - int index = file_by_time[i]; - FileMetaData* f = c->input_version_->files_[level][index]; + FileMetaData* f = c->input_version_->files_[level][i]; c->inputs_[0].push_back(f); LogToBuffer(log_buffer, "[%s] Universal: Picking file %" PRIu64 "[%d] " @@ -780,8 +769,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( max_size_amplification_percent; // The files are sorted from newest first to oldest last. - std::vector& file_by_time = version->files_by_size_[level]; - assert(file_by_time.size() == version->files_[level].size()); + const auto& files = version->files_[level]; unsigned int candidate_count = 0; uint64_t candidate_size = 0; @@ -789,9 +777,8 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( FileMetaData* f = nullptr; // Skip files that are already being compacted - for (unsigned int loop = 0; loop < file_by_time.size() - 1; loop++) { - int index = file_by_time[loop]; - f = version->files_[level][index]; + for (unsigned int loop = 0; loop < files.size() - 1; loop++) { + f = files[loop]; if (!f->being_compacted) { start_index = loop; // Consider this as the first candidate. break; @@ -812,10 +799,8 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( " to reduce size amp.\n"); // keep adding up all the remaining files - for (unsigned int loop = start_index; loop < file_by_time.size() - 1; - loop++) { - int index = file_by_time[loop]; - f = version->files_[level][index]; + for (unsigned int loop = start_index; loop < files.size() - 1; loop++) { + f = files[loop]; if (f->being_compacted) { LogToBuffer( log_buffer, @@ -832,8 +817,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( } // size of earliest file - int index = file_by_time[file_by_time.size() - 1]; - uint64_t earliest_file_size = version->files_[level][index]->fd.GetFileSize(); + uint64_t earliest_file_size = files.back()->fd.GetFileSize(); // size amplification = percentage of additional size if (candidate_size * 100 < ratio * earliest_file_size) { @@ -850,7 +834,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( "earliest-file-size %" PRIu64, version->cfd_->GetName().c_str(), candidate_size, earliest_file_size); } - assert(start_index >= 0 && start_index < file_by_time.size() - 1); + assert(start_index >= 0 && start_index < files.size() - 1); // create a compaction request // We always compact all the files, so always compress. @@ -858,9 +842,8 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( new Compaction(version, level, level, MaxFileSizeForLevel(level), LLONG_MAX, false, true); c->score_ = score; - for (unsigned int loop = start_index; loop < file_by_time.size(); loop++) { - int index = file_by_time[loop]; - f = c->input_version_->files_[level][index]; + for (unsigned int loop = start_index; loop < files.size(); loop++) { + f = c->input_version_->files_[level][loop]; c->inputs_[0].push_back(f); LogToBuffer(log_buffer, "[%s] Universal: size amp picking file %" PRIu64 "[%d] " diff --git a/db/version_set.cc b/db/version_set.cc index 29611f0a0..c54f0b591 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -861,7 +861,6 @@ void Version::ComputeCompactionScore( } namespace { - // Compator that is used to sort files based on their size // In normal mode: descending size bool CompareCompensatedSizeDescending(const Version::Fsize& first, @@ -869,18 +868,6 @@ bool CompareCompensatedSizeDescending(const Version::Fsize& first, return (first.file->compensated_file_size > second.file->compensated_file_size); } -// A static compator used to sort files based on their seqno -// In universal style : descending seqno -bool CompareSeqnoDescending(const Version::Fsize& first, - const Version::Fsize& second) { - if (first.file->smallest_seqno > second.file->smallest_seqno) { - assert(first.file->largest_seqno > second.file->largest_seqno); - return true; - } - assert(first.file->largest_seqno <= second.file->largest_seqno); - return false; -} - } // anonymous namespace void Version::UpdateNumNonEmptyLevels() { @@ -895,19 +882,15 @@ void Version::UpdateNumNonEmptyLevels() { } void Version::UpdateFilesBySize() { - if (cfd_->options()->compaction_style == kCompactionStyleFIFO) { + if (cfd_->options()->compaction_style == kCompactionStyleFIFO || + cfd_->options()->compaction_style == kCompactionStyleUniversal) { // don't need this return; } // No need to sort the highest level because it is never compacted. - int max_level = - (cfd_->options()->compaction_style == kCompactionStyleUniversal) - ? NumberLevels() - : NumberLevels() - 1; - - for (int level = 0; level < max_level; level++) { + for (int level = 0; level < NumberLevels() - 1; level++) { const std::vector& files = files_[level]; - std::vector& files_by_size = files_by_size_[level]; + auto& files_by_size = files_by_size_[level]; assert(files_by_size.size() == 0); // populate a temp vector for sorting based on size @@ -918,18 +901,12 @@ void Version::UpdateFilesBySize() { } // sort the top number_of_files_to_sort_ based on file size - if (cfd_->options()->compaction_style == kCompactionStyleUniversal) { - int num = temp.size(); - std::partial_sort(temp.begin(), temp.begin() + num, temp.end(), - CompareSeqnoDescending); - } else { - int num = Version::number_of_files_to_sort_; - if (num > (int)temp.size()) { - num = temp.size(); - } - std::partial_sort(temp.begin(), temp.begin() + num, temp.end(), - CompareCompensatedSizeDescending); + size_t num = Version::number_of_files_to_sort_; + if (num > temp.size()) { + num = temp.size(); } + std::partial_sort(temp.begin(), temp.begin() + num, temp.end(), + CompareCompensatedSizeDescending); assert(temp.size() == files.size()); // initialize files_by_size_ diff --git a/db/version_set.h b/db/version_set.h index 542db7466..04f52a508 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -294,7 +294,7 @@ class Version { // that on a running system, we need to look at only the first // few largest files because a new version is created every few // seconds/minutes (because of concurrent compactions). - static const int number_of_files_to_sort_ = 50; + static const size_t number_of_files_to_sort_ = 50; // Level that should be compacted next and its compaction score. // Score < 1 means compaction is not strictly needed. These fields From f5d4df1c02b8fdf004dbefe775dffaecbfef2a1b Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Tue, 1 Jul 2014 10:55:03 +0200 Subject: [PATCH 04/13] Fix compile error --- db/compaction_picker.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 92bd81d3f..d24f6a48b 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -849,7 +849,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( "[%s] Universal: size amp picking file %" PRIu64 "[%d] " "with size %" PRIu64 " (compensated size %" PRIu64 ")", version->cfd_->GetName().c_str(), - f->fd.GetNumber(), index, + f->fd.GetNumber(), loop, f->fd.GetFileSize(), f->compensated_file_size); } return c; From c4018e771c7fd31fab6fa934a05cff4005bae9dc Mon Sep 17 00:00:00 2001 From: Feng Zhu Date: Tue, 1 Jul 2014 11:02:42 -0700 Subject: [PATCH 05/13] In tools/db_stress.cc, set proper value in NewHashSkipListRepFactory's bucket_size Summary: Now that the arena is used to allocate space for hashskiplist's bucket. The bucket size need to be set small enough to avoid "should_flush_" failure in memtable's assertion. Test Plan: make all check Reviewers: sdong Reviewed By: sdong Subscribers: igor Differential Revision: https://reviews.facebook.net/D19371 --- tools/db_stress.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 929efee3f..337199a24 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -1591,7 +1591,7 @@ class StressTest { } switch (FLAGS_rep_factory) { case kHashSkipList: - options_.memtable_factory.reset(NewHashSkipListRepFactory()); + options_.memtable_factory.reset(NewHashSkipListRepFactory(10000)); break; case kSkipList: // no need to do anything From 6634844dba962b9a150646382f4d6531d1f2440b Mon Sep 17 00:00:00 2001 From: sdong Date: Tue, 1 Jul 2014 16:27:00 -0700 Subject: [PATCH 06/13] Two small fixes in db_test Summary: Two fixes: (1) WalDir to pick a directory under TmpDir to allow two tests running in parallel without impacting each other (2) kBlockBasedTableWithWholeKeyHashIndex is disabled by mistake (I assume). Enable it. Test Plan: ./db_test Reviewers: yhchiang, ljin Reviewed By: ljin Subscribers: nkg-, igor, dhruba, haobo, leveldb Differential Revision: https://reviews.facebook.net/D19389 --- db/db_test.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/db/db_test.cc b/db/db_test.cc index 701c72e11..e08462bf4 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -294,8 +294,8 @@ class DBTest { protected: // Sequence of option configurations to try enum OptionConfig { - kBlockBasedTableWithWholeKeyHashIndex, kDefault, + kBlockBasedTableWithWholeKeyHashIndex, kBlockBasedTableWithPrefixHashIndex, kPlainTableFirstBytePrefix, kPlainTableAllBytesPrefix, @@ -467,7 +467,7 @@ class DBTest { options.db_log_dir = test::TmpDir(); break; case kWalDir: - options.wal_dir = "/tmp/wal"; + options.wal_dir = test::TmpDir() + "/wal"; break; case kManifestFileSize: options.max_manifest_file_size = 50; // 50 bytes From 9c332aa11af615d947935638886a1b9eb07a88b6 Mon Sep 17 00:00:00 2001 From: sdong Date: Tue, 1 Jul 2014 11:05:05 -0700 Subject: [PATCH 07/13] HashLinkList memtable switches a bucket to a skip list to reduce performance outliers Summary: In this patch, we enhance HashLinkList memtable to reduce performance outliers when a bucket contains too many entries. We switch to skip list for this case to enable binary search. Add threshold_use_skiplist parameter to determine when a bucket needs to switch to skip list. The new data structure is documented in comments in the codes. Test Plan: make all check set threshold_use_skiplist in several tests Reviewers: yhchiang, haobo, ljin Reviewed By: yhchiang, ljin Subscribers: nkg-, xjin, dhruba, yhchiang, leveldb Differential Revision: https://reviews.facebook.net/D19299 --- HISTORY.md | 3 + db/db_test.cc | 3 +- db/dbformat.h | 7 + db/plain_table_db_test.cc | 2 +- db/prefix_test.cc | 5 + include/rocksdb/memtablerep.h | 12 +- util/hash_linklist_rep.cc | 437 +++++++++++++++++++++++++++------- util/hash_linklist_rep.h | 3 + 8 files changed, 386 insertions(+), 86 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index a042ff3b1..cb25e8987 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,9 @@ ## Unreleased +### New Features +* HashLinklist reduces performance outlier caused by skewed bucket by switching data in the bucket from linked list to skip list. Add parameter threshold_use_skiplist in NewHashLinkListRepFactory(). + ## 3.2.0 (06/20/2014) diff --git a/db/db_test.cc b/db/db_test.cc index e08462bf4..11495fd61 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -488,7 +488,8 @@ class DBTest { break; case kHashLinkList: options.prefix_extractor.reset(NewFixedPrefixTransform(1)); - options.memtable_factory.reset(NewHashLinkListRepFactory(4, 0)); + options.memtable_factory.reset( + NewHashLinkListRepFactory(4, 0, 3, true, 4)); break; case kHashCuckoo: options.memtable_factory.reset( diff --git a/db/dbformat.h b/db/dbformat.h index c7b3ced94..e1248a59f 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -297,6 +297,13 @@ class IterKey { parsed_key_suffix.sequence, parsed_key_suffix.type); } + void EncodeLengthPrefixedKey(const Slice& key) { + auto size = key.size(); + EnlargeBufferIfNeeded(size + VarintLength(size)); + char* ptr = EncodeVarint32(key_, size); + memcpy(ptr, key.data(), size); + } + private: char* key_; size_t buf_size_; diff --git a/db/plain_table_db_test.cc b/db/plain_table_db_test.cc index b169b1724..bad834e49 100644 --- a/db/plain_table_db_test.cc +++ b/db/plain_table_db_test.cc @@ -62,7 +62,7 @@ class PlainTableDBTest { Options CurrentOptions() { Options options; options.table_factory.reset(NewPlainTableFactory(0, 2, 0.8, 3, 0, kPrefix)); - options.memtable_factory.reset(NewHashLinkListRepFactory(4, 0, 3, true)); + options.memtable_factory.reset(NewHashLinkListRepFactory(4, 0, 3, true, 3)); options.prefix_extractor.reset(NewFixedPrefixTransform(8)); options.allow_mmap_reads = true; return options; diff --git a/db/prefix_test.cc b/db/prefix_test.cc index 64a4d0617..a69dda2b4 100644 --- a/db/prefix_test.cc +++ b/db/prefix_test.cc @@ -189,6 +189,10 @@ class PrefixTest { options.memtable_factory.reset( NewHashLinkListRepFactory(bucket_count, 2 * 1024 * 1024)); return true; + case kHashLinkListTriggerSkipList: + options.memtable_factory.reset( + NewHashLinkListRepFactory(bucket_count, 0, 3)); + return true; default: return false; } @@ -208,6 +212,7 @@ class PrefixTest { kHashSkipList, kHashLinkList, kHashLinkListHugePageTlb, + kHashLinkListTriggerSkipList, kEnd }; int option_config_; diff --git a/include/rocksdb/memtablerep.h b/include/rocksdb/memtablerep.h index 4dc8d7680..b7fc39c81 100644 --- a/include/rocksdb/memtablerep.h +++ b/include/rocksdb/memtablerep.h @@ -227,9 +227,10 @@ extern MemTableRepFactory* NewHashSkipListRepFactory( int32_t skiplist_branching_factor = 4 ); -// The factory is to create memtables with a hashed linked list: -// it contains a fixed array of buckets, each pointing to a sorted single -// linked list (null if the bucket is empty). +// The factory is to create memtables based on a hash table: +// it contains a fixed array of buckets, each pointing to either a linked list +// or a skip list if number of entries inside the bucket exceeds +// threshold_use_skiplist. // @bucket_count: number of fixed array buckets // @huge_page_tlb_size: if <=0, allocate the hash table bytes from malloc. // Otherwise from huge page TLB. The user needs to reserve @@ -240,10 +241,13 @@ extern MemTableRepFactory* NewHashSkipListRepFactory( // exceeds this number, log about it. // @if_log_bucket_dist_when_flash: if true, log distribution of number of // entries when flushing. +// @threshold_use_skiplist: a bucket switches to skip list if number of +// entries exceed this parameter. extern MemTableRepFactory* NewHashLinkListRepFactory( size_t bucket_count = 50000, size_t huge_page_tlb_size = 0, int bucket_entries_logging_threshold = 4096, - bool if_log_bucket_dist_when_flash = true); + bool if_log_bucket_dist_when_flash = true, + uint32_t threshold_use_skiplist = 256); // This factory creates a cuckoo-hashing based mem-table representation. // Cuckoo-hash is a closed-hash strategy, in which all key/value pairs diff --git a/util/hash_linklist_rep.cc b/util/hash_linklist_rep.cc index 22bb7ffb1..8e3dc5826 100644 --- a/util/hash_linklist_rep.cc +++ b/util/hash_linklist_rep.cc @@ -7,6 +7,7 @@ #ifndef ROCKSDB_LITE #include "util/hash_linklist_rep.h" +#include #include "rocksdb/memtablerep.h" #include "util/arena.h" #include "rocksdb/slice.h" @@ -22,6 +23,31 @@ namespace rocksdb { namespace { typedef const char* Key; +typedef SkipList MemtableSkipList; +typedef port::AtomicPointer Pointer; + +// A data structure used as the header of a link list of a hash bucket. +struct BucketHeader { + Pointer next; + uint32_t num_entries; + + explicit BucketHeader(void* n, uint32_t count) + : next(n), num_entries(count) {} + + bool IsSkipListBucket() { return next.NoBarrier_Load() == this; } +}; + +// A data structure used as the header of a skip list of a hash bucket. +struct SkipListBucketHeader { + BucketHeader Counting_header; + MemtableSkipList skip_list; + + explicit SkipListBucketHeader(const MemTableRep::KeyComparator& cmp, + Arena* arena, uint32_t count) + : Counting_header(this, // Pointing to itself to indicate header type. + count), + skip_list(cmp, arena) {} +}; struct Node { // Accessors/mutators for links. Wrapped in methods so we can @@ -51,12 +77,75 @@ struct Node { char key[0]; }; +// Memory structure of the mem table: +// It is a hash table, each bucket points to one entry, a linked list or a +// skip list. In order to track total number of records in a bucket to determine +// whether should switch to skip list, a header is added just to indicate +// number of entries in the bucket. +// +// +// +-----> NULL Case 1. Empty bucket +// | +// | +// | +---> +-------+ +// | | | Next +--> NULL +// | | +-------+ +// +-----+ | | | | Case 2. One Entry in bucket. +// | +-+ | | Data | next pointer points to +// +-----+ | | | NULL. All other cases +// | | | | | next pointer is not NULL. +// +-----+ | +-------+ +// | +---+ +// +-----+ +-> +-------+ +> +-------+ +-> +-------+ +// | | | | Next +--+ | Next +--+ | Next +-->NULL +// +-----+ | +-------+ +-------+ +-------+ +// | +-----+ | Count | | | | | +// +-----+ +-------+ | Data | | Data | +// | | | | | | +// +-----+ Case 3. | | | | +// | | A header +-------+ +-------+ +// +-----+ points to +// | | a linked list. Count indicates total number +// +-----+ of rows in this bucket. +// | | +// +-----+ +-> +-------+ <--+ +// | | | | Next +----+ +// +-----+ | +-------+ Case 4. A header points to a skip +// | +----+ | Count | list and next pointer points to +// +-----+ +-------+ itself, to distinguish case 3 or 4. +// | | | | Count still is kept to indicates total +// +-----+ | Skip +--> of entries in the bucket for debugging +// | | | List | Data purpose. +// | | | +--> +// +-----+ | | +// | | +-------+ +// +-----+ +// +// We don't have data race when changing cases because: +// (1) When changing from case 2->3, we create a new bucket header, put the +// single node there first without changing the original node, and do a +// release store when changing the bucket pointer. In that case, a reader +// who sees a stale value of the bucket pointer will read this node, while +// a reader sees the correct value because of the release store. +// (2) When changing case 3->4, a new header is created with skip list points +// to the data, before doing an acquire store to change the bucket pointer. +// The old header and nodes are never changed, so any reader sees any +// of those existing pointers will guarantee to be able to iterate to the +// end of the linked list. +// (3) Header's next pointer in case 3 might change, but they are never equal +// to itself, so no matter a reader sees any stale or newer value, it will +// be able to correctly distinguish case 3 and 4. +// +// The reason that we use case 2 is we want to make the format to be efficient +// when the utilization of buckets is relatively low. If we use case 3 for +// single entry bucket, we will need to waste 12 bytes for every entry, +// which can be significant decrease of memory utilization. class HashLinkListRep : public MemTableRep { public: HashLinkListRep(const MemTableRep::KeyComparator& compare, Arena* arena, const SliceTransform* transform, size_t bucket_size, - size_t huge_page_tlb_size, Logger* logger, - int bucket_entries_logging_threshold, + uint32_t threshold_use_skiplist, size_t huge_page_tlb_size, + Logger* logger, int bucket_entries_logging_threshold, bool if_log_bucket_dist_when_flash); virtual KeyHandle Allocate(const size_t len, char** buf) override; @@ -80,7 +169,6 @@ class HashLinkListRep : public MemTableRep { private: friend class DynamicIterator; - typedef SkipList FullList; size_t bucket_size_; @@ -88,6 +176,8 @@ class HashLinkListRep : public MemTableRep { // the same transform. port::AtomicPointer* buckets_; + const uint32_t threshold_use_skiplist_; + // The user-supplied transform whose domain is the user keys. const SliceTransform* transform_; @@ -97,7 +187,12 @@ class HashLinkListRep : public MemTableRep { int bucket_entries_logging_threshold_; bool if_log_bucket_dist_when_flash_; - bool BucketContains(Node* head, const Slice& key) const; + bool LinkListContains(Node* head, const Slice& key) const; + + SkipListBucketHeader* GetSkipListBucketHeader(Pointer* first_next_pointer) + const; + + Node* GetLinkListFirstNode(Pointer* first_next_pointer) const; Slice GetPrefix(const Slice& internal_key) const { return transform_->Transform(ExtractUserKey(internal_key)); @@ -107,11 +202,11 @@ class HashLinkListRep : public MemTableRep { return MurmurHash(slice.data(), slice.size(), 0) % bucket_size_; } - Node* GetBucket(size_t i) const { - return static_cast(buckets_[i].Acquire_Load()); + Pointer* GetBucket(size_t i) const { + return static_cast(buckets_[i].Acquire_Load()); } - Node* GetBucket(const Slice& slice) const { + Pointer* GetBucket(const Slice& slice) const { return GetBucket(GetHash(slice)); } @@ -119,7 +214,6 @@ class HashLinkListRep : public MemTableRep { return (compare_(b, a) == 0); } - bool Equal(const Key& a, const Key& b) const { return (compare_(a, b) == 0); } bool KeyIsAfterNode(const Slice& internal_key, const Node* n) const { @@ -137,8 +231,8 @@ class HashLinkListRep : public MemTableRep { class FullListIterator : public MemTableRep::Iterator { public: - explicit FullListIterator(FullList* list, Arena* arena) - : iter_(list), full_list_(list), arena_(arena) {} + explicit FullListIterator(MemtableSkipList* list, Arena* arena) + : iter_(list), full_list_(list), arena_(arena) {} virtual ~FullListIterator() { } @@ -189,22 +283,22 @@ class HashLinkListRep : public MemTableRep { iter_.SeekToLast(); } private: - FullList::Iterator iter_; + MemtableSkipList::Iterator iter_; // To destruct with the iterator. - std::unique_ptr full_list_; + std::unique_ptr full_list_; std::unique_ptr arena_; std::string tmp_; // For passing to EncodeKey }; - class Iterator : public MemTableRep::Iterator { + class LinkListIterator : public MemTableRep::Iterator { public: - explicit Iterator(const HashLinkListRep* const hash_link_list_rep, - Node* head) : - hash_link_list_rep_(hash_link_list_rep), head_(head), node_(nullptr) { - } + explicit LinkListIterator(const HashLinkListRep* const hash_link_list_rep, + Node* head) + : hash_link_list_rep_(hash_link_list_rep), + head_(head), + node_(nullptr) {} - virtual ~Iterator() { - } + virtual ~LinkListIterator() {} // Returns true iff the iterator is positioned at a valid node. virtual bool Valid() const { @@ -271,22 +365,68 @@ class HashLinkListRep : public MemTableRep { } }; - class DynamicIterator : public HashLinkListRep::Iterator { + class DynamicIterator : public HashLinkListRep::LinkListIterator { public: explicit DynamicIterator(HashLinkListRep& memtable_rep) - : HashLinkListRep::Iterator(&memtable_rep, nullptr), - memtable_rep_(memtable_rep) {} + : HashLinkListRep::LinkListIterator(&memtable_rep, nullptr), + memtable_rep_(memtable_rep) {} // Advance to the first entry with a key >= target virtual void Seek(const Slice& k, const char* memtable_key) { auto transformed = memtable_rep_.GetPrefix(k); - Reset(memtable_rep_.GetBucket(transformed)); - HashLinkListRep::Iterator::Seek(k, memtable_key); + auto* bucket = memtable_rep_.GetBucket(transformed); + + SkipListBucketHeader* skip_list_header = + memtable_rep_.GetSkipListBucketHeader(bucket); + if (skip_list_header != nullptr) { + // The bucket is organized as a skip list + if (!skip_list_iter_) { + skip_list_iter_.reset( + new MemtableSkipList::Iterator(&skip_list_header->skip_list)); + } else { + skip_list_iter_->SetList(&skip_list_header->skip_list); + } + if (memtable_key != nullptr) { + skip_list_iter_->Seek(memtable_key); + } else { + IterKey encoded_key; + encoded_key.EncodeLengthPrefixedKey(k); + skip_list_iter_->Seek(encoded_key.GetKey().data()); + } + } else { + // The bucket is organized as a linked list + skip_list_iter_.reset(); + Reset(memtable_rep_.GetLinkListFirstNode(bucket)); + HashLinkListRep::LinkListIterator::Seek(k, memtable_key); + } + } + + virtual bool Valid() const { + if (skip_list_iter_) { + return skip_list_iter_->Valid(); + } + return HashLinkListRep::LinkListIterator::Valid(); + } + + virtual const char* key() const { + if (skip_list_iter_) { + return skip_list_iter_->key(); + } + return HashLinkListRep::LinkListIterator::key(); + } + + virtual void Next() { + if (skip_list_iter_) { + skip_list_iter_->Next(); + } else { + HashLinkListRep::LinkListIterator::Next(); + } } private: // the underlying memtable const HashLinkListRep& memtable_rep_; + std::unique_ptr skip_list_iter_; }; class EmptyIterator : public MemTableRep::Iterator { @@ -312,12 +452,16 @@ class HashLinkListRep : public MemTableRep { HashLinkListRep::HashLinkListRep(const MemTableRep::KeyComparator& compare, Arena* arena, const SliceTransform* transform, - size_t bucket_size, size_t huge_page_tlb_size, - Logger* logger, + size_t bucket_size, + uint32_t threshold_use_skiplist, + size_t huge_page_tlb_size, Logger* logger, int bucket_entries_logging_threshold, bool if_log_bucket_dist_when_flash) : MemTableRep(arena), bucket_size_(bucket_size), + // Threshold to use skip list doesn't make sense if less than 3, so we + // force it to be minimum of 3 to simplify implementation. + threshold_use_skiplist_(std::max(threshold_use_skiplist, 3U)), transform_(transform), compare_(compare), logger_(logger), @@ -343,53 +487,161 @@ KeyHandle HashLinkListRep::Allocate(const size_t len, char** buf) { return static_cast(x); } +SkipListBucketHeader* HashLinkListRep::GetSkipListBucketHeader( + Pointer* first_next_pointer) const { + if (first_next_pointer == nullptr) { + return nullptr; + } + if (first_next_pointer->NoBarrier_Load() == nullptr) { + // Single entry bucket + return nullptr; + } + // Counting header + BucketHeader* header = reinterpret_cast(first_next_pointer); + if (header->IsSkipListBucket()) { + assert(header->num_entries > threshold_use_skiplist_); + auto* skip_list_bucket_header = + reinterpret_cast(header); + assert(skip_list_bucket_header->Counting_header.next.NoBarrier_Load() == + header); + return skip_list_bucket_header; + } + assert(header->num_entries <= threshold_use_skiplist_); + return nullptr; +} + +Node* HashLinkListRep::GetLinkListFirstNode(Pointer* first_next_pointer) const { + if (first_next_pointer == nullptr) { + return nullptr; + } + if (first_next_pointer->NoBarrier_Load() == nullptr) { + // Single entry bucket + return reinterpret_cast(first_next_pointer); + } + // Counting header + BucketHeader* header = reinterpret_cast(first_next_pointer); + if (!header->IsSkipListBucket()) { + assert(header->num_entries <= threshold_use_skiplist_); + return reinterpret_cast(header->next.NoBarrier_Load()); + } + assert(header->num_entries > threshold_use_skiplist_); + return nullptr; +} + void HashLinkListRep::Insert(KeyHandle handle) { Node* x = static_cast(handle); assert(!Contains(x->key)); Slice internal_key = GetLengthPrefixedSlice(x->key); auto transformed = GetPrefix(internal_key); auto& bucket = buckets_[GetHash(transformed)]; - Node* head = static_cast(bucket.Acquire_Load()); + Pointer* first_next_pointer = static_cast(bucket.NoBarrier_Load()); - if (!head) { + if (first_next_pointer == nullptr) { + // Case 1. empty bucket // NoBarrier_SetNext() suffices since we will add a barrier when // we publish a pointer to "x" in prev[i]. x->NoBarrier_SetNext(nullptr); - bucket.Release_Store(static_cast(x)); + bucket.Release_Store(x); return; } - Node* cur = head; - Node* prev = nullptr; - while (true) { - if (cur == nullptr) { - break; - } - Node* next = cur->Next(); - // Make sure the lists are sorted. - // If x points to head_ or next points nullptr, it is trivially satisfied. - assert((cur == head) || (next == nullptr) || - KeyIsAfterNode(next->key, cur)); - if (KeyIsAfterNode(internal_key, cur)) { - // Keep searching in this list - prev = cur; - cur = next; - } else { - break; + BucketHeader* header = nullptr; + if (first_next_pointer->NoBarrier_Load() == nullptr) { + // Case 2. only one entry in the bucket + // Need to convert to a Counting bucket and turn to case 4. + Node* first = reinterpret_cast(first_next_pointer); + // Need to add a bucket header. + // We have to first convert it to a bucket with header before inserting + // the new node. Otherwise, we might need to change next pointer of first. + // In that case, a reader might sees the next pointer is NULL and wrongly + // think the node is a bucket header. + auto* mem = arena_->AllocateAligned(sizeof(BucketHeader)); + header = new (mem) BucketHeader(first, 1); + bucket.Release_Store(header); + } else { + header = reinterpret_cast(first_next_pointer); + if (header->IsSkipListBucket()) { + // Case 4. Bucket is already a skip list + assert(header->num_entries > threshold_use_skiplist_); + auto* skip_list_bucket_header = + reinterpret_cast(header); + skip_list_bucket_header->Counting_header.num_entries++; + skip_list_bucket_header->skip_list.Insert(x->key); + return; } } - // Our data structure does not allow duplicate insertion - assert(cur == nullptr || !Equal(x->key, cur->key)); + if (bucket_entries_logging_threshold_ > 0 && + header->num_entries == + static_cast(bucket_entries_logging_threshold_)) { + Info(logger_, + "HashLinkedList bucket %zu has more than %d " + "entries. Key to insert: %s", + GetHash(transformed), header->num_entries, + GetLengthPrefixedSlice(x->key).ToString(true).c_str()); + } - // NoBarrier_SetNext() suffices since we will add a barrier when - // we publish a pointer to "x" in prev[i]. - x->NoBarrier_SetNext(cur); + if (header->num_entries == threshold_use_skiplist_) { + // Case 3. number of entries reaches the threshold so need to convert to + // skip list. + LinkListIterator bucket_iter( + this, reinterpret_cast(first_next_pointer->NoBarrier_Load())); + auto mem = arena_->AllocateAligned(sizeof(SkipListBucketHeader)); + SkipListBucketHeader* new_skip_list_header = new (mem) + SkipListBucketHeader(compare_, arena_, header->num_entries + 1); + auto& skip_list = new_skip_list_header->skip_list; + + // Add all current entries to the skip list + for (bucket_iter.SeekToHead(); bucket_iter.Valid(); bucket_iter.Next()) { + skip_list.Insert(bucket_iter.key()); + } - if (prev) { - prev->SetNext(x); + // insert the new entry + skip_list.Insert(x->key); + // Set the bucket + bucket.Release_Store(new_skip_list_header); } else { - bucket.Release_Store(static_cast(x)); + // Case 5. Need to insert to the sorted linked list without changing the + // header. + Node* first = reinterpret_cast(header->next.NoBarrier_Load()); + assert(first != nullptr); + // Advance counter unless the bucket needs to be advanced to skip list. + // In that case, we need to make sure the previous count never exceeds + // threshold_use_skiplist_ to avoid readers to cast to wrong format. + header->num_entries++; + + Node* cur = first; + Node* prev = nullptr; + while (true) { + if (cur == nullptr) { + break; + } + Node* next = cur->Next(); + // Make sure the lists are sorted. + // If x points to head_ or next points nullptr, it is trivially satisfied. + assert((cur == first) || (next == nullptr) || + KeyIsAfterNode(next->key, cur)); + if (KeyIsAfterNode(internal_key, cur)) { + // Keep searching in this list + prev = cur; + cur = next; + } else { + break; + } + } + + // Our data structure does not allow duplicate insertion + assert(cur == nullptr || !Equal(x->key, cur->key)); + + // NoBarrier_SetNext() suffices since we will add a barrier when + // we publish a pointer to "x" in prev[i]. + x->NoBarrier_SetNext(cur); + + if (prev) { + prev->SetNext(x); + } else { + header->next.Release_Store(static_cast(x)); + } } } @@ -401,7 +653,13 @@ bool HashLinkListRep::Contains(const char* key) const { if (bucket == nullptr) { return false; } - return BucketContains(bucket, internal_key); + + SkipListBucketHeader* skip_list_header = GetSkipListBucketHeader(bucket); + if (skip_list_header != nullptr) { + return skip_list_header->skip_list.Contains(key); + } else { + return LinkListContains(GetLinkListFirstNode(bucket), internal_key); + } } size_t HashLinkListRep::ApproximateMemoryUsage() { @@ -413,37 +671,53 @@ void HashLinkListRep::Get(const LookupKey& k, void* callback_args, bool (*callback_func)(void* arg, const char* entry)) { auto transformed = transform_->Transform(k.user_key()); auto bucket = GetBucket(transformed); - if (bucket != nullptr) { - Iterator iter(this, bucket); - for (iter.Seek(k.internal_key(), nullptr); + + auto* skip_list_header = GetSkipListBucketHeader(bucket); + if (skip_list_header != nullptr) { + // Is a skip list + MemtableSkipList::Iterator iter(&skip_list_header->skip_list); + for (iter.Seek(k.memtable_key().data()); iter.Valid() && callback_func(callback_args, iter.key()); iter.Next()) { } + } else { + auto* link_list_head = GetLinkListFirstNode(bucket); + if (link_list_head != nullptr) { + LinkListIterator iter(this, link_list_head); + for (iter.Seek(k.internal_key(), nullptr); + iter.Valid() && callback_func(callback_args, iter.key()); + iter.Next()) { + } + } } } MemTableRep::Iterator* HashLinkListRep::GetIterator(Arena* alloc_arena) { // allocate a new arena of similar size to the one currently in use Arena* new_arena = new Arena(arena_->BlockSize()); - auto list = new FullList(compare_, new_arena); + auto list = new MemtableSkipList(compare_, new_arena); HistogramImpl keys_per_bucket_hist; for (size_t i = 0; i < bucket_size_; ++i) { int count = 0; - bool num_entries_printed = false; - auto bucket = GetBucket(i); + auto* bucket = GetBucket(i); if (bucket != nullptr) { - Iterator itr(this, bucket); - for (itr.SeekToHead(); itr.Valid(); itr.Next()) { - list->Insert(itr.key()); - if (logger_ != nullptr && - ++count >= bucket_entries_logging_threshold_ && - !num_entries_printed) { - num_entries_printed = true; - Info(logger_, "HashLinkedList bucket %zu has more than %d " - "entries. %dth key: %s", - i, count, count, - GetLengthPrefixedSlice(itr.key()).ToString(true).c_str()); + auto* skip_list_header = GetSkipListBucketHeader(bucket); + if (skip_list_header != nullptr) { + // Is a skip list + MemtableSkipList::Iterator itr(&skip_list_header->skip_list); + for (itr.SeekToFirst(); itr.Valid(); itr.Next()) { + list->Insert(itr.key()); + count++; + } + } else { + auto* link_list_head = GetLinkListFirstNode(bucket); + if (link_list_head != nullptr) { + LinkListIterator itr(this, link_list_head); + for (itr.SeekToHead(); itr.Valid(); itr.Next()) { + list->Insert(itr.key()); + count++; + } } } } @@ -474,7 +748,8 @@ MemTableRep::Iterator* HashLinkListRep::GetDynamicPrefixIterator( } } -bool HashLinkListRep::BucketContains(Node* head, const Slice& user_key) const { +bool HashLinkListRep::LinkListContains(Node* head, + const Slice& user_key) const { Node* x = FindGreaterOrEqualInBucket(head, user_key); return (x != nullptr && Equal(user_key, x->key)); } @@ -505,17 +780,19 @@ Node* HashLinkListRep::FindGreaterOrEqualInBucket(Node* head, MemTableRep* HashLinkListRepFactory::CreateMemTableRep( const MemTableRep::KeyComparator& compare, Arena* arena, const SliceTransform* transform, Logger* logger) { - return new HashLinkListRep( - compare, arena, transform, bucket_count_, huge_page_tlb_size_, logger, - bucket_entries_logging_threshold_, if_log_bucket_dist_when_flash_); + return new HashLinkListRep(compare, arena, transform, bucket_count_, + threshold_use_skiplist_, huge_page_tlb_size_, + logger, bucket_entries_logging_threshold_, + if_log_bucket_dist_when_flash_); } MemTableRepFactory* NewHashLinkListRepFactory( size_t bucket_count, size_t huge_page_tlb_size, - int bucket_entries_logging_threshold, bool if_log_bucket_dist_when_flash) { - return new HashLinkListRepFactory(bucket_count, huge_page_tlb_size, - bucket_entries_logging_threshold, - if_log_bucket_dist_when_flash); + int bucket_entries_logging_threshold, bool if_log_bucket_dist_when_flash, + uint32_t threshold_use_skiplist) { + return new HashLinkListRepFactory( + bucket_count, threshold_use_skiplist, huge_page_tlb_size, + bucket_entries_logging_threshold, if_log_bucket_dist_when_flash); } } // namespace rocksdb diff --git a/util/hash_linklist_rep.h b/util/hash_linklist_rep.h index bd42e699d..0df35b545 100644 --- a/util/hash_linklist_rep.h +++ b/util/hash_linklist_rep.h @@ -16,10 +16,12 @@ namespace rocksdb { class HashLinkListRepFactory : public MemTableRepFactory { public: explicit HashLinkListRepFactory(size_t bucket_count, + uint32_t threshold_use_skiplist, size_t huge_page_tlb_size, int bucket_entries_logging_threshold, bool if_log_bucket_dist_when_flash) : bucket_count_(bucket_count), + threshold_use_skiplist_(threshold_use_skiplist), huge_page_tlb_size_(huge_page_tlb_size), bucket_entries_logging_threshold_(bucket_entries_logging_threshold), if_log_bucket_dist_when_flash_(if_log_bucket_dist_when_flash) {} @@ -36,6 +38,7 @@ class HashLinkListRepFactory : public MemTableRepFactory { private: const size_t bucket_count_; + const uint32_t threshold_use_skiplist_; const size_t huge_page_tlb_size_; int bucket_entries_logging_threshold_; bool if_log_bucket_dist_when_flash_; From 30b20604db38e1610cd0453f541406026a54b9f6 Mon Sep 17 00:00:00 2001 From: sdong Date: Tue, 1 Jul 2014 17:41:38 -0700 Subject: [PATCH 08/13] Revert "Two small fixes in db_test" This reverts commit 6634844dba962b9a150646382f4d6531d1f2440b. --- db/db_test.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/db/db_test.cc b/db/db_test.cc index 11495fd61..0892969d8 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -294,8 +294,8 @@ class DBTest { protected: // Sequence of option configurations to try enum OptionConfig { - kDefault, kBlockBasedTableWithWholeKeyHashIndex, + kDefault, kBlockBasedTableWithPrefixHashIndex, kPlainTableFirstBytePrefix, kPlainTableAllBytesPrefix, @@ -467,7 +467,7 @@ class DBTest { options.db_log_dir = test::TmpDir(); break; case kWalDir: - options.wal_dir = test::TmpDir() + "/wal"; + options.wal_dir = "/tmp/wal"; break; case kManifestFileSize: options.max_manifest_file_size = 50; // 50 bytes From 1d05006740bf8307fddf85a4b5f2a159bf85b35e Mon Sep 17 00:00:00 2001 From: sdong Date: Tue, 1 Jul 2014 18:54:50 -0700 Subject: [PATCH 09/13] Re-commit the correct part (WalDir) of the revision: Commit 6634844dba962b9a150646382f4d6531d1f2440b by sdong Two small fixes in db_test Summary: Two fixes: (1) WalDir to pick a directory under TmpDir to allow two tests running in parallel without impacting each other (2) kBlockBasedTableWithWholeKeyHashIndex is disabled by mistake (I assume). Enable it. Test Plan: ./db_test Reviewers: yhchiang, ljin Reviewed By: ljin Subscribers: nkg-, igor, dhruba, haobo, leveldb Differential Revision: https://reviews.facebook.net/D19389 --- db/db_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/db_test.cc b/db/db_test.cc index 0892969d8..6bc272744 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -467,7 +467,7 @@ class DBTest { options.db_log_dir = test::TmpDir(); break; case kWalDir: - options.wal_dir = "/tmp/wal"; + options.wal_dir = test::TmpDir() + "/wal"; break; case kManifestFileSize: options.max_manifest_file_size = 50; // 50 bytes From d3f63f03adb664ae076253852015feca8e1f689b Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Wed, 2 Jul 2014 11:40:16 +0200 Subject: [PATCH 10/13] Fix 32-bit errors Summary: https://www.facebook.com/groups/rocksdb.dev/permalink/590438347721350/ Test Plan: compiles Reviewers: sdong, ljin, yhchiang Reviewed By: yhchiang Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D19197 --- util/env_posix.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/util/env_posix.cc b/util/env_posix.cc index 267958606..3bfeb0ea0 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -821,7 +821,7 @@ class PosixWritableFile : public WritableFile { } } - virtual Status RangeSync(off64_t offset, off64_t nbytes) { + virtual Status RangeSync(off_t offset, off_t nbytes) { if (sync_file_range(fd_, offset, nbytes, SYNC_FILE_RANGE_WRITE) == 0) { return Status::OK(); } else { From f146cab26182a026819b4c7890400f558ae98d20 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Wed, 2 Jul 2014 20:40:57 +0200 Subject: [PATCH 11/13] Centralize compression decision to compaction picker Summary: Before this diff, we're deciding enable_compression in CompactionPicker and then we're deciding final compression type in DBImpl. This is kind of confusing. After the diff, the final compression type will be decided in CompactionPicker. The reason for this is that I want CompactFiles() to specify output compression type, so that people can mix and match compression styles in their compaction algorithms. This diff makes it much easier to do that. Test Plan: make check Reviewers: dhruba, haobo, sdong, yhchiang, ljin Reviewed By: ljin Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D19137 --- db/compaction.cc | 4 ++-- db/compaction.h | 8 ++++---- db/compaction_picker.cc | 41 ++++++++++++++++++++++++++++++++++------- db/db_impl.cc | 38 ++++++++------------------------------ db/db_impl.h | 11 ----------- 5 files changed, 48 insertions(+), 54 deletions(-) diff --git a/db/compaction.cc b/db/compaction.cc index 5d22d4484..d0c54cc0c 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -29,7 +29,7 @@ static uint64_t TotalFileSize(const std::vector& files) { Compaction::Compaction(Version* input_version, int level, int out_level, uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes, - bool seek_compaction, bool enable_compression, + CompressionType output_compression, bool seek_compaction, bool deletion_compaction) : level_(level), out_level_(out_level), @@ -38,8 +38,8 @@ Compaction::Compaction(Version* input_version, int level, int out_level, input_version_(input_version), number_levels_(input_version_->NumberLevels()), cfd_(input_version_->cfd_), + output_compression_(output_compression), seek_compaction_(seek_compaction), - enable_compression_(enable_compression), deletion_compaction_(deletion_compaction), grandparent_index_(0), seen_key_(false), diff --git a/db/compaction.h b/db/compaction.h index d6f6f80b4..44d51ef77 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -47,8 +47,8 @@ class Compaction { // Maximum size of files to build during this compaction. uint64_t MaxOutputFileSize() const { return max_output_file_size_; } - // Whether compression will be enabled for compaction outputs - bool enable_compression() const { return enable_compression_; } + // What compression for output + CompressionType OutputCompressionType() const { return output_compression_; } // Is this a trivial compaction that can be implemented by just // moving a single input file to the next level (no merging or splitting) @@ -104,7 +104,7 @@ class Compaction { Compaction(Version* input_version, int level, int out_level, uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes, - bool seek_compaction = false, bool enable_compression = true, + CompressionType output_compression, bool seek_compaction = false, bool deletion_compaction = false); int level_; @@ -116,8 +116,8 @@ class Compaction { int number_levels_; ColumnFamilyData* cfd_; + CompressionType output_compression_; bool seek_compaction_; - bool enable_compression_; // if true, just delete files in inputs_[0] bool deletion_compaction_; diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index d24f6a48b..758210f63 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -18,6 +18,31 @@ namespace rocksdb { namespace { +// Determine compression type, based on user options, level of the output +// file and whether compression is disabled. +// If enable_compression is false, then compression is always disabled no +// matter what the values of the other two parameters are. +// Otherwise, the compression type is determined based on options and level. +CompressionType GetCompressionType(const Options& options, int level, + const bool enable_compression = true) { + if (!enable_compression) { + // disable compression + return kNoCompression; + } + // If the use has specified a different compression level for each level, + // then pick the compresison for that level. + if (!options.compression_per_level.empty()) { + const int n = options.compression_per_level.size() - 1; + // It is possible for level_ to be -1; in that case, we use level + // 0's compression. This occurs mostly in backwards compatibility + // situations when the builder doesn't know what level the file + // belongs to. Likewise, if level_ is beyond the end of the + // specified compression levels, use the last value. + return options.compression_per_level[std::max(0, std::min(level, n))]; + } else { + return options.compression; + } +} uint64_t TotalCompensatedFileSize(const std::vector& files) { uint64_t sum = 0; @@ -345,7 +370,8 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level, } Compaction* c = new Compaction(version, input_level, output_level, MaxFileSizeForLevel(output_level), - MaxGrandParentOverlapBytes(input_level)); + MaxGrandParentOverlapBytes(input_level), + GetCompressionType(*options_, output_level)); c->inputs_[0] = inputs; if (ExpandWhileOverlapping(c) == false) { @@ -465,7 +491,8 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(Version* version, assert(level >= 0); assert(level + 1 < NumberLevels()); c = new Compaction(version, level, level + 1, MaxFileSizeForLevel(level + 1), - MaxGrandParentOverlapBytes(level)); + MaxGrandParentOverlapBytes(level), + GetCompressionType(*options_, level + 1)); c->score_ = score; // Pick the largest file in this level that is not already @@ -736,9 +763,9 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( } } } - Compaction* c = - new Compaction(version, level, level, MaxFileSizeForLevel(level), - LLONG_MAX, false, enable_compression); + Compaction* c = new Compaction( + version, level, level, MaxFileSizeForLevel(level), LLONG_MAX, + GetCompressionType(*options_, level, enable_compression)); c->score_ = score; for (unsigned int i = start_index; i < first_index_after; i++) { @@ -840,7 +867,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( // We always compact all the files, so always compress. Compaction* c = new Compaction(version, level, level, MaxFileSizeForLevel(level), - LLONG_MAX, false, true); + LLONG_MAX, GetCompressionType(*options_, level)); c->score_ = score; for (unsigned int loop = start_index; loop < files.size(); loop++) { f = c->input_version_->files_[level][loop]; @@ -882,7 +909,7 @@ Compaction* FIFOCompactionPicker::PickCompaction(Version* version, return nullptr; } - Compaction* c = new Compaction(version, 0, 0, 0, 0, false, false, + Compaction* c = new Compaction(version, 0, 0, 0, 0, kNoCompression, false, true /* is deletion compaction */); // delete old files (FIFO) for (auto ritr = version->files_[0].rbegin(); diff --git a/db/db_impl.cc b/db/db_impl.cc index a6a622849..b93be78bc 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -297,27 +297,7 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) { return result; } -CompressionType GetCompressionType(const Options& options, int level, - const bool enable_compression) { - if (!enable_compression) { - // disable compression - return kNoCompression; - } - // If the use has specified a different compression level for each level, - // then pick the compresison for that level. - if (!options.compression_per_level.empty()) { - const int n = options.compression_per_level.size() - 1; - // It is possible for level_ to be -1; in that case, we use level - // 0's compression. This occurs mostly in backwards compatibility - // situations when the builder doesn't know what level the file - // belongs to. Likewise, if level_ is beyond the end of the - // specified compression levels, use the last value. - return options.compression_per_level[std::max(0, std::min(level, n))]; - } else { - return options.compression; - } -} - +namespace { CompressionType GetCompressionFlush(const Options& options) { // Compressing memtable flushes might not help unless the sequential load // optimization is used for leveled compaction. Otherwise the CPU and @@ -325,12 +305,13 @@ CompressionType GetCompressionFlush(const Options& options) { bool can_compress; - if (options.compaction_style == kCompactionStyleUniversal) { + if (options.compaction_style == kCompactionStyleUniversal) { can_compress = (options.compaction_options_universal.compression_size_percent < 0); } else { // For leveled compress when min_level_to_compress == 0. - can_compress = (GetCompressionType(options, 0, true) != kNoCompression); + can_compress = options.compression_per_level.empty() || + options.compression_per_level[0] != kNoCompression; } if (can_compress) { @@ -339,6 +320,7 @@ CompressionType GetCompressionFlush(const Options& options) { return kNoCompression; } } +} // namespace DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) : env_(options.env), @@ -2343,13 +2325,9 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { compact->compaction->OutputFilePreallocationSize()); ColumnFamilyData* cfd = compact->compaction->column_family_data(); - CompressionType compression_type = - GetCompressionType(*cfd->options(), compact->compaction->output_level(), - compact->compaction->enable_compression()); - - compact->builder.reset( - NewTableBuilder(*cfd->options(), cfd->internal_comparator(), - compact->outfile.get(), compression_type)); + compact->builder.reset(NewTableBuilder( + *cfd->options(), cfd->internal_comparator(), compact->outfile.get(), + compact->compaction->OutputCompressionType())); } LogFlush(options_.info_log); return s; diff --git a/db/db_impl.h b/db/db_impl.h index 797cb0484..5d171b57d 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -625,15 +625,4 @@ extern Options SanitizeOptions(const std::string& db, const Options& src); extern DBOptions SanitizeOptions(const std::string& db, const DBOptions& src); -// Determine compression type, based on user options, level of the output -// file and whether compression is disabled. -// If enable_compression is false, then compression is always disabled no -// matter what the values of the other two parameters are. -// Otherwise, the compression type is determined based on options and level. -CompressionType GetCompressionType(const Options& options, int level, - const bool enable_compression); - -// Determine compression type for L0 file written by memtable flush. -CompressionType GetCompressionFlush(const Options& options); - } // namespace rocksdb From 2459f7ec4e62c3fff2c121e14a3d70f4c01379a7 Mon Sep 17 00:00:00 2001 From: sdong Date: Wed, 2 Jul 2014 09:54:20 -0700 Subject: [PATCH 12/13] Support Multiple DB paths (without having an interface to expose to users) Summary: In this patch, we allow RocksDB to support multiple DB paths internally. No user interface is supported yet so this patch is silent to users. Test Plan: make all check Reviewers: igor, haobo, ljin, yhchiang Reviewed By: yhchiang Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D18921 --- db/builder.cc | 3 +- db/column_family.cc | 3 +- db/compaction.cc | 2 + db/compaction.h | 8 +- db/compaction_picker.cc | 46 ++++--- db/db_filesnapshot.cc | 4 +- db/db_impl.cc | 178 +++++++++++++++++-------- db/db_impl.h | 18 ++- db/db_test.cc | 47 ++++++- db/filename.cc | 24 +++- db/filename.h | 12 +- db/filename_test.cc | 4 +- db/log_and_apply_bench.cc | 4 +- db/memtable_list.cc | 8 +- db/memtable_list.h | 16 +-- db/repair.cc | 91 +++++++------ db/table_cache.cc | 10 +- db/table_cache.h | 7 +- db/version_edit.cc | 68 +++++++--- db/version_edit.h | 41 ++++-- db/version_edit_test.cc | 5 +- db/version_set.cc | 50 ++++--- db/version_set.h | 6 +- db/version_set_test.cc | 2 +- include/rocksdb/db.h | 1 + include/rocksdb/options.h | 7 + include/rocksdb/universal_compaction.h | 17 +-- util/ldb_cmd.cc | 4 + util/options.cc | 1 + 29 files changed, 469 insertions(+), 218 deletions(-) diff --git a/db/builder.cc b/db/builder.cc index 61890b5b6..3be61bd10 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -54,7 +54,8 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options, purge = false; } - std::string fname = TableFileName(dbname, meta->fd.GetNumber()); + std::string fname = TableFileName(options.db_paths, meta->fd.GetNumber(), + meta->fd.GetPathId()); if (iter->Valid()) { unique_ptr file; s = env->NewWritableFile(fname, &file, soptions); diff --git a/db/column_family.cc b/db/column_family.cc index 2d7ac23ae..ec90872b8 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -224,8 +224,7 @@ ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id, if (dummy_versions != nullptr) { internal_stats_.reset(new InternalStats( options_.num_levels, db_options->env, db_options->statistics.get())); - table_cache_.reset( - new TableCache(dbname, &options_, storage_options, table_cache)); + table_cache_.reset(new TableCache(&options_, storage_options, table_cache)); if (options_.compaction_style == kCompactionStyleUniversal) { compaction_picker_.reset( new UniversalCompactionPicker(&options_, &internal_comparator_)); diff --git a/db/compaction.cc b/db/compaction.cc index d0c54cc0c..4ed5374ac 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -29,6 +29,7 @@ static uint64_t TotalFileSize(const std::vector& files) { Compaction::Compaction(Version* input_version, int level, int out_level, uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes, + uint32_t output_path_id, CompressionType output_compression, bool seek_compaction, bool deletion_compaction) : level_(level), @@ -38,6 +39,7 @@ Compaction::Compaction(Version* input_version, int level, int out_level, input_version_(input_version), number_levels_(input_version_->NumberLevels()), cfd_(input_version_->cfd_), + output_path_id_(output_path_id), output_compression_(output_compression), seek_compaction_(seek_compaction), deletion_compaction_(deletion_compaction), diff --git a/db/compaction.h b/db/compaction.h index 44d51ef77..caf44d466 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -50,6 +50,9 @@ class Compaction { // What compression for output CompressionType OutputCompressionType() const { return output_compression_; } + // Whether need to write output file to second DB path. + uint32_t GetOutputPathId() const { return output_path_id_; } + // Is this a trivial compaction that can be implemented by just // moving a single input file to the next level (no merging or splitting) bool IsTrivialMove() const; @@ -104,8 +107,8 @@ class Compaction { Compaction(Version* input_version, int level, int out_level, uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes, - CompressionType output_compression, bool seek_compaction = false, - bool deletion_compaction = false); + uint32_t output_path_id, CompressionType output_compression, + bool seek_compaction = false, bool deletion_compaction = false); int level_; int out_level_; // levels to which output files are stored @@ -116,6 +119,7 @@ class Compaction { int number_levels_; ColumnFamilyData* cfd_; + uint32_t output_path_id_; CompressionType output_compression_; bool seek_compaction_; // if true, just delete files in inputs_[0] diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 758210f63..0c752184a 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -12,6 +12,7 @@ #define __STDC_FORMAT_MACROS #include #include +#include "db/filename.h" #include "util/log_buffer.h" #include "util/statistics.h" @@ -370,7 +371,7 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level, } Compaction* c = new Compaction(version, input_level, output_level, MaxFileSizeForLevel(output_level), - MaxGrandParentOverlapBytes(input_level), + MaxGrandParentOverlapBytes(input_level), 0, GetCompressionType(*options_, output_level)); c->inputs_[0] = inputs; @@ -491,7 +492,7 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(Version* version, assert(level >= 0); assert(level + 1 < NumberLevels()); c = new Compaction(version, level, level + 1, MaxFileSizeForLevel(level + 1), - MaxGrandParentOverlapBytes(level), + MaxGrandParentOverlapBytes(level), 0, GetCompressionType(*options_, level + 1)); c->score_ = score; @@ -684,9 +685,10 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( // first candidate to be compacted. uint64_t candidate_size = f != nullptr? f->compensated_file_size : 0; if (f != nullptr) { - LogToBuffer(log_buffer, - "[%s] Universal: Possible candidate file %" PRIu64 "[%d].", - version->cfd_->GetName().c_str(), f->fd.GetNumber(), loop); + LogToBuffer( + log_buffer, "[%s] Universal: Possible candidate file %s[%d].", + version->cfd_->GetName().c_str(), + FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId()).c_str(), loop); } // Check if the suceeding files need compaction. @@ -764,7 +766,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( } } Compaction* c = new Compaction( - version, level, level, MaxFileSizeForLevel(level), LLONG_MAX, + version, level, level, MaxFileSizeForLevel(level), LLONG_MAX, 0, GetCompressionType(*options_, level, enable_compression)); c->score_ = score; @@ -772,11 +774,11 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( FileMetaData* f = c->input_version_->files_[level][i]; c->inputs_[0].push_back(f); LogToBuffer(log_buffer, - "[%s] Universal: Picking file %" PRIu64 "[%d] " + "[%s] Universal: Picking file %s[%d] " "with size %" PRIu64 " (compensated size %" PRIu64 ")\n", version->cfd_->GetName().c_str(), - f->fd.GetNumber(), i, - f->fd.GetFileSize(), f->compensated_file_size); + FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId()).c_str(), + i, f->fd.GetFileSize(), f->compensated_file_size); } return c; } @@ -810,29 +812,29 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( start_index = loop; // Consider this as the first candidate. break; } - LogToBuffer(log_buffer, - "[%s] Universal: skipping file %" PRIu64 "[%d] compacted %s", - version->cfd_->GetName().c_str(), f->fd.GetNumber(), loop, - " cannot be a candidate to reduce size amp.\n"); + LogToBuffer(log_buffer, "[%s] Universal: skipping file %s[%d] compacted %s", + version->cfd_->GetName().c_str(), + FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId()).c_str(), + loop, " cannot be a candidate to reduce size amp.\n"); f = nullptr; } if (f == nullptr) { return nullptr; // no candidate files } - LogToBuffer(log_buffer, - "[%s] Universal: First candidate file %" PRIu64 "[%d] %s", - version->cfd_->GetName().c_str(), f->fd.GetNumber(), start_index, - " to reduce size amp.\n"); + LogToBuffer(log_buffer, "[%s] Universal: First candidate file %s[%d] %s", + version->cfd_->GetName().c_str(), + FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId()).c_str(), + start_index, " to reduce size amp.\n"); // keep adding up all the remaining files for (unsigned int loop = start_index; loop < files.size() - 1; loop++) { f = files[loop]; if (f->being_compacted) { LogToBuffer( - log_buffer, - "[%s] Universal: Possible candidate file %" PRIu64 "[%d] %s.", - version->cfd_->GetName().c_str(), f->fd.GetNumber(), loop, + log_buffer, "[%s] Universal: Possible candidate file %s[%d] %s.", + version->cfd_->GetName().c_str(), + FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId()).c_str(), loop, " is already being compacted. No size amp reduction possible.\n"); return nullptr; } @@ -867,7 +869,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( // We always compact all the files, so always compress. Compaction* c = new Compaction(version, level, level, MaxFileSizeForLevel(level), - LLONG_MAX, GetCompressionType(*options_, level)); + LLONG_MAX, 0, GetCompressionType(*options_, level)); c->score_ = score; for (unsigned int loop = start_index; loop < files.size(); loop++) { f = c->input_version_->files_[level][loop]; @@ -909,7 +911,7 @@ Compaction* FIFOCompactionPicker::PickCompaction(Version* version, return nullptr; } - Compaction* c = new Compaction(version, 0, 0, 0, 0, kNoCompression, false, + Compaction* c = new Compaction(version, 0, 0, 0, 0, 0, kNoCompression, false, true /* is deletion compaction */); // delete old files (FIFO) for (auto ritr = version->files_[0].rbegin(); diff --git a/db/db_filesnapshot.cc b/db/db_filesnapshot.cc index 582355ccd..5286ca782 100644 --- a/db/db_filesnapshot.cc +++ b/db/db_filesnapshot.cc @@ -98,7 +98,7 @@ Status DBImpl::GetLiveFiles(std::vector& ret, } // Make a set of all of the live *.sst files - std::set live; + std::vector live; for (auto cfd : *versions_->GetColumnFamilySet()) { cfd->current()->AddLiveFiles(&live); } @@ -109,7 +109,7 @@ Status DBImpl::GetLiveFiles(std::vector& ret, // create names of the live files. The names are not absolute // paths, instead they are relative to dbname_; for (auto live_file : live) { - ret.push_back(TableFileName("", live_file)); + ret.push_back(MakeTableFileName("", live_file.GetNumber())); } ret.push_back(CurrentFileName("")); diff --git a/db/db_impl.cc b/db/db_impl.cc index b93be78bc..ce1cf78ff 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -98,6 +98,7 @@ struct DBImpl::CompactionState { // Files produced by compaction struct Output { uint64_t number; + uint32_t path_id; uint64_t file_size; InternalKey smallest, largest; SequenceNumber smallest_seqno, largest_seqno; @@ -294,6 +295,10 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) { result.wal_dir = result.wal_dir.substr(0, result.wal_dir.size() - 1); } + if (result.db_paths.size() == 0) { + result.db_paths.push_back(dbname); + } + return result; } @@ -573,30 +578,48 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state, } // don't delete live files - deletion_state.sst_live.assign(pending_outputs_.begin(), - pending_outputs_.end()); + for (auto pair : pending_outputs_) { + deletion_state.sst_live.emplace_back(pair.first, pair.second, 0); + } + /* deletion_state.sst_live.insert(pending_outputs_.begin(), + pending_outputs_.end());*/ versions_->AddLiveFiles(&deletion_state.sst_live); if (doing_the_full_scan) { - // set of all files in the directory. We'll exclude files that are still - // alive in the subsequent processings. - env_->GetChildren( - dbname_, &deletion_state.candidate_files - ); // Ignore errors + for (uint32_t path_id = 0; path_id < options_.db_paths.size(); path_id++) { + // set of all files in the directory. We'll exclude files that are still + // alive in the subsequent processings. + std::vector files; + env_->GetChildren(dbname_, &files); // Ignore errors + for (std::string file : files) { + deletion_state.candidate_files.emplace_back(file, path_id); + } + } //Add log files in wal_dir if (options_.wal_dir != dbname_) { std::vector log_files; env_->GetChildren(options_.wal_dir, &log_files); // Ignore errors - deletion_state.candidate_files.insert( - deletion_state.candidate_files.end(), - log_files.begin(), - log_files.end() - ); + for (std::string log_file : log_files) { + deletion_state.candidate_files.emplace_back(log_file, 0); + } } } } +namespace { +bool CompareCandidateFile(const rocksdb::DBImpl::CandidateFileInfo& first, + const rocksdb::DBImpl::CandidateFileInfo& second) { + if (first.file_name > second.file_name) { + return true; + } else if (first.file_name < second.file_name) { + return false; + } else { + return (first.path_id > first.path_id); + } +} +}; // namespace + // Diffs the files listed in filenames and those that do not // belong to live files are posibly removed. Also, removes all the // files in sst_delete_files and log_delete_files. @@ -612,10 +635,12 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { return; } - // Now, convert live list to an unordered set, WITHOUT mutex held; + // Now, convert live list to an unordered map, WITHOUT mutex held; // set is slow. - std::unordered_set sst_live(state.sst_live.begin(), - state.sst_live.end()); + std::unordered_map sst_live_map; + for (FileDescriptor& fd : state.sst_live) { + sst_live_map[fd.GetNumber()] = &fd; + } auto& candidate_files = state.candidate_files; candidate_files.reserve( @@ -625,26 +650,30 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { // We may ignore the dbname when generating the file names. const char* kDumbDbName = ""; for (auto file : state.sst_delete_files) { - candidate_files.push_back( - TableFileName(kDumbDbName, file->fd.GetNumber()).substr(1)); + candidate_files.emplace_back( + MakeTableFileName(kDumbDbName, file->fd.GetNumber()), + file->fd.GetPathId()); delete file; } for (auto file_num : state.log_delete_files) { if (file_num > 0) { - candidate_files.push_back(LogFileName(kDumbDbName, file_num).substr(1)); + candidate_files.emplace_back(LogFileName(kDumbDbName, file_num).substr(1), + 0); } } // dedup state.candidate_files so we don't try to delete the same // file twice - sort(candidate_files.begin(), candidate_files.end()); + sort(candidate_files.begin(), candidate_files.end(), CompareCandidateFile); candidate_files.erase(unique(candidate_files.begin(), candidate_files.end()), candidate_files.end()); std::vector old_info_log_files; - for (const auto& to_delete : candidate_files) { + for (const auto& candidate_file : candidate_files) { + std::string to_delete = candidate_file.file_name; + uint32_t path_id = candidate_file.path_id; uint64_t number; FileType type; // Ignore file if we cannot recognize it. @@ -664,7 +693,7 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { keep = (number >= state.manifest_file_number); break; case kTableFile: - keep = (sst_live.find(number) != sst_live.end()); + keep = (sst_live_map.find(number) != sst_live_map.end()); break; case kTempFile: // Any temp files that are currently being written to must @@ -672,7 +701,7 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { // Also, SetCurrentFile creates a temp file when writing out new // manifest, which is equal to state.pending_manifest_file_number. We // should not delete that file - keep = (sst_live.find(number) != sst_live.end()) || + keep = (sst_live_map.find(number) != sst_live_map.end()) || (number == state.pending_manifest_file_number); break; case kInfoLogFile: @@ -693,13 +722,16 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { continue; } + std::string fname; if (type == kTableFile) { // evict from cache TableCache::Evict(table_cache_.get(), number); + fname = TableFileName(options_.db_paths, number, path_id); + } else { + fname = + ((type == kLogFile) ? options_.wal_dir : dbname_) + "/" + to_delete; } - std::string fname = ((type == kLogFile) ? options_.wal_dir : dbname_) + - "/" + to_delete; if (type == kLogFile && (options_.WAL_ttl_seconds > 0 || options_.WAL_size_limit_MB > 0)) { auto archived_log_name = ArchivedLogFileName(options_.wal_dir, number); @@ -1084,6 +1116,13 @@ Status DBImpl::Recover( return s; } + for (auto db_path : options_.db_paths) { + s = env_->CreateDirIfMissing(db_path); + if (!s.ok()) { + return s; + } + } + s = env_->NewDirectory(dbname_, &db_directory_); if (!s.ok()) { return s; @@ -1349,8 +1388,8 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem, mutex_.AssertHeld(); const uint64_t start_micros = env_->NowMicros(); FileMetaData meta; - meta.fd.number = versions_->NewFileNumber(); - pending_outputs_.insert(meta.fd.GetNumber()); + meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0); + pending_outputs_[meta.fd.GetNumber()] = 0; // path 0 for level 0 file. Iterator* iter = mem->NewIterator(ReadOptions(), true); const SequenceNumber newest_snapshot = snapshots_.GetNewest(); const SequenceNumber earliest_seqno_in_memtable = @@ -1381,9 +1420,9 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem, // should not be added to the manifest. int level = 0; if (s.ok() && meta.fd.GetFileSize() > 0) { - edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetFileSize(), - meta.smallest, meta.largest, meta.smallest_seqno, - meta.largest_seqno); + edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(), + meta.fd.GetFileSize(), meta.smallest, meta.largest, + meta.smallest_seqno, meta.largest_seqno); } InternalStats::CompactionStats stats; @@ -1402,9 +1441,10 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd, mutex_.AssertHeld(); const uint64_t start_micros = env_->NowMicros(); FileMetaData meta; - meta.fd.number = versions_->NewFileNumber(); + + meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0); *filenumber = meta.fd.GetNumber(); - pending_outputs_.insert(meta.fd.GetNumber()); + pending_outputs_[meta.fd.GetNumber()] = 0; // path 0 for level 0 file. const SequenceNumber newest_snapshot = snapshots_.GetNewest(); const SequenceNumber earliest_seqno_in_memtable = @@ -1471,9 +1511,9 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd, cfd->options()->compaction_style == kCompactionStyleLevel) { level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); } - edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetFileSize(), - meta.smallest, meta.largest, meta.smallest_seqno, - meta.largest_seqno); + edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(), + meta.fd.GetFileSize(), meta.smallest, meta.largest, + meta.smallest_seqno, meta.largest_seqno); } InternalStats::CompactionStats stats; @@ -1529,7 +1569,7 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd, // Replace immutable memtable with the generated Table s = cfd->imm()->InstallMemtableFlushResults( cfd, mems, versions_.get(), &mutex_, options_.info_log.get(), - file_number, pending_outputs_, &deletion_state.memtables_to_free, + file_number, &pending_outputs_, &deletion_state.memtables_to_free, db_directory_.get(), log_buffer); } @@ -1673,9 +1713,9 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { edit.SetColumnFamily(cfd->GetID()); for (const auto& f : cfd->current()->files_[level]) { edit.DeleteFile(level, f->fd.GetNumber()); - edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetFileSize(), - f->smallest, f->largest, f->smallest_seqno, - f->largest_seqno); + edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(), + f->fd.GetFileSize(), f->smallest, f->largest, + f->smallest_seqno, f->largest_seqno); } Log(options_.info_log, "[%s] Apply version edit:\n%s", cfd->GetName().c_str(), edit.DebugString().data()); @@ -2172,9 +2212,9 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, assert(c->num_input_files(0) == 1); FileMetaData* f = c->input(0, 0); c->edit()->DeleteFile(c->level(), f->fd.GetNumber()); - c->edit()->AddFile(c->level() + 1, f->fd.GetNumber(), f->fd.GetFileSize(), - f->smallest, f->largest, f->smallest_seqno, - f->largest_seqno); + c->edit()->AddFile(c->level() + 1, f->fd.GetNumber(), f->fd.GetPathId(), + f->fd.GetFileSize(), f->smallest, f->largest, + f->smallest_seqno, f->largest_seqno); status = versions_->LogAndApply(c->column_family_data(), c->edit(), &mutex_, db_directory_.get()); InstallSuperVersion(c->column_family_data(), deletion_state); @@ -2280,7 +2320,7 @@ void DBImpl::AllocateCompactionOutputFileNumbers(CompactionState* compact) { int filesNeeded = compact->compaction->num_input_files(1); for (int i = 0; i < std::max(filesNeeded, 1); i++) { uint64_t file_number = versions_->NewFileNumber(); - pending_outputs_.insert(file_number); + pending_outputs_[file_number] = compact->compaction->GetOutputPathId(); compact->allocated_file_numbers.push_back(file_number); } } @@ -2306,18 +2346,20 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { } else { mutex_.Lock(); file_number = versions_->NewFileNumber(); - pending_outputs_.insert(file_number); + pending_outputs_[file_number] = compact->compaction->GetOutputPathId(); mutex_.Unlock(); } CompactionState::Output out; out.number = file_number; + out.path_id = compact->compaction->GetOutputPathId(); out.smallest.Clear(); out.largest.Clear(); out.smallest_seqno = out.largest_seqno = 0; compact->outputs.push_back(out); // Make the output file - std::string fname = TableFileName(dbname_, file_number); + std::string fname = TableFileName(options_.db_paths, file_number, + compact->compaction->GetOutputPathId()); Status s = env_->NewWritableFile(fname, &compact->outfile, storage_options_); if (s.ok()) { @@ -2340,6 +2382,7 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, assert(compact->builder != nullptr); const uint64_t output_number = compact->current_output()->number; + const uint32_t output_path_id = compact->current_output()->path_id; assert(output_number != 0); // Check for iterator errors @@ -2375,9 +2418,9 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, if (s.ok() && current_entries > 0) { // Verify that the table is usable ColumnFamilyData* cfd = compact->compaction->column_family_data(); - FileDescriptor meta(output_number, current_bytes); + FileDescriptor fd(output_number, output_path_id, current_bytes); Iterator* iter = cfd->table_cache()->NewIterator( - ReadOptions(), storage_options_, cfd->internal_comparator(), meta); + ReadOptions(), storage_options_, cfd->internal_comparator(), fd); s = iter->status(); delete iter; if (s.ok()) { @@ -2420,9 +2463,10 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact, compact->compaction->AddInputDeletions(compact->compaction->edit()); for (size_t i = 0; i < compact->outputs.size(); i++) { const CompactionState::Output& out = compact->outputs[i]; - compact->compaction->edit()->AddFile( - compact->compaction->output_level(), out.number, out.file_size, - out.smallest, out.largest, out.smallest_seqno, out.largest_seqno); + compact->compaction->edit()->AddFile(compact->compaction->output_level(), + out.number, out.path_id, out.file_size, + out.smallest, out.largest, + out.smallest_seqno, out.largest_seqno); } return versions_->LogAndApply(compact->compaction->column_family_data(), compact->compaction->edit(), &mutex_, @@ -4118,7 +4162,7 @@ Status DBImpl::MakeRoomForWrite( // how do we fail if we're not creating new log? assert(creating_new_log); // Avoid chewing through file number space in a tight loop. - versions_->ReuseFileNumber(new_log_number); + versions_->ReuseLogFileNumber(new_log_number); assert(!new_mem); assert(!new_log); break; @@ -4361,14 +4405,15 @@ Status DBImpl::CheckConsistency() { std::string corruption_messages; for (const auto& md : metadata) { - std::string file_path = dbname_ + md.name; + std::string file_path = md.db_path + "/" + md.name; + uint64_t fsize = 0; Status s = env_->GetFileSize(file_path, &fsize); if (!s.ok()) { corruption_messages += "Can't access " + md.name + ": " + s.ToString() + "\n"; } else if (fsize != md.size) { - corruption_messages += "Sst file size mismatch: " + md.name + + corruption_messages += "Sst file size mismatch: " + file_path + ". Size recorded in manifest " + std::to_string(md.size) + ", actual size " + std::to_string(fsize) + "\n"; @@ -4466,6 +4511,11 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { Status DB::Open(const DBOptions& db_options, const std::string& dbname, const std::vector& column_families, std::vector* handles, DB** dbptr) { + if (db_options.db_paths.size() > 1) { + return Status::NotSupported( + "More than one DB paths are not supported yet. "); + } + *dbptr = nullptr; handles->clear(); @@ -4481,6 +4531,15 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, DBImpl* impl = new DBImpl(db_options, dbname); Status s = impl->env_->CreateDirIfMissing(impl->options_.wal_dir); + if (s.ok()) { + for (auto path : impl->options_.db_paths) { + s = impl->env_->CreateDirIfMissing(path); + if (!s.ok()) { + break; + } + } + } + if (!s.ok()) { delete impl; return s; @@ -4643,6 +4702,21 @@ Status DestroyDB(const std::string& dbname, const Options& options) { } } + for (auto db_path : options.db_paths) { + env->GetChildren(db_path, &filenames); + uint64_t number; + FileType type; + for (size_t i = 0; i < filenames.size(); i++) { + if (ParseFileName(filenames[i], &number, &type) && + type == kTableFile) { // Lock file will be deleted at end + Status del = env->DeleteFile(db_path + "/" + filenames[i]); + if (result.ok() && !del.ok()) { + result = del; + } + } + } + } + env->GetChildren(archivedir, &archiveFiles); // Delete archival files. for (size_t i = 0; i < archiveFiles.size(); ++i) { diff --git a/db/db_impl.h b/db/db_impl.h index 5d171b57d..fb0bdb4af 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -198,6 +198,17 @@ class DBImpl : public DB { Status TEST_ReadFirstLine(const std::string& fname, SequenceNumber* sequence); #endif // NDEBUG + // Structure to store information for candidate files to delete. + struct CandidateFileInfo { + std::string file_name; + uint32_t path_id; + CandidateFileInfo(std::string name, uint32_t path) + : file_name(name), path_id(path) {} + bool operator==(const CandidateFileInfo& other) const { + return file_name == other.file_name && path_id == other.path_id; + } + }; + // needed for CleanupIteratorState struct DeletionState { inline bool HaveSomethingToDelete() const { @@ -209,10 +220,10 @@ class DBImpl : public DB { // a list of all files that we'll consider deleting // (every once in a while this is filled up with all files // in the DB directory) - std::vector candidate_files; + std::vector candidate_files; // the list of all live sst files that cannot be deleted - std::vector sst_live; + std::vector sst_live; // a list of sst files that we need to delete std::vector sst_delete_files; @@ -501,7 +512,8 @@ class DBImpl : public DB { // Set of table files to protect from deletion because they are // part of ongoing compactions. - std::set pending_outputs_; + // map from pending file number ID to their path IDs. + FileNumToPathIdMap pending_outputs_; // At least one compaction or flush job is pending but not yet scheduled // because of the max background thread limit. diff --git a/db/db_test.cc b/db/db_test.cc index 6bc272744..8010eaa81 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -98,7 +98,12 @@ class AtomicCounter { count_ = 0; } }; +} // namespace anon +static std::string Key(int i) { + char buf[100]; + snprintf(buf, sizeof(buf), "key%06d", i); + return std::string(buf); } // Special Env used to delay background operations @@ -355,7 +360,10 @@ class DBTest { ~DBTest() { Close(); - ASSERT_OK(DestroyDB(dbname_, Options())); + Options options; + options.db_paths.push_back(dbname_); + options.db_paths.push_back(dbname_ + "_2"); + ASSERT_OK(DestroyDB(dbname_, options)); delete env_; delete filter_policy_; } @@ -897,6 +905,30 @@ class DBTest { return property; } + int GetSstFileCount(std::string path) { + std::vector files; + env_->GetChildren(path, &files); + + int sst_count = 0; + uint64_t number; + FileType type; + for (size_t i = 0; i < files.size(); i++) { + if (ParseFileName(files[i], &number, &type) && type == kTableFile) { + sst_count++; + } + } + return sst_count; + } + + void GenerateNewFile(Random* rnd, int* key_idx) { + for (int i = 0; i < 11; i++) { + ASSERT_OK(Put(Key(*key_idx), RandomString(rnd, (i == 10) ? 1 : 10000))); + (*key_idx)++; + } + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->TEST_WaitForCompact(); + } + std::string IterStatus(Iterator* iter) { std::string result; if (iter->Valid()) { @@ -1037,12 +1069,6 @@ class DBTest { }; -static std::string Key(int i) { - char buf[100]; - snprintf(buf, sizeof(buf), "key%06d", i); - return std::string(buf); -} - static long TestGetTickerCount(const Options& options, Tickers ticker_type) { return options.statistics->getTickerCount(ticker_type); } @@ -3434,6 +3460,13 @@ TEST(DBTest, UniversalCompactionCompressRatio2) { ASSERT_LT((int)dbfull()->TEST_GetLevel0TotalSize(), 120000 * 12 * 0.8 + 120000 * 2); } + +TEST(DBTest, FailMoreDbPaths) { + Options options; + options.db_paths.push_back(dbname_); + options.db_paths.push_back(dbname_ + "_2"); + ASSERT_TRUE(TryReopen(&options).IsNotSupported()); +} #endif TEST(DBTest, ConvertCompactionStyle) { diff --git a/db/filename.cc b/db/filename.cc index d19f0fd53..1c2be8ffb 100644 --- a/db/filename.cc +++ b/db/filename.cc @@ -11,6 +11,7 @@ #include #include +#include #include "db/dbformat.h" #include "rocksdb/env.h" #include "util/logging.h" @@ -66,9 +67,28 @@ std::string ArchivedLogFileName(const std::string& name, uint64_t number) { return MakeFileName(name + "/" + ARCHIVAL_DIR, number, "log"); } -std::string TableFileName(const std::string& name, uint64_t number) { +std::string MakeTableFileName(const std::string& path, uint64_t number) { + return MakeFileName(path, number, "sst"); +} + +std::string TableFileName(const std::vector db_paths, + uint64_t number, uint32_t path_id) { assert(number > 0); - return MakeFileName(name, number, "sst"); + std::string path; + if (path_id >= db_paths.size()) { + path = db_paths.back(); + } else { + path = db_paths[path_id]; + } + return MakeTableFileName(path, number); +} + +std::string FormatFileNumber(uint64_t number, uint32_t path_id) { + if (path_id == 0) { + return std::to_string(number); + } else { + return std::to_string(number) + "(path " + std::to_string(path_id) + ")"; + } } std::string DescriptorFileName(const std::string& dbname, uint64_t number) { diff --git a/db/filename.h b/db/filename.h index c4c306946..5db434e02 100644 --- a/db/filename.h +++ b/db/filename.h @@ -11,7 +11,9 @@ #pragma once #include +#include #include +#include #include "rocksdb/slice.h" #include "rocksdb/status.h" #include "rocksdb/transaction_log.h" @@ -34,6 +36,9 @@ enum FileType { kIdentityFile }; +// map from file number to path ID. +typedef std::unordered_map FileNumToPathIdMap; + // Return the name of the log file with the specified number // in the db named by "dbname". The result will be prefixed with // "dbname". @@ -48,10 +53,15 @@ extern std::string ArchivalDirectory(const std::string& dbname); extern std::string ArchivedLogFileName(const std::string& dbname, uint64_t num); +extern std::string MakeTableFileName(const std::string& name, uint64_t number); + // Return the name of the sstable with the specified number // in the db named by "dbname". The result will be prefixed with // "dbname". -extern std::string TableFileName(const std::string& dbname, uint64_t number); +extern std::string TableFileName(const std::vector db_paths, + uint64_t number, uint32_t path_id); + +extern std::string FormatFileNumber(uint64_t number, uint32_t path_id); // Return the name of the descriptor file for the db named by // "dbname" and the specified incarnation number. The result will be diff --git a/db/filename_test.cc b/db/filename_test.cc index 0baa7fdae..c86d16f34 100644 --- a/db/filename_test.cc +++ b/db/filename_test.cc @@ -108,7 +108,9 @@ TEST(FileNameTest, Construction) { ASSERT_EQ(192U, number); ASSERT_EQ(kLogFile, type); - fname = TableFileName("bar", 200); + fname = TableFileName({"bar"}, 200, 0); + std::string fname1 = TableFileName({"foo", "bar"}, 200, 1); + ASSERT_EQ(fname, fname1); ASSERT_EQ("bar/", std::string(fname.data(), 4)); ASSERT_TRUE(ParseFileName(fname.c_str() + 4, &number, &type)); ASSERT_EQ(200U, number); diff --git a/db/log_and_apply_bench.cc b/db/log_and_apply_bench.cc index ab9716deb..dbb459c30 100644 --- a/db/log_and_apply_bench.cc +++ b/db/log_and_apply_bench.cc @@ -51,7 +51,7 @@ void BM_LogAndApply(int iters, int num_base_files) { for (int i = 0; i < num_base_files; i++) { InternalKey start(MakeKey(2 * fnum), 1, kTypeValue); InternalKey limit(MakeKey(2 * fnum + 1), 1, kTypeDeletion); - vbase.AddFile(2, ++fnum, 1 /* file size */, start, limit, 1, 1); + vbase.AddFile(2, ++fnum, 0, 1 /* file size */, start, limit, 1, 1); } ASSERT_OK(vset->LogAndApply(default_cfd, &vbase, &mu)); } @@ -61,7 +61,7 @@ void BM_LogAndApply(int iters, int num_base_files) { vedit.DeleteFile(2, fnum); InternalKey start(MakeKey(2 * fnum), 1, kTypeValue); InternalKey limit(MakeKey(2 * fnum + 1), 1, kTypeDeletion); - vedit.AddFile(2, ++fnum, 1 /* file size */, start, limit, 1, 1); + vedit.AddFile(2, ++fnum, 0, 1 /* file size */, start, limit, 1, 1); vset->LogAndApply(default_cfd, &vedit, &mu); } } diff --git a/db/memtable_list.cc b/db/memtable_list.cc index de1a18eee..d3fc1356b 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -140,7 +140,7 @@ void MemTableList::PickMemtablesToFlush(autovector* ret) { void MemTableList::RollbackMemtableFlush(const autovector& mems, uint64_t file_number, - std::set* pending_outputs) { + FileNumToPathIdMap* pending_outputs) { assert(!mems.empty()); // If the flush was not successful, then just reset state. @@ -162,7 +162,7 @@ void MemTableList::RollbackMemtableFlush(const autovector& mems, Status MemTableList::InstallMemtableFlushResults( ColumnFamilyData* cfd, const autovector& mems, VersionSet* vset, port::Mutex* mu, Logger* info_log, uint64_t file_number, - std::set& pending_outputs, autovector* to_delete, + FileNumToPathIdMap* pending_outputs, autovector* to_delete, Directory* db_directory, LogBuffer* log_buffer) { mu->AssertHeld(); @@ -219,7 +219,7 @@ Status MemTableList::InstallMemtableFlushResults( // has been written to a committed version so that other concurrently // executing compaction threads do not mistakenly assume that this // file is not live. - pending_outputs.erase(m->file_number_); + pending_outputs->erase(m->file_number_); if (m->Unref() != nullptr) { to_delete->push_back(m); } @@ -233,7 +233,7 @@ Status MemTableList::InstallMemtableFlushResults( m->flush_in_progress_ = false; m->edit_.Clear(); num_flush_not_started_++; - pending_outputs.erase(m->file_number_); + pending_outputs->erase(m->file_number_); m->file_number_ = 0; imm_flush_needed.Release_Store((void *)1); } diff --git a/db/memtable_list.h b/db/memtable_list.h index e56710fc9..f4923e831 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -15,6 +15,7 @@ #include "rocksdb/iterator.h" #include "db/dbformat.h" +#include "db/filename.h" #include "db/skiplist.h" #include "db/memtable.h" #include "rocksdb/db.h" @@ -108,17 +109,14 @@ class MemTableList { // they can get picked up again on the next round of flush. void RollbackMemtableFlush(const autovector& mems, uint64_t file_number, - std::set* pending_outputs); + FileNumToPathIdMap* pending_outputs); // Commit a successful flush in the manifest file - Status InstallMemtableFlushResults(ColumnFamilyData* cfd, - const autovector& m, - VersionSet* vset, port::Mutex* mu, - Logger* info_log, uint64_t file_number, - std::set& pending_outputs, - autovector* to_delete, - Directory* db_directory, - LogBuffer* log_buffer); + Status InstallMemtableFlushResults( + ColumnFamilyData* cfd, const autovector& m, VersionSet* vset, + port::Mutex* mu, Logger* info_log, uint64_t file_number, + FileNumToPathIdMap* pending_outputs, autovector* to_delete, + Directory* db_directory, LogBuffer* log_buffer); // New memtables are inserted at the front of the list. // Takes ownership of the referenced held on *m by the caller of Add(). diff --git a/db/repair.cc b/db/repair.cc index 13959a920..12c275c3e 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -65,8 +65,8 @@ class Repairer { NewLRUCache(10, options_.table_cache_numshardbits, options_.table_cache_remove_scan_count_limit)), next_file_number_(1) { - table_cache_ = new TableCache(dbname_, &options_, storage_options_, - raw_table_cache_.get()); + table_cache_ = + new TableCache(&options_, storage_options_, raw_table_cache_.get()); edit_ = new VersionEdit(); } @@ -116,7 +116,7 @@ class Repairer { VersionEdit* edit_; std::vector manifests_; - std::vector table_numbers_; + std::vector table_fds_; std::vector logs_; std::vector tables_; uint64_t next_file_number_; @@ -124,35 +124,43 @@ class Repairer { Status FindFiles() { std::vector filenames; - Status status = env_->GetChildren(dbname_, &filenames); - if (!status.ok()) { - return status; - } - if (filenames.empty()) { - return Status::Corruption(dbname_, "repair found no files"); - } + bool found_file = false; + for (uint32_t path_id = 0; path_id < options_.db_paths.size(); path_id++) { + Status status = env_->GetChildren(options_.db_paths[path_id], &filenames); + if (!status.ok()) { + return status; + } + if (!filenames.empty()) { + found_file = true; + } - uint64_t number; - FileType type; - for (size_t i = 0; i < filenames.size(); i++) { - if (ParseFileName(filenames[i], &number, &type)) { - if (type == kDescriptorFile) { - manifests_.push_back(filenames[i]); - } else { - if (number + 1 > next_file_number_) { - next_file_number_ = number + 1; - } - if (type == kLogFile) { - logs_.push_back(number); - } else if (type == kTableFile) { - table_numbers_.push_back(number); + uint64_t number; + FileType type; + for (size_t i = 0; i < filenames.size(); i++) { + if (ParseFileName(filenames[i], &number, &type)) { + if (type == kDescriptorFile) { + assert(path_id == 0); + manifests_.push_back(filenames[i]); } else { - // Ignore other files + if (number + 1 > next_file_number_) { + next_file_number_ = number + 1; + } + if (type == kLogFile) { + assert(path_id == 0); + logs_.push_back(number); + } else if (type == kTableFile) { + table_fds_.emplace_back(number, path_id, 0); + } else { + // Ignore other files + } } } } } - return status; + if (!found_file) { + return Status::Corruption(dbname_, "repair found no files"); + } + return Status::OK(); } void ConvertLogFilesToTables() { @@ -228,7 +236,7 @@ class Repairer { // Do not record a version edit for this conversion to a Table // since ExtractMetaData() will also generate edits. FileMetaData meta; - meta.fd.number = next_file_number_++; + meta.fd = FileDescriptor(next_file_number_++, 0, 0); ReadOptions ro; Iterator* iter = mem->NewIterator(ro, true /* enforce_total_order */); status = BuildTable(dbname_, env_, options_, storage_options_, table_cache_, @@ -239,7 +247,7 @@ class Repairer { mem = nullptr; if (status.ok()) { if (meta.fd.GetFileSize() > 0) { - table_numbers_.push_back(meta.fd.GetNumber()); + table_fds_.push_back(meta.fd); } } Log(options_.info_log, @@ -249,14 +257,17 @@ class Repairer { } void ExtractMetaData() { - for (size_t i = 0; i < table_numbers_.size(); i++) { + for (size_t i = 0; i < table_fds_.size(); i++) { TableInfo t; - t.meta.fd.number = table_numbers_[i]; + t.meta.fd = table_fds_[i]; Status status = ScanTable(&t); if (!status.ok()) { - std::string fname = TableFileName(dbname_, table_numbers_[i]); - Log(options_.info_log, "Table #%" PRIu64 ": ignoring %s", - table_numbers_[i], status.ToString().c_str()); + std::string fname = TableFileName( + options_.db_paths, t.meta.fd.GetNumber(), t.meta.fd.GetPathId()); + Log(options_.info_log, "Table #%s: ignoring %s", + FormatFileNumber(t.meta.fd.GetNumber(), t.meta.fd.GetPathId()) + .c_str(), + status.ToString().c_str()); ArchiveFile(fname); } else { tables_.push_back(t); @@ -265,9 +276,13 @@ class Repairer { } Status ScanTable(TableInfo* t) { - std::string fname = TableFileName(dbname_, t->meta.fd.GetNumber()); + std::string fname = TableFileName(options_.db_paths, t->meta.fd.GetNumber(), + t->meta.fd.GetPathId()); int counter = 0; - Status status = env_->GetFileSize(fname, &t->meta.fd.file_size); + uint64_t file_size; + Status status = env_->GetFileSize(fname, &file_size); + t->meta.fd = FileDescriptor(t->meta.fd.GetNumber(), t->meta.fd.GetPathId(), + file_size); if (status.ok()) { Iterator* iter = table_cache_->NewIterator( ReadOptions(), storage_options_, icmp_, t->meta.fd); @@ -330,9 +345,9 @@ class Repairer { for (size_t i = 0; i < tables_.size(); i++) { // TODO(opt): separate out into multiple levels const TableInfo& t = tables_[i]; - edit_->AddFile(0, t.meta.fd.GetNumber(), t.meta.fd.GetFileSize(), - t.meta.smallest, t.meta.largest, t.min_sequence, - t.max_sequence); + edit_->AddFile(0, t.meta.fd.GetNumber(), t.meta.fd.GetPathId(), + t.meta.fd.GetFileSize(), t.meta.smallest, t.meta.largest, + t.min_sequence, t.max_sequence); } //fprintf(stderr, "NewDescriptor:\n%s\n", edit_.DebugString().c_str()); diff --git a/db/table_cache.cc b/db/table_cache.cc index 7a7513026..bd359f96d 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -36,10 +36,10 @@ static Slice GetSliceForFileNumber(const uint64_t* file_number) { sizeof(*file_number)); } -TableCache::TableCache(const std::string& dbname, const Options* options, +TableCache::TableCache(const Options* options, const EnvOptions& storage_options, Cache* const cache) : env_(options->env), - dbname_(dbname), + db_paths_(options->db_paths), options_(options), storage_options_(storage_options), cache_(cache) {} @@ -60,13 +60,15 @@ Status TableCache::FindTable(const EnvOptions& toptions, const FileDescriptor& fd, Cache::Handle** handle, const bool no_io) { Status s; - Slice key = GetSliceForFileNumber(&fd.number); + uint64_t number = fd.GetNumber(); + Slice key = GetSliceForFileNumber(&number); *handle = cache_->Lookup(key); if (*handle == nullptr) { if (no_io) { // Dont do IO and return a not-found status return Status::Incomplete("Table not found in table_cache, no_io is set"); } - std::string fname = TableFileName(dbname_, fd.GetNumber()); + std::string fname = + TableFileName(db_paths_, fd.GetNumber(), fd.GetPathId()); unique_ptr file; unique_ptr table_reader; s = env_->NewRandomAccessFile(fname, &file, toptions); diff --git a/db/table_cache.h b/db/table_cache.h index e912addc1..eaadc07da 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -11,6 +11,7 @@ #pragma once #include +#include #include #include "db/dbformat.h" @@ -28,8 +29,8 @@ struct FileDescriptor; class TableCache { public: - TableCache(const std::string& dbname, const Options* options, - const EnvOptions& storage_options, Cache* cache); + TableCache(const Options* options, const EnvOptions& storage_options, + Cache* cache); ~TableCache(); // Return an iterator for the specified file number (the corresponding @@ -84,7 +85,7 @@ class TableCache { private: Env* const env_; - const std::string dbname_; + const std::vector db_paths_; const Options* options_; const EnvOptions& storage_options_; Cache* const cache_; diff --git a/db/version_edit.cc b/db/version_edit.cc index c2b4928e0..4e2cf8f5b 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -18,25 +18,30 @@ namespace rocksdb { // Tag numbers for serialized VersionEdit. These numbers are written to // disk and should not be changed. enum Tag { - kComparator = 1, - kLogNumber = 2, - kNextFileNumber = 3, - kLastSequence = 4, - kCompactPointer = 5, - kDeletedFile = 6, - kNewFile = 7, + kComparator = 1, + kLogNumber = 2, + kNextFileNumber = 3, + kLastSequence = 4, + kCompactPointer = 5, + kDeletedFile = 6, + kNewFile = 7, // 8 was used for large value refs - kPrevLogNumber = 9, + kPrevLogNumber = 9, // these are new formats divergent from open source leveldb - kNewFile2 = 100, // store smallest & largest seqno - - kColumnFamily = 200, // specify column family for version edit - kColumnFamilyAdd = 201, - kColumnFamilyDrop = 202, - kMaxColumnFamily = 203, + kNewFile2 = 100, + kNewFile3 = 102, + kColumnFamily = 200, // specify column family for version edit + kColumnFamilyAdd = 201, + kColumnFamilyDrop = 202, + kMaxColumnFamily = 203, }; +uint64_t PackFileNumberAndPathId(uint64_t number, uint64_t path_id) { + assert(number <= kFileNumberMask); + return number | (path_id * (kFileNumberMask + 1)); +} + void VersionEdit::Clear() { comparator_.clear(); max_level_ = 0; @@ -93,9 +98,18 @@ void VersionEdit::EncodeTo(std::string* dst) const { for (size_t i = 0; i < new_files_.size(); i++) { const FileMetaData& f = new_files_[i].second; - PutVarint32(dst, kNewFile2); + if (f.fd.GetPathId() == 0) { + // Use older format to make sure user can roll back the build if they + // don't config multiple DB paths. + PutVarint32(dst, kNewFile2); + } else { + PutVarint32(dst, kNewFile3); + } PutVarint32(dst, new_files_[i].first); // level PutVarint64(dst, f.fd.GetNumber()); + if (f.fd.GetPathId() != 0) { + PutVarint32(dst, f.fd.GetPathId()); + } PutVarint64(dst, f.fd.GetFileSize()); PutLengthPrefixedSlice(dst, f.smallest.Encode()); PutLengthPrefixedSlice(dst, f.largest.Encode()); @@ -237,7 +251,7 @@ Status VersionEdit::DecodeFrom(const Slice& src) { GetVarint64(&input, &file_size) && GetInternalKey(&input, &f.smallest) && GetInternalKey(&input, &f.largest)) { - f.fd = FileDescriptor(number, file_size); + f.fd = FileDescriptor(number, 0, file_size); new_files_.push_back(std::make_pair(level, f)); } else { if (!msg) { @@ -255,7 +269,27 @@ Status VersionEdit::DecodeFrom(const Slice& src) { GetInternalKey(&input, &f.largest) && GetVarint64(&input, &f.smallest_seqno) && GetVarint64(&input, &f.largest_seqno)) { - f.fd = FileDescriptor(number, file_size); + f.fd = FileDescriptor(number, 0, file_size); + new_files_.push_back(std::make_pair(level, f)); + } else { + if (!msg) { + msg = "new-file2 entry"; + } + } + break; + } + + case kNewFile3: { + uint64_t number; + uint32_t path_id; + uint64_t file_size; + if (GetLevel(&input, &level, &msg) && GetVarint64(&input, &number) && + GetVarint32(&input, &path_id) && GetVarint64(&input, &file_size) && + GetInternalKey(&input, &f.smallest) && + GetInternalKey(&input, &f.largest) && + GetVarint64(&input, &f.smallest_seqno) && + GetVarint64(&input, &f.largest_seqno)) { + f.fd = FileDescriptor(number, path_id, file_size); new_files_.push_back(std::make_pair(level, f)); } else { if (!msg) { diff --git a/db/version_edit.h b/db/version_edit.h index d6e62fc8c..50a24ea2d 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -19,21 +19,41 @@ namespace rocksdb { class VersionSet; +const uint64_t kFileNumberMask = 0x3FFFFFFFFFFFFFFF; + +extern uint64_t PackFileNumberAndPathId(uint64_t number, uint64_t path_id); + // A copyable structure contains information needed to read data from an SST // file. It can contains a pointer to a table reader opened for the file, or // file number and size, which can be used to create a new table reader for it. // The behavior is undefined when a copied of the structure is used when the // file is not in any live version any more. struct FileDescriptor { - uint64_t number; - uint64_t file_size; // File size in bytes // Table reader in table_reader_handle TableReader* table_reader; + uint64_t packed_number_and_path_id; + uint64_t file_size; // File size in bytes + + FileDescriptor() : FileDescriptor(0, 0, 0) {} - FileDescriptor(uint64_t number, uint64_t file_size) - : number(number), file_size(file_size), table_reader(nullptr) {} + FileDescriptor(uint64_t number, uint32_t path_id, uint64_t file_size) + : table_reader(nullptr), + packed_number_and_path_id(PackFileNumberAndPathId(number, path_id)), + file_size(file_size) {} - uint64_t GetNumber() const { return number; } + FileDescriptor& operator=(const FileDescriptor& fd) { + table_reader = fd.table_reader; + packed_number_and_path_id = fd.packed_number_and_path_id; + file_size = fd.file_size; + return *this; + } + + uint64_t GetNumber() const { + return packed_number_and_path_id & kFileNumberMask; + } + uint32_t GetPathId() const { + return packed_number_and_path_id / (kFileNumberMask + 1); + } uint64_t GetFileSize() const { return file_size; } }; @@ -58,7 +78,6 @@ struct FileMetaData { FileMetaData() : refs(0), - fd(0, 0), being_compacted(false), table_reader_handle(nullptr), compensated_file_size(0), @@ -103,15 +122,13 @@ class VersionEdit { // Add the specified file at the specified number. // REQUIRES: This version has not been saved (see VersionSet::SaveTo) // REQUIRES: "smallest" and "largest" are smallest and largest keys in file - void AddFile(int level, uint64_t file, - uint64_t file_size, - const InternalKey& smallest, - const InternalKey& largest, - const SequenceNumber& smallest_seqno, + void AddFile(int level, uint64_t file, uint64_t file_size, + uint64_t file_path_id, const InternalKey& smallest, + const InternalKey& largest, const SequenceNumber& smallest_seqno, const SequenceNumber& largest_seqno) { assert(smallest_seqno <= largest_seqno); FileMetaData f; - f.fd = FileDescriptor(file, file_size); + f.fd = FileDescriptor(file, file_size, file_path_id); f.smallest = smallest; f.largest = largest; f.smallest_seqno = smallest_seqno; diff --git a/db/version_edit_test.cc b/db/version_edit_test.cc index 7842b3263..850f242c1 100644 --- a/db/version_edit_test.cc +++ b/db/version_edit_test.cc @@ -30,11 +30,10 @@ TEST(VersionEditTest, EncodeDecode) { VersionEdit edit; for (int i = 0; i < 4; i++) { TestEncodeDecode(edit); - edit.AddFile(3, kBig + 300 + i, kBig + 400 + i, + edit.AddFile(3, kBig + 300 + i, kBig + 400 + i, 0, InternalKey("foo", kBig + 500 + i, kTypeValue), InternalKey("zoo", kBig + 600 + i, kTypeDeletion), - kBig + 500 + i, - kBig + 600 + i); + kBig + 500 + i, kBig + 600 + i); edit.DeleteFile(4, kBig + 700 + i); } diff --git a/db/version_set.cc b/db/version_set.cc index c54f0b591..b68923bc0 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -16,6 +16,7 @@ #include #include #include +#include #include #include "db/filename.h" @@ -171,7 +172,7 @@ class Version::LevelFileNumIterator : public Iterator { : icmp_(icmp), flist_(flist), index_(flist->size()), - current_value_(0, 0) { // Marks as invalid + current_value_(0, 0, 0) { // Marks as invalid } virtual bool Valid() const { return index_ < flist_->size(); @@ -276,7 +277,8 @@ Status Version::GetTableProperties(std::shared_ptr* tp, *fname, &file, vset_->storage_options_); } else { s = options->env->NewRandomAccessFile( - TableFileName(vset_->dbname_, file_meta->fd.GetNumber()), + TableFileName(vset_->options_->db_paths, file_meta->fd.GetNumber(), + file_meta->fd.GetPathId()), &file, vset_->storage_options_); } if (!s.ok()) { @@ -303,7 +305,9 @@ Status Version::GetTableProperties(std::shared_ptr* tp, Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) { for (int level = 0; level < num_levels_; level++) { for (const auto& file_meta : files_[level]) { - auto fname = TableFileName(vset_->dbname_, file_meta->fd.GetNumber()); + auto fname = + TableFileName(vset_->options_->db_paths, file_meta->fd.GetNumber(), + file_meta->fd.GetPathId()); // 1. If the table is already present in table cache, load table // properties from there. std::shared_ptr table_properties; @@ -1268,11 +1272,11 @@ int64_t Version::MaxNextLevelOverlappingBytes() { return result; } -void Version::AddLiveFiles(std::set* live) { +void Version::AddLiveFiles(std::vector* live) { for (int level = 0; level < NumberLevels(); level++) { const std::vector& files = files_[level]; for (const auto& file : files) { - live->insert(file->fd.GetNumber()); + live->push_back(file->fd); } } } @@ -1425,7 +1429,7 @@ class VersionSet::Builder { #endif } - void CheckConsistencyForDeletes(VersionEdit* edit, unsigned int number, + void CheckConsistencyForDeletes(VersionEdit* edit, uint64_t number, int level) { #ifndef NDEBUG // a file to be deleted better exist in the previous version @@ -1467,6 +1471,9 @@ class VersionSet::Builder { } } } + if (!found) { + fprintf(stderr, "not found %ld\n", number); + } assert(found); #endif } @@ -2160,17 +2167,15 @@ Status VersionSet::Recover( last_sequence_ = last_sequence; prev_log_number_ = prev_log_number; - Log(options_->info_log, "Recovered from manifest file:%s succeeded," + Log(options_->info_log, + "Recovered from manifest file:%s succeeded," "manifest_file_number is %lu, next_file_number is %lu, " "last_sequence is %lu, log_number is %lu," "prev_log_number is %lu," "max_column_family is %u\n", - manifest_filename.c_str(), - (unsigned long)manifest_file_number_, - (unsigned long)next_file_number_, - (unsigned long)last_sequence_, - (unsigned long)log_number, - (unsigned long)prev_log_number_, + manifest_filename.c_str(), (unsigned long)manifest_file_number_, + (unsigned long)next_file_number_, (unsigned long)last_sequence_, + (unsigned long)log_number, (unsigned long)prev_log_number_, column_family_set_->GetMaxColumnFamily()); for (auto cfd : *column_family_set_) { @@ -2557,9 +2562,9 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { for (int level = 0; level < cfd->NumberLevels(); level++) { for (const auto& f : cfd->current()->files_[level]) { - edit.AddFile(level, f->fd.GetNumber(), f->fd.GetFileSize(), - f->smallest, f->largest, f->smallest_seqno, - f->largest_seqno); + edit.AddFile(level, f->fd.GetNumber(), f->fd.GetPathId(), + f->fd.GetFileSize(), f->smallest, f->largest, + f->smallest_seqno, f->largest_seqno); } } edit.SetLogNumber(cfd->GetLogNumber()); @@ -2641,7 +2646,7 @@ uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) { return result; } -void VersionSet::AddLiveFiles(std::vector* live_list) { +void VersionSet::AddLiveFiles(std::vector* live_list) { // pre-calculate space requirement int64_t total_files = 0; for (auto cfd : *column_family_set_) { @@ -2663,7 +2668,7 @@ void VersionSet::AddLiveFiles(std::vector* live_list) { v = v->next_) { for (int level = 0; level < v->NumberLevels(); level++) { for (const auto& f : v->files_[level]) { - live_list->push_back(f->fd.GetNumber()); + live_list->push_back(f->fd); } } } @@ -2786,7 +2791,14 @@ void VersionSet::GetLiveFilesMetaData(std::vector* metadata) { for (const auto& file : cfd->current()->files_[level]) { LiveFileMetaData filemetadata; filemetadata.column_family_name = cfd->GetName(); - filemetadata.name = TableFileName("", file->fd.GetNumber()); + uint32_t path_id = file->fd.GetPathId(); + if (path_id < options_->db_paths.size()) { + filemetadata.db_path = options_->db_paths[path_id]; + } else { + assert(!options_->db_paths.empty()); + filemetadata.db_path = options_->db_paths.back(); + } + filemetadata.name = MakeTableFileName("", file->fd.GetNumber()); filemetadata.level = level; filemetadata.size = file->fd.GetFileSize(); filemetadata.smallestkey = file->smallest.user_key().ToString(); diff --git a/db/version_set.h b/db/version_set.h index 04f52a508..60e9383f8 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -188,7 +188,7 @@ class Version { int64_t MaxNextLevelOverlappingBytes(); // Add all files listed in the current version to *live. - void AddLiveFiles(std::set* live); + void AddLiveFiles(std::vector* live); // Return a human readable string that describes this version's contents. std::string DebugString(bool hex = false) const; @@ -399,7 +399,7 @@ class VersionSet { // Arrange to reuse "file_number" unless a newer file number has // already been allocated. // REQUIRES: "file_number" was returned by a call to NewFileNumber(). - void ReuseFileNumber(uint64_t file_number) { + void ReuseLogFileNumber(uint64_t file_number) { if (next_file_number_ == file_number + 1) { next_file_number_ = file_number; } @@ -440,7 +440,7 @@ class VersionSet { Iterator* MakeInputIterator(Compaction* c); // Add all files listed in any live version to *live. - void AddLiveFiles(std::vector* live_list); + void AddLiveFiles(std::vector* live_list); // Return the approximate offset in the database of the data for // "key" as of version "v". diff --git a/db/version_set_test.cc b/db/version_set_test.cc index ef48bf927..0c548e342 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -31,7 +31,7 @@ class FindFileTest { SequenceNumber smallest_seq = 100, SequenceNumber largest_seq = 100) { FileMetaData* f = new FileMetaData; - f->fd = FileDescriptor(files_.size() + 1, 0); + f->fd = FileDescriptor(files_.size() + 1, 0, 0); f->smallest = InternalKey(smallest, smallest_seq, kTypeValue); f->largest = InternalKey(largest, largest_seq, kTypeValue); files_.push_back(f); diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 33b443f40..b1cbbc25b 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -55,6 +55,7 @@ class Env; // Metadata associated with each SST file. struct LiveFileMetaData { std::string column_family_name; // Name of the column family + std::string db_path; std::string name; // Name of the file int level; // Level at which this file resides. size_t size; // File size in bytes. diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 658be3afc..df7383d25 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -675,6 +675,13 @@ struct DBOptions { // Default value is 1800 (half an hour). int db_stats_log_interval; + // A list paths where SST files can be put into. A compaction style can + // determine which of those paths it will put the file to. + // If left empty, only one path will be used, which is db_name passed when + // opening the DB. + // Default: empty + std::vector db_paths; + // This specifies the info LOG dir. // If it is empty, the log files will be in the same dir as data. // If it is non empty, the log files will be in the specified dir, diff --git a/include/rocksdb/universal_compaction.h b/include/rocksdb/universal_compaction.h index eaf47e5c7..229e50b25 100644 --- a/include/rocksdb/universal_compaction.h +++ b/include/rocksdb/universal_compaction.h @@ -8,6 +8,7 @@ #include #include +#include namespace rocksdb { @@ -61,6 +62,7 @@ class CompactionOptionsUniversal { // well as the total size of C1...Ct as total_C, the compaction output file // will be compressed iff // total_C / total_size < this percentage + // Default: -1 int compression_size_percent; // The algorithm used to stop picking files into a single compaction run @@ -68,14 +70,13 @@ class CompactionOptionsUniversal { CompactionStopStyle stop_style; // Default set of parameters - CompactionOptionsUniversal() : - size_ratio(1), - min_merge_width(2), - max_merge_width(UINT_MAX), - max_size_amplification_percent(200), - compression_size_percent(-1), - stop_style(kCompactionStopStyleTotalSize) { - } + CompactionOptionsUniversal() + : size_ratio(1), + min_merge_width(2), + max_merge_width(UINT_MAX), + max_size_amplification_percent(200), + compression_size_percent(-1), + stop_style(kCompactionStopStyleTotalSize) {} }; } // namespace rocksdb diff --git a/util/ldb_cmd.cc b/util/ldb_cmd.cc index e623e5278..41d8d6f47 100644 --- a/util/ldb_cmd.cc +++ b/util/ldb_cmd.cc @@ -286,6 +286,10 @@ Options LDBCommand::PrepareOptionsForOpenDB() { } } + if (opt.db_paths.size() == 0) { + opt.db_paths.push_back(db_path_); + } + return opt; } diff --git a/util/options.cc b/util/options.cc index 17dad0f25..88d26fa01 100644 --- a/util/options.cc +++ b/util/options.cc @@ -214,6 +214,7 @@ DBOptions::DBOptions(const Options& options) disableDataSync(options.disableDataSync), use_fsync(options.use_fsync), db_stats_log_interval(options.db_stats_log_interval), + db_paths(options.db_paths), db_log_dir(options.db_log_dir), wal_dir(options.wal_dir), delete_obsolete_files_period_micros( From 658068526081d0b97967ad015629986d81c7b6a1 Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Thu, 3 Jul 2014 10:22:08 -0700 Subject: [PATCH 13/13] Add TimedWait() API to CondVar. Summary: Add TimedWait() API to CondVar, which will be used in the future to support TimedOut Write API and Rate limiter. Test Plan: make db_test -j32 Reviewers: sdong, ljin Reviewed By: ljin Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D19431 --- port/port_posix.cc | 25 ++++++++++++++++++++++++- port/port_posix.h | 2 ++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/port/port_posix.cc b/port/port_posix.cc index 2ad10f58f..90dde3227 100644 --- a/port/port_posix.cc +++ b/port/port_posix.cc @@ -9,10 +9,12 @@ #include "port/port_posix.h" -#include #include #include +#include +#include #include +#include #include "util/logging.h" namespace rocksdb { @@ -83,6 +85,27 @@ void CondVar::Wait() { #endif } +bool CondVar::TimedWait(uint64_t abs_time_us) { + struct timespec ts; + ts.tv_sec = abs_time_us / 1000000; + ts.tv_nsec = (abs_time_us % 1000000) * 1000; + +#ifndef NDEBUG + mu_->locked_ = false; +#endif + int err = pthread_cond_timedwait(&cv_, &mu_->mu_, &ts); +#ifndef NDEBUG + mu_->locked_ = true; +#endif + if (err == ETIMEDOUT) { + return true; + } + if (err != 0) { + PthreadCall("timedwait", err); + } + return false; +} + void CondVar::Signal() { PthreadCall("signal", pthread_cond_signal(&cv_)); } diff --git a/port/port_posix.h b/port/port_posix.h index c2070c7cb..2e3c868b3 100644 --- a/port/port_posix.h +++ b/port/port_posix.h @@ -137,6 +137,8 @@ class CondVar { explicit CondVar(Mutex* mu); ~CondVar(); void Wait(); + // Timed condition wait. Returns true if timeout occurred. + bool TimedWait(uint64_t abs_time_us); void Signal(); void SignalAll(); private: