diff --git a/HISTORY.md b/HISTORY.md index a042ff3b1..cb25e8987 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -2,6 +2,9 @@ ## Unreleased +### New Features +* HashLinklist reduces performance outlier caused by skewed bucket by switching data in the bucket from linked list to skip list. Add parameter threshold_use_skiplist in NewHashLinkListRepFactory(). + ## 3.2.0 (06/20/2014) diff --git a/db/builder.cc b/db/builder.cc index 61890b5b6..3be61bd10 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -54,7 +54,8 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options, purge = false; } - std::string fname = TableFileName(dbname, meta->fd.GetNumber()); + std::string fname = TableFileName(options.db_paths, meta->fd.GetNumber(), + meta->fd.GetPathId()); if (iter->Valid()) { unique_ptr file; s = env->NewWritableFile(fname, &file, soptions); diff --git a/db/c_test.c b/db/c_test.c index 89380a08b..5220cd8a3 100644 --- a/db/c_test.c +++ b/db/c_test.c @@ -495,7 +495,7 @@ int main(int argc, char** argv) { rocksdb_filterpolicy_t* policy = rocksdb_filterpolicy_create_bloom(10); rocksdb_options_set_filter_policy(options, policy); rocksdb_options_set_prefix_extractor(options, rocksdb_slicetransform_create_fixed_prefix(3)); - rocksdb_options_set_hash_skip_list_rep(options, 50000, 4, 4); + rocksdb_options_set_hash_skip_list_rep(options, 5000, 4, 4); rocksdb_options_set_plain_table_factory(options, 4, 10, 0.75, 16); db = rocksdb_open(options, dbname, &err); diff --git a/db/column_family.cc b/db/column_family.cc index 2d7ac23ae..ec90872b8 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -224,8 +224,7 @@ ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id, if (dummy_versions != nullptr) { internal_stats_.reset(new InternalStats( options_.num_levels, db_options->env, db_options->statistics.get())); - table_cache_.reset( - new TableCache(dbname, &options_, storage_options, table_cache)); + table_cache_.reset(new TableCache(&options_, storage_options, table_cache)); if (options_.compaction_style == kCompactionStyleUniversal) { compaction_picker_.reset( new UniversalCompactionPicker(&options_, &internal_comparator_)); diff --git a/db/compaction.cc b/db/compaction.cc index 5d22d4484..4ed5374ac 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -29,7 +29,8 @@ static uint64_t TotalFileSize(const std::vector& files) { Compaction::Compaction(Version* input_version, int level, int out_level, uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes, - bool seek_compaction, bool enable_compression, + uint32_t output_path_id, + CompressionType output_compression, bool seek_compaction, bool deletion_compaction) : level_(level), out_level_(out_level), @@ -38,8 +39,9 @@ Compaction::Compaction(Version* input_version, int level, int out_level, input_version_(input_version), number_levels_(input_version_->NumberLevels()), cfd_(input_version_->cfd_), + output_path_id_(output_path_id), + output_compression_(output_compression), seek_compaction_(seek_compaction), - enable_compression_(enable_compression), deletion_compaction_(deletion_compaction), grandparent_index_(0), seen_key_(false), diff --git a/db/compaction.h b/db/compaction.h index d6f6f80b4..caf44d466 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -47,8 +47,11 @@ class Compaction { // Maximum size of files to build during this compaction. uint64_t MaxOutputFileSize() const { return max_output_file_size_; } - // Whether compression will be enabled for compaction outputs - bool enable_compression() const { return enable_compression_; } + // What compression for output + CompressionType OutputCompressionType() const { return output_compression_; } + + // Whether need to write output file to second DB path. + uint32_t GetOutputPathId() const { return output_path_id_; } // Is this a trivial compaction that can be implemented by just // moving a single input file to the next level (no merging or splitting) @@ -104,8 +107,8 @@ class Compaction { Compaction(Version* input_version, int level, int out_level, uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes, - bool seek_compaction = false, bool enable_compression = true, - bool deletion_compaction = false); + uint32_t output_path_id, CompressionType output_compression, + bool seek_compaction = false, bool deletion_compaction = false); int level_; int out_level_; // levels to which output files are stored @@ -116,8 +119,9 @@ class Compaction { int number_levels_; ColumnFamilyData* cfd_; + uint32_t output_path_id_; + CompressionType output_compression_; bool seek_compaction_; - bool enable_compression_; // if true, just delete files in inputs_[0] bool deletion_compaction_; diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index f5551f774..0c752184a 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -12,12 +12,38 @@ #define __STDC_FORMAT_MACROS #include #include +#include "db/filename.h" #include "util/log_buffer.h" #include "util/statistics.h" namespace rocksdb { namespace { +// Determine compression type, based on user options, level of the output +// file and whether compression is disabled. +// If enable_compression is false, then compression is always disabled no +// matter what the values of the other two parameters are. +// Otherwise, the compression type is determined based on options and level. +CompressionType GetCompressionType(const Options& options, int level, + const bool enable_compression = true) { + if (!enable_compression) { + // disable compression + return kNoCompression; + } + // If the use has specified a different compression level for each level, + // then pick the compresison for that level. + if (!options.compression_per_level.empty()) { + const int n = options.compression_per_level.size() - 1; + // It is possible for level_ to be -1; in that case, we use level + // 0's compression. This occurs mostly in backwards compatibility + // situations when the builder doesn't know what level the file + // belongs to. Likewise, if level_ is beyond the end of the + // specified compression levels, use the last value. + return options.compression_per_level[std::max(0, std::min(level, n))]; + } else { + return options.compression; + } +} uint64_t TotalCompensatedFileSize(const std::vector& files) { uint64_t sum = 0; @@ -345,7 +371,8 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level, } Compaction* c = new Compaction(version, input_level, output_level, MaxFileSizeForLevel(output_level), - MaxGrandParentOverlapBytes(input_level)); + MaxGrandParentOverlapBytes(input_level), 0, + GetCompressionType(*options_, output_level)); c->inputs_[0] = inputs; if (ExpandWhileOverlapping(c) == false) { @@ -465,7 +492,8 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(Version* version, assert(level >= 0); assert(level + 1 < NumberLevels()); c = new Compaction(version, level, level + 1, MaxFileSizeForLevel(level + 1), - MaxGrandParentOverlapBytes(level)); + MaxGrandParentOverlapBytes(level), 0, + GetCompressionType(*options_, level + 1)); c->score_ = score; // Pick the largest file in this level that is not already @@ -585,15 +613,9 @@ Compaction* UniversalCompactionPicker::PickCompaction(Version* version, newerfile = f; } - // The files are sorted from newest first to oldest last. - std::vector& file_by_time = c->input_version_->files_by_size_[level]; - // Is the earliest file part of this compaction? - int last_index = file_by_time[file_by_time.size()-1]; - FileMetaData* last_file = c->input_version_->files_[level][last_index]; - if (c->inputs_[0][c->inputs_[0].size()-1] == last_file) { - c->bottommost_level_ = true; - } + FileMetaData* last_file = c->input_version_->files_[level].back(); + c->bottommost_level_ = c->inputs_[0].back() == last_file; // update statistics MeasureTime(options_->statistics.get(), NUM_FILES_IN_SINGLE_COMPACTION, @@ -628,12 +650,12 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( options_->compaction_options_universal.max_merge_width; // The files are sorted from newest first to oldest last. - std::vector& file_by_time = version->files_by_size_[level]; + const auto& files = version->files_[level]; + FileMetaData* f = nullptr; bool done = false; int start_index = 0; unsigned int candidate_count = 0; - assert(file_by_time.size() == version->files_[level].size()); unsigned int max_files_to_compact = std::min(max_merge_width, max_number_of_files_to_compact); @@ -641,14 +663,13 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( // Considers a candidate file only if it is smaller than the // total size accumulated so far. - for (unsigned int loop = 0; loop < file_by_time.size(); loop++) { + for (unsigned int loop = 0; loop < files.size(); loop++) { candidate_count = 0; // Skip files that are already being compacted - for (f = nullptr; loop < file_by_time.size(); loop++) { - int index = file_by_time[loop]; - f = version->files_[level][index]; + for (f = nullptr; loop < files.size(); loop++) { + f = files[loop]; if (!f->being_compacted) { candidate_count = 1; @@ -664,17 +685,16 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( // first candidate to be compacted. uint64_t candidate_size = f != nullptr? f->compensated_file_size : 0; if (f != nullptr) { - LogToBuffer(log_buffer, - "[%s] Universal: Possible candidate file %" PRIu64 "[%d].", - version->cfd_->GetName().c_str(), f->fd.GetNumber(), loop); + LogToBuffer( + log_buffer, "[%s] Universal: Possible candidate file %s[%d].", + version->cfd_->GetName().c_str(), + FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId()).c_str(), loop); } // Check if the suceeding files need compaction. - for (unsigned int i = loop+1; - candidate_count < max_files_to_compact && i < file_by_time.size(); - i++) { - int index = file_by_time[i]; - FileMetaData* f = version->files_[level][index]; + for (unsigned int i = loop + 1; + candidate_count < max_files_to_compact && i < files.size(); i++) { + FileMetaData* f = files[i]; if (f->being_compacted) { break; } @@ -713,14 +733,14 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( break; } else { for (unsigned int i = loop; - i < loop + candidate_count && i < file_by_time.size(); i++) { - int index = file_by_time[i]; - FileMetaData* f = version->files_[level][index]; - LogToBuffer(log_buffer, - "[%s] Universal: Skipping file %" PRIu64 "[%d] " - "with size %" PRIu64 " (compensated size %" PRIu64 ") %d\n", - version->cfd_->GetName().c_str(), f->fd.GetNumber(), - i, f->fd.GetFileSize(), f->compensated_file_size, f->being_compacted); + i < loop + candidate_count && i < files.size(); i++) { + FileMetaData* f = files[i]; + LogToBuffer(log_buffer, "[%s] Universal: Skipping file %" PRIu64 + "[%d] with size %" PRIu64 + " (compensated size %" PRIu64 ") %d\n", + version->cfd_->GetName().c_str(), f->fd.GetNumber(), i, + f->fd.GetFileSize(), f->compensated_file_size, + f->being_compacted); } } } @@ -736,31 +756,29 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( if (ratio_to_compress >= 0) { uint64_t total_size = version->NumLevelBytes(level); uint64_t older_file_size = 0; - for (unsigned int i = file_by_time.size() - 1; i >= first_index_after; - i--) { - older_file_size += - version->files_[level][file_by_time[i]]->fd.GetFileSize(); + for (unsigned int i = files.size() - 1; + i >= first_index_after; i--) { + older_file_size += files[i]->fd.GetFileSize(); if (older_file_size * 100L >= total_size * (long) ratio_to_compress) { enable_compression = false; break; } } } - Compaction* c = - new Compaction(version, level, level, MaxFileSizeForLevel(level), - LLONG_MAX, false, enable_compression); + Compaction* c = new Compaction( + version, level, level, MaxFileSizeForLevel(level), LLONG_MAX, 0, + GetCompressionType(*options_, level, enable_compression)); c->score_ = score; for (unsigned int i = start_index; i < first_index_after; i++) { - int index = file_by_time[i]; - FileMetaData* f = c->input_version_->files_[level][index]; + FileMetaData* f = c->input_version_->files_[level][i]; c->inputs_[0].push_back(f); LogToBuffer(log_buffer, - "[%s] Universal: Picking file %" PRIu64 "[%d] " + "[%s] Universal: Picking file %s[%d] " "with size %" PRIu64 " (compensated size %" PRIu64 ")\n", version->cfd_->GetName().c_str(), - f->fd.GetNumber(), i, - f->fd.GetFileSize(), f->compensated_file_size); + FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId()).c_str(), + i, f->fd.GetFileSize(), f->compensated_file_size); } return c; } @@ -780,8 +798,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( max_size_amplification_percent; // The files are sorted from newest first to oldest last. - std::vector& file_by_time = version->files_by_size_[level]; - assert(file_by_time.size() == version->files_[level].size()); + const auto& files = version->files_[level]; unsigned int candidate_count = 0; uint64_t candidate_size = 0; @@ -789,38 +806,35 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( FileMetaData* f = nullptr; // Skip files that are already being compacted - for (unsigned int loop = 0; loop < file_by_time.size() - 1; loop++) { - int index = file_by_time[loop]; - f = version->files_[level][index]; + for (unsigned int loop = 0; loop < files.size() - 1; loop++) { + f = files[loop]; if (!f->being_compacted) { start_index = loop; // Consider this as the first candidate. break; } - LogToBuffer(log_buffer, - "[%s] Universal: skipping file %" PRIu64 "[%d] compacted %s", - version->cfd_->GetName().c_str(), f->fd.GetNumber(), loop, - " cannot be a candidate to reduce size amp.\n"); + LogToBuffer(log_buffer, "[%s] Universal: skipping file %s[%d] compacted %s", + version->cfd_->GetName().c_str(), + FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId()).c_str(), + loop, " cannot be a candidate to reduce size amp.\n"); f = nullptr; } if (f == nullptr) { return nullptr; // no candidate files } - LogToBuffer(log_buffer, - "[%s] Universal: First candidate file %" PRIu64 "[%d] %s", - version->cfd_->GetName().c_str(), f->fd.GetNumber(), start_index, - " to reduce size amp.\n"); + LogToBuffer(log_buffer, "[%s] Universal: First candidate file %s[%d] %s", + version->cfd_->GetName().c_str(), + FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId()).c_str(), + start_index, " to reduce size amp.\n"); // keep adding up all the remaining files - for (unsigned int loop = start_index; loop < file_by_time.size() - 1; - loop++) { - int index = file_by_time[loop]; - f = version->files_[level][index]; + for (unsigned int loop = start_index; loop < files.size() - 1; loop++) { + f = files[loop]; if (f->being_compacted) { LogToBuffer( - log_buffer, - "[%s] Universal: Possible candidate file %" PRIu64 "[%d] %s.", - version->cfd_->GetName().c_str(), f->fd.GetNumber(), loop, + log_buffer, "[%s] Universal: Possible candidate file %s[%d] %s.", + version->cfd_->GetName().c_str(), + FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId()).c_str(), loop, " is already being compacted. No size amp reduction possible.\n"); return nullptr; } @@ -832,8 +846,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( } // size of earliest file - int index = file_by_time[file_by_time.size() - 1]; - uint64_t earliest_file_size = version->files_[level][index]->fd.GetFileSize(); + uint64_t earliest_file_size = files.back()->fd.GetFileSize(); // size amplification = percentage of additional size if (candidate_size * 100 < ratio * earliest_file_size) { @@ -850,23 +863,22 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( "earliest-file-size %" PRIu64, version->cfd_->GetName().c_str(), candidate_size, earliest_file_size); } - assert(start_index >= 0 && start_index < file_by_time.size() - 1); + assert(start_index >= 0 && start_index < files.size() - 1); // create a compaction request // We always compact all the files, so always compress. Compaction* c = new Compaction(version, level, level, MaxFileSizeForLevel(level), - LLONG_MAX, false, true); + LLONG_MAX, 0, GetCompressionType(*options_, level)); c->score_ = score; - for (unsigned int loop = start_index; loop < file_by_time.size(); loop++) { - int index = file_by_time[loop]; - f = c->input_version_->files_[level][index]; + for (unsigned int loop = start_index; loop < files.size(); loop++) { + f = c->input_version_->files_[level][loop]; c->inputs_[0].push_back(f); LogToBuffer(log_buffer, "[%s] Universal: size amp picking file %" PRIu64 "[%d] " "with size %" PRIu64 " (compensated size %" PRIu64 ")", version->cfd_->GetName().c_str(), - f->fd.GetNumber(), index, + f->fd.GetNumber(), loop, f->fd.GetFileSize(), f->compensated_file_size); } return c; @@ -899,7 +911,7 @@ Compaction* FIFOCompactionPicker::PickCompaction(Version* version, return nullptr; } - Compaction* c = new Compaction(version, 0, 0, 0, 0, false, false, + Compaction* c = new Compaction(version, 0, 0, 0, 0, 0, kNoCompression, false, true /* is deletion compaction */); // delete old files (FIFO) for (auto ritr = version->files_[0].rbegin(); diff --git a/db/db_filesnapshot.cc b/db/db_filesnapshot.cc index 582355ccd..5286ca782 100644 --- a/db/db_filesnapshot.cc +++ b/db/db_filesnapshot.cc @@ -98,7 +98,7 @@ Status DBImpl::GetLiveFiles(std::vector& ret, } // Make a set of all of the live *.sst files - std::set live; + std::vector live; for (auto cfd : *versions_->GetColumnFamilySet()) { cfd->current()->AddLiveFiles(&live); } @@ -109,7 +109,7 @@ Status DBImpl::GetLiveFiles(std::vector& ret, // create names of the live files. The names are not absolute // paths, instead they are relative to dbname_; for (auto live_file : live) { - ret.push_back(TableFileName("", live_file)); + ret.push_back(MakeTableFileName("", live_file.GetNumber())); } ret.push_back(CurrentFileName("")); diff --git a/db/db_impl.cc b/db/db_impl.cc index a6a622849..ce1cf78ff 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -98,6 +98,7 @@ struct DBImpl::CompactionState { // Files produced by compaction struct Output { uint64_t number; + uint32_t path_id; uint64_t file_size; InternalKey smallest, largest; SequenceNumber smallest_seqno, largest_seqno; @@ -294,30 +295,14 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) { result.wal_dir = result.wal_dir.substr(0, result.wal_dir.size() - 1); } - return result; -} - -CompressionType GetCompressionType(const Options& options, int level, - const bool enable_compression) { - if (!enable_compression) { - // disable compression - return kNoCompression; - } - // If the use has specified a different compression level for each level, - // then pick the compresison for that level. - if (!options.compression_per_level.empty()) { - const int n = options.compression_per_level.size() - 1; - // It is possible for level_ to be -1; in that case, we use level - // 0's compression. This occurs mostly in backwards compatibility - // situations when the builder doesn't know what level the file - // belongs to. Likewise, if level_ is beyond the end of the - // specified compression levels, use the last value. - return options.compression_per_level[std::max(0, std::min(level, n))]; - } else { - return options.compression; + if (result.db_paths.size() == 0) { + result.db_paths.push_back(dbname); } + + return result; } +namespace { CompressionType GetCompressionFlush(const Options& options) { // Compressing memtable flushes might not help unless the sequential load // optimization is used for leveled compaction. Otherwise the CPU and @@ -325,12 +310,13 @@ CompressionType GetCompressionFlush(const Options& options) { bool can_compress; - if (options.compaction_style == kCompactionStyleUniversal) { + if (options.compaction_style == kCompactionStyleUniversal) { can_compress = (options.compaction_options_universal.compression_size_percent < 0); } else { // For leveled compress when min_level_to_compress == 0. - can_compress = (GetCompressionType(options, 0, true) != kNoCompression); + can_compress = options.compression_per_level.empty() || + options.compression_per_level[0] != kNoCompression; } if (can_compress) { @@ -339,6 +325,7 @@ CompressionType GetCompressionFlush(const Options& options) { return kNoCompression; } } +} // namespace DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) : env_(options.env), @@ -591,30 +578,48 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state, } // don't delete live files - deletion_state.sst_live.assign(pending_outputs_.begin(), - pending_outputs_.end()); + for (auto pair : pending_outputs_) { + deletion_state.sst_live.emplace_back(pair.first, pair.second, 0); + } + /* deletion_state.sst_live.insert(pending_outputs_.begin(), + pending_outputs_.end());*/ versions_->AddLiveFiles(&deletion_state.sst_live); if (doing_the_full_scan) { - // set of all files in the directory. We'll exclude files that are still - // alive in the subsequent processings. - env_->GetChildren( - dbname_, &deletion_state.candidate_files - ); // Ignore errors + for (uint32_t path_id = 0; path_id < options_.db_paths.size(); path_id++) { + // set of all files in the directory. We'll exclude files that are still + // alive in the subsequent processings. + std::vector files; + env_->GetChildren(dbname_, &files); // Ignore errors + for (std::string file : files) { + deletion_state.candidate_files.emplace_back(file, path_id); + } + } //Add log files in wal_dir if (options_.wal_dir != dbname_) { std::vector log_files; env_->GetChildren(options_.wal_dir, &log_files); // Ignore errors - deletion_state.candidate_files.insert( - deletion_state.candidate_files.end(), - log_files.begin(), - log_files.end() - ); + for (std::string log_file : log_files) { + deletion_state.candidate_files.emplace_back(log_file, 0); + } } } } +namespace { +bool CompareCandidateFile(const rocksdb::DBImpl::CandidateFileInfo& first, + const rocksdb::DBImpl::CandidateFileInfo& second) { + if (first.file_name > second.file_name) { + return true; + } else if (first.file_name < second.file_name) { + return false; + } else { + return (first.path_id > first.path_id); + } +} +}; // namespace + // Diffs the files listed in filenames and those that do not // belong to live files are posibly removed. Also, removes all the // files in sst_delete_files and log_delete_files. @@ -630,10 +635,12 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { return; } - // Now, convert live list to an unordered set, WITHOUT mutex held; + // Now, convert live list to an unordered map, WITHOUT mutex held; // set is slow. - std::unordered_set sst_live(state.sst_live.begin(), - state.sst_live.end()); + std::unordered_map sst_live_map; + for (FileDescriptor& fd : state.sst_live) { + sst_live_map[fd.GetNumber()] = &fd; + } auto& candidate_files = state.candidate_files; candidate_files.reserve( @@ -643,26 +650,30 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { // We may ignore the dbname when generating the file names. const char* kDumbDbName = ""; for (auto file : state.sst_delete_files) { - candidate_files.push_back( - TableFileName(kDumbDbName, file->fd.GetNumber()).substr(1)); + candidate_files.emplace_back( + MakeTableFileName(kDumbDbName, file->fd.GetNumber()), + file->fd.GetPathId()); delete file; } for (auto file_num : state.log_delete_files) { if (file_num > 0) { - candidate_files.push_back(LogFileName(kDumbDbName, file_num).substr(1)); + candidate_files.emplace_back(LogFileName(kDumbDbName, file_num).substr(1), + 0); } } // dedup state.candidate_files so we don't try to delete the same // file twice - sort(candidate_files.begin(), candidate_files.end()); + sort(candidate_files.begin(), candidate_files.end(), CompareCandidateFile); candidate_files.erase(unique(candidate_files.begin(), candidate_files.end()), candidate_files.end()); std::vector old_info_log_files; - for (const auto& to_delete : candidate_files) { + for (const auto& candidate_file : candidate_files) { + std::string to_delete = candidate_file.file_name; + uint32_t path_id = candidate_file.path_id; uint64_t number; FileType type; // Ignore file if we cannot recognize it. @@ -682,7 +693,7 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { keep = (number >= state.manifest_file_number); break; case kTableFile: - keep = (sst_live.find(number) != sst_live.end()); + keep = (sst_live_map.find(number) != sst_live_map.end()); break; case kTempFile: // Any temp files that are currently being written to must @@ -690,7 +701,7 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { // Also, SetCurrentFile creates a temp file when writing out new // manifest, which is equal to state.pending_manifest_file_number. We // should not delete that file - keep = (sst_live.find(number) != sst_live.end()) || + keep = (sst_live_map.find(number) != sst_live_map.end()) || (number == state.pending_manifest_file_number); break; case kInfoLogFile: @@ -711,13 +722,16 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { continue; } + std::string fname; if (type == kTableFile) { // evict from cache TableCache::Evict(table_cache_.get(), number); + fname = TableFileName(options_.db_paths, number, path_id); + } else { + fname = + ((type == kLogFile) ? options_.wal_dir : dbname_) + "/" + to_delete; } - std::string fname = ((type == kLogFile) ? options_.wal_dir : dbname_) + - "/" + to_delete; if (type == kLogFile && (options_.WAL_ttl_seconds > 0 || options_.WAL_size_limit_MB > 0)) { auto archived_log_name = ArchivedLogFileName(options_.wal_dir, number); @@ -1102,6 +1116,13 @@ Status DBImpl::Recover( return s; } + for (auto db_path : options_.db_paths) { + s = env_->CreateDirIfMissing(db_path); + if (!s.ok()) { + return s; + } + } + s = env_->NewDirectory(dbname_, &db_directory_); if (!s.ok()) { return s; @@ -1367,8 +1388,8 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem, mutex_.AssertHeld(); const uint64_t start_micros = env_->NowMicros(); FileMetaData meta; - meta.fd.number = versions_->NewFileNumber(); - pending_outputs_.insert(meta.fd.GetNumber()); + meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0); + pending_outputs_[meta.fd.GetNumber()] = 0; // path 0 for level 0 file. Iterator* iter = mem->NewIterator(ReadOptions(), true); const SequenceNumber newest_snapshot = snapshots_.GetNewest(); const SequenceNumber earliest_seqno_in_memtable = @@ -1399,9 +1420,9 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem, // should not be added to the manifest. int level = 0; if (s.ok() && meta.fd.GetFileSize() > 0) { - edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetFileSize(), - meta.smallest, meta.largest, meta.smallest_seqno, - meta.largest_seqno); + edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(), + meta.fd.GetFileSize(), meta.smallest, meta.largest, + meta.smallest_seqno, meta.largest_seqno); } InternalStats::CompactionStats stats; @@ -1420,9 +1441,10 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd, mutex_.AssertHeld(); const uint64_t start_micros = env_->NowMicros(); FileMetaData meta; - meta.fd.number = versions_->NewFileNumber(); + + meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0); *filenumber = meta.fd.GetNumber(); - pending_outputs_.insert(meta.fd.GetNumber()); + pending_outputs_[meta.fd.GetNumber()] = 0; // path 0 for level 0 file. const SequenceNumber newest_snapshot = snapshots_.GetNewest(); const SequenceNumber earliest_seqno_in_memtable = @@ -1489,9 +1511,9 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd, cfd->options()->compaction_style == kCompactionStyleLevel) { level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); } - edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetFileSize(), - meta.smallest, meta.largest, meta.smallest_seqno, - meta.largest_seqno); + edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(), + meta.fd.GetFileSize(), meta.smallest, meta.largest, + meta.smallest_seqno, meta.largest_seqno); } InternalStats::CompactionStats stats; @@ -1547,7 +1569,7 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd, // Replace immutable memtable with the generated Table s = cfd->imm()->InstallMemtableFlushResults( cfd, mems, versions_.get(), &mutex_, options_.info_log.get(), - file_number, pending_outputs_, &deletion_state.memtables_to_free, + file_number, &pending_outputs_, &deletion_state.memtables_to_free, db_directory_.get(), log_buffer); } @@ -1691,9 +1713,9 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { edit.SetColumnFamily(cfd->GetID()); for (const auto& f : cfd->current()->files_[level]) { edit.DeleteFile(level, f->fd.GetNumber()); - edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetFileSize(), - f->smallest, f->largest, f->smallest_seqno, - f->largest_seqno); + edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(), + f->fd.GetFileSize(), f->smallest, f->largest, + f->smallest_seqno, f->largest_seqno); } Log(options_.info_log, "[%s] Apply version edit:\n%s", cfd->GetName().c_str(), edit.DebugString().data()); @@ -2190,9 +2212,9 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, assert(c->num_input_files(0) == 1); FileMetaData* f = c->input(0, 0); c->edit()->DeleteFile(c->level(), f->fd.GetNumber()); - c->edit()->AddFile(c->level() + 1, f->fd.GetNumber(), f->fd.GetFileSize(), - f->smallest, f->largest, f->smallest_seqno, - f->largest_seqno); + c->edit()->AddFile(c->level() + 1, f->fd.GetNumber(), f->fd.GetPathId(), + f->fd.GetFileSize(), f->smallest, f->largest, + f->smallest_seqno, f->largest_seqno); status = versions_->LogAndApply(c->column_family_data(), c->edit(), &mutex_, db_directory_.get()); InstallSuperVersion(c->column_family_data(), deletion_state); @@ -2298,7 +2320,7 @@ void DBImpl::AllocateCompactionOutputFileNumbers(CompactionState* compact) { int filesNeeded = compact->compaction->num_input_files(1); for (int i = 0; i < std::max(filesNeeded, 1); i++) { uint64_t file_number = versions_->NewFileNumber(); - pending_outputs_.insert(file_number); + pending_outputs_[file_number] = compact->compaction->GetOutputPathId(); compact->allocated_file_numbers.push_back(file_number); } } @@ -2324,18 +2346,20 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { } else { mutex_.Lock(); file_number = versions_->NewFileNumber(); - pending_outputs_.insert(file_number); + pending_outputs_[file_number] = compact->compaction->GetOutputPathId(); mutex_.Unlock(); } CompactionState::Output out; out.number = file_number; + out.path_id = compact->compaction->GetOutputPathId(); out.smallest.Clear(); out.largest.Clear(); out.smallest_seqno = out.largest_seqno = 0; compact->outputs.push_back(out); // Make the output file - std::string fname = TableFileName(dbname_, file_number); + std::string fname = TableFileName(options_.db_paths, file_number, + compact->compaction->GetOutputPathId()); Status s = env_->NewWritableFile(fname, &compact->outfile, storage_options_); if (s.ok()) { @@ -2343,13 +2367,9 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { compact->compaction->OutputFilePreallocationSize()); ColumnFamilyData* cfd = compact->compaction->column_family_data(); - CompressionType compression_type = - GetCompressionType(*cfd->options(), compact->compaction->output_level(), - compact->compaction->enable_compression()); - - compact->builder.reset( - NewTableBuilder(*cfd->options(), cfd->internal_comparator(), - compact->outfile.get(), compression_type)); + compact->builder.reset(NewTableBuilder( + *cfd->options(), cfd->internal_comparator(), compact->outfile.get(), + compact->compaction->OutputCompressionType())); } LogFlush(options_.info_log); return s; @@ -2362,6 +2382,7 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, assert(compact->builder != nullptr); const uint64_t output_number = compact->current_output()->number; + const uint32_t output_path_id = compact->current_output()->path_id; assert(output_number != 0); // Check for iterator errors @@ -2397,9 +2418,9 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, if (s.ok() && current_entries > 0) { // Verify that the table is usable ColumnFamilyData* cfd = compact->compaction->column_family_data(); - FileDescriptor meta(output_number, current_bytes); + FileDescriptor fd(output_number, output_path_id, current_bytes); Iterator* iter = cfd->table_cache()->NewIterator( - ReadOptions(), storage_options_, cfd->internal_comparator(), meta); + ReadOptions(), storage_options_, cfd->internal_comparator(), fd); s = iter->status(); delete iter; if (s.ok()) { @@ -2442,9 +2463,10 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact, compact->compaction->AddInputDeletions(compact->compaction->edit()); for (size_t i = 0; i < compact->outputs.size(); i++) { const CompactionState::Output& out = compact->outputs[i]; - compact->compaction->edit()->AddFile( - compact->compaction->output_level(), out.number, out.file_size, - out.smallest, out.largest, out.smallest_seqno, out.largest_seqno); + compact->compaction->edit()->AddFile(compact->compaction->output_level(), + out.number, out.path_id, out.file_size, + out.smallest, out.largest, + out.smallest_seqno, out.largest_seqno); } return versions_->LogAndApply(compact->compaction->column_family_data(), compact->compaction->edit(), &mutex_, @@ -4140,7 +4162,7 @@ Status DBImpl::MakeRoomForWrite( // how do we fail if we're not creating new log? assert(creating_new_log); // Avoid chewing through file number space in a tight loop. - versions_->ReuseFileNumber(new_log_number); + versions_->ReuseLogFileNumber(new_log_number); assert(!new_mem); assert(!new_log); break; @@ -4383,14 +4405,15 @@ Status DBImpl::CheckConsistency() { std::string corruption_messages; for (const auto& md : metadata) { - std::string file_path = dbname_ + md.name; + std::string file_path = md.db_path + "/" + md.name; + uint64_t fsize = 0; Status s = env_->GetFileSize(file_path, &fsize); if (!s.ok()) { corruption_messages += "Can't access " + md.name + ": " + s.ToString() + "\n"; } else if (fsize != md.size) { - corruption_messages += "Sst file size mismatch: " + md.name + + corruption_messages += "Sst file size mismatch: " + file_path + ". Size recorded in manifest " + std::to_string(md.size) + ", actual size " + std::to_string(fsize) + "\n"; @@ -4488,6 +4511,11 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { Status DB::Open(const DBOptions& db_options, const std::string& dbname, const std::vector& column_families, std::vector* handles, DB** dbptr) { + if (db_options.db_paths.size() > 1) { + return Status::NotSupported( + "More than one DB paths are not supported yet. "); + } + *dbptr = nullptr; handles->clear(); @@ -4503,6 +4531,15 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, DBImpl* impl = new DBImpl(db_options, dbname); Status s = impl->env_->CreateDirIfMissing(impl->options_.wal_dir); + if (s.ok()) { + for (auto path : impl->options_.db_paths) { + s = impl->env_->CreateDirIfMissing(path); + if (!s.ok()) { + break; + } + } + } + if (!s.ok()) { delete impl; return s; @@ -4665,6 +4702,21 @@ Status DestroyDB(const std::string& dbname, const Options& options) { } } + for (auto db_path : options.db_paths) { + env->GetChildren(db_path, &filenames); + uint64_t number; + FileType type; + for (size_t i = 0; i < filenames.size(); i++) { + if (ParseFileName(filenames[i], &number, &type) && + type == kTableFile) { // Lock file will be deleted at end + Status del = env->DeleteFile(db_path + "/" + filenames[i]); + if (result.ok() && !del.ok()) { + result = del; + } + } + } + } + env->GetChildren(archivedir, &archiveFiles); // Delete archival files. for (size_t i = 0; i < archiveFiles.size(); ++i) { diff --git a/db/db_impl.h b/db/db_impl.h index 797cb0484..fb0bdb4af 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -198,6 +198,17 @@ class DBImpl : public DB { Status TEST_ReadFirstLine(const std::string& fname, SequenceNumber* sequence); #endif // NDEBUG + // Structure to store information for candidate files to delete. + struct CandidateFileInfo { + std::string file_name; + uint32_t path_id; + CandidateFileInfo(std::string name, uint32_t path) + : file_name(name), path_id(path) {} + bool operator==(const CandidateFileInfo& other) const { + return file_name == other.file_name && path_id == other.path_id; + } + }; + // needed for CleanupIteratorState struct DeletionState { inline bool HaveSomethingToDelete() const { @@ -209,10 +220,10 @@ class DBImpl : public DB { // a list of all files that we'll consider deleting // (every once in a while this is filled up with all files // in the DB directory) - std::vector candidate_files; + std::vector candidate_files; // the list of all live sst files that cannot be deleted - std::vector sst_live; + std::vector sst_live; // a list of sst files that we need to delete std::vector sst_delete_files; @@ -501,7 +512,8 @@ class DBImpl : public DB { // Set of table files to protect from deletion because they are // part of ongoing compactions. - std::set pending_outputs_; + // map from pending file number ID to their path IDs. + FileNumToPathIdMap pending_outputs_; // At least one compaction or flush job is pending but not yet scheduled // because of the max background thread limit. @@ -625,15 +637,4 @@ extern Options SanitizeOptions(const std::string& db, const Options& src); extern DBOptions SanitizeOptions(const std::string& db, const DBOptions& src); -// Determine compression type, based on user options, level of the output -// file and whether compression is disabled. -// If enable_compression is false, then compression is always disabled no -// matter what the values of the other two parameters are. -// Otherwise, the compression type is determined based on options and level. -CompressionType GetCompressionType(const Options& options, int level, - const bool enable_compression); - -// Determine compression type for L0 file written by memtable flush. -CompressionType GetCompressionFlush(const Options& options); - } // namespace rocksdb diff --git a/db/db_test.cc b/db/db_test.cc index 6344722ed..8010eaa81 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -98,7 +98,12 @@ class AtomicCounter { count_ = 0; } }; +} // namespace anon +static std::string Key(int i) { + char buf[100]; + snprintf(buf, sizeof(buf), "key%06d", i); + return std::string(buf); } // Special Env used to delay background operations @@ -355,7 +360,10 @@ class DBTest { ~DBTest() { Close(); - ASSERT_OK(DestroyDB(dbname_, Options())); + Options options; + options.db_paths.push_back(dbname_); + options.db_paths.push_back(dbname_ + "_2"); + ASSERT_OK(DestroyDB(dbname_, options)); delete env_; delete filter_policy_; } @@ -436,7 +444,8 @@ class DBTest { switch (option_config_) { case kHashSkipList: options.prefix_extractor.reset(NewFixedPrefixTransform(1)); - options.memtable_factory.reset(NewHashSkipListRepFactory()); + options.memtable_factory.reset( + NewHashSkipListRepFactory(16)); break; case kPlainTableFirstBytePrefix: options.table_factory.reset(new PlainTableFactory()); @@ -466,7 +475,7 @@ class DBTest { options.db_log_dir = test::TmpDir(); break; case kWalDir: - options.wal_dir = "/tmp/wal"; + options.wal_dir = test::TmpDir() + "/wal"; break; case kManifestFileSize: options.max_manifest_file_size = 50; // 50 bytes @@ -487,7 +496,8 @@ class DBTest { break; case kHashLinkList: options.prefix_extractor.reset(NewFixedPrefixTransform(1)); - options.memtable_factory.reset(NewHashLinkListRepFactory(4, 0)); + options.memtable_factory.reset( + NewHashLinkListRepFactory(4, 0, 3, true, 4)); break; case kHashCuckoo: options.memtable_factory.reset( @@ -895,6 +905,30 @@ class DBTest { return property; } + int GetSstFileCount(std::string path) { + std::vector files; + env_->GetChildren(path, &files); + + int sst_count = 0; + uint64_t number; + FileType type; + for (size_t i = 0; i < files.size(); i++) { + if (ParseFileName(files[i], &number, &type) && type == kTableFile) { + sst_count++; + } + } + return sst_count; + } + + void GenerateNewFile(Random* rnd, int* key_idx) { + for (int i = 0; i < 11; i++) { + ASSERT_OK(Put(Key(*key_idx), RandomString(rnd, (i == 10) ? 1 : 10000))); + (*key_idx)++; + } + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->TEST_WaitForCompact(); + } + std::string IterStatus(Iterator* iter) { std::string result; if (iter->Valid()) { @@ -1035,12 +1069,6 @@ class DBTest { }; -static std::string Key(int i) { - char buf[100]; - snprintf(buf, sizeof(buf), "key%06d", i); - return std::string(buf); -} - static long TestGetTickerCount(const Options& options, Tickers ticker_type) { return options.statistics->getTickerCount(ticker_type); } @@ -3432,6 +3460,13 @@ TEST(DBTest, UniversalCompactionCompressRatio2) { ASSERT_LT((int)dbfull()->TEST_GetLevel0TotalSize(), 120000 * 12 * 0.8 + 120000 * 2); } + +TEST(DBTest, FailMoreDbPaths) { + Options options; + options.db_paths.push_back(dbname_); + options.db_paths.push_back(dbname_ + "_2"); + ASSERT_TRUE(TryReopen(&options).IsNotSupported()); +} #endif TEST(DBTest, ConvertCompactionStyle) { @@ -6691,7 +6726,7 @@ TEST(DBTest, PrefixScan) { options.disable_auto_compactions = true; options.max_background_compactions = 2; options.create_if_missing = true; - options.memtable_factory.reset(NewHashSkipListRepFactory()); + options.memtable_factory.reset(NewHashSkipListRepFactory(16)); // 11 RAND I/Os DestroyAndReopen(&options); @@ -6848,7 +6883,7 @@ TEST(DBTest, TailingIteratorPrefixSeek) { options.create_if_missing = true; options.disable_auto_compactions = true; options.prefix_extractor.reset(NewFixedPrefixTransform(2)); - options.memtable_factory.reset(NewHashSkipListRepFactory()); + options.memtable_factory.reset(NewHashSkipListRepFactory(16)); DestroyAndReopen(&options); CreateAndReopenWithCF({"pikachu"}, &options); diff --git a/db/dbformat.h b/db/dbformat.h index c7b3ced94..e1248a59f 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -297,6 +297,13 @@ class IterKey { parsed_key_suffix.sequence, parsed_key_suffix.type); } + void EncodeLengthPrefixedKey(const Slice& key) { + auto size = key.size(); + EnlargeBufferIfNeeded(size + VarintLength(size)); + char* ptr = EncodeVarint32(key_, size); + memcpy(ptr, key.data(), size); + } + private: char* key_; size_t buf_size_; diff --git a/db/filename.cc b/db/filename.cc index d19f0fd53..1c2be8ffb 100644 --- a/db/filename.cc +++ b/db/filename.cc @@ -11,6 +11,7 @@ #include #include +#include #include "db/dbformat.h" #include "rocksdb/env.h" #include "util/logging.h" @@ -66,9 +67,28 @@ std::string ArchivedLogFileName(const std::string& name, uint64_t number) { return MakeFileName(name + "/" + ARCHIVAL_DIR, number, "log"); } -std::string TableFileName(const std::string& name, uint64_t number) { +std::string MakeTableFileName(const std::string& path, uint64_t number) { + return MakeFileName(path, number, "sst"); +} + +std::string TableFileName(const std::vector db_paths, + uint64_t number, uint32_t path_id) { assert(number > 0); - return MakeFileName(name, number, "sst"); + std::string path; + if (path_id >= db_paths.size()) { + path = db_paths.back(); + } else { + path = db_paths[path_id]; + } + return MakeTableFileName(path, number); +} + +std::string FormatFileNumber(uint64_t number, uint32_t path_id) { + if (path_id == 0) { + return std::to_string(number); + } else { + return std::to_string(number) + "(path " + std::to_string(path_id) + ")"; + } } std::string DescriptorFileName(const std::string& dbname, uint64_t number) { diff --git a/db/filename.h b/db/filename.h index c4c306946..5db434e02 100644 --- a/db/filename.h +++ b/db/filename.h @@ -11,7 +11,9 @@ #pragma once #include +#include #include +#include #include "rocksdb/slice.h" #include "rocksdb/status.h" #include "rocksdb/transaction_log.h" @@ -34,6 +36,9 @@ enum FileType { kIdentityFile }; +// map from file number to path ID. +typedef std::unordered_map FileNumToPathIdMap; + // Return the name of the log file with the specified number // in the db named by "dbname". The result will be prefixed with // "dbname". @@ -48,10 +53,15 @@ extern std::string ArchivalDirectory(const std::string& dbname); extern std::string ArchivedLogFileName(const std::string& dbname, uint64_t num); +extern std::string MakeTableFileName(const std::string& name, uint64_t number); + // Return the name of the sstable with the specified number // in the db named by "dbname". The result will be prefixed with // "dbname". -extern std::string TableFileName(const std::string& dbname, uint64_t number); +extern std::string TableFileName(const std::vector db_paths, + uint64_t number, uint32_t path_id); + +extern std::string FormatFileNumber(uint64_t number, uint32_t path_id); // Return the name of the descriptor file for the db named by // "dbname" and the specified incarnation number. The result will be diff --git a/db/filename_test.cc b/db/filename_test.cc index 0baa7fdae..c86d16f34 100644 --- a/db/filename_test.cc +++ b/db/filename_test.cc @@ -108,7 +108,9 @@ TEST(FileNameTest, Construction) { ASSERT_EQ(192U, number); ASSERT_EQ(kLogFile, type); - fname = TableFileName("bar", 200); + fname = TableFileName({"bar"}, 200, 0); + std::string fname1 = TableFileName({"foo", "bar"}, 200, 1); + ASSERT_EQ(fname, fname1); ASSERT_EQ("bar/", std::string(fname.data(), 4)); ASSERT_TRUE(ParseFileName(fname.c_str() + 4, &number, &type)); ASSERT_EQ(200U, number); diff --git a/db/log_and_apply_bench.cc b/db/log_and_apply_bench.cc index ab9716deb..dbb459c30 100644 --- a/db/log_and_apply_bench.cc +++ b/db/log_and_apply_bench.cc @@ -51,7 +51,7 @@ void BM_LogAndApply(int iters, int num_base_files) { for (int i = 0; i < num_base_files; i++) { InternalKey start(MakeKey(2 * fnum), 1, kTypeValue); InternalKey limit(MakeKey(2 * fnum + 1), 1, kTypeDeletion); - vbase.AddFile(2, ++fnum, 1 /* file size */, start, limit, 1, 1); + vbase.AddFile(2, ++fnum, 0, 1 /* file size */, start, limit, 1, 1); } ASSERT_OK(vset->LogAndApply(default_cfd, &vbase, &mu)); } @@ -61,7 +61,7 @@ void BM_LogAndApply(int iters, int num_base_files) { vedit.DeleteFile(2, fnum); InternalKey start(MakeKey(2 * fnum), 1, kTypeValue); InternalKey limit(MakeKey(2 * fnum + 1), 1, kTypeDeletion); - vedit.AddFile(2, ++fnum, 1 /* file size */, start, limit, 1, 1); + vedit.AddFile(2, ++fnum, 0, 1 /* file size */, start, limit, 1, 1); vset->LogAndApply(default_cfd, &vedit, &mu); } } diff --git a/db/memtable.cc b/db/memtable.cc index f6d322d83..6023edde9 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -55,6 +55,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp, const Options& options) assert(!should_flush_); if (prefix_extractor_ && options.memtable_prefix_bloom_bits > 0) { prefix_bloom_.reset(new DynamicBloom( + &arena_, options.memtable_prefix_bloom_bits, options.bloom_locality, options.memtable_prefix_bloom_probes, nullptr, options.memtable_prefix_bloom_huge_page_tlb_size, diff --git a/db/memtable_list.cc b/db/memtable_list.cc index de1a18eee..d3fc1356b 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -140,7 +140,7 @@ void MemTableList::PickMemtablesToFlush(autovector* ret) { void MemTableList::RollbackMemtableFlush(const autovector& mems, uint64_t file_number, - std::set* pending_outputs) { + FileNumToPathIdMap* pending_outputs) { assert(!mems.empty()); // If the flush was not successful, then just reset state. @@ -162,7 +162,7 @@ void MemTableList::RollbackMemtableFlush(const autovector& mems, Status MemTableList::InstallMemtableFlushResults( ColumnFamilyData* cfd, const autovector& mems, VersionSet* vset, port::Mutex* mu, Logger* info_log, uint64_t file_number, - std::set& pending_outputs, autovector* to_delete, + FileNumToPathIdMap* pending_outputs, autovector* to_delete, Directory* db_directory, LogBuffer* log_buffer) { mu->AssertHeld(); @@ -219,7 +219,7 @@ Status MemTableList::InstallMemtableFlushResults( // has been written to a committed version so that other concurrently // executing compaction threads do not mistakenly assume that this // file is not live. - pending_outputs.erase(m->file_number_); + pending_outputs->erase(m->file_number_); if (m->Unref() != nullptr) { to_delete->push_back(m); } @@ -233,7 +233,7 @@ Status MemTableList::InstallMemtableFlushResults( m->flush_in_progress_ = false; m->edit_.Clear(); num_flush_not_started_++; - pending_outputs.erase(m->file_number_); + pending_outputs->erase(m->file_number_); m->file_number_ = 0; imm_flush_needed.Release_Store((void *)1); } diff --git a/db/memtable_list.h b/db/memtable_list.h index e56710fc9..f4923e831 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -15,6 +15,7 @@ #include "rocksdb/iterator.h" #include "db/dbformat.h" +#include "db/filename.h" #include "db/skiplist.h" #include "db/memtable.h" #include "rocksdb/db.h" @@ -108,17 +109,14 @@ class MemTableList { // they can get picked up again on the next round of flush. void RollbackMemtableFlush(const autovector& mems, uint64_t file_number, - std::set* pending_outputs); + FileNumToPathIdMap* pending_outputs); // Commit a successful flush in the manifest file - Status InstallMemtableFlushResults(ColumnFamilyData* cfd, - const autovector& m, - VersionSet* vset, port::Mutex* mu, - Logger* info_log, uint64_t file_number, - std::set& pending_outputs, - autovector* to_delete, - Directory* db_directory, - LogBuffer* log_buffer); + Status InstallMemtableFlushResults( + ColumnFamilyData* cfd, const autovector& m, VersionSet* vset, + port::Mutex* mu, Logger* info_log, uint64_t file_number, + FileNumToPathIdMap* pending_outputs, autovector* to_delete, + Directory* db_directory, LogBuffer* log_buffer); // New memtables are inserted at the front of the list. // Takes ownership of the referenced held on *m by the caller of Add(). diff --git a/db/plain_table_db_test.cc b/db/plain_table_db_test.cc index b169b1724..bad834e49 100644 --- a/db/plain_table_db_test.cc +++ b/db/plain_table_db_test.cc @@ -62,7 +62,7 @@ class PlainTableDBTest { Options CurrentOptions() { Options options; options.table_factory.reset(NewPlainTableFactory(0, 2, 0.8, 3, 0, kPrefix)); - options.memtable_factory.reset(NewHashLinkListRepFactory(4, 0, 3, true)); + options.memtable_factory.reset(NewHashLinkListRepFactory(4, 0, 3, true, 3)); options.prefix_extractor.reset(NewFixedPrefixTransform(8)); options.allow_mmap_reads = true; return options; diff --git a/db/prefix_test.cc b/db/prefix_test.cc index 64a4d0617..a69dda2b4 100644 --- a/db/prefix_test.cc +++ b/db/prefix_test.cc @@ -189,6 +189,10 @@ class PrefixTest { options.memtable_factory.reset( NewHashLinkListRepFactory(bucket_count, 2 * 1024 * 1024)); return true; + case kHashLinkListTriggerSkipList: + options.memtable_factory.reset( + NewHashLinkListRepFactory(bucket_count, 0, 3)); + return true; default: return false; } @@ -208,6 +212,7 @@ class PrefixTest { kHashSkipList, kHashLinkList, kHashLinkListHugePageTlb, + kHashLinkListTriggerSkipList, kEnd }; int option_config_; diff --git a/db/repair.cc b/db/repair.cc index 13959a920..12c275c3e 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -65,8 +65,8 @@ class Repairer { NewLRUCache(10, options_.table_cache_numshardbits, options_.table_cache_remove_scan_count_limit)), next_file_number_(1) { - table_cache_ = new TableCache(dbname_, &options_, storage_options_, - raw_table_cache_.get()); + table_cache_ = + new TableCache(&options_, storage_options_, raw_table_cache_.get()); edit_ = new VersionEdit(); } @@ -116,7 +116,7 @@ class Repairer { VersionEdit* edit_; std::vector manifests_; - std::vector table_numbers_; + std::vector table_fds_; std::vector logs_; std::vector tables_; uint64_t next_file_number_; @@ -124,35 +124,43 @@ class Repairer { Status FindFiles() { std::vector filenames; - Status status = env_->GetChildren(dbname_, &filenames); - if (!status.ok()) { - return status; - } - if (filenames.empty()) { - return Status::Corruption(dbname_, "repair found no files"); - } + bool found_file = false; + for (uint32_t path_id = 0; path_id < options_.db_paths.size(); path_id++) { + Status status = env_->GetChildren(options_.db_paths[path_id], &filenames); + if (!status.ok()) { + return status; + } + if (!filenames.empty()) { + found_file = true; + } - uint64_t number; - FileType type; - for (size_t i = 0; i < filenames.size(); i++) { - if (ParseFileName(filenames[i], &number, &type)) { - if (type == kDescriptorFile) { - manifests_.push_back(filenames[i]); - } else { - if (number + 1 > next_file_number_) { - next_file_number_ = number + 1; - } - if (type == kLogFile) { - logs_.push_back(number); - } else if (type == kTableFile) { - table_numbers_.push_back(number); + uint64_t number; + FileType type; + for (size_t i = 0; i < filenames.size(); i++) { + if (ParseFileName(filenames[i], &number, &type)) { + if (type == kDescriptorFile) { + assert(path_id == 0); + manifests_.push_back(filenames[i]); } else { - // Ignore other files + if (number + 1 > next_file_number_) { + next_file_number_ = number + 1; + } + if (type == kLogFile) { + assert(path_id == 0); + logs_.push_back(number); + } else if (type == kTableFile) { + table_fds_.emplace_back(number, path_id, 0); + } else { + // Ignore other files + } } } } } - return status; + if (!found_file) { + return Status::Corruption(dbname_, "repair found no files"); + } + return Status::OK(); } void ConvertLogFilesToTables() { @@ -228,7 +236,7 @@ class Repairer { // Do not record a version edit for this conversion to a Table // since ExtractMetaData() will also generate edits. FileMetaData meta; - meta.fd.number = next_file_number_++; + meta.fd = FileDescriptor(next_file_number_++, 0, 0); ReadOptions ro; Iterator* iter = mem->NewIterator(ro, true /* enforce_total_order */); status = BuildTable(dbname_, env_, options_, storage_options_, table_cache_, @@ -239,7 +247,7 @@ class Repairer { mem = nullptr; if (status.ok()) { if (meta.fd.GetFileSize() > 0) { - table_numbers_.push_back(meta.fd.GetNumber()); + table_fds_.push_back(meta.fd); } } Log(options_.info_log, @@ -249,14 +257,17 @@ class Repairer { } void ExtractMetaData() { - for (size_t i = 0; i < table_numbers_.size(); i++) { + for (size_t i = 0; i < table_fds_.size(); i++) { TableInfo t; - t.meta.fd.number = table_numbers_[i]; + t.meta.fd = table_fds_[i]; Status status = ScanTable(&t); if (!status.ok()) { - std::string fname = TableFileName(dbname_, table_numbers_[i]); - Log(options_.info_log, "Table #%" PRIu64 ": ignoring %s", - table_numbers_[i], status.ToString().c_str()); + std::string fname = TableFileName( + options_.db_paths, t.meta.fd.GetNumber(), t.meta.fd.GetPathId()); + Log(options_.info_log, "Table #%s: ignoring %s", + FormatFileNumber(t.meta.fd.GetNumber(), t.meta.fd.GetPathId()) + .c_str(), + status.ToString().c_str()); ArchiveFile(fname); } else { tables_.push_back(t); @@ -265,9 +276,13 @@ class Repairer { } Status ScanTable(TableInfo* t) { - std::string fname = TableFileName(dbname_, t->meta.fd.GetNumber()); + std::string fname = TableFileName(options_.db_paths, t->meta.fd.GetNumber(), + t->meta.fd.GetPathId()); int counter = 0; - Status status = env_->GetFileSize(fname, &t->meta.fd.file_size); + uint64_t file_size; + Status status = env_->GetFileSize(fname, &file_size); + t->meta.fd = FileDescriptor(t->meta.fd.GetNumber(), t->meta.fd.GetPathId(), + file_size); if (status.ok()) { Iterator* iter = table_cache_->NewIterator( ReadOptions(), storage_options_, icmp_, t->meta.fd); @@ -330,9 +345,9 @@ class Repairer { for (size_t i = 0; i < tables_.size(); i++) { // TODO(opt): separate out into multiple levels const TableInfo& t = tables_[i]; - edit_->AddFile(0, t.meta.fd.GetNumber(), t.meta.fd.GetFileSize(), - t.meta.smallest, t.meta.largest, t.min_sequence, - t.max_sequence); + edit_->AddFile(0, t.meta.fd.GetNumber(), t.meta.fd.GetPathId(), + t.meta.fd.GetFileSize(), t.meta.smallest, t.meta.largest, + t.min_sequence, t.max_sequence); } //fprintf(stderr, "NewDescriptor:\n%s\n", edit_.DebugString().c_str()); diff --git a/db/table_cache.cc b/db/table_cache.cc index 7a7513026..bd359f96d 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -36,10 +36,10 @@ static Slice GetSliceForFileNumber(const uint64_t* file_number) { sizeof(*file_number)); } -TableCache::TableCache(const std::string& dbname, const Options* options, +TableCache::TableCache(const Options* options, const EnvOptions& storage_options, Cache* const cache) : env_(options->env), - dbname_(dbname), + db_paths_(options->db_paths), options_(options), storage_options_(storage_options), cache_(cache) {} @@ -60,13 +60,15 @@ Status TableCache::FindTable(const EnvOptions& toptions, const FileDescriptor& fd, Cache::Handle** handle, const bool no_io) { Status s; - Slice key = GetSliceForFileNumber(&fd.number); + uint64_t number = fd.GetNumber(); + Slice key = GetSliceForFileNumber(&number); *handle = cache_->Lookup(key); if (*handle == nullptr) { if (no_io) { // Dont do IO and return a not-found status return Status::Incomplete("Table not found in table_cache, no_io is set"); } - std::string fname = TableFileName(dbname_, fd.GetNumber()); + std::string fname = + TableFileName(db_paths_, fd.GetNumber(), fd.GetPathId()); unique_ptr file; unique_ptr table_reader; s = env_->NewRandomAccessFile(fname, &file, toptions); diff --git a/db/table_cache.h b/db/table_cache.h index e912addc1..eaadc07da 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -11,6 +11,7 @@ #pragma once #include +#include #include #include "db/dbformat.h" @@ -28,8 +29,8 @@ struct FileDescriptor; class TableCache { public: - TableCache(const std::string& dbname, const Options* options, - const EnvOptions& storage_options, Cache* cache); + TableCache(const Options* options, const EnvOptions& storage_options, + Cache* cache); ~TableCache(); // Return an iterator for the specified file number (the corresponding @@ -84,7 +85,7 @@ class TableCache { private: Env* const env_; - const std::string dbname_; + const std::vector db_paths_; const Options* options_; const EnvOptions& storage_options_; Cache* const cache_; diff --git a/db/version_edit.cc b/db/version_edit.cc index c2b4928e0..4e2cf8f5b 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -18,25 +18,30 @@ namespace rocksdb { // Tag numbers for serialized VersionEdit. These numbers are written to // disk and should not be changed. enum Tag { - kComparator = 1, - kLogNumber = 2, - kNextFileNumber = 3, - kLastSequence = 4, - kCompactPointer = 5, - kDeletedFile = 6, - kNewFile = 7, + kComparator = 1, + kLogNumber = 2, + kNextFileNumber = 3, + kLastSequence = 4, + kCompactPointer = 5, + kDeletedFile = 6, + kNewFile = 7, // 8 was used for large value refs - kPrevLogNumber = 9, + kPrevLogNumber = 9, // these are new formats divergent from open source leveldb - kNewFile2 = 100, // store smallest & largest seqno - - kColumnFamily = 200, // specify column family for version edit - kColumnFamilyAdd = 201, - kColumnFamilyDrop = 202, - kMaxColumnFamily = 203, + kNewFile2 = 100, + kNewFile3 = 102, + kColumnFamily = 200, // specify column family for version edit + kColumnFamilyAdd = 201, + kColumnFamilyDrop = 202, + kMaxColumnFamily = 203, }; +uint64_t PackFileNumberAndPathId(uint64_t number, uint64_t path_id) { + assert(number <= kFileNumberMask); + return number | (path_id * (kFileNumberMask + 1)); +} + void VersionEdit::Clear() { comparator_.clear(); max_level_ = 0; @@ -93,9 +98,18 @@ void VersionEdit::EncodeTo(std::string* dst) const { for (size_t i = 0; i < new_files_.size(); i++) { const FileMetaData& f = new_files_[i].second; - PutVarint32(dst, kNewFile2); + if (f.fd.GetPathId() == 0) { + // Use older format to make sure user can roll back the build if they + // don't config multiple DB paths. + PutVarint32(dst, kNewFile2); + } else { + PutVarint32(dst, kNewFile3); + } PutVarint32(dst, new_files_[i].first); // level PutVarint64(dst, f.fd.GetNumber()); + if (f.fd.GetPathId() != 0) { + PutVarint32(dst, f.fd.GetPathId()); + } PutVarint64(dst, f.fd.GetFileSize()); PutLengthPrefixedSlice(dst, f.smallest.Encode()); PutLengthPrefixedSlice(dst, f.largest.Encode()); @@ -237,7 +251,7 @@ Status VersionEdit::DecodeFrom(const Slice& src) { GetVarint64(&input, &file_size) && GetInternalKey(&input, &f.smallest) && GetInternalKey(&input, &f.largest)) { - f.fd = FileDescriptor(number, file_size); + f.fd = FileDescriptor(number, 0, file_size); new_files_.push_back(std::make_pair(level, f)); } else { if (!msg) { @@ -255,7 +269,27 @@ Status VersionEdit::DecodeFrom(const Slice& src) { GetInternalKey(&input, &f.largest) && GetVarint64(&input, &f.smallest_seqno) && GetVarint64(&input, &f.largest_seqno)) { - f.fd = FileDescriptor(number, file_size); + f.fd = FileDescriptor(number, 0, file_size); + new_files_.push_back(std::make_pair(level, f)); + } else { + if (!msg) { + msg = "new-file2 entry"; + } + } + break; + } + + case kNewFile3: { + uint64_t number; + uint32_t path_id; + uint64_t file_size; + if (GetLevel(&input, &level, &msg) && GetVarint64(&input, &number) && + GetVarint32(&input, &path_id) && GetVarint64(&input, &file_size) && + GetInternalKey(&input, &f.smallest) && + GetInternalKey(&input, &f.largest) && + GetVarint64(&input, &f.smallest_seqno) && + GetVarint64(&input, &f.largest_seqno)) { + f.fd = FileDescriptor(number, path_id, file_size); new_files_.push_back(std::make_pair(level, f)); } else { if (!msg) { diff --git a/db/version_edit.h b/db/version_edit.h index d6e62fc8c..50a24ea2d 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -19,21 +19,41 @@ namespace rocksdb { class VersionSet; +const uint64_t kFileNumberMask = 0x3FFFFFFFFFFFFFFF; + +extern uint64_t PackFileNumberAndPathId(uint64_t number, uint64_t path_id); + // A copyable structure contains information needed to read data from an SST // file. It can contains a pointer to a table reader opened for the file, or // file number and size, which can be used to create a new table reader for it. // The behavior is undefined when a copied of the structure is used when the // file is not in any live version any more. struct FileDescriptor { - uint64_t number; - uint64_t file_size; // File size in bytes // Table reader in table_reader_handle TableReader* table_reader; + uint64_t packed_number_and_path_id; + uint64_t file_size; // File size in bytes + + FileDescriptor() : FileDescriptor(0, 0, 0) {} - FileDescriptor(uint64_t number, uint64_t file_size) - : number(number), file_size(file_size), table_reader(nullptr) {} + FileDescriptor(uint64_t number, uint32_t path_id, uint64_t file_size) + : table_reader(nullptr), + packed_number_and_path_id(PackFileNumberAndPathId(number, path_id)), + file_size(file_size) {} - uint64_t GetNumber() const { return number; } + FileDescriptor& operator=(const FileDescriptor& fd) { + table_reader = fd.table_reader; + packed_number_and_path_id = fd.packed_number_and_path_id; + file_size = fd.file_size; + return *this; + } + + uint64_t GetNumber() const { + return packed_number_and_path_id & kFileNumberMask; + } + uint32_t GetPathId() const { + return packed_number_and_path_id / (kFileNumberMask + 1); + } uint64_t GetFileSize() const { return file_size; } }; @@ -58,7 +78,6 @@ struct FileMetaData { FileMetaData() : refs(0), - fd(0, 0), being_compacted(false), table_reader_handle(nullptr), compensated_file_size(0), @@ -103,15 +122,13 @@ class VersionEdit { // Add the specified file at the specified number. // REQUIRES: This version has not been saved (see VersionSet::SaveTo) // REQUIRES: "smallest" and "largest" are smallest and largest keys in file - void AddFile(int level, uint64_t file, - uint64_t file_size, - const InternalKey& smallest, - const InternalKey& largest, - const SequenceNumber& smallest_seqno, + void AddFile(int level, uint64_t file, uint64_t file_size, + uint64_t file_path_id, const InternalKey& smallest, + const InternalKey& largest, const SequenceNumber& smallest_seqno, const SequenceNumber& largest_seqno) { assert(smallest_seqno <= largest_seqno); FileMetaData f; - f.fd = FileDescriptor(file, file_size); + f.fd = FileDescriptor(file, file_size, file_path_id); f.smallest = smallest; f.largest = largest; f.smallest_seqno = smallest_seqno; diff --git a/db/version_edit_test.cc b/db/version_edit_test.cc index 7842b3263..850f242c1 100644 --- a/db/version_edit_test.cc +++ b/db/version_edit_test.cc @@ -30,11 +30,10 @@ TEST(VersionEditTest, EncodeDecode) { VersionEdit edit; for (int i = 0; i < 4; i++) { TestEncodeDecode(edit); - edit.AddFile(3, kBig + 300 + i, kBig + 400 + i, + edit.AddFile(3, kBig + 300 + i, kBig + 400 + i, 0, InternalKey("foo", kBig + 500 + i, kTypeValue), InternalKey("zoo", kBig + 600 + i, kTypeDeletion), - kBig + 500 + i, - kBig + 600 + i); + kBig + 500 + i, kBig + 600 + i); edit.DeleteFile(4, kBig + 700 + i); } diff --git a/db/version_set.cc b/db/version_set.cc index 29611f0a0..b68923bc0 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -16,6 +16,7 @@ #include #include #include +#include #include #include "db/filename.h" @@ -171,7 +172,7 @@ class Version::LevelFileNumIterator : public Iterator { : icmp_(icmp), flist_(flist), index_(flist->size()), - current_value_(0, 0) { // Marks as invalid + current_value_(0, 0, 0) { // Marks as invalid } virtual bool Valid() const { return index_ < flist_->size(); @@ -276,7 +277,8 @@ Status Version::GetTableProperties(std::shared_ptr* tp, *fname, &file, vset_->storage_options_); } else { s = options->env->NewRandomAccessFile( - TableFileName(vset_->dbname_, file_meta->fd.GetNumber()), + TableFileName(vset_->options_->db_paths, file_meta->fd.GetNumber(), + file_meta->fd.GetPathId()), &file, vset_->storage_options_); } if (!s.ok()) { @@ -303,7 +305,9 @@ Status Version::GetTableProperties(std::shared_ptr* tp, Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) { for (int level = 0; level < num_levels_; level++) { for (const auto& file_meta : files_[level]) { - auto fname = TableFileName(vset_->dbname_, file_meta->fd.GetNumber()); + auto fname = + TableFileName(vset_->options_->db_paths, file_meta->fd.GetNumber(), + file_meta->fd.GetPathId()); // 1. If the table is already present in table cache, load table // properties from there. std::shared_ptr table_properties; @@ -861,7 +865,6 @@ void Version::ComputeCompactionScore( } namespace { - // Compator that is used to sort files based on their size // In normal mode: descending size bool CompareCompensatedSizeDescending(const Version::Fsize& first, @@ -869,18 +872,6 @@ bool CompareCompensatedSizeDescending(const Version::Fsize& first, return (first.file->compensated_file_size > second.file->compensated_file_size); } -// A static compator used to sort files based on their seqno -// In universal style : descending seqno -bool CompareSeqnoDescending(const Version::Fsize& first, - const Version::Fsize& second) { - if (first.file->smallest_seqno > second.file->smallest_seqno) { - assert(first.file->largest_seqno > second.file->largest_seqno); - return true; - } - assert(first.file->largest_seqno <= second.file->largest_seqno); - return false; -} - } // anonymous namespace void Version::UpdateNumNonEmptyLevels() { @@ -895,19 +886,15 @@ void Version::UpdateNumNonEmptyLevels() { } void Version::UpdateFilesBySize() { - if (cfd_->options()->compaction_style == kCompactionStyleFIFO) { + if (cfd_->options()->compaction_style == kCompactionStyleFIFO || + cfd_->options()->compaction_style == kCompactionStyleUniversal) { // don't need this return; } // No need to sort the highest level because it is never compacted. - int max_level = - (cfd_->options()->compaction_style == kCompactionStyleUniversal) - ? NumberLevels() - : NumberLevels() - 1; - - for (int level = 0; level < max_level; level++) { + for (int level = 0; level < NumberLevels() - 1; level++) { const std::vector& files = files_[level]; - std::vector& files_by_size = files_by_size_[level]; + auto& files_by_size = files_by_size_[level]; assert(files_by_size.size() == 0); // populate a temp vector for sorting based on size @@ -918,18 +905,12 @@ void Version::UpdateFilesBySize() { } // sort the top number_of_files_to_sort_ based on file size - if (cfd_->options()->compaction_style == kCompactionStyleUniversal) { - int num = temp.size(); - std::partial_sort(temp.begin(), temp.begin() + num, temp.end(), - CompareSeqnoDescending); - } else { - int num = Version::number_of_files_to_sort_; - if (num > (int)temp.size()) { - num = temp.size(); - } - std::partial_sort(temp.begin(), temp.begin() + num, temp.end(), - CompareCompensatedSizeDescending); + size_t num = Version::number_of_files_to_sort_; + if (num > temp.size()) { + num = temp.size(); } + std::partial_sort(temp.begin(), temp.begin() + num, temp.end(), + CompareCompensatedSizeDescending); assert(temp.size() == files.size()); // initialize files_by_size_ @@ -1291,11 +1272,11 @@ int64_t Version::MaxNextLevelOverlappingBytes() { return result; } -void Version::AddLiveFiles(std::set* live) { +void Version::AddLiveFiles(std::vector* live) { for (int level = 0; level < NumberLevels(); level++) { const std::vector& files = files_[level]; for (const auto& file : files) { - live->insert(file->fd.GetNumber()); + live->push_back(file->fd); } } } @@ -1448,7 +1429,7 @@ class VersionSet::Builder { #endif } - void CheckConsistencyForDeletes(VersionEdit* edit, unsigned int number, + void CheckConsistencyForDeletes(VersionEdit* edit, uint64_t number, int level) { #ifndef NDEBUG // a file to be deleted better exist in the previous version @@ -1490,6 +1471,9 @@ class VersionSet::Builder { } } } + if (!found) { + fprintf(stderr, "not found %ld\n", number); + } assert(found); #endif } @@ -2183,17 +2167,15 @@ Status VersionSet::Recover( last_sequence_ = last_sequence; prev_log_number_ = prev_log_number; - Log(options_->info_log, "Recovered from manifest file:%s succeeded," + Log(options_->info_log, + "Recovered from manifest file:%s succeeded," "manifest_file_number is %lu, next_file_number is %lu, " "last_sequence is %lu, log_number is %lu," "prev_log_number is %lu," "max_column_family is %u\n", - manifest_filename.c_str(), - (unsigned long)manifest_file_number_, - (unsigned long)next_file_number_, - (unsigned long)last_sequence_, - (unsigned long)log_number, - (unsigned long)prev_log_number_, + manifest_filename.c_str(), (unsigned long)manifest_file_number_, + (unsigned long)next_file_number_, (unsigned long)last_sequence_, + (unsigned long)log_number, (unsigned long)prev_log_number_, column_family_set_->GetMaxColumnFamily()); for (auto cfd : *column_family_set_) { @@ -2580,9 +2562,9 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { for (int level = 0; level < cfd->NumberLevels(); level++) { for (const auto& f : cfd->current()->files_[level]) { - edit.AddFile(level, f->fd.GetNumber(), f->fd.GetFileSize(), - f->smallest, f->largest, f->smallest_seqno, - f->largest_seqno); + edit.AddFile(level, f->fd.GetNumber(), f->fd.GetPathId(), + f->fd.GetFileSize(), f->smallest, f->largest, + f->smallest_seqno, f->largest_seqno); } } edit.SetLogNumber(cfd->GetLogNumber()); @@ -2664,7 +2646,7 @@ uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) { return result; } -void VersionSet::AddLiveFiles(std::vector* live_list) { +void VersionSet::AddLiveFiles(std::vector* live_list) { // pre-calculate space requirement int64_t total_files = 0; for (auto cfd : *column_family_set_) { @@ -2686,7 +2668,7 @@ void VersionSet::AddLiveFiles(std::vector* live_list) { v = v->next_) { for (int level = 0; level < v->NumberLevels(); level++) { for (const auto& f : v->files_[level]) { - live_list->push_back(f->fd.GetNumber()); + live_list->push_back(f->fd); } } } @@ -2809,7 +2791,14 @@ void VersionSet::GetLiveFilesMetaData(std::vector* metadata) { for (const auto& file : cfd->current()->files_[level]) { LiveFileMetaData filemetadata; filemetadata.column_family_name = cfd->GetName(); - filemetadata.name = TableFileName("", file->fd.GetNumber()); + uint32_t path_id = file->fd.GetPathId(); + if (path_id < options_->db_paths.size()) { + filemetadata.db_path = options_->db_paths[path_id]; + } else { + assert(!options_->db_paths.empty()); + filemetadata.db_path = options_->db_paths.back(); + } + filemetadata.name = MakeTableFileName("", file->fd.GetNumber()); filemetadata.level = level; filemetadata.size = file->fd.GetFileSize(); filemetadata.smallestkey = file->smallest.user_key().ToString(); diff --git a/db/version_set.h b/db/version_set.h index 542db7466..60e9383f8 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -188,7 +188,7 @@ class Version { int64_t MaxNextLevelOverlappingBytes(); // Add all files listed in the current version to *live. - void AddLiveFiles(std::set* live); + void AddLiveFiles(std::vector* live); // Return a human readable string that describes this version's contents. std::string DebugString(bool hex = false) const; @@ -294,7 +294,7 @@ class Version { // that on a running system, we need to look at only the first // few largest files because a new version is created every few // seconds/minutes (because of concurrent compactions). - static const int number_of_files_to_sort_ = 50; + static const size_t number_of_files_to_sort_ = 50; // Level that should be compacted next and its compaction score. // Score < 1 means compaction is not strictly needed. These fields @@ -399,7 +399,7 @@ class VersionSet { // Arrange to reuse "file_number" unless a newer file number has // already been allocated. // REQUIRES: "file_number" was returned by a call to NewFileNumber(). - void ReuseFileNumber(uint64_t file_number) { + void ReuseLogFileNumber(uint64_t file_number) { if (next_file_number_ == file_number + 1) { next_file_number_ = file_number; } @@ -440,7 +440,7 @@ class VersionSet { Iterator* MakeInputIterator(Compaction* c); // Add all files listed in any live version to *live. - void AddLiveFiles(std::vector* live_list); + void AddLiveFiles(std::vector* live_list); // Return the approximate offset in the database of the data for // "key" as of version "v". diff --git a/db/version_set_test.cc b/db/version_set_test.cc index ef48bf927..0c548e342 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -31,7 +31,7 @@ class FindFileTest { SequenceNumber smallest_seq = 100, SequenceNumber largest_seq = 100) { FileMetaData* f = new FileMetaData; - f->fd = FileDescriptor(files_.size() + 1, 0); + f->fd = FileDescriptor(files_.size() + 1, 0, 0); f->smallest = InternalKey(smallest, smallest_seq, kTypeValue); f->largest = InternalKey(largest, largest_seq, kTypeValue); files_.push_back(f); diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 33b443f40..b1cbbc25b 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -55,6 +55,7 @@ class Env; // Metadata associated with each SST file. struct LiveFileMetaData { std::string column_family_name; // Name of the column family + std::string db_path; std::string name; // Name of the file int level; // Level at which this file resides. size_t size; // File size in bytes. diff --git a/include/rocksdb/memtablerep.h b/include/rocksdb/memtablerep.h index 4dc8d7680..b7fc39c81 100644 --- a/include/rocksdb/memtablerep.h +++ b/include/rocksdb/memtablerep.h @@ -227,9 +227,10 @@ extern MemTableRepFactory* NewHashSkipListRepFactory( int32_t skiplist_branching_factor = 4 ); -// The factory is to create memtables with a hashed linked list: -// it contains a fixed array of buckets, each pointing to a sorted single -// linked list (null if the bucket is empty). +// The factory is to create memtables based on a hash table: +// it contains a fixed array of buckets, each pointing to either a linked list +// or a skip list if number of entries inside the bucket exceeds +// threshold_use_skiplist. // @bucket_count: number of fixed array buckets // @huge_page_tlb_size: if <=0, allocate the hash table bytes from malloc. // Otherwise from huge page TLB. The user needs to reserve @@ -240,10 +241,13 @@ extern MemTableRepFactory* NewHashSkipListRepFactory( // exceeds this number, log about it. // @if_log_bucket_dist_when_flash: if true, log distribution of number of // entries when flushing. +// @threshold_use_skiplist: a bucket switches to skip list if number of +// entries exceed this parameter. extern MemTableRepFactory* NewHashLinkListRepFactory( size_t bucket_count = 50000, size_t huge_page_tlb_size = 0, int bucket_entries_logging_threshold = 4096, - bool if_log_bucket_dist_when_flash = true); + bool if_log_bucket_dist_when_flash = true, + uint32_t threshold_use_skiplist = 256); // This factory creates a cuckoo-hashing based mem-table representation. // Cuckoo-hash is a closed-hash strategy, in which all key/value pairs diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 658be3afc..df7383d25 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -675,6 +675,13 @@ struct DBOptions { // Default value is 1800 (half an hour). int db_stats_log_interval; + // A list paths where SST files can be put into. A compaction style can + // determine which of those paths it will put the file to. + // If left empty, only one path will be used, which is db_name passed when + // opening the DB. + // Default: empty + std::vector db_paths; + // This specifies the info LOG dir. // If it is empty, the log files will be in the same dir as data. // If it is non empty, the log files will be in the specified dir, diff --git a/include/rocksdb/universal_compaction.h b/include/rocksdb/universal_compaction.h index eaf47e5c7..229e50b25 100644 --- a/include/rocksdb/universal_compaction.h +++ b/include/rocksdb/universal_compaction.h @@ -8,6 +8,7 @@ #include #include +#include namespace rocksdb { @@ -61,6 +62,7 @@ class CompactionOptionsUniversal { // well as the total size of C1...Ct as total_C, the compaction output file // will be compressed iff // total_C / total_size < this percentage + // Default: -1 int compression_size_percent; // The algorithm used to stop picking files into a single compaction run @@ -68,14 +70,13 @@ class CompactionOptionsUniversal { CompactionStopStyle stop_style; // Default set of parameters - CompactionOptionsUniversal() : - size_ratio(1), - min_merge_width(2), - max_merge_width(UINT_MAX), - max_size_amplification_percent(200), - compression_size_percent(-1), - stop_style(kCompactionStopStyleTotalSize) { - } + CompactionOptionsUniversal() + : size_ratio(1), + min_merge_width(2), + max_merge_width(UINT_MAX), + max_size_amplification_percent(200), + compression_size_percent(-1), + stop_style(kCompactionStopStyleTotalSize) {} }; } // namespace rocksdb diff --git a/java/org/rocksdb/RocksDB.java b/java/org/rocksdb/RocksDB.java index cec73ed49..a55266e25 100644 --- a/java/org/rocksdb/RocksDB.java +++ b/java/org/rocksdb/RocksDB.java @@ -21,7 +21,7 @@ import org.rocksdb.util.Environment; public class RocksDB extends RocksObject { public static final int NOT_FOUND = -1; private static final String[] compressionLibs_ = { - "snappy", "zlib", "bzip2", "lz4", "lz4hc"}; + "snappy", "z", "bzip2", "lz4", "lz4hc"}; /** * Loads the necessary library files. diff --git a/java/org/rocksdb/benchmark/DbBenchmark.java b/java/org/rocksdb/benchmark/DbBenchmark.java index 5fcbed7e2..d0b2eab21 100644 --- a/java/org/rocksdb/benchmark/DbBenchmark.java +++ b/java/org/rocksdb/benchmark/DbBenchmark.java @@ -463,7 +463,7 @@ public class DbBenchmark { if (compressionType_.equals("snappy")) { System.loadLibrary("snappy"); } else if (compressionType_.equals("zlib")) { - System.loadLibrary("zlib"); + System.loadLibrary("z"); } else if (compressionType_.equals("bzip2")) { System.loadLibrary("bzip2"); } else if (compressionType_.equals("lz4")) { diff --git a/port/port_posix.cc b/port/port_posix.cc index 2ad10f58f..90dde3227 100644 --- a/port/port_posix.cc +++ b/port/port_posix.cc @@ -9,10 +9,12 @@ #include "port/port_posix.h" -#include #include #include +#include +#include #include +#include #include "util/logging.h" namespace rocksdb { @@ -83,6 +85,27 @@ void CondVar::Wait() { #endif } +bool CondVar::TimedWait(uint64_t abs_time_us) { + struct timespec ts; + ts.tv_sec = abs_time_us / 1000000; + ts.tv_nsec = (abs_time_us % 1000000) * 1000; + +#ifndef NDEBUG + mu_->locked_ = false; +#endif + int err = pthread_cond_timedwait(&cv_, &mu_->mu_, &ts); +#ifndef NDEBUG + mu_->locked_ = true; +#endif + if (err == ETIMEDOUT) { + return true; + } + if (err != 0) { + PthreadCall("timedwait", err); + } + return false; +} + void CondVar::Signal() { PthreadCall("signal", pthread_cond_signal(&cv_)); } diff --git a/port/port_posix.h b/port/port_posix.h index c2070c7cb..2e3c868b3 100644 --- a/port/port_posix.h +++ b/port/port_posix.h @@ -137,6 +137,8 @@ class CondVar { explicit CondVar(Mutex* mu); ~CondVar(); void Wait(); + // Timed condition wait. Returns true if timeout occurred. + bool TimedWait(uint64_t abs_time_us); void Signal(); void SignalAll(); private: diff --git a/table/plain_table_reader.cc b/table/plain_table_reader.cc index 469a61cf4..20cb87538 100644 --- a/table/plain_table_reader.cc +++ b/table/plain_table_reader.cc @@ -333,7 +333,7 @@ void PlainTableReader::AllocateIndexAndBloom(int num_prefixes, uint32_t bloom_total_bits = num_prefixes * bloom_bits_per_key; if (bloom_total_bits > 0) { enable_bloom_ = true; - bloom_.SetTotalBits(bloom_total_bits, options_.bloom_locality, + bloom_.SetTotalBits(&arena_, bloom_total_bits, options_.bloom_locality, huge_page_tlb_size, options_.info_log.get()); } } @@ -465,7 +465,7 @@ Status PlainTableReader::PopulateIndex(TableProperties* props, table_properties_->num_entries * bloom_bits_per_key; if (num_bloom_bits > 0) { enable_bloom_ = true; - bloom_.SetTotalBits(num_bloom_bits, options_.bloom_locality, + bloom_.SetTotalBits(&arena_, num_bloom_bits, options_.bloom_locality, huge_page_tlb_size, options_.info_log.get()); } } diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 929efee3f..337199a24 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -1591,7 +1591,7 @@ class StressTest { } switch (FLAGS_rep_factory) { case kHashSkipList: - options_.memtable_factory.reset(NewHashSkipListRepFactory()); + options_.memtable_factory.reset(NewHashSkipListRepFactory(10000)); break; case kSkipList: // no need to do anything diff --git a/util/dynamic_bloom.cc b/util/dynamic_bloom.cc index b90f199ae..cbe895ace 100644 --- a/util/dynamic_bloom.cc +++ b/util/dynamic_bloom.cc @@ -32,12 +32,13 @@ uint32_t GetTotalBitsForLocality(uint32_t total_bits) { } } -DynamicBloom::DynamicBloom(uint32_t total_bits, uint32_t locality, +DynamicBloom::DynamicBloom(Arena* arena, uint32_t total_bits, uint32_t locality, uint32_t num_probes, uint32_t (*hash_func)(const Slice& key), - size_t huge_page_tlb_size, Logger* logger) + size_t huge_page_tlb_size, + Logger* logger) : DynamicBloom(num_probes, hash_func) { - SetTotalBits(total_bits, locality, huge_page_tlb_size, logger); + SetTotalBits(arena, total_bits, locality, huge_page_tlb_size, logger); } DynamicBloom::DynamicBloom(uint32_t num_probes, @@ -47,8 +48,10 @@ DynamicBloom::DynamicBloom(uint32_t num_probes, kNumProbes(num_probes), hash_func_(hash_func == nullptr ? &BloomHash : hash_func) {} -void DynamicBloom::SetTotalBits(uint32_t total_bits, uint32_t locality, - size_t huge_page_tlb_size, Logger* logger) { +void DynamicBloom::SetTotalBits(Arena* arena, + uint32_t total_bits, uint32_t locality, + size_t huge_page_tlb_size, + Logger* logger) { kTotalBits = (locality > 0) ? GetTotalBitsForLocality(total_bits) : (total_bits + 7) / 8 * 8; kNumBlocks = (locality > 0) ? (kTotalBits / (CACHE_LINE_SIZE * 8)) : 0; @@ -60,8 +63,9 @@ void DynamicBloom::SetTotalBits(uint32_t total_bits, uint32_t locality, if (kNumBlocks > 0) { sz += CACHE_LINE_SIZE - 1; } + assert(arena); raw_ = reinterpret_cast( - arena_.AllocateAligned(sz, huge_page_tlb_size, logger)); + arena->AllocateAligned(sz, huge_page_tlb_size, logger)); memset(raw_, 0, sz); if (kNumBlocks > 0 && (reinterpret_cast(raw_) % CACHE_LINE_SIZE)) { data_ = raw_ + CACHE_LINE_SIZE - diff --git a/util/dynamic_bloom.h b/util/dynamic_bloom.h index 4c4f7e1f9..ba0016ddb 100644 --- a/util/dynamic_bloom.h +++ b/util/dynamic_bloom.h @@ -18,6 +18,7 @@ class Logger; class DynamicBloom { public: + // arena: pass arena to bloom filter, hence trace the usage of memory // total_bits: fixed total bits for the bloom // num_probes: number of hash probes for a single key // locality: If positive, optimize for cache line locality, 0 otherwise. @@ -27,7 +28,8 @@ class DynamicBloom { // it to be allocated, like: // sysctl -w vm.nr_hugepages=20 // See linux doc Documentation/vm/hugetlbpage.txt - explicit DynamicBloom(uint32_t total_bits, uint32_t locality = 0, + explicit DynamicBloom(Arena* arena, + uint32_t total_bits, uint32_t locality = 0, uint32_t num_probes = 6, uint32_t (*hash_func)(const Slice& key) = nullptr, size_t huge_page_tlb_size = 0, @@ -36,7 +38,7 @@ class DynamicBloom { explicit DynamicBloom(uint32_t num_probes = 6, uint32_t (*hash_func)(const Slice& key) = nullptr); - void SetTotalBits(uint32_t total_bits, uint32_t locality, + void SetTotalBits(Arena* arena, uint32_t total_bits, uint32_t locality, size_t huge_page_tlb_size, Logger* logger); ~DynamicBloom() {} @@ -63,8 +65,6 @@ class DynamicBloom { uint32_t (*hash_func_)(const Slice& key); unsigned char* data_; unsigned char* raw_; - - Arena arena_; }; inline void DynamicBloom::Add(const Slice& key) { AddHash(hash_func_(key)); } diff --git a/util/dynamic_bloom_test.cc b/util/dynamic_bloom_test.cc index d345addba..3e55488f2 100644 --- a/util/dynamic_bloom_test.cc +++ b/util/dynamic_bloom_test.cc @@ -40,17 +40,19 @@ class DynamicBloomTest { }; TEST(DynamicBloomTest, EmptyFilter) { - DynamicBloom bloom1(100, 0, 2); + Arena arena; + DynamicBloom bloom1(&arena, 100, 0, 2); ASSERT_TRUE(!bloom1.MayContain("hello")); ASSERT_TRUE(!bloom1.MayContain("world")); - DynamicBloom bloom2(CACHE_LINE_SIZE * 8 * 2 - 1, 1, 2); + DynamicBloom bloom2(&arena, CACHE_LINE_SIZE * 8 * 2 - 1, 1, 2); ASSERT_TRUE(!bloom2.MayContain("hello")); ASSERT_TRUE(!bloom2.MayContain("world")); } TEST(DynamicBloomTest, Small) { - DynamicBloom bloom1(100, 0, 2); + Arena arena; + DynamicBloom bloom1(&arena, 100, 0, 2); bloom1.Add("hello"); bloom1.Add("world"); ASSERT_TRUE(bloom1.MayContain("hello")); @@ -58,7 +60,7 @@ TEST(DynamicBloomTest, Small) { ASSERT_TRUE(!bloom1.MayContain("x")); ASSERT_TRUE(!bloom1.MayContain("foo")); - DynamicBloom bloom2(CACHE_LINE_SIZE * 8 * 2 - 1, 1, 2); + DynamicBloom bloom2(&arena, CACHE_LINE_SIZE * 8 * 2 - 1, 1, 2); bloom2.Add("hello"); bloom2.Add("world"); ASSERT_TRUE(bloom2.MayContain("hello")); @@ -94,13 +96,14 @@ TEST(DynamicBloomTest, VaryingLengths) { for (uint32_t enable_locality = 0; enable_locality < 2; ++enable_locality) { for (uint32_t num = 1; num <= 10000; num = NextNum(num)) { uint32_t bloom_bits = 0; + Arena arena; if (enable_locality == 0) { bloom_bits = std::max(num * FLAGS_bits_per_key, 64U); } else { bloom_bits = std::max(num * FLAGS_bits_per_key, enable_locality * CACHE_LINE_SIZE * 8); } - DynamicBloom bloom(bloom_bits, enable_locality, num_probes); + DynamicBloom bloom(&arena, bloom_bits, enable_locality, num_probes); for (uint64_t i = 0; i < num; i++) { bloom.Add(Key(i, buffer)); ASSERT_TRUE(bloom.MayContain(Key(i, buffer))); @@ -148,10 +151,11 @@ TEST(DynamicBloomTest, perf) { } for (uint64_t m = 1; m <= 8; ++m) { + Arena arena; const uint64_t num_keys = m * 8 * 1024 * 1024; fprintf(stderr, "testing %" PRIu64 "M keys\n", m * 8); - DynamicBloom std_bloom(num_keys * 10, 0, num_probes); + DynamicBloom std_bloom(&arena, num_keys * 10, 0, num_probes); timer.Start(); for (uint64_t i = 1; i <= num_keys; ++i) { @@ -175,7 +179,7 @@ TEST(DynamicBloomTest, perf) { ASSERT_TRUE(count == num_keys); // Locality enabled version - DynamicBloom blocked_bloom(num_keys * 10, 1, num_probes); + DynamicBloom blocked_bloom(&arena, num_keys * 10, 1, num_probes); timer.Start(); for (uint64_t i = 1; i <= num_keys; ++i) { diff --git a/util/env_posix.cc b/util/env_posix.cc index 267958606..3bfeb0ea0 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -821,7 +821,7 @@ class PosixWritableFile : public WritableFile { } } - virtual Status RangeSync(off64_t offset, off64_t nbytes) { + virtual Status RangeSync(off_t offset, off_t nbytes) { if (sync_file_range(fd_, offset, nbytes, SYNC_FILE_RANGE_WRITE) == 0) { return Status::OK(); } else { diff --git a/util/hash_linklist_rep.cc b/util/hash_linklist_rep.cc index 22bb7ffb1..8e3dc5826 100644 --- a/util/hash_linklist_rep.cc +++ b/util/hash_linklist_rep.cc @@ -7,6 +7,7 @@ #ifndef ROCKSDB_LITE #include "util/hash_linklist_rep.h" +#include #include "rocksdb/memtablerep.h" #include "util/arena.h" #include "rocksdb/slice.h" @@ -22,6 +23,31 @@ namespace rocksdb { namespace { typedef const char* Key; +typedef SkipList MemtableSkipList; +typedef port::AtomicPointer Pointer; + +// A data structure used as the header of a link list of a hash bucket. +struct BucketHeader { + Pointer next; + uint32_t num_entries; + + explicit BucketHeader(void* n, uint32_t count) + : next(n), num_entries(count) {} + + bool IsSkipListBucket() { return next.NoBarrier_Load() == this; } +}; + +// A data structure used as the header of a skip list of a hash bucket. +struct SkipListBucketHeader { + BucketHeader Counting_header; + MemtableSkipList skip_list; + + explicit SkipListBucketHeader(const MemTableRep::KeyComparator& cmp, + Arena* arena, uint32_t count) + : Counting_header(this, // Pointing to itself to indicate header type. + count), + skip_list(cmp, arena) {} +}; struct Node { // Accessors/mutators for links. Wrapped in methods so we can @@ -51,12 +77,75 @@ struct Node { char key[0]; }; +// Memory structure of the mem table: +// It is a hash table, each bucket points to one entry, a linked list or a +// skip list. In order to track total number of records in a bucket to determine +// whether should switch to skip list, a header is added just to indicate +// number of entries in the bucket. +// +// +// +-----> NULL Case 1. Empty bucket +// | +// | +// | +---> +-------+ +// | | | Next +--> NULL +// | | +-------+ +// +-----+ | | | | Case 2. One Entry in bucket. +// | +-+ | | Data | next pointer points to +// +-----+ | | | NULL. All other cases +// | | | | | next pointer is not NULL. +// +-----+ | +-------+ +// | +---+ +// +-----+ +-> +-------+ +> +-------+ +-> +-------+ +// | | | | Next +--+ | Next +--+ | Next +-->NULL +// +-----+ | +-------+ +-------+ +-------+ +// | +-----+ | Count | | | | | +// +-----+ +-------+ | Data | | Data | +// | | | | | | +// +-----+ Case 3. | | | | +// | | A header +-------+ +-------+ +// +-----+ points to +// | | a linked list. Count indicates total number +// +-----+ of rows in this bucket. +// | | +// +-----+ +-> +-------+ <--+ +// | | | | Next +----+ +// +-----+ | +-------+ Case 4. A header points to a skip +// | +----+ | Count | list and next pointer points to +// +-----+ +-------+ itself, to distinguish case 3 or 4. +// | | | | Count still is kept to indicates total +// +-----+ | Skip +--> of entries in the bucket for debugging +// | | | List | Data purpose. +// | | | +--> +// +-----+ | | +// | | +-------+ +// +-----+ +// +// We don't have data race when changing cases because: +// (1) When changing from case 2->3, we create a new bucket header, put the +// single node there first without changing the original node, and do a +// release store when changing the bucket pointer. In that case, a reader +// who sees a stale value of the bucket pointer will read this node, while +// a reader sees the correct value because of the release store. +// (2) When changing case 3->4, a new header is created with skip list points +// to the data, before doing an acquire store to change the bucket pointer. +// The old header and nodes are never changed, so any reader sees any +// of those existing pointers will guarantee to be able to iterate to the +// end of the linked list. +// (3) Header's next pointer in case 3 might change, but they are never equal +// to itself, so no matter a reader sees any stale or newer value, it will +// be able to correctly distinguish case 3 and 4. +// +// The reason that we use case 2 is we want to make the format to be efficient +// when the utilization of buckets is relatively low. If we use case 3 for +// single entry bucket, we will need to waste 12 bytes for every entry, +// which can be significant decrease of memory utilization. class HashLinkListRep : public MemTableRep { public: HashLinkListRep(const MemTableRep::KeyComparator& compare, Arena* arena, const SliceTransform* transform, size_t bucket_size, - size_t huge_page_tlb_size, Logger* logger, - int bucket_entries_logging_threshold, + uint32_t threshold_use_skiplist, size_t huge_page_tlb_size, + Logger* logger, int bucket_entries_logging_threshold, bool if_log_bucket_dist_when_flash); virtual KeyHandle Allocate(const size_t len, char** buf) override; @@ -80,7 +169,6 @@ class HashLinkListRep : public MemTableRep { private: friend class DynamicIterator; - typedef SkipList FullList; size_t bucket_size_; @@ -88,6 +176,8 @@ class HashLinkListRep : public MemTableRep { // the same transform. port::AtomicPointer* buckets_; + const uint32_t threshold_use_skiplist_; + // The user-supplied transform whose domain is the user keys. const SliceTransform* transform_; @@ -97,7 +187,12 @@ class HashLinkListRep : public MemTableRep { int bucket_entries_logging_threshold_; bool if_log_bucket_dist_when_flash_; - bool BucketContains(Node* head, const Slice& key) const; + bool LinkListContains(Node* head, const Slice& key) const; + + SkipListBucketHeader* GetSkipListBucketHeader(Pointer* first_next_pointer) + const; + + Node* GetLinkListFirstNode(Pointer* first_next_pointer) const; Slice GetPrefix(const Slice& internal_key) const { return transform_->Transform(ExtractUserKey(internal_key)); @@ -107,11 +202,11 @@ class HashLinkListRep : public MemTableRep { return MurmurHash(slice.data(), slice.size(), 0) % bucket_size_; } - Node* GetBucket(size_t i) const { - return static_cast(buckets_[i].Acquire_Load()); + Pointer* GetBucket(size_t i) const { + return static_cast(buckets_[i].Acquire_Load()); } - Node* GetBucket(const Slice& slice) const { + Pointer* GetBucket(const Slice& slice) const { return GetBucket(GetHash(slice)); } @@ -119,7 +214,6 @@ class HashLinkListRep : public MemTableRep { return (compare_(b, a) == 0); } - bool Equal(const Key& a, const Key& b) const { return (compare_(a, b) == 0); } bool KeyIsAfterNode(const Slice& internal_key, const Node* n) const { @@ -137,8 +231,8 @@ class HashLinkListRep : public MemTableRep { class FullListIterator : public MemTableRep::Iterator { public: - explicit FullListIterator(FullList* list, Arena* arena) - : iter_(list), full_list_(list), arena_(arena) {} + explicit FullListIterator(MemtableSkipList* list, Arena* arena) + : iter_(list), full_list_(list), arena_(arena) {} virtual ~FullListIterator() { } @@ -189,22 +283,22 @@ class HashLinkListRep : public MemTableRep { iter_.SeekToLast(); } private: - FullList::Iterator iter_; + MemtableSkipList::Iterator iter_; // To destruct with the iterator. - std::unique_ptr full_list_; + std::unique_ptr full_list_; std::unique_ptr arena_; std::string tmp_; // For passing to EncodeKey }; - class Iterator : public MemTableRep::Iterator { + class LinkListIterator : public MemTableRep::Iterator { public: - explicit Iterator(const HashLinkListRep* const hash_link_list_rep, - Node* head) : - hash_link_list_rep_(hash_link_list_rep), head_(head), node_(nullptr) { - } + explicit LinkListIterator(const HashLinkListRep* const hash_link_list_rep, + Node* head) + : hash_link_list_rep_(hash_link_list_rep), + head_(head), + node_(nullptr) {} - virtual ~Iterator() { - } + virtual ~LinkListIterator() {} // Returns true iff the iterator is positioned at a valid node. virtual bool Valid() const { @@ -271,22 +365,68 @@ class HashLinkListRep : public MemTableRep { } }; - class DynamicIterator : public HashLinkListRep::Iterator { + class DynamicIterator : public HashLinkListRep::LinkListIterator { public: explicit DynamicIterator(HashLinkListRep& memtable_rep) - : HashLinkListRep::Iterator(&memtable_rep, nullptr), - memtable_rep_(memtable_rep) {} + : HashLinkListRep::LinkListIterator(&memtable_rep, nullptr), + memtable_rep_(memtable_rep) {} // Advance to the first entry with a key >= target virtual void Seek(const Slice& k, const char* memtable_key) { auto transformed = memtable_rep_.GetPrefix(k); - Reset(memtable_rep_.GetBucket(transformed)); - HashLinkListRep::Iterator::Seek(k, memtable_key); + auto* bucket = memtable_rep_.GetBucket(transformed); + + SkipListBucketHeader* skip_list_header = + memtable_rep_.GetSkipListBucketHeader(bucket); + if (skip_list_header != nullptr) { + // The bucket is organized as a skip list + if (!skip_list_iter_) { + skip_list_iter_.reset( + new MemtableSkipList::Iterator(&skip_list_header->skip_list)); + } else { + skip_list_iter_->SetList(&skip_list_header->skip_list); + } + if (memtable_key != nullptr) { + skip_list_iter_->Seek(memtable_key); + } else { + IterKey encoded_key; + encoded_key.EncodeLengthPrefixedKey(k); + skip_list_iter_->Seek(encoded_key.GetKey().data()); + } + } else { + // The bucket is organized as a linked list + skip_list_iter_.reset(); + Reset(memtable_rep_.GetLinkListFirstNode(bucket)); + HashLinkListRep::LinkListIterator::Seek(k, memtable_key); + } + } + + virtual bool Valid() const { + if (skip_list_iter_) { + return skip_list_iter_->Valid(); + } + return HashLinkListRep::LinkListIterator::Valid(); + } + + virtual const char* key() const { + if (skip_list_iter_) { + return skip_list_iter_->key(); + } + return HashLinkListRep::LinkListIterator::key(); + } + + virtual void Next() { + if (skip_list_iter_) { + skip_list_iter_->Next(); + } else { + HashLinkListRep::LinkListIterator::Next(); + } } private: // the underlying memtable const HashLinkListRep& memtable_rep_; + std::unique_ptr skip_list_iter_; }; class EmptyIterator : public MemTableRep::Iterator { @@ -312,12 +452,16 @@ class HashLinkListRep : public MemTableRep { HashLinkListRep::HashLinkListRep(const MemTableRep::KeyComparator& compare, Arena* arena, const SliceTransform* transform, - size_t bucket_size, size_t huge_page_tlb_size, - Logger* logger, + size_t bucket_size, + uint32_t threshold_use_skiplist, + size_t huge_page_tlb_size, Logger* logger, int bucket_entries_logging_threshold, bool if_log_bucket_dist_when_flash) : MemTableRep(arena), bucket_size_(bucket_size), + // Threshold to use skip list doesn't make sense if less than 3, so we + // force it to be minimum of 3 to simplify implementation. + threshold_use_skiplist_(std::max(threshold_use_skiplist, 3U)), transform_(transform), compare_(compare), logger_(logger), @@ -343,53 +487,161 @@ KeyHandle HashLinkListRep::Allocate(const size_t len, char** buf) { return static_cast(x); } +SkipListBucketHeader* HashLinkListRep::GetSkipListBucketHeader( + Pointer* first_next_pointer) const { + if (first_next_pointer == nullptr) { + return nullptr; + } + if (first_next_pointer->NoBarrier_Load() == nullptr) { + // Single entry bucket + return nullptr; + } + // Counting header + BucketHeader* header = reinterpret_cast(first_next_pointer); + if (header->IsSkipListBucket()) { + assert(header->num_entries > threshold_use_skiplist_); + auto* skip_list_bucket_header = + reinterpret_cast(header); + assert(skip_list_bucket_header->Counting_header.next.NoBarrier_Load() == + header); + return skip_list_bucket_header; + } + assert(header->num_entries <= threshold_use_skiplist_); + return nullptr; +} + +Node* HashLinkListRep::GetLinkListFirstNode(Pointer* first_next_pointer) const { + if (first_next_pointer == nullptr) { + return nullptr; + } + if (first_next_pointer->NoBarrier_Load() == nullptr) { + // Single entry bucket + return reinterpret_cast(first_next_pointer); + } + // Counting header + BucketHeader* header = reinterpret_cast(first_next_pointer); + if (!header->IsSkipListBucket()) { + assert(header->num_entries <= threshold_use_skiplist_); + return reinterpret_cast(header->next.NoBarrier_Load()); + } + assert(header->num_entries > threshold_use_skiplist_); + return nullptr; +} + void HashLinkListRep::Insert(KeyHandle handle) { Node* x = static_cast(handle); assert(!Contains(x->key)); Slice internal_key = GetLengthPrefixedSlice(x->key); auto transformed = GetPrefix(internal_key); auto& bucket = buckets_[GetHash(transformed)]; - Node* head = static_cast(bucket.Acquire_Load()); + Pointer* first_next_pointer = static_cast(bucket.NoBarrier_Load()); - if (!head) { + if (first_next_pointer == nullptr) { + // Case 1. empty bucket // NoBarrier_SetNext() suffices since we will add a barrier when // we publish a pointer to "x" in prev[i]. x->NoBarrier_SetNext(nullptr); - bucket.Release_Store(static_cast(x)); + bucket.Release_Store(x); return; } - Node* cur = head; - Node* prev = nullptr; - while (true) { - if (cur == nullptr) { - break; - } - Node* next = cur->Next(); - // Make sure the lists are sorted. - // If x points to head_ or next points nullptr, it is trivially satisfied. - assert((cur == head) || (next == nullptr) || - KeyIsAfterNode(next->key, cur)); - if (KeyIsAfterNode(internal_key, cur)) { - // Keep searching in this list - prev = cur; - cur = next; - } else { - break; + BucketHeader* header = nullptr; + if (first_next_pointer->NoBarrier_Load() == nullptr) { + // Case 2. only one entry in the bucket + // Need to convert to a Counting bucket and turn to case 4. + Node* first = reinterpret_cast(first_next_pointer); + // Need to add a bucket header. + // We have to first convert it to a bucket with header before inserting + // the new node. Otherwise, we might need to change next pointer of first. + // In that case, a reader might sees the next pointer is NULL and wrongly + // think the node is a bucket header. + auto* mem = arena_->AllocateAligned(sizeof(BucketHeader)); + header = new (mem) BucketHeader(first, 1); + bucket.Release_Store(header); + } else { + header = reinterpret_cast(first_next_pointer); + if (header->IsSkipListBucket()) { + // Case 4. Bucket is already a skip list + assert(header->num_entries > threshold_use_skiplist_); + auto* skip_list_bucket_header = + reinterpret_cast(header); + skip_list_bucket_header->Counting_header.num_entries++; + skip_list_bucket_header->skip_list.Insert(x->key); + return; } } - // Our data structure does not allow duplicate insertion - assert(cur == nullptr || !Equal(x->key, cur->key)); + if (bucket_entries_logging_threshold_ > 0 && + header->num_entries == + static_cast(bucket_entries_logging_threshold_)) { + Info(logger_, + "HashLinkedList bucket %zu has more than %d " + "entries. Key to insert: %s", + GetHash(transformed), header->num_entries, + GetLengthPrefixedSlice(x->key).ToString(true).c_str()); + } - // NoBarrier_SetNext() suffices since we will add a barrier when - // we publish a pointer to "x" in prev[i]. - x->NoBarrier_SetNext(cur); + if (header->num_entries == threshold_use_skiplist_) { + // Case 3. number of entries reaches the threshold so need to convert to + // skip list. + LinkListIterator bucket_iter( + this, reinterpret_cast(first_next_pointer->NoBarrier_Load())); + auto mem = arena_->AllocateAligned(sizeof(SkipListBucketHeader)); + SkipListBucketHeader* new_skip_list_header = new (mem) + SkipListBucketHeader(compare_, arena_, header->num_entries + 1); + auto& skip_list = new_skip_list_header->skip_list; + + // Add all current entries to the skip list + for (bucket_iter.SeekToHead(); bucket_iter.Valid(); bucket_iter.Next()) { + skip_list.Insert(bucket_iter.key()); + } - if (prev) { - prev->SetNext(x); + // insert the new entry + skip_list.Insert(x->key); + // Set the bucket + bucket.Release_Store(new_skip_list_header); } else { - bucket.Release_Store(static_cast(x)); + // Case 5. Need to insert to the sorted linked list without changing the + // header. + Node* first = reinterpret_cast(header->next.NoBarrier_Load()); + assert(first != nullptr); + // Advance counter unless the bucket needs to be advanced to skip list. + // In that case, we need to make sure the previous count never exceeds + // threshold_use_skiplist_ to avoid readers to cast to wrong format. + header->num_entries++; + + Node* cur = first; + Node* prev = nullptr; + while (true) { + if (cur == nullptr) { + break; + } + Node* next = cur->Next(); + // Make sure the lists are sorted. + // If x points to head_ or next points nullptr, it is trivially satisfied. + assert((cur == first) || (next == nullptr) || + KeyIsAfterNode(next->key, cur)); + if (KeyIsAfterNode(internal_key, cur)) { + // Keep searching in this list + prev = cur; + cur = next; + } else { + break; + } + } + + // Our data structure does not allow duplicate insertion + assert(cur == nullptr || !Equal(x->key, cur->key)); + + // NoBarrier_SetNext() suffices since we will add a barrier when + // we publish a pointer to "x" in prev[i]. + x->NoBarrier_SetNext(cur); + + if (prev) { + prev->SetNext(x); + } else { + header->next.Release_Store(static_cast(x)); + } } } @@ -401,7 +653,13 @@ bool HashLinkListRep::Contains(const char* key) const { if (bucket == nullptr) { return false; } - return BucketContains(bucket, internal_key); + + SkipListBucketHeader* skip_list_header = GetSkipListBucketHeader(bucket); + if (skip_list_header != nullptr) { + return skip_list_header->skip_list.Contains(key); + } else { + return LinkListContains(GetLinkListFirstNode(bucket), internal_key); + } } size_t HashLinkListRep::ApproximateMemoryUsage() { @@ -413,37 +671,53 @@ void HashLinkListRep::Get(const LookupKey& k, void* callback_args, bool (*callback_func)(void* arg, const char* entry)) { auto transformed = transform_->Transform(k.user_key()); auto bucket = GetBucket(transformed); - if (bucket != nullptr) { - Iterator iter(this, bucket); - for (iter.Seek(k.internal_key(), nullptr); + + auto* skip_list_header = GetSkipListBucketHeader(bucket); + if (skip_list_header != nullptr) { + // Is a skip list + MemtableSkipList::Iterator iter(&skip_list_header->skip_list); + for (iter.Seek(k.memtable_key().data()); iter.Valid() && callback_func(callback_args, iter.key()); iter.Next()) { } + } else { + auto* link_list_head = GetLinkListFirstNode(bucket); + if (link_list_head != nullptr) { + LinkListIterator iter(this, link_list_head); + for (iter.Seek(k.internal_key(), nullptr); + iter.Valid() && callback_func(callback_args, iter.key()); + iter.Next()) { + } + } } } MemTableRep::Iterator* HashLinkListRep::GetIterator(Arena* alloc_arena) { // allocate a new arena of similar size to the one currently in use Arena* new_arena = new Arena(arena_->BlockSize()); - auto list = new FullList(compare_, new_arena); + auto list = new MemtableSkipList(compare_, new_arena); HistogramImpl keys_per_bucket_hist; for (size_t i = 0; i < bucket_size_; ++i) { int count = 0; - bool num_entries_printed = false; - auto bucket = GetBucket(i); + auto* bucket = GetBucket(i); if (bucket != nullptr) { - Iterator itr(this, bucket); - for (itr.SeekToHead(); itr.Valid(); itr.Next()) { - list->Insert(itr.key()); - if (logger_ != nullptr && - ++count >= bucket_entries_logging_threshold_ && - !num_entries_printed) { - num_entries_printed = true; - Info(logger_, "HashLinkedList bucket %zu has more than %d " - "entries. %dth key: %s", - i, count, count, - GetLengthPrefixedSlice(itr.key()).ToString(true).c_str()); + auto* skip_list_header = GetSkipListBucketHeader(bucket); + if (skip_list_header != nullptr) { + // Is a skip list + MemtableSkipList::Iterator itr(&skip_list_header->skip_list); + for (itr.SeekToFirst(); itr.Valid(); itr.Next()) { + list->Insert(itr.key()); + count++; + } + } else { + auto* link_list_head = GetLinkListFirstNode(bucket); + if (link_list_head != nullptr) { + LinkListIterator itr(this, link_list_head); + for (itr.SeekToHead(); itr.Valid(); itr.Next()) { + list->Insert(itr.key()); + count++; + } } } } @@ -474,7 +748,8 @@ MemTableRep::Iterator* HashLinkListRep::GetDynamicPrefixIterator( } } -bool HashLinkListRep::BucketContains(Node* head, const Slice& user_key) const { +bool HashLinkListRep::LinkListContains(Node* head, + const Slice& user_key) const { Node* x = FindGreaterOrEqualInBucket(head, user_key); return (x != nullptr && Equal(user_key, x->key)); } @@ -505,17 +780,19 @@ Node* HashLinkListRep::FindGreaterOrEqualInBucket(Node* head, MemTableRep* HashLinkListRepFactory::CreateMemTableRep( const MemTableRep::KeyComparator& compare, Arena* arena, const SliceTransform* transform, Logger* logger) { - return new HashLinkListRep( - compare, arena, transform, bucket_count_, huge_page_tlb_size_, logger, - bucket_entries_logging_threshold_, if_log_bucket_dist_when_flash_); + return new HashLinkListRep(compare, arena, transform, bucket_count_, + threshold_use_skiplist_, huge_page_tlb_size_, + logger, bucket_entries_logging_threshold_, + if_log_bucket_dist_when_flash_); } MemTableRepFactory* NewHashLinkListRepFactory( size_t bucket_count, size_t huge_page_tlb_size, - int bucket_entries_logging_threshold, bool if_log_bucket_dist_when_flash) { - return new HashLinkListRepFactory(bucket_count, huge_page_tlb_size, - bucket_entries_logging_threshold, - if_log_bucket_dist_when_flash); + int bucket_entries_logging_threshold, bool if_log_bucket_dist_when_flash, + uint32_t threshold_use_skiplist) { + return new HashLinkListRepFactory( + bucket_count, threshold_use_skiplist, huge_page_tlb_size, + bucket_entries_logging_threshold, if_log_bucket_dist_when_flash); } } // namespace rocksdb diff --git a/util/hash_linklist_rep.h b/util/hash_linklist_rep.h index bd42e699d..0df35b545 100644 --- a/util/hash_linklist_rep.h +++ b/util/hash_linklist_rep.h @@ -16,10 +16,12 @@ namespace rocksdb { class HashLinkListRepFactory : public MemTableRepFactory { public: explicit HashLinkListRepFactory(size_t bucket_count, + uint32_t threshold_use_skiplist, size_t huge_page_tlb_size, int bucket_entries_logging_threshold, bool if_log_bucket_dist_when_flash) : bucket_count_(bucket_count), + threshold_use_skiplist_(threshold_use_skiplist), huge_page_tlb_size_(huge_page_tlb_size), bucket_entries_logging_threshold_(bucket_entries_logging_threshold), if_log_bucket_dist_when_flash_(if_log_bucket_dist_when_flash) {} @@ -36,6 +38,7 @@ class HashLinkListRepFactory : public MemTableRepFactory { private: const size_t bucket_count_; + const uint32_t threshold_use_skiplist_; const size_t huge_page_tlb_size_; int bucket_entries_logging_threshold_; bool if_log_bucket_dist_when_flash_; diff --git a/util/hash_skiplist_rep.cc b/util/hash_skiplist_rep.cc index 85d4e3356..1c7a459bd 100644 --- a/util/hash_skiplist_rep.cc +++ b/util/hash_skiplist_rep.cc @@ -229,7 +229,9 @@ HashSkipListRep::HashSkipListRep(const MemTableRep::KeyComparator& compare, transform_(transform), compare_(compare), arena_(arena) { - buckets_ = new port::AtomicPointer[bucket_size]; + auto mem = + arena->AllocateAligned(sizeof(port::AtomicPointer) * bucket_size); + buckets_ = new (mem) port::AtomicPointer[bucket_size]; for (size_t i = 0; i < bucket_size_; ++i) { buckets_[i].NoBarrier_Store(nullptr); @@ -237,7 +239,6 @@ HashSkipListRep::HashSkipListRep(const MemTableRep::KeyComparator& compare, } HashSkipListRep::~HashSkipListRep() { - delete[] buckets_; } HashSkipListRep::Bucket* HashSkipListRep::GetInitializedBucket( @@ -271,7 +272,7 @@ bool HashSkipListRep::Contains(const char* key) const { } size_t HashSkipListRep::ApproximateMemoryUsage() { - return sizeof(buckets_); + return 0; } void HashSkipListRep::Get(const LookupKey& k, void* callback_args, diff --git a/util/ldb_cmd.cc b/util/ldb_cmd.cc index e623e5278..41d8d6f47 100644 --- a/util/ldb_cmd.cc +++ b/util/ldb_cmd.cc @@ -286,6 +286,10 @@ Options LDBCommand::PrepareOptionsForOpenDB() { } } + if (opt.db_paths.size() == 0) { + opt.db_paths.push_back(db_path_); + } + return opt; } diff --git a/util/options.cc b/util/options.cc index 17dad0f25..88d26fa01 100644 --- a/util/options.cc +++ b/util/options.cc @@ -214,6 +214,7 @@ DBOptions::DBOptions(const Options& options) disableDataSync(options.disableDataSync), use_fsync(options.use_fsync), db_stats_log_interval(options.db_stats_log_interval), + db_paths(options.db_paths), db_log_dir(options.db_log_dir), wal_dir(options.wal_dir), delete_obsolete_files_period_micros(