From 2459f7ec4e62c3fff2c121e14a3d70f4c01379a7 Mon Sep 17 00:00:00 2001 From: sdong Date: Wed, 2 Jul 2014 09:54:20 -0700 Subject: [PATCH] Support Multiple DB paths (without having an interface to expose to users) Summary: In this patch, we allow RocksDB to support multiple DB paths internally. No user interface is supported yet so this patch is silent to users. Test Plan: make all check Reviewers: igor, haobo, ljin, yhchiang Reviewed By: yhchiang Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D18921 --- db/builder.cc | 3 +- db/column_family.cc | 3 +- db/compaction.cc | 2 + db/compaction.h | 8 +- db/compaction_picker.cc | 46 ++++--- db/db_filesnapshot.cc | 4 +- db/db_impl.cc | 178 +++++++++++++++++-------- db/db_impl.h | 18 ++- db/db_test.cc | 47 ++++++- db/filename.cc | 24 +++- db/filename.h | 12 +- db/filename_test.cc | 4 +- db/log_and_apply_bench.cc | 4 +- db/memtable_list.cc | 8 +- db/memtable_list.h | 16 +-- db/repair.cc | 91 +++++++------ db/table_cache.cc | 10 +- db/table_cache.h | 7 +- db/version_edit.cc | 68 +++++++--- db/version_edit.h | 41 ++++-- db/version_edit_test.cc | 5 +- db/version_set.cc | 50 ++++--- db/version_set.h | 6 +- db/version_set_test.cc | 2 +- include/rocksdb/db.h | 1 + include/rocksdb/options.h | 7 + include/rocksdb/universal_compaction.h | 17 +-- util/ldb_cmd.cc | 4 + util/options.cc | 1 + 29 files changed, 469 insertions(+), 218 deletions(-) diff --git a/db/builder.cc b/db/builder.cc index 61890b5b6..3be61bd10 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -54,7 +54,8 @@ Status BuildTable(const std::string& dbname, Env* env, const Options& options, purge = false; } - std::string fname = TableFileName(dbname, meta->fd.GetNumber()); + std::string fname = TableFileName(options.db_paths, meta->fd.GetNumber(), + meta->fd.GetPathId()); if (iter->Valid()) { unique_ptr file; s = env->NewWritableFile(fname, &file, soptions); diff --git a/db/column_family.cc b/db/column_family.cc index 2d7ac23ae..ec90872b8 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -224,8 +224,7 @@ ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id, if (dummy_versions != nullptr) { internal_stats_.reset(new InternalStats( options_.num_levels, db_options->env, db_options->statistics.get())); - table_cache_.reset( - new TableCache(dbname, &options_, storage_options, table_cache)); + table_cache_.reset(new TableCache(&options_, storage_options, table_cache)); if (options_.compaction_style == kCompactionStyleUniversal) { compaction_picker_.reset( new UniversalCompactionPicker(&options_, &internal_comparator_)); diff --git a/db/compaction.cc b/db/compaction.cc index d0c54cc0c..4ed5374ac 100644 --- a/db/compaction.cc +++ b/db/compaction.cc @@ -29,6 +29,7 @@ static uint64_t TotalFileSize(const std::vector& files) { Compaction::Compaction(Version* input_version, int level, int out_level, uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes, + uint32_t output_path_id, CompressionType output_compression, bool seek_compaction, bool deletion_compaction) : level_(level), @@ -38,6 +39,7 @@ Compaction::Compaction(Version* input_version, int level, int out_level, input_version_(input_version), number_levels_(input_version_->NumberLevels()), cfd_(input_version_->cfd_), + output_path_id_(output_path_id), output_compression_(output_compression), seek_compaction_(seek_compaction), deletion_compaction_(deletion_compaction), diff --git a/db/compaction.h b/db/compaction.h index 44d51ef77..caf44d466 100644 --- a/db/compaction.h +++ b/db/compaction.h @@ -50,6 +50,9 @@ class Compaction { // What compression for output CompressionType OutputCompressionType() const { return output_compression_; } + // Whether need to write output file to second DB path. + uint32_t GetOutputPathId() const { return output_path_id_; } + // Is this a trivial compaction that can be implemented by just // moving a single input file to the next level (no merging or splitting) bool IsTrivialMove() const; @@ -104,8 +107,8 @@ class Compaction { Compaction(Version* input_version, int level, int out_level, uint64_t target_file_size, uint64_t max_grandparent_overlap_bytes, - CompressionType output_compression, bool seek_compaction = false, - bool deletion_compaction = false); + uint32_t output_path_id, CompressionType output_compression, + bool seek_compaction = false, bool deletion_compaction = false); int level_; int out_level_; // levels to which output files are stored @@ -116,6 +119,7 @@ class Compaction { int number_levels_; ColumnFamilyData* cfd_; + uint32_t output_path_id_; CompressionType output_compression_; bool seek_compaction_; // if true, just delete files in inputs_[0] diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 758210f63..0c752184a 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -12,6 +12,7 @@ #define __STDC_FORMAT_MACROS #include #include +#include "db/filename.h" #include "util/log_buffer.h" #include "util/statistics.h" @@ -370,7 +371,7 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level, } Compaction* c = new Compaction(version, input_level, output_level, MaxFileSizeForLevel(output_level), - MaxGrandParentOverlapBytes(input_level), + MaxGrandParentOverlapBytes(input_level), 0, GetCompressionType(*options_, output_level)); c->inputs_[0] = inputs; @@ -491,7 +492,7 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(Version* version, assert(level >= 0); assert(level + 1 < NumberLevels()); c = new Compaction(version, level, level + 1, MaxFileSizeForLevel(level + 1), - MaxGrandParentOverlapBytes(level), + MaxGrandParentOverlapBytes(level), 0, GetCompressionType(*options_, level + 1)); c->score_ = score; @@ -684,9 +685,10 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( // first candidate to be compacted. uint64_t candidate_size = f != nullptr? f->compensated_file_size : 0; if (f != nullptr) { - LogToBuffer(log_buffer, - "[%s] Universal: Possible candidate file %" PRIu64 "[%d].", - version->cfd_->GetName().c_str(), f->fd.GetNumber(), loop); + LogToBuffer( + log_buffer, "[%s] Universal: Possible candidate file %s[%d].", + version->cfd_->GetName().c_str(), + FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId()).c_str(), loop); } // Check if the suceeding files need compaction. @@ -764,7 +766,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( } } Compaction* c = new Compaction( - version, level, level, MaxFileSizeForLevel(level), LLONG_MAX, + version, level, level, MaxFileSizeForLevel(level), LLONG_MAX, 0, GetCompressionType(*options_, level, enable_compression)); c->score_ = score; @@ -772,11 +774,11 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( FileMetaData* f = c->input_version_->files_[level][i]; c->inputs_[0].push_back(f); LogToBuffer(log_buffer, - "[%s] Universal: Picking file %" PRIu64 "[%d] " + "[%s] Universal: Picking file %s[%d] " "with size %" PRIu64 " (compensated size %" PRIu64 ")\n", version->cfd_->GetName().c_str(), - f->fd.GetNumber(), i, - f->fd.GetFileSize(), f->compensated_file_size); + FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId()).c_str(), + i, f->fd.GetFileSize(), f->compensated_file_size); } return c; } @@ -810,29 +812,29 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( start_index = loop; // Consider this as the first candidate. break; } - LogToBuffer(log_buffer, - "[%s] Universal: skipping file %" PRIu64 "[%d] compacted %s", - version->cfd_->GetName().c_str(), f->fd.GetNumber(), loop, - " cannot be a candidate to reduce size amp.\n"); + LogToBuffer(log_buffer, "[%s] Universal: skipping file %s[%d] compacted %s", + version->cfd_->GetName().c_str(), + FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId()).c_str(), + loop, " cannot be a candidate to reduce size amp.\n"); f = nullptr; } if (f == nullptr) { return nullptr; // no candidate files } - LogToBuffer(log_buffer, - "[%s] Universal: First candidate file %" PRIu64 "[%d] %s", - version->cfd_->GetName().c_str(), f->fd.GetNumber(), start_index, - " to reduce size amp.\n"); + LogToBuffer(log_buffer, "[%s] Universal: First candidate file %s[%d] %s", + version->cfd_->GetName().c_str(), + FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId()).c_str(), + start_index, " to reduce size amp.\n"); // keep adding up all the remaining files for (unsigned int loop = start_index; loop < files.size() - 1; loop++) { f = files[loop]; if (f->being_compacted) { LogToBuffer( - log_buffer, - "[%s] Universal: Possible candidate file %" PRIu64 "[%d] %s.", - version->cfd_->GetName().c_str(), f->fd.GetNumber(), loop, + log_buffer, "[%s] Universal: Possible candidate file %s[%d] %s.", + version->cfd_->GetName().c_str(), + FormatFileNumber(f->fd.GetNumber(), f->fd.GetPathId()).c_str(), loop, " is already being compacted. No size amp reduction possible.\n"); return nullptr; } @@ -867,7 +869,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( // We always compact all the files, so always compress. Compaction* c = new Compaction(version, level, level, MaxFileSizeForLevel(level), - LLONG_MAX, GetCompressionType(*options_, level)); + LLONG_MAX, 0, GetCompressionType(*options_, level)); c->score_ = score; for (unsigned int loop = start_index; loop < files.size(); loop++) { f = c->input_version_->files_[level][loop]; @@ -909,7 +911,7 @@ Compaction* FIFOCompactionPicker::PickCompaction(Version* version, return nullptr; } - Compaction* c = new Compaction(version, 0, 0, 0, 0, kNoCompression, false, + Compaction* c = new Compaction(version, 0, 0, 0, 0, 0, kNoCompression, false, true /* is deletion compaction */); // delete old files (FIFO) for (auto ritr = version->files_[0].rbegin(); diff --git a/db/db_filesnapshot.cc b/db/db_filesnapshot.cc index 582355ccd..5286ca782 100644 --- a/db/db_filesnapshot.cc +++ b/db/db_filesnapshot.cc @@ -98,7 +98,7 @@ Status DBImpl::GetLiveFiles(std::vector& ret, } // Make a set of all of the live *.sst files - std::set live; + std::vector live; for (auto cfd : *versions_->GetColumnFamilySet()) { cfd->current()->AddLiveFiles(&live); } @@ -109,7 +109,7 @@ Status DBImpl::GetLiveFiles(std::vector& ret, // create names of the live files. The names are not absolute // paths, instead they are relative to dbname_; for (auto live_file : live) { - ret.push_back(TableFileName("", live_file)); + ret.push_back(MakeTableFileName("", live_file.GetNumber())); } ret.push_back(CurrentFileName("")); diff --git a/db/db_impl.cc b/db/db_impl.cc index b93be78bc..ce1cf78ff 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -98,6 +98,7 @@ struct DBImpl::CompactionState { // Files produced by compaction struct Output { uint64_t number; + uint32_t path_id; uint64_t file_size; InternalKey smallest, largest; SequenceNumber smallest_seqno, largest_seqno; @@ -294,6 +295,10 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) { result.wal_dir = result.wal_dir.substr(0, result.wal_dir.size() - 1); } + if (result.db_paths.size() == 0) { + result.db_paths.push_back(dbname); + } + return result; } @@ -573,30 +578,48 @@ void DBImpl::FindObsoleteFiles(DeletionState& deletion_state, } // don't delete live files - deletion_state.sst_live.assign(pending_outputs_.begin(), - pending_outputs_.end()); + for (auto pair : pending_outputs_) { + deletion_state.sst_live.emplace_back(pair.first, pair.second, 0); + } + /* deletion_state.sst_live.insert(pending_outputs_.begin(), + pending_outputs_.end());*/ versions_->AddLiveFiles(&deletion_state.sst_live); if (doing_the_full_scan) { - // set of all files in the directory. We'll exclude files that are still - // alive in the subsequent processings. - env_->GetChildren( - dbname_, &deletion_state.candidate_files - ); // Ignore errors + for (uint32_t path_id = 0; path_id < options_.db_paths.size(); path_id++) { + // set of all files in the directory. We'll exclude files that are still + // alive in the subsequent processings. + std::vector files; + env_->GetChildren(dbname_, &files); // Ignore errors + for (std::string file : files) { + deletion_state.candidate_files.emplace_back(file, path_id); + } + } //Add log files in wal_dir if (options_.wal_dir != dbname_) { std::vector log_files; env_->GetChildren(options_.wal_dir, &log_files); // Ignore errors - deletion_state.candidate_files.insert( - deletion_state.candidate_files.end(), - log_files.begin(), - log_files.end() - ); + for (std::string log_file : log_files) { + deletion_state.candidate_files.emplace_back(log_file, 0); + } } } } +namespace { +bool CompareCandidateFile(const rocksdb::DBImpl::CandidateFileInfo& first, + const rocksdb::DBImpl::CandidateFileInfo& second) { + if (first.file_name > second.file_name) { + return true; + } else if (first.file_name < second.file_name) { + return false; + } else { + return (first.path_id > first.path_id); + } +} +}; // namespace + // Diffs the files listed in filenames and those that do not // belong to live files are posibly removed. Also, removes all the // files in sst_delete_files and log_delete_files. @@ -612,10 +635,12 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { return; } - // Now, convert live list to an unordered set, WITHOUT mutex held; + // Now, convert live list to an unordered map, WITHOUT mutex held; // set is slow. - std::unordered_set sst_live(state.sst_live.begin(), - state.sst_live.end()); + std::unordered_map sst_live_map; + for (FileDescriptor& fd : state.sst_live) { + sst_live_map[fd.GetNumber()] = &fd; + } auto& candidate_files = state.candidate_files; candidate_files.reserve( @@ -625,26 +650,30 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { // We may ignore the dbname when generating the file names. const char* kDumbDbName = ""; for (auto file : state.sst_delete_files) { - candidate_files.push_back( - TableFileName(kDumbDbName, file->fd.GetNumber()).substr(1)); + candidate_files.emplace_back( + MakeTableFileName(kDumbDbName, file->fd.GetNumber()), + file->fd.GetPathId()); delete file; } for (auto file_num : state.log_delete_files) { if (file_num > 0) { - candidate_files.push_back(LogFileName(kDumbDbName, file_num).substr(1)); + candidate_files.emplace_back(LogFileName(kDumbDbName, file_num).substr(1), + 0); } } // dedup state.candidate_files so we don't try to delete the same // file twice - sort(candidate_files.begin(), candidate_files.end()); + sort(candidate_files.begin(), candidate_files.end(), CompareCandidateFile); candidate_files.erase(unique(candidate_files.begin(), candidate_files.end()), candidate_files.end()); std::vector old_info_log_files; - for (const auto& to_delete : candidate_files) { + for (const auto& candidate_file : candidate_files) { + std::string to_delete = candidate_file.file_name; + uint32_t path_id = candidate_file.path_id; uint64_t number; FileType type; // Ignore file if we cannot recognize it. @@ -664,7 +693,7 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { keep = (number >= state.manifest_file_number); break; case kTableFile: - keep = (sst_live.find(number) != sst_live.end()); + keep = (sst_live_map.find(number) != sst_live_map.end()); break; case kTempFile: // Any temp files that are currently being written to must @@ -672,7 +701,7 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { // Also, SetCurrentFile creates a temp file when writing out new // manifest, which is equal to state.pending_manifest_file_number. We // should not delete that file - keep = (sst_live.find(number) != sst_live.end()) || + keep = (sst_live_map.find(number) != sst_live_map.end()) || (number == state.pending_manifest_file_number); break; case kInfoLogFile: @@ -693,13 +722,16 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { continue; } + std::string fname; if (type == kTableFile) { // evict from cache TableCache::Evict(table_cache_.get(), number); + fname = TableFileName(options_.db_paths, number, path_id); + } else { + fname = + ((type == kLogFile) ? options_.wal_dir : dbname_) + "/" + to_delete; } - std::string fname = ((type == kLogFile) ? options_.wal_dir : dbname_) + - "/" + to_delete; if (type == kLogFile && (options_.WAL_ttl_seconds > 0 || options_.WAL_size_limit_MB > 0)) { auto archived_log_name = ArchivedLogFileName(options_.wal_dir, number); @@ -1084,6 +1116,13 @@ Status DBImpl::Recover( return s; } + for (auto db_path : options_.db_paths) { + s = env_->CreateDirIfMissing(db_path); + if (!s.ok()) { + return s; + } + } + s = env_->NewDirectory(dbname_, &db_directory_); if (!s.ok()) { return s; @@ -1349,8 +1388,8 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem, mutex_.AssertHeld(); const uint64_t start_micros = env_->NowMicros(); FileMetaData meta; - meta.fd.number = versions_->NewFileNumber(); - pending_outputs_.insert(meta.fd.GetNumber()); + meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0); + pending_outputs_[meta.fd.GetNumber()] = 0; // path 0 for level 0 file. Iterator* iter = mem->NewIterator(ReadOptions(), true); const SequenceNumber newest_snapshot = snapshots_.GetNewest(); const SequenceNumber earliest_seqno_in_memtable = @@ -1381,9 +1420,9 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem, // should not be added to the manifest. int level = 0; if (s.ok() && meta.fd.GetFileSize() > 0) { - edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetFileSize(), - meta.smallest, meta.largest, meta.smallest_seqno, - meta.largest_seqno); + edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(), + meta.fd.GetFileSize(), meta.smallest, meta.largest, + meta.smallest_seqno, meta.largest_seqno); } InternalStats::CompactionStats stats; @@ -1402,9 +1441,10 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd, mutex_.AssertHeld(); const uint64_t start_micros = env_->NowMicros(); FileMetaData meta; - meta.fd.number = versions_->NewFileNumber(); + + meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0); *filenumber = meta.fd.GetNumber(); - pending_outputs_.insert(meta.fd.GetNumber()); + pending_outputs_[meta.fd.GetNumber()] = 0; // path 0 for level 0 file. const SequenceNumber newest_snapshot = snapshots_.GetNewest(); const SequenceNumber earliest_seqno_in_memtable = @@ -1471,9 +1511,9 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd, cfd->options()->compaction_style == kCompactionStyleLevel) { level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); } - edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetFileSize(), - meta.smallest, meta.largest, meta.smallest_seqno, - meta.largest_seqno); + edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(), + meta.fd.GetFileSize(), meta.smallest, meta.largest, + meta.smallest_seqno, meta.largest_seqno); } InternalStats::CompactionStats stats; @@ -1529,7 +1569,7 @@ Status DBImpl::FlushMemTableToOutputFile(ColumnFamilyData* cfd, // Replace immutable memtable with the generated Table s = cfd->imm()->InstallMemtableFlushResults( cfd, mems, versions_.get(), &mutex_, options_.info_log.get(), - file_number, pending_outputs_, &deletion_state.memtables_to_free, + file_number, &pending_outputs_, &deletion_state.memtables_to_free, db_directory_.get(), log_buffer); } @@ -1673,9 +1713,9 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { edit.SetColumnFamily(cfd->GetID()); for (const auto& f : cfd->current()->files_[level]) { edit.DeleteFile(level, f->fd.GetNumber()); - edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetFileSize(), - f->smallest, f->largest, f->smallest_seqno, - f->largest_seqno); + edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(), + f->fd.GetFileSize(), f->smallest, f->largest, + f->smallest_seqno, f->largest_seqno); } Log(options_.info_log, "[%s] Apply version edit:\n%s", cfd->GetName().c_str(), edit.DebugString().data()); @@ -2172,9 +2212,9 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, assert(c->num_input_files(0) == 1); FileMetaData* f = c->input(0, 0); c->edit()->DeleteFile(c->level(), f->fd.GetNumber()); - c->edit()->AddFile(c->level() + 1, f->fd.GetNumber(), f->fd.GetFileSize(), - f->smallest, f->largest, f->smallest_seqno, - f->largest_seqno); + c->edit()->AddFile(c->level() + 1, f->fd.GetNumber(), f->fd.GetPathId(), + f->fd.GetFileSize(), f->smallest, f->largest, + f->smallest_seqno, f->largest_seqno); status = versions_->LogAndApply(c->column_family_data(), c->edit(), &mutex_, db_directory_.get()); InstallSuperVersion(c->column_family_data(), deletion_state); @@ -2280,7 +2320,7 @@ void DBImpl::AllocateCompactionOutputFileNumbers(CompactionState* compact) { int filesNeeded = compact->compaction->num_input_files(1); for (int i = 0; i < std::max(filesNeeded, 1); i++) { uint64_t file_number = versions_->NewFileNumber(); - pending_outputs_.insert(file_number); + pending_outputs_[file_number] = compact->compaction->GetOutputPathId(); compact->allocated_file_numbers.push_back(file_number); } } @@ -2306,18 +2346,20 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { } else { mutex_.Lock(); file_number = versions_->NewFileNumber(); - pending_outputs_.insert(file_number); + pending_outputs_[file_number] = compact->compaction->GetOutputPathId(); mutex_.Unlock(); } CompactionState::Output out; out.number = file_number; + out.path_id = compact->compaction->GetOutputPathId(); out.smallest.Clear(); out.largest.Clear(); out.smallest_seqno = out.largest_seqno = 0; compact->outputs.push_back(out); // Make the output file - std::string fname = TableFileName(dbname_, file_number); + std::string fname = TableFileName(options_.db_paths, file_number, + compact->compaction->GetOutputPathId()); Status s = env_->NewWritableFile(fname, &compact->outfile, storage_options_); if (s.ok()) { @@ -2340,6 +2382,7 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, assert(compact->builder != nullptr); const uint64_t output_number = compact->current_output()->number; + const uint32_t output_path_id = compact->current_output()->path_id; assert(output_number != 0); // Check for iterator errors @@ -2375,9 +2418,9 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, if (s.ok() && current_entries > 0) { // Verify that the table is usable ColumnFamilyData* cfd = compact->compaction->column_family_data(); - FileDescriptor meta(output_number, current_bytes); + FileDescriptor fd(output_number, output_path_id, current_bytes); Iterator* iter = cfd->table_cache()->NewIterator( - ReadOptions(), storage_options_, cfd->internal_comparator(), meta); + ReadOptions(), storage_options_, cfd->internal_comparator(), fd); s = iter->status(); delete iter; if (s.ok()) { @@ -2420,9 +2463,10 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact, compact->compaction->AddInputDeletions(compact->compaction->edit()); for (size_t i = 0; i < compact->outputs.size(); i++) { const CompactionState::Output& out = compact->outputs[i]; - compact->compaction->edit()->AddFile( - compact->compaction->output_level(), out.number, out.file_size, - out.smallest, out.largest, out.smallest_seqno, out.largest_seqno); + compact->compaction->edit()->AddFile(compact->compaction->output_level(), + out.number, out.path_id, out.file_size, + out.smallest, out.largest, + out.smallest_seqno, out.largest_seqno); } return versions_->LogAndApply(compact->compaction->column_family_data(), compact->compaction->edit(), &mutex_, @@ -4118,7 +4162,7 @@ Status DBImpl::MakeRoomForWrite( // how do we fail if we're not creating new log? assert(creating_new_log); // Avoid chewing through file number space in a tight loop. - versions_->ReuseFileNumber(new_log_number); + versions_->ReuseLogFileNumber(new_log_number); assert(!new_mem); assert(!new_log); break; @@ -4361,14 +4405,15 @@ Status DBImpl::CheckConsistency() { std::string corruption_messages; for (const auto& md : metadata) { - std::string file_path = dbname_ + md.name; + std::string file_path = md.db_path + "/" + md.name; + uint64_t fsize = 0; Status s = env_->GetFileSize(file_path, &fsize); if (!s.ok()) { corruption_messages += "Can't access " + md.name + ": " + s.ToString() + "\n"; } else if (fsize != md.size) { - corruption_messages += "Sst file size mismatch: " + md.name + + corruption_messages += "Sst file size mismatch: " + file_path + ". Size recorded in manifest " + std::to_string(md.size) + ", actual size " + std::to_string(fsize) + "\n"; @@ -4466,6 +4511,11 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { Status DB::Open(const DBOptions& db_options, const std::string& dbname, const std::vector& column_families, std::vector* handles, DB** dbptr) { + if (db_options.db_paths.size() > 1) { + return Status::NotSupported( + "More than one DB paths are not supported yet. "); + } + *dbptr = nullptr; handles->clear(); @@ -4481,6 +4531,15 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, DBImpl* impl = new DBImpl(db_options, dbname); Status s = impl->env_->CreateDirIfMissing(impl->options_.wal_dir); + if (s.ok()) { + for (auto path : impl->options_.db_paths) { + s = impl->env_->CreateDirIfMissing(path); + if (!s.ok()) { + break; + } + } + } + if (!s.ok()) { delete impl; return s; @@ -4643,6 +4702,21 @@ Status DestroyDB(const std::string& dbname, const Options& options) { } } + for (auto db_path : options.db_paths) { + env->GetChildren(db_path, &filenames); + uint64_t number; + FileType type; + for (size_t i = 0; i < filenames.size(); i++) { + if (ParseFileName(filenames[i], &number, &type) && + type == kTableFile) { // Lock file will be deleted at end + Status del = env->DeleteFile(db_path + "/" + filenames[i]); + if (result.ok() && !del.ok()) { + result = del; + } + } + } + } + env->GetChildren(archivedir, &archiveFiles); // Delete archival files. for (size_t i = 0; i < archiveFiles.size(); ++i) { diff --git a/db/db_impl.h b/db/db_impl.h index 5d171b57d..fb0bdb4af 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -198,6 +198,17 @@ class DBImpl : public DB { Status TEST_ReadFirstLine(const std::string& fname, SequenceNumber* sequence); #endif // NDEBUG + // Structure to store information for candidate files to delete. + struct CandidateFileInfo { + std::string file_name; + uint32_t path_id; + CandidateFileInfo(std::string name, uint32_t path) + : file_name(name), path_id(path) {} + bool operator==(const CandidateFileInfo& other) const { + return file_name == other.file_name && path_id == other.path_id; + } + }; + // needed for CleanupIteratorState struct DeletionState { inline bool HaveSomethingToDelete() const { @@ -209,10 +220,10 @@ class DBImpl : public DB { // a list of all files that we'll consider deleting // (every once in a while this is filled up with all files // in the DB directory) - std::vector candidate_files; + std::vector candidate_files; // the list of all live sst files that cannot be deleted - std::vector sst_live; + std::vector sst_live; // a list of sst files that we need to delete std::vector sst_delete_files; @@ -501,7 +512,8 @@ class DBImpl : public DB { // Set of table files to protect from deletion because they are // part of ongoing compactions. - std::set pending_outputs_; + // map from pending file number ID to their path IDs. + FileNumToPathIdMap pending_outputs_; // At least one compaction or flush job is pending but not yet scheduled // because of the max background thread limit. diff --git a/db/db_test.cc b/db/db_test.cc index 6bc272744..8010eaa81 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -98,7 +98,12 @@ class AtomicCounter { count_ = 0; } }; +} // namespace anon +static std::string Key(int i) { + char buf[100]; + snprintf(buf, sizeof(buf), "key%06d", i); + return std::string(buf); } // Special Env used to delay background operations @@ -355,7 +360,10 @@ class DBTest { ~DBTest() { Close(); - ASSERT_OK(DestroyDB(dbname_, Options())); + Options options; + options.db_paths.push_back(dbname_); + options.db_paths.push_back(dbname_ + "_2"); + ASSERT_OK(DestroyDB(dbname_, options)); delete env_; delete filter_policy_; } @@ -897,6 +905,30 @@ class DBTest { return property; } + int GetSstFileCount(std::string path) { + std::vector files; + env_->GetChildren(path, &files); + + int sst_count = 0; + uint64_t number; + FileType type; + for (size_t i = 0; i < files.size(); i++) { + if (ParseFileName(files[i], &number, &type) && type == kTableFile) { + sst_count++; + } + } + return sst_count; + } + + void GenerateNewFile(Random* rnd, int* key_idx) { + for (int i = 0; i < 11; i++) { + ASSERT_OK(Put(Key(*key_idx), RandomString(rnd, (i == 10) ? 1 : 10000))); + (*key_idx)++; + } + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->TEST_WaitForCompact(); + } + std::string IterStatus(Iterator* iter) { std::string result; if (iter->Valid()) { @@ -1037,12 +1069,6 @@ class DBTest { }; -static std::string Key(int i) { - char buf[100]; - snprintf(buf, sizeof(buf), "key%06d", i); - return std::string(buf); -} - static long TestGetTickerCount(const Options& options, Tickers ticker_type) { return options.statistics->getTickerCount(ticker_type); } @@ -3434,6 +3460,13 @@ TEST(DBTest, UniversalCompactionCompressRatio2) { ASSERT_LT((int)dbfull()->TEST_GetLevel0TotalSize(), 120000 * 12 * 0.8 + 120000 * 2); } + +TEST(DBTest, FailMoreDbPaths) { + Options options; + options.db_paths.push_back(dbname_); + options.db_paths.push_back(dbname_ + "_2"); + ASSERT_TRUE(TryReopen(&options).IsNotSupported()); +} #endif TEST(DBTest, ConvertCompactionStyle) { diff --git a/db/filename.cc b/db/filename.cc index d19f0fd53..1c2be8ffb 100644 --- a/db/filename.cc +++ b/db/filename.cc @@ -11,6 +11,7 @@ #include #include +#include #include "db/dbformat.h" #include "rocksdb/env.h" #include "util/logging.h" @@ -66,9 +67,28 @@ std::string ArchivedLogFileName(const std::string& name, uint64_t number) { return MakeFileName(name + "/" + ARCHIVAL_DIR, number, "log"); } -std::string TableFileName(const std::string& name, uint64_t number) { +std::string MakeTableFileName(const std::string& path, uint64_t number) { + return MakeFileName(path, number, "sst"); +} + +std::string TableFileName(const std::vector db_paths, + uint64_t number, uint32_t path_id) { assert(number > 0); - return MakeFileName(name, number, "sst"); + std::string path; + if (path_id >= db_paths.size()) { + path = db_paths.back(); + } else { + path = db_paths[path_id]; + } + return MakeTableFileName(path, number); +} + +std::string FormatFileNumber(uint64_t number, uint32_t path_id) { + if (path_id == 0) { + return std::to_string(number); + } else { + return std::to_string(number) + "(path " + std::to_string(path_id) + ")"; + } } std::string DescriptorFileName(const std::string& dbname, uint64_t number) { diff --git a/db/filename.h b/db/filename.h index c4c306946..5db434e02 100644 --- a/db/filename.h +++ b/db/filename.h @@ -11,7 +11,9 @@ #pragma once #include +#include #include +#include #include "rocksdb/slice.h" #include "rocksdb/status.h" #include "rocksdb/transaction_log.h" @@ -34,6 +36,9 @@ enum FileType { kIdentityFile }; +// map from file number to path ID. +typedef std::unordered_map FileNumToPathIdMap; + // Return the name of the log file with the specified number // in the db named by "dbname". The result will be prefixed with // "dbname". @@ -48,10 +53,15 @@ extern std::string ArchivalDirectory(const std::string& dbname); extern std::string ArchivedLogFileName(const std::string& dbname, uint64_t num); +extern std::string MakeTableFileName(const std::string& name, uint64_t number); + // Return the name of the sstable with the specified number // in the db named by "dbname". The result will be prefixed with // "dbname". -extern std::string TableFileName(const std::string& dbname, uint64_t number); +extern std::string TableFileName(const std::vector db_paths, + uint64_t number, uint32_t path_id); + +extern std::string FormatFileNumber(uint64_t number, uint32_t path_id); // Return the name of the descriptor file for the db named by // "dbname" and the specified incarnation number. The result will be diff --git a/db/filename_test.cc b/db/filename_test.cc index 0baa7fdae..c86d16f34 100644 --- a/db/filename_test.cc +++ b/db/filename_test.cc @@ -108,7 +108,9 @@ TEST(FileNameTest, Construction) { ASSERT_EQ(192U, number); ASSERT_EQ(kLogFile, type); - fname = TableFileName("bar", 200); + fname = TableFileName({"bar"}, 200, 0); + std::string fname1 = TableFileName({"foo", "bar"}, 200, 1); + ASSERT_EQ(fname, fname1); ASSERT_EQ("bar/", std::string(fname.data(), 4)); ASSERT_TRUE(ParseFileName(fname.c_str() + 4, &number, &type)); ASSERT_EQ(200U, number); diff --git a/db/log_and_apply_bench.cc b/db/log_and_apply_bench.cc index ab9716deb..dbb459c30 100644 --- a/db/log_and_apply_bench.cc +++ b/db/log_and_apply_bench.cc @@ -51,7 +51,7 @@ void BM_LogAndApply(int iters, int num_base_files) { for (int i = 0; i < num_base_files; i++) { InternalKey start(MakeKey(2 * fnum), 1, kTypeValue); InternalKey limit(MakeKey(2 * fnum + 1), 1, kTypeDeletion); - vbase.AddFile(2, ++fnum, 1 /* file size */, start, limit, 1, 1); + vbase.AddFile(2, ++fnum, 0, 1 /* file size */, start, limit, 1, 1); } ASSERT_OK(vset->LogAndApply(default_cfd, &vbase, &mu)); } @@ -61,7 +61,7 @@ void BM_LogAndApply(int iters, int num_base_files) { vedit.DeleteFile(2, fnum); InternalKey start(MakeKey(2 * fnum), 1, kTypeValue); InternalKey limit(MakeKey(2 * fnum + 1), 1, kTypeDeletion); - vedit.AddFile(2, ++fnum, 1 /* file size */, start, limit, 1, 1); + vedit.AddFile(2, ++fnum, 0, 1 /* file size */, start, limit, 1, 1); vset->LogAndApply(default_cfd, &vedit, &mu); } } diff --git a/db/memtable_list.cc b/db/memtable_list.cc index de1a18eee..d3fc1356b 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -140,7 +140,7 @@ void MemTableList::PickMemtablesToFlush(autovector* ret) { void MemTableList::RollbackMemtableFlush(const autovector& mems, uint64_t file_number, - std::set* pending_outputs) { + FileNumToPathIdMap* pending_outputs) { assert(!mems.empty()); // If the flush was not successful, then just reset state. @@ -162,7 +162,7 @@ void MemTableList::RollbackMemtableFlush(const autovector& mems, Status MemTableList::InstallMemtableFlushResults( ColumnFamilyData* cfd, const autovector& mems, VersionSet* vset, port::Mutex* mu, Logger* info_log, uint64_t file_number, - std::set& pending_outputs, autovector* to_delete, + FileNumToPathIdMap* pending_outputs, autovector* to_delete, Directory* db_directory, LogBuffer* log_buffer) { mu->AssertHeld(); @@ -219,7 +219,7 @@ Status MemTableList::InstallMemtableFlushResults( // has been written to a committed version so that other concurrently // executing compaction threads do not mistakenly assume that this // file is not live. - pending_outputs.erase(m->file_number_); + pending_outputs->erase(m->file_number_); if (m->Unref() != nullptr) { to_delete->push_back(m); } @@ -233,7 +233,7 @@ Status MemTableList::InstallMemtableFlushResults( m->flush_in_progress_ = false; m->edit_.Clear(); num_flush_not_started_++; - pending_outputs.erase(m->file_number_); + pending_outputs->erase(m->file_number_); m->file_number_ = 0; imm_flush_needed.Release_Store((void *)1); } diff --git a/db/memtable_list.h b/db/memtable_list.h index e56710fc9..f4923e831 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -15,6 +15,7 @@ #include "rocksdb/iterator.h" #include "db/dbformat.h" +#include "db/filename.h" #include "db/skiplist.h" #include "db/memtable.h" #include "rocksdb/db.h" @@ -108,17 +109,14 @@ class MemTableList { // they can get picked up again on the next round of flush. void RollbackMemtableFlush(const autovector& mems, uint64_t file_number, - std::set* pending_outputs); + FileNumToPathIdMap* pending_outputs); // Commit a successful flush in the manifest file - Status InstallMemtableFlushResults(ColumnFamilyData* cfd, - const autovector& m, - VersionSet* vset, port::Mutex* mu, - Logger* info_log, uint64_t file_number, - std::set& pending_outputs, - autovector* to_delete, - Directory* db_directory, - LogBuffer* log_buffer); + Status InstallMemtableFlushResults( + ColumnFamilyData* cfd, const autovector& m, VersionSet* vset, + port::Mutex* mu, Logger* info_log, uint64_t file_number, + FileNumToPathIdMap* pending_outputs, autovector* to_delete, + Directory* db_directory, LogBuffer* log_buffer); // New memtables are inserted at the front of the list. // Takes ownership of the referenced held on *m by the caller of Add(). diff --git a/db/repair.cc b/db/repair.cc index 13959a920..12c275c3e 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -65,8 +65,8 @@ class Repairer { NewLRUCache(10, options_.table_cache_numshardbits, options_.table_cache_remove_scan_count_limit)), next_file_number_(1) { - table_cache_ = new TableCache(dbname_, &options_, storage_options_, - raw_table_cache_.get()); + table_cache_ = + new TableCache(&options_, storage_options_, raw_table_cache_.get()); edit_ = new VersionEdit(); } @@ -116,7 +116,7 @@ class Repairer { VersionEdit* edit_; std::vector manifests_; - std::vector table_numbers_; + std::vector table_fds_; std::vector logs_; std::vector tables_; uint64_t next_file_number_; @@ -124,35 +124,43 @@ class Repairer { Status FindFiles() { std::vector filenames; - Status status = env_->GetChildren(dbname_, &filenames); - if (!status.ok()) { - return status; - } - if (filenames.empty()) { - return Status::Corruption(dbname_, "repair found no files"); - } + bool found_file = false; + for (uint32_t path_id = 0; path_id < options_.db_paths.size(); path_id++) { + Status status = env_->GetChildren(options_.db_paths[path_id], &filenames); + if (!status.ok()) { + return status; + } + if (!filenames.empty()) { + found_file = true; + } - uint64_t number; - FileType type; - for (size_t i = 0; i < filenames.size(); i++) { - if (ParseFileName(filenames[i], &number, &type)) { - if (type == kDescriptorFile) { - manifests_.push_back(filenames[i]); - } else { - if (number + 1 > next_file_number_) { - next_file_number_ = number + 1; - } - if (type == kLogFile) { - logs_.push_back(number); - } else if (type == kTableFile) { - table_numbers_.push_back(number); + uint64_t number; + FileType type; + for (size_t i = 0; i < filenames.size(); i++) { + if (ParseFileName(filenames[i], &number, &type)) { + if (type == kDescriptorFile) { + assert(path_id == 0); + manifests_.push_back(filenames[i]); } else { - // Ignore other files + if (number + 1 > next_file_number_) { + next_file_number_ = number + 1; + } + if (type == kLogFile) { + assert(path_id == 0); + logs_.push_back(number); + } else if (type == kTableFile) { + table_fds_.emplace_back(number, path_id, 0); + } else { + // Ignore other files + } } } } } - return status; + if (!found_file) { + return Status::Corruption(dbname_, "repair found no files"); + } + return Status::OK(); } void ConvertLogFilesToTables() { @@ -228,7 +236,7 @@ class Repairer { // Do not record a version edit for this conversion to a Table // since ExtractMetaData() will also generate edits. FileMetaData meta; - meta.fd.number = next_file_number_++; + meta.fd = FileDescriptor(next_file_number_++, 0, 0); ReadOptions ro; Iterator* iter = mem->NewIterator(ro, true /* enforce_total_order */); status = BuildTable(dbname_, env_, options_, storage_options_, table_cache_, @@ -239,7 +247,7 @@ class Repairer { mem = nullptr; if (status.ok()) { if (meta.fd.GetFileSize() > 0) { - table_numbers_.push_back(meta.fd.GetNumber()); + table_fds_.push_back(meta.fd); } } Log(options_.info_log, @@ -249,14 +257,17 @@ class Repairer { } void ExtractMetaData() { - for (size_t i = 0; i < table_numbers_.size(); i++) { + for (size_t i = 0; i < table_fds_.size(); i++) { TableInfo t; - t.meta.fd.number = table_numbers_[i]; + t.meta.fd = table_fds_[i]; Status status = ScanTable(&t); if (!status.ok()) { - std::string fname = TableFileName(dbname_, table_numbers_[i]); - Log(options_.info_log, "Table #%" PRIu64 ": ignoring %s", - table_numbers_[i], status.ToString().c_str()); + std::string fname = TableFileName( + options_.db_paths, t.meta.fd.GetNumber(), t.meta.fd.GetPathId()); + Log(options_.info_log, "Table #%s: ignoring %s", + FormatFileNumber(t.meta.fd.GetNumber(), t.meta.fd.GetPathId()) + .c_str(), + status.ToString().c_str()); ArchiveFile(fname); } else { tables_.push_back(t); @@ -265,9 +276,13 @@ class Repairer { } Status ScanTable(TableInfo* t) { - std::string fname = TableFileName(dbname_, t->meta.fd.GetNumber()); + std::string fname = TableFileName(options_.db_paths, t->meta.fd.GetNumber(), + t->meta.fd.GetPathId()); int counter = 0; - Status status = env_->GetFileSize(fname, &t->meta.fd.file_size); + uint64_t file_size; + Status status = env_->GetFileSize(fname, &file_size); + t->meta.fd = FileDescriptor(t->meta.fd.GetNumber(), t->meta.fd.GetPathId(), + file_size); if (status.ok()) { Iterator* iter = table_cache_->NewIterator( ReadOptions(), storage_options_, icmp_, t->meta.fd); @@ -330,9 +345,9 @@ class Repairer { for (size_t i = 0; i < tables_.size(); i++) { // TODO(opt): separate out into multiple levels const TableInfo& t = tables_[i]; - edit_->AddFile(0, t.meta.fd.GetNumber(), t.meta.fd.GetFileSize(), - t.meta.smallest, t.meta.largest, t.min_sequence, - t.max_sequence); + edit_->AddFile(0, t.meta.fd.GetNumber(), t.meta.fd.GetPathId(), + t.meta.fd.GetFileSize(), t.meta.smallest, t.meta.largest, + t.min_sequence, t.max_sequence); } //fprintf(stderr, "NewDescriptor:\n%s\n", edit_.DebugString().c_str()); diff --git a/db/table_cache.cc b/db/table_cache.cc index 7a7513026..bd359f96d 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -36,10 +36,10 @@ static Slice GetSliceForFileNumber(const uint64_t* file_number) { sizeof(*file_number)); } -TableCache::TableCache(const std::string& dbname, const Options* options, +TableCache::TableCache(const Options* options, const EnvOptions& storage_options, Cache* const cache) : env_(options->env), - dbname_(dbname), + db_paths_(options->db_paths), options_(options), storage_options_(storage_options), cache_(cache) {} @@ -60,13 +60,15 @@ Status TableCache::FindTable(const EnvOptions& toptions, const FileDescriptor& fd, Cache::Handle** handle, const bool no_io) { Status s; - Slice key = GetSliceForFileNumber(&fd.number); + uint64_t number = fd.GetNumber(); + Slice key = GetSliceForFileNumber(&number); *handle = cache_->Lookup(key); if (*handle == nullptr) { if (no_io) { // Dont do IO and return a not-found status return Status::Incomplete("Table not found in table_cache, no_io is set"); } - std::string fname = TableFileName(dbname_, fd.GetNumber()); + std::string fname = + TableFileName(db_paths_, fd.GetNumber(), fd.GetPathId()); unique_ptr file; unique_ptr table_reader; s = env_->NewRandomAccessFile(fname, &file, toptions); diff --git a/db/table_cache.h b/db/table_cache.h index e912addc1..eaadc07da 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -11,6 +11,7 @@ #pragma once #include +#include #include #include "db/dbformat.h" @@ -28,8 +29,8 @@ struct FileDescriptor; class TableCache { public: - TableCache(const std::string& dbname, const Options* options, - const EnvOptions& storage_options, Cache* cache); + TableCache(const Options* options, const EnvOptions& storage_options, + Cache* cache); ~TableCache(); // Return an iterator for the specified file number (the corresponding @@ -84,7 +85,7 @@ class TableCache { private: Env* const env_; - const std::string dbname_; + const std::vector db_paths_; const Options* options_; const EnvOptions& storage_options_; Cache* const cache_; diff --git a/db/version_edit.cc b/db/version_edit.cc index c2b4928e0..4e2cf8f5b 100644 --- a/db/version_edit.cc +++ b/db/version_edit.cc @@ -18,25 +18,30 @@ namespace rocksdb { // Tag numbers for serialized VersionEdit. These numbers are written to // disk and should not be changed. enum Tag { - kComparator = 1, - kLogNumber = 2, - kNextFileNumber = 3, - kLastSequence = 4, - kCompactPointer = 5, - kDeletedFile = 6, - kNewFile = 7, + kComparator = 1, + kLogNumber = 2, + kNextFileNumber = 3, + kLastSequence = 4, + kCompactPointer = 5, + kDeletedFile = 6, + kNewFile = 7, // 8 was used for large value refs - kPrevLogNumber = 9, + kPrevLogNumber = 9, // these are new formats divergent from open source leveldb - kNewFile2 = 100, // store smallest & largest seqno - - kColumnFamily = 200, // specify column family for version edit - kColumnFamilyAdd = 201, - kColumnFamilyDrop = 202, - kMaxColumnFamily = 203, + kNewFile2 = 100, + kNewFile3 = 102, + kColumnFamily = 200, // specify column family for version edit + kColumnFamilyAdd = 201, + kColumnFamilyDrop = 202, + kMaxColumnFamily = 203, }; +uint64_t PackFileNumberAndPathId(uint64_t number, uint64_t path_id) { + assert(number <= kFileNumberMask); + return number | (path_id * (kFileNumberMask + 1)); +} + void VersionEdit::Clear() { comparator_.clear(); max_level_ = 0; @@ -93,9 +98,18 @@ void VersionEdit::EncodeTo(std::string* dst) const { for (size_t i = 0; i < new_files_.size(); i++) { const FileMetaData& f = new_files_[i].second; - PutVarint32(dst, kNewFile2); + if (f.fd.GetPathId() == 0) { + // Use older format to make sure user can roll back the build if they + // don't config multiple DB paths. + PutVarint32(dst, kNewFile2); + } else { + PutVarint32(dst, kNewFile3); + } PutVarint32(dst, new_files_[i].first); // level PutVarint64(dst, f.fd.GetNumber()); + if (f.fd.GetPathId() != 0) { + PutVarint32(dst, f.fd.GetPathId()); + } PutVarint64(dst, f.fd.GetFileSize()); PutLengthPrefixedSlice(dst, f.smallest.Encode()); PutLengthPrefixedSlice(dst, f.largest.Encode()); @@ -237,7 +251,7 @@ Status VersionEdit::DecodeFrom(const Slice& src) { GetVarint64(&input, &file_size) && GetInternalKey(&input, &f.smallest) && GetInternalKey(&input, &f.largest)) { - f.fd = FileDescriptor(number, file_size); + f.fd = FileDescriptor(number, 0, file_size); new_files_.push_back(std::make_pair(level, f)); } else { if (!msg) { @@ -255,7 +269,27 @@ Status VersionEdit::DecodeFrom(const Slice& src) { GetInternalKey(&input, &f.largest) && GetVarint64(&input, &f.smallest_seqno) && GetVarint64(&input, &f.largest_seqno)) { - f.fd = FileDescriptor(number, file_size); + f.fd = FileDescriptor(number, 0, file_size); + new_files_.push_back(std::make_pair(level, f)); + } else { + if (!msg) { + msg = "new-file2 entry"; + } + } + break; + } + + case kNewFile3: { + uint64_t number; + uint32_t path_id; + uint64_t file_size; + if (GetLevel(&input, &level, &msg) && GetVarint64(&input, &number) && + GetVarint32(&input, &path_id) && GetVarint64(&input, &file_size) && + GetInternalKey(&input, &f.smallest) && + GetInternalKey(&input, &f.largest) && + GetVarint64(&input, &f.smallest_seqno) && + GetVarint64(&input, &f.largest_seqno)) { + f.fd = FileDescriptor(number, path_id, file_size); new_files_.push_back(std::make_pair(level, f)); } else { if (!msg) { diff --git a/db/version_edit.h b/db/version_edit.h index d6e62fc8c..50a24ea2d 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -19,21 +19,41 @@ namespace rocksdb { class VersionSet; +const uint64_t kFileNumberMask = 0x3FFFFFFFFFFFFFFF; + +extern uint64_t PackFileNumberAndPathId(uint64_t number, uint64_t path_id); + // A copyable structure contains information needed to read data from an SST // file. It can contains a pointer to a table reader opened for the file, or // file number and size, which can be used to create a new table reader for it. // The behavior is undefined when a copied of the structure is used when the // file is not in any live version any more. struct FileDescriptor { - uint64_t number; - uint64_t file_size; // File size in bytes // Table reader in table_reader_handle TableReader* table_reader; + uint64_t packed_number_and_path_id; + uint64_t file_size; // File size in bytes + + FileDescriptor() : FileDescriptor(0, 0, 0) {} - FileDescriptor(uint64_t number, uint64_t file_size) - : number(number), file_size(file_size), table_reader(nullptr) {} + FileDescriptor(uint64_t number, uint32_t path_id, uint64_t file_size) + : table_reader(nullptr), + packed_number_and_path_id(PackFileNumberAndPathId(number, path_id)), + file_size(file_size) {} - uint64_t GetNumber() const { return number; } + FileDescriptor& operator=(const FileDescriptor& fd) { + table_reader = fd.table_reader; + packed_number_and_path_id = fd.packed_number_and_path_id; + file_size = fd.file_size; + return *this; + } + + uint64_t GetNumber() const { + return packed_number_and_path_id & kFileNumberMask; + } + uint32_t GetPathId() const { + return packed_number_and_path_id / (kFileNumberMask + 1); + } uint64_t GetFileSize() const { return file_size; } }; @@ -58,7 +78,6 @@ struct FileMetaData { FileMetaData() : refs(0), - fd(0, 0), being_compacted(false), table_reader_handle(nullptr), compensated_file_size(0), @@ -103,15 +122,13 @@ class VersionEdit { // Add the specified file at the specified number. // REQUIRES: This version has not been saved (see VersionSet::SaveTo) // REQUIRES: "smallest" and "largest" are smallest and largest keys in file - void AddFile(int level, uint64_t file, - uint64_t file_size, - const InternalKey& smallest, - const InternalKey& largest, - const SequenceNumber& smallest_seqno, + void AddFile(int level, uint64_t file, uint64_t file_size, + uint64_t file_path_id, const InternalKey& smallest, + const InternalKey& largest, const SequenceNumber& smallest_seqno, const SequenceNumber& largest_seqno) { assert(smallest_seqno <= largest_seqno); FileMetaData f; - f.fd = FileDescriptor(file, file_size); + f.fd = FileDescriptor(file, file_size, file_path_id); f.smallest = smallest; f.largest = largest; f.smallest_seqno = smallest_seqno; diff --git a/db/version_edit_test.cc b/db/version_edit_test.cc index 7842b3263..850f242c1 100644 --- a/db/version_edit_test.cc +++ b/db/version_edit_test.cc @@ -30,11 +30,10 @@ TEST(VersionEditTest, EncodeDecode) { VersionEdit edit; for (int i = 0; i < 4; i++) { TestEncodeDecode(edit); - edit.AddFile(3, kBig + 300 + i, kBig + 400 + i, + edit.AddFile(3, kBig + 300 + i, kBig + 400 + i, 0, InternalKey("foo", kBig + 500 + i, kTypeValue), InternalKey("zoo", kBig + 600 + i, kTypeDeletion), - kBig + 500 + i, - kBig + 600 + i); + kBig + 500 + i, kBig + 600 + i); edit.DeleteFile(4, kBig + 700 + i); } diff --git a/db/version_set.cc b/db/version_set.cc index c54f0b591..b68923bc0 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -16,6 +16,7 @@ #include #include #include +#include #include #include "db/filename.h" @@ -171,7 +172,7 @@ class Version::LevelFileNumIterator : public Iterator { : icmp_(icmp), flist_(flist), index_(flist->size()), - current_value_(0, 0) { // Marks as invalid + current_value_(0, 0, 0) { // Marks as invalid } virtual bool Valid() const { return index_ < flist_->size(); @@ -276,7 +277,8 @@ Status Version::GetTableProperties(std::shared_ptr* tp, *fname, &file, vset_->storage_options_); } else { s = options->env->NewRandomAccessFile( - TableFileName(vset_->dbname_, file_meta->fd.GetNumber()), + TableFileName(vset_->options_->db_paths, file_meta->fd.GetNumber(), + file_meta->fd.GetPathId()), &file, vset_->storage_options_); } if (!s.ok()) { @@ -303,7 +305,9 @@ Status Version::GetTableProperties(std::shared_ptr* tp, Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) { for (int level = 0; level < num_levels_; level++) { for (const auto& file_meta : files_[level]) { - auto fname = TableFileName(vset_->dbname_, file_meta->fd.GetNumber()); + auto fname = + TableFileName(vset_->options_->db_paths, file_meta->fd.GetNumber(), + file_meta->fd.GetPathId()); // 1. If the table is already present in table cache, load table // properties from there. std::shared_ptr table_properties; @@ -1268,11 +1272,11 @@ int64_t Version::MaxNextLevelOverlappingBytes() { return result; } -void Version::AddLiveFiles(std::set* live) { +void Version::AddLiveFiles(std::vector* live) { for (int level = 0; level < NumberLevels(); level++) { const std::vector& files = files_[level]; for (const auto& file : files) { - live->insert(file->fd.GetNumber()); + live->push_back(file->fd); } } } @@ -1425,7 +1429,7 @@ class VersionSet::Builder { #endif } - void CheckConsistencyForDeletes(VersionEdit* edit, unsigned int number, + void CheckConsistencyForDeletes(VersionEdit* edit, uint64_t number, int level) { #ifndef NDEBUG // a file to be deleted better exist in the previous version @@ -1467,6 +1471,9 @@ class VersionSet::Builder { } } } + if (!found) { + fprintf(stderr, "not found %ld\n", number); + } assert(found); #endif } @@ -2160,17 +2167,15 @@ Status VersionSet::Recover( last_sequence_ = last_sequence; prev_log_number_ = prev_log_number; - Log(options_->info_log, "Recovered from manifest file:%s succeeded," + Log(options_->info_log, + "Recovered from manifest file:%s succeeded," "manifest_file_number is %lu, next_file_number is %lu, " "last_sequence is %lu, log_number is %lu," "prev_log_number is %lu," "max_column_family is %u\n", - manifest_filename.c_str(), - (unsigned long)manifest_file_number_, - (unsigned long)next_file_number_, - (unsigned long)last_sequence_, - (unsigned long)log_number, - (unsigned long)prev_log_number_, + manifest_filename.c_str(), (unsigned long)manifest_file_number_, + (unsigned long)next_file_number_, (unsigned long)last_sequence_, + (unsigned long)log_number, (unsigned long)prev_log_number_, column_family_set_->GetMaxColumnFamily()); for (auto cfd : *column_family_set_) { @@ -2557,9 +2562,9 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { for (int level = 0; level < cfd->NumberLevels(); level++) { for (const auto& f : cfd->current()->files_[level]) { - edit.AddFile(level, f->fd.GetNumber(), f->fd.GetFileSize(), - f->smallest, f->largest, f->smallest_seqno, - f->largest_seqno); + edit.AddFile(level, f->fd.GetNumber(), f->fd.GetPathId(), + f->fd.GetFileSize(), f->smallest, f->largest, + f->smallest_seqno, f->largest_seqno); } } edit.SetLogNumber(cfd->GetLogNumber()); @@ -2641,7 +2646,7 @@ uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) { return result; } -void VersionSet::AddLiveFiles(std::vector* live_list) { +void VersionSet::AddLiveFiles(std::vector* live_list) { // pre-calculate space requirement int64_t total_files = 0; for (auto cfd : *column_family_set_) { @@ -2663,7 +2668,7 @@ void VersionSet::AddLiveFiles(std::vector* live_list) { v = v->next_) { for (int level = 0; level < v->NumberLevels(); level++) { for (const auto& f : v->files_[level]) { - live_list->push_back(f->fd.GetNumber()); + live_list->push_back(f->fd); } } } @@ -2786,7 +2791,14 @@ void VersionSet::GetLiveFilesMetaData(std::vector* metadata) { for (const auto& file : cfd->current()->files_[level]) { LiveFileMetaData filemetadata; filemetadata.column_family_name = cfd->GetName(); - filemetadata.name = TableFileName("", file->fd.GetNumber()); + uint32_t path_id = file->fd.GetPathId(); + if (path_id < options_->db_paths.size()) { + filemetadata.db_path = options_->db_paths[path_id]; + } else { + assert(!options_->db_paths.empty()); + filemetadata.db_path = options_->db_paths.back(); + } + filemetadata.name = MakeTableFileName("", file->fd.GetNumber()); filemetadata.level = level; filemetadata.size = file->fd.GetFileSize(); filemetadata.smallestkey = file->smallest.user_key().ToString(); diff --git a/db/version_set.h b/db/version_set.h index 04f52a508..60e9383f8 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -188,7 +188,7 @@ class Version { int64_t MaxNextLevelOverlappingBytes(); // Add all files listed in the current version to *live. - void AddLiveFiles(std::set* live); + void AddLiveFiles(std::vector* live); // Return a human readable string that describes this version's contents. std::string DebugString(bool hex = false) const; @@ -399,7 +399,7 @@ class VersionSet { // Arrange to reuse "file_number" unless a newer file number has // already been allocated. // REQUIRES: "file_number" was returned by a call to NewFileNumber(). - void ReuseFileNumber(uint64_t file_number) { + void ReuseLogFileNumber(uint64_t file_number) { if (next_file_number_ == file_number + 1) { next_file_number_ = file_number; } @@ -440,7 +440,7 @@ class VersionSet { Iterator* MakeInputIterator(Compaction* c); // Add all files listed in any live version to *live. - void AddLiveFiles(std::vector* live_list); + void AddLiveFiles(std::vector* live_list); // Return the approximate offset in the database of the data for // "key" as of version "v". diff --git a/db/version_set_test.cc b/db/version_set_test.cc index ef48bf927..0c548e342 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -31,7 +31,7 @@ class FindFileTest { SequenceNumber smallest_seq = 100, SequenceNumber largest_seq = 100) { FileMetaData* f = new FileMetaData; - f->fd = FileDescriptor(files_.size() + 1, 0); + f->fd = FileDescriptor(files_.size() + 1, 0, 0); f->smallest = InternalKey(smallest, smallest_seq, kTypeValue); f->largest = InternalKey(largest, largest_seq, kTypeValue); files_.push_back(f); diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 33b443f40..b1cbbc25b 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -55,6 +55,7 @@ class Env; // Metadata associated with each SST file. struct LiveFileMetaData { std::string column_family_name; // Name of the column family + std::string db_path; std::string name; // Name of the file int level; // Level at which this file resides. size_t size; // File size in bytes. diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 658be3afc..df7383d25 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -675,6 +675,13 @@ struct DBOptions { // Default value is 1800 (half an hour). int db_stats_log_interval; + // A list paths where SST files can be put into. A compaction style can + // determine which of those paths it will put the file to. + // If left empty, only one path will be used, which is db_name passed when + // opening the DB. + // Default: empty + std::vector db_paths; + // This specifies the info LOG dir. // If it is empty, the log files will be in the same dir as data. // If it is non empty, the log files will be in the specified dir, diff --git a/include/rocksdb/universal_compaction.h b/include/rocksdb/universal_compaction.h index eaf47e5c7..229e50b25 100644 --- a/include/rocksdb/universal_compaction.h +++ b/include/rocksdb/universal_compaction.h @@ -8,6 +8,7 @@ #include #include +#include namespace rocksdb { @@ -61,6 +62,7 @@ class CompactionOptionsUniversal { // well as the total size of C1...Ct as total_C, the compaction output file // will be compressed iff // total_C / total_size < this percentage + // Default: -1 int compression_size_percent; // The algorithm used to stop picking files into a single compaction run @@ -68,14 +70,13 @@ class CompactionOptionsUniversal { CompactionStopStyle stop_style; // Default set of parameters - CompactionOptionsUniversal() : - size_ratio(1), - min_merge_width(2), - max_merge_width(UINT_MAX), - max_size_amplification_percent(200), - compression_size_percent(-1), - stop_style(kCompactionStopStyleTotalSize) { - } + CompactionOptionsUniversal() + : size_ratio(1), + min_merge_width(2), + max_merge_width(UINT_MAX), + max_size_amplification_percent(200), + compression_size_percent(-1), + stop_style(kCompactionStopStyleTotalSize) {} }; } // namespace rocksdb diff --git a/util/ldb_cmd.cc b/util/ldb_cmd.cc index e623e5278..41d8d6f47 100644 --- a/util/ldb_cmd.cc +++ b/util/ldb_cmd.cc @@ -286,6 +286,10 @@ Options LDBCommand::PrepareOptionsForOpenDB() { } } + if (opt.db_paths.size() == 0) { + opt.db_paths.push_back(db_path_); + } + return opt; } diff --git a/util/options.cc b/util/options.cc index 17dad0f25..88d26fa01 100644 --- a/util/options.cc +++ b/util/options.cc @@ -214,6 +214,7 @@ DBOptions::DBOptions(const Options& options) disableDataSync(options.disableDataSync), use_fsync(options.use_fsync), db_stats_log_interval(options.db_stats_log_interval), + db_paths(options.db_paths), db_log_dir(options.db_log_dir), wal_dir(options.wal_dir), delete_obsolete_files_period_micros(