diff --git a/db/builder.cc b/db/builder.cc index 4042d9685..e0c39be84 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -92,7 +92,7 @@ Status BuildTable( return s; } - std::string fname = TableFileName(ioptions.db_paths, meta->fd.GetNumber(), + std::string fname = TableFileName(ioptions.cf_paths, meta->fd.GetNumber(), meta->fd.GetPathId()); #ifndef ROCKSDB_LITE EventHelpers::NotifyTableFileCreationStarted( diff --git a/db/column_family.cc b/db/column_family.cc index b3e025bee..5349b637a 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -34,6 +34,7 @@ #include "table/merging_iterator.h" #include "util/autovector.h" #include "util/compression.h" +#include "util/sst_file_manager_impl.h" namespace rocksdb { @@ -159,6 +160,28 @@ Status CheckConcurrentWritesSupported(const ColumnFamilyOptions& cf_options) { return Status::OK(); } +Status CheckCFPathsSupported(const DBOptions& db_options, + const ColumnFamilyOptions& cf_options) { + // More than one cf_paths are supported only in universal + // and level compaction styles. This function also checks the case + // in which cf_paths is not specified, which results in db_paths + // being used. + if ((cf_options.compaction_style != kCompactionStyleUniversal) && + (cf_options.compaction_style != kCompactionStyleLevel)) { + if (cf_options.cf_paths.size() > 1) { + return Status::NotSupported( + "More than one CF paths are only supported in " + "universal and level compaction styles. "); + } else if (cf_options.cf_paths.empty() && + db_options.db_paths.size() > 1) { + return Status::NotSupported( + "More than one DB paths are only supported in " + "universal and level compaction styles. "); + } + } + return Status::OK(); +} + ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options, const ColumnFamilyOptions& src) { ColumnFamilyOptions result = src; @@ -277,9 +300,24 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options, result.hard_pending_compaction_bytes_limit; } +#ifndef ROCKSDB_LITE + // When the DB is stopped, it's possible that there are some .trash files that + // were not deleted yet, when we open the DB we will find these .trash files + // and schedule them to be deleted (or delete immediately if SstFileManager + // was not used) + auto sfm = static_cast(db_options.sst_file_manager.get()); + for (size_t i = 0; i < result.cf_paths.size(); i++) { + DeleteScheduler::CleanupDirectory(db_options.env, sfm, result.cf_paths[i].path); + } +#endif + + if (result.cf_paths.empty()) { + result.cf_paths = db_options.db_paths; + } + if (result.level_compaction_dynamic_level_bytes) { if (result.compaction_style != kCompactionStyleLevel || - db_options.db_paths.size() > 1U) { + result.cf_paths.size() > 1U) { // 1. level_compaction_dynamic_level_bytes only makes sense for // level-based compaction. // 2. we don't yet know how to make both of this feature and multiple @@ -1138,6 +1176,31 @@ Env::WriteLifeTimeHint ColumnFamilyData::CalculateSSTWriteHint(int level) { static_cast(Env::WLTH_MEDIUM)); } +Status ColumnFamilyData::AddDirectories() { + Status s; + assert(data_dirs_.empty()); + for (auto& p : ioptions_.cf_paths) { + std::unique_ptr path_directory; + s = DBImpl::CreateAndNewDirectory(ioptions_.env, p.path, &path_directory); + if (!s.ok()) { + return s; + } + assert(path_directory != nullptr); + data_dirs_.emplace_back(path_directory.release()); + } + assert(data_dirs_.size() == ioptions_.cf_paths.size()); + return s; +} + +Directory* ColumnFamilyData::GetDataDir(size_t path_id) const { + if (data_dirs_.empty()) { + return nullptr; + } + + assert(path_id < data_dirs_.size()); + return data_dirs_[path_id].get(); +} + ColumnFamilySet::ColumnFamilySet(const std::string& dbname, const ImmutableDBOptions* db_options, const EnvOptions& env_options, diff --git a/db/column_family.h b/db/column_family.h index 84625d906..2858fe5e5 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -139,6 +139,9 @@ extern Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options); extern Status CheckConcurrentWritesSupported( const ColumnFamilyOptions& cf_options); +extern Status CheckCFPathsSupported(const DBOptions& db_options, + const ColumnFamilyOptions& cf_options); + extern ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options, const ColumnFamilyOptions& src); // Wrap user defined table proproties collector factories `from cf_options` @@ -376,6 +379,10 @@ class ColumnFamilyData { Env::WriteLifeTimeHint CalculateSSTWriteHint(int level); + Status AddDirectories(); + + Directory* GetDataDir(size_t path_id) const; + private: friend class ColumnFamilySet; ColumnFamilyData(uint32_t id, const std::string& name, @@ -459,6 +466,9 @@ class ColumnFamilyData { // Memtable id to track flush. std::atomic last_memtable_id_; + + // Directories corresponding to cf_paths. + std::vector> data_dirs_; }; // ColumnFamilySet has interesting thread-safety requirements diff --git a/db/column_family_test.cc b/db/column_family_test.cc index 6d8360dcb..0355011a0 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -69,9 +69,15 @@ class ColumnFamilyTest : public testing::Test { } ~ColumnFamilyTest() { + std::vector column_families; + for (auto h : handles_) { + ColumnFamilyDescriptor cfdescriptor; + h->GetDescriptor(&cfdescriptor); + column_families.push_back(cfdescriptor); + } Close(); rocksdb::SyncPoint::GetInstance()->DisableProcessing(); - Destroy(); + Destroy(column_families); delete env_; } @@ -236,9 +242,11 @@ class ColumnFamilyTest : public testing::Test { #endif // !ROCKSDB_LITE } - void Destroy() { + void Destroy(const std::vector& column_families = + std::vector()) { Close(); - ASSERT_OK(DestroyDB(dbname_, Options(db_options_, column_family_options_))); + ASSERT_OK(DestroyDB(dbname_, Options(db_options_, column_family_options_), + column_families)); } void CreateColumnFamilies( @@ -483,6 +491,12 @@ class ColumnFamilyTest : public testing::Test { ASSERT_OK(destfile->Close()); } + int GetSstFileCount(std::string path) { + std::vector files; + DBTestBase::GetSstFiles(env_, path, &files); + return static_cast(files.size()); + } + std::vector handles_; std::vector names_; std::set keys_; @@ -3129,6 +3143,58 @@ TEST_F(ColumnFamilyTest, DISABLED_LogTruncationTest) { // cleanup env_->DeleteDir(backup_logs); } + +TEST_F(ColumnFamilyTest, DefaultCfPathsTest) { + Open(); + // Leave cf_paths for one column families to be empty. + // Files should be generated according to db_paths for that + // column family. + ColumnFamilyOptions cf_opt1, cf_opt2; + cf_opt1.cf_paths.emplace_back(dbname_ + "_one_1", + std::numeric_limits::max()); + CreateColumnFamilies({"one", "two"}, {cf_opt1, cf_opt2}); + Reopen({ColumnFamilyOptions(), cf_opt1, cf_opt2}); + + // Fill Column family 1. + PutRandomData(1, 100, 100); + Flush(1); + + ASSERT_EQ(1, GetSstFileCount(cf_opt1.cf_paths[0].path)); + ASSERT_EQ(0, GetSstFileCount(dbname_)); + + // Fill column family 2 + PutRandomData(2, 100, 100); + Flush(2); + + // SST from Column family 2 should be generated in + // db_paths which is dbname_ in this case. + ASSERT_EQ(1, GetSstFileCount(dbname_)); +} + +TEST_F(ColumnFamilyTest, MultipleCFPathsTest) { + Open(); + // Configure Column family specific paths. + ColumnFamilyOptions cf_opt1, cf_opt2; + cf_opt1.cf_paths.emplace_back(dbname_ + "_one_1", + std::numeric_limits::max()); + cf_opt2.cf_paths.emplace_back(dbname_ + "_two_1", + std::numeric_limits::max()); + CreateColumnFamilies({"one", "two"}, {cf_opt1, cf_opt2}); + Reopen({ColumnFamilyOptions(), cf_opt1, cf_opt2}); + + PutRandomData(1, 100, 100); + Flush(1); + + // Check that files are generated in appropriate paths. + ASSERT_EQ(1, GetSstFileCount(cf_opt1.cf_paths[0].path)); + ASSERT_EQ(0, GetSstFileCount(dbname_)); + + PutRandomData(2, 100, 100); + Flush(2); + + ASSERT_EQ(1, GetSstFileCount(cf_opt2.cf_paths[0].path)); + ASSERT_EQ(0, GetSstFileCount(dbname_)); +} } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 4702fc802..d2ffce26d 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -566,8 +566,10 @@ Status CompactionJob::Run() { TablePropertiesCollection tp; for (const auto& state : compact_->sub_compact_states) { for (const auto& output : state.outputs) { - auto fn = TableFileName(db_options_.db_paths, output.meta.fd.GetNumber(), - output.meta.fd.GetPathId()); + auto fn = TableFileName( + state.compaction->immutable_cf_options()->cf_paths, + output.meta.fd.GetNumber(), + output.meta.fd.GetPathId()); tp[fn] = output.table_properties; } } @@ -1112,7 +1114,9 @@ Status CompactionJob::FinishCompactionOutputFile( // This happens when the output level is bottom level, at the same time // the sub_compact output nothing. std::string fname = TableFileName( - db_options_.db_paths, meta->fd.GetNumber(), meta->fd.GetPathId()); + sub_compact->compaction->immutable_cf_options()->cf_paths, + meta->fd.GetNumber(), + meta->fd.GetPathId()); env_->DeleteFile(fname); // Also need to remove the file from outputs, or it will be added to the @@ -1165,8 +1169,10 @@ Status CompactionJob::FinishCompactionOutputFile( std::string fname; FileDescriptor output_fd; if (meta != nullptr) { - fname = TableFileName(db_options_.db_paths, meta->fd.GetNumber(), - meta->fd.GetPathId()); + fname = TableFileName( + sub_compact->compaction->immutable_cf_options()->cf_paths, + meta->fd.GetNumber(), + meta->fd.GetPathId()); output_fd = meta->fd; } else { fname = "(nil)"; @@ -1180,8 +1186,10 @@ Status CompactionJob::FinishCompactionOutputFile( auto sfm = static_cast(db_options_.sst_file_manager.get()); if (sfm && meta != nullptr && meta->fd.GetPathId() == 0) { - auto fn = TableFileName(cfd->ioptions()->db_paths, meta->fd.GetNumber(), - meta->fd.GetPathId()); + auto fn = TableFileName( + sub_compact->compaction->immutable_cf_options()->cf_paths, + meta->fd.GetNumber(), + meta->fd.GetPathId()); sfm->OnAddFile(fn); if (sfm->IsMaxAllowedSpaceReached()) { // TODO(ajkr): should we return OK() if max space was reached by the final @@ -1266,8 +1274,10 @@ Status CompactionJob::OpenCompactionOutputFile( assert(sub_compact->builder == nullptr); // no need to lock because VersionSet::next_file_number_ is atomic uint64_t file_number = versions_->NewFileNumber(); - std::string fname = TableFileName(db_options_.db_paths, file_number, - sub_compact->compaction->output_path_id()); + std::string fname = TableFileName( + sub_compact->compaction->immutable_cf_options()->cf_paths, + file_number, + sub_compact->compaction->output_path_id()); // Fire events. ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); #ifndef ROCKSDB_LITE diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 738de1724..ef73c0e5c 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -616,7 +616,7 @@ Compaction* CompactionPicker::CompactRange( } } } - assert(output_path_id < static_cast(ioptions_.db_paths.size())); + assert(output_path_id < static_cast(ioptions_.cf_paths.size())); if (ExpandInputsToCleanCut(cf_name, vstorage, &inputs) == false) { // manual compaction is now multi-threaded, so it can @@ -1336,10 +1336,10 @@ uint32_t LevelCompactionBuilder::GetPathId( const ImmutableCFOptions& ioptions, const MutableCFOptions& mutable_cf_options, int level) { uint32_t p = 0; - assert(!ioptions.db_paths.empty()); + assert(!ioptions.cf_paths.empty()); // size remaining in the most recent path - uint64_t current_path_size = ioptions.db_paths[0].target_size; + uint64_t current_path_size = ioptions.cf_paths[0].target_size; uint64_t level_size; int cur_level = 0; @@ -1349,7 +1349,7 @@ uint32_t LevelCompactionBuilder::GetPathId( level_size = mutable_cf_options.max_bytes_for_level_base; // Last path is the fallback - while (p < ioptions.db_paths.size() - 1) { + while (p < ioptions.cf_paths.size() - 1) { if (level_size <= current_path_size) { if (cur_level == level) { // Does desired level fit in this path? @@ -1376,7 +1376,7 @@ uint32_t LevelCompactionBuilder::GetPathId( } } p++; - current_path_size = ioptions.db_paths[p].target_size; + current_path_size = ioptions.cf_paths[p].target_size; } return p; } diff --git a/db/compaction_picker_test.cc b/db/compaction_picker_test.cc index 297949070..77e148173 100644 --- a/db/compaction_picker_test.cc +++ b/db/compaction_picker_test.cc @@ -59,7 +59,7 @@ class CompactionPickerTest : public testing::Test { vstorage_(nullptr) { fifo_options_.max_table_files_size = 1; mutable_cf_options_.RefreshDerivedOptions(ioptions_); - ioptions_.db_paths.emplace_back("dummy", + ioptions_.cf_paths.emplace_back("dummy", std::numeric_limits::max()); } diff --git a/db/compaction_picker_universal.cc b/db/compaction_picker_universal.cc index a0cbdea6e..655fc1050 100644 --- a/db/compaction_picker_universal.cc +++ b/db/compaction_picker_universal.cc @@ -406,9 +406,9 @@ uint32_t UniversalCompactionPicker::GetPathId( file_size * (100 - mutable_cf_options.compaction_options_universal.size_ratio) / 100; uint32_t p = 0; - assert(!ioptions.db_paths.empty()); - for (; p < ioptions.db_paths.size() - 1; p++) { - uint64_t target_size = ioptions.db_paths[p].target_size; + assert(!ioptions.cf_paths.empty()); + for (; p < ioptions.cf_paths.size() - 1; p++) { + uint64_t target_size = ioptions.cf_paths[p].target_size; if (target_size > file_size && accumulated_size + (target_size - file_size) > future_size) { return p; diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 47b7a1e56..b4790ef0b 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -1983,6 +1983,125 @@ TEST_P(DBCompactionTestWithParam, LevelCompactionPathUse) { Destroy(options); } +TEST_P(DBCompactionTestWithParam, LevelCompactionCFPathUse) { + Options options = CurrentOptions(); + options.db_paths.emplace_back(dbname_, 500 * 1024); + options.db_paths.emplace_back(dbname_ + "_2", 4 * 1024 * 1024); + options.db_paths.emplace_back(dbname_ + "_3", 1024 * 1024 * 1024); + options.memtable_factory.reset( + new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1)); + options.compaction_style = kCompactionStyleLevel; + options.write_buffer_size = 110 << 10; // 110KB + options.arena_block_size = 4 << 10; + options.level0_file_num_compaction_trigger = 2; + options.num_levels = 4; + options.max_bytes_for_level_base = 400 * 1024; + options.max_subcompactions = max_subcompactions_; + + std::vector option_vector; + option_vector.emplace_back(options); + ColumnFamilyOptions cf_opt1(options), cf_opt2(options); + // Configure CF1 specific paths. + cf_opt1.cf_paths.emplace_back(dbname_ + "cf1", 500 * 1024); + cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_2", 4 * 1024 * 1024); + cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_3", 1024 * 1024 * 1024); + option_vector.emplace_back(DBOptions(options), cf_opt1); + CreateColumnFamilies({"one"},option_vector[1]); + + // Configura CF2 specific paths. + cf_opt2.cf_paths.emplace_back(dbname_ + "cf2", 500 * 1024); + cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_2", 4 * 1024 * 1024); + cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_3", 1024 * 1024 * 1024); + option_vector.emplace_back(DBOptions(options), cf_opt2); + CreateColumnFamilies({"two"},option_vector[2]); + + ReopenWithColumnFamilies({"default", "one", "two"}, option_vector); + + Random rnd(301); + int key_idx = 0; + int key_idx1 = 0; + int key_idx2 = 0; + + auto generate_file = [&]() { + GenerateNewFile(0, &rnd, &key_idx); + GenerateNewFile(1, &rnd, &key_idx1); + GenerateNewFile(2, &rnd, &key_idx2); + }; + + auto check_sstfilecount = [&](int path_id, int expected) { + ASSERT_EQ(expected, GetSstFileCount(options.db_paths[path_id].path)); + ASSERT_EQ(expected, GetSstFileCount(cf_opt1.cf_paths[path_id].path)); + ASSERT_EQ(expected, GetSstFileCount(cf_opt2.cf_paths[path_id].path)); + }; + + auto check_filesperlevel = [&](const std::string& expected) { + ASSERT_EQ(expected, FilesPerLevel(0)); + ASSERT_EQ(expected, FilesPerLevel(1)); + ASSERT_EQ(expected, FilesPerLevel(2)); + }; + + auto check_getvalues = [&]() { + for (int i = 0; i < key_idx; i++) { + auto v = Get(0, Key(i)); + ASSERT_NE(v, "NOT_FOUND"); + ASSERT_TRUE(v.size() == 1 || v.size() == 990); + } + + for (int i = 0; i < key_idx1; i++) { + auto v = Get(1, Key(i)); + ASSERT_NE(v, "NOT_FOUND"); + ASSERT_TRUE(v.size() == 1 || v.size() == 990); + } + + for (int i = 0; i < key_idx2; i++) { + auto v = Get(2, Key(i)); + ASSERT_NE(v, "NOT_FOUND"); + ASSERT_TRUE(v.size() == 1 || v.size() == 990); + } + }; + + // Check that default column family uses db_paths. + // And Column family "one" uses cf_paths. + + // First three 110KB files are not going to second path. + // After that, (100K, 200K) + for (int num = 0; num < 3; num++) { + generate_file(); + } + + // Another 110KB triggers a compaction to 400K file to fill up first path + generate_file(); + check_sstfilecount(1, 3); + + // (1, 4) + generate_file(); + check_filesperlevel("1,4"); + check_sstfilecount(1, 4); + check_sstfilecount(0, 1); + + // (1, 4, 1) + generate_file(); + check_filesperlevel("1,4,1"); + check_sstfilecount(2, 1); + check_sstfilecount(1, 4); + check_sstfilecount(0, 1); + + // (1, 4, 2) + generate_file(); + check_filesperlevel("1,4,2"); + check_sstfilecount(2, 2); + check_sstfilecount(1, 4); + check_sstfilecount(0, 1); + + check_getvalues(); + + ReopenWithColumnFamilies({"default", "one", "two"}, option_vector); + + check_getvalues(); + + Destroy(options, true); +} + TEST_P(DBCompactionTestWithParam, ConvertCompactionStyle) { Random rnd(301); int max_key_level_insert = 200; diff --git a/db/db_impl.cc b/db/db_impl.cc index a8412f21e..35ae46935 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -514,7 +514,16 @@ void DBImpl::ScheduleBgLogWriterClose(JobContext* job_context) { } } -Directory* DBImpl::Directories::GetDataDir(size_t path_id) { +Directory* DBImpl::GetDataDir(ColumnFamilyData* cfd, size_t path_id) const { + assert(cfd); + Directory* ret_dir = cfd->GetDataDir(path_id); + if (ret_dir == nullptr) { + return directories_.GetDataDir(path_id); + } + return ret_dir; +} + +Directory* DBImpl::Directories::GetDataDir(size_t path_id) const { assert(path_id < data_dirs_.size()); Directory* ret_dir = data_dirs_[path_id].get(); if (ret_dir == nullptr) { @@ -860,12 +869,11 @@ void DBImpl::BackgroundCallPurge() { auto fname = purge_file->fname; auto type = purge_file->type; auto number = purge_file->number; - auto path_id = purge_file->path_id; auto job_id = purge_file->job_id; purge_queue_.pop_front(); mutex_.Unlock(); - DeleteObsoleteFileImpl(job_id, fname, type, number, path_id); + DeleteObsoleteFileImpl(job_id, fname, type, number); mutex_.Lock(); } else { assert(!logs_to_free_queue_.empty()); @@ -1306,6 +1314,17 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options, if (s.ok() && immutable_db_options_.allow_concurrent_memtable_write) { s = CheckConcurrentWritesSupported(cf_options); } + if (s.ok()) { + s = CheckCFPathsSupported(initial_db_options_, cf_options); + } + if (s.ok()) { + for (auto& cf_path : cf_options.cf_paths) { + s = env_->CreateDirIfMissing(cf_path.path); + if (!s.ok()) { + break; + } + } + } if (!s.ok()) { return s; } @@ -1337,6 +1356,12 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options, &cf_options); write_thread_.ExitUnbatched(&w); } + if (s.ok()) { + auto* cfd = + versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name); + assert(cfd != nullptr); + s = cfd->AddDirectories(); + } if (s.ok()) { single_column_family_mode_ = false; auto* cfd = @@ -2391,7 +2416,8 @@ Status DB::ListColumnFamilies(const DBOptions& db_options, Snapshot::~Snapshot() { } -Status DestroyDB(const std::string& dbname, const Options& options) { +Status DestroyDB(const std::string& dbname, const Options& options, + const std::vector& column_families) { ImmutableDBOptions soptions(SanitizeOptions(dbname, options)); Env* env = soptions.env; std::vector filenames; @@ -2417,7 +2443,7 @@ Status DestroyDB(const std::string& dbname, const Options& options) { if (type == kMetaDatabase) { del = DestroyDB(path_to_delete, options); } else if (type == kTableFile) { - del = DeleteSSTFile(&soptions, path_to_delete, 0); + del = DeleteSSTFile(&soptions, path_to_delete); } else { del = env->DeleteFile(path_to_delete); } @@ -2427,15 +2453,32 @@ Status DestroyDB(const std::string& dbname, const Options& options) { } } + std::vector paths; + for (size_t path_id = 0; path_id < options.db_paths.size(); path_id++) { - const auto& db_path = options.db_paths[path_id]; - env->GetChildren(db_path.path, &filenames); + paths.emplace_back(options.db_paths[path_id].path); + } + for (auto& cf : column_families) { + for (size_t path_id = 0; path_id < cf.options.cf_paths.size(); + path_id++) { + paths.emplace_back(cf.options.cf_paths[path_id].path); + } + } + + // Remove duplicate paths. + // Note that we compare only the actual paths but not path ids. + // This reason is that same path can appear at different path_ids + // for different column families. + std::sort(paths.begin(), paths.end()); + paths.erase(std::unique(paths.begin(), paths.end()), paths.end()); + + for (auto& path : paths) { + env->GetChildren(path, &filenames); for (size_t i = 0; i < filenames.size(); i++) { if (ParseFileName(filenames[i], &number, &type) && type == kTableFile) { // Lock file will be deleted at end - std::string table_path = db_path.path + "/" + filenames[i]; - Status del = DeleteSSTFile(&soptions, table_path, - static_cast(path_id)); + std::string table_path = path + "/" + filenames[i]; + Status del = DeleteSSTFile(&soptions, table_path); if (result.ok() && !del.ok()) { result = del; } @@ -2921,11 +2964,12 @@ Status DBImpl::VerifyChecksum() { } for (auto& sv : sv_list) { VersionStorageInfo* vstorage = sv->current->storage_info(); + ColumnFamilyData* cfd = sv->current->cfd(); for (int i = 0; i < vstorage->num_non_empty_levels() && s.ok(); i++) { for (size_t j = 0; j < vstorage->LevelFilesBrief(i).num_files && s.ok(); j++) { const auto& fd = vstorage->LevelFilesBrief(i).files[j].fd; - std::string fname = TableFileName(immutable_db_options_.db_paths, + std::string fname = TableFileName(cfd->ioptions()->cf_paths, fd.GetNumber(), fd.GetPathId()); s = rocksdb::VerifySstFileChecksum(options, env_options, fname); } diff --git a/db/db_impl.h b/db/db_impl.h index 357458cbb..2d386faa7 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -478,7 +478,7 @@ class DBImpl : public DB { // It is not necessary to hold the mutex when invoking this method. // If FindObsoleteFiles() was run, we need to also run // PurgeObsoleteFiles(), even if disable_delete_obsolete_files_ is true - void PurgeObsoleteFiles(const JobContext& background_contet, + void PurgeObsoleteFiles(JobContext& background_contet, bool schedule_only = false); void SchedulePurge(); @@ -636,6 +636,9 @@ class DBImpl : public DB { virtual Status Close() override; + static Status CreateAndNewDirectory(Env* env, const std::string& dirname, + std::unique_ptr* directory); + protected: Env* const env_; const std::string dbname_; @@ -789,7 +792,7 @@ class DBImpl : public DB { void DeleteObsoleteFiles(); // Delete obsolete files and log status and information of file deletion void DeleteObsoleteFileImpl(int job_id, const std::string& fname, - FileType type, uint64_t number, uint32_t path_id); + FileType type, uint64_t number); // Background process needs to call // auto x = CaptureCurrentFileNumberInPendingOutputs() @@ -911,7 +914,7 @@ class DBImpl : public DB { void SchedulePendingFlush(ColumnFamilyData* cfd, FlushReason flush_reason); void SchedulePendingCompaction(ColumnFamilyData* cfd); void SchedulePendingPurge(std::string fname, FileType type, uint64_t number, - uint32_t path_id, int job_id); + int job_id); static void BGWorkCompaction(void* arg); // Runs a pre-chosen universal compaction involving bottom level in a // separate, bottom-pri thread pool. @@ -960,6 +963,8 @@ class DBImpl : public DB { uint64_t GetMaxTotalWalSize() const; + Directory* GetDataDir(ColumnFamilyData* cfd, size_t path_id) const; + Status CloseHelper(); // table_cache_ provides its own synchronization @@ -1097,7 +1102,7 @@ class DBImpl : public DB { const std::string& wal_dir, const std::vector& data_paths); - Directory* GetDataDir(size_t path_id); + Directory* GetDataDir(size_t path_id) const; Directory* GetWalDir() { if (wal_dir_) { @@ -1112,9 +1117,6 @@ class DBImpl : public DB { std::unique_ptr db_dir_; std::vector> data_dirs_; std::unique_ptr wal_dir_; - - Status CreateAndNewDirectory(Env* env, const std::string& dirname, - std::unique_ptr* directory) const; }; Directories directories_; @@ -1158,11 +1160,9 @@ class DBImpl : public DB { std::string fname; FileType type; uint64_t number; - uint32_t path_id; int job_id; - PurgeFileInfo(std::string fn, FileType t, uint64_t num, uint32_t pid, - int jid) - : fname(fn), type(t), number(num), path_id(pid), job_id(jid) {} + PurgeFileInfo(std::string fn, FileType t, uint64_t num, int jid) + : fname(fn), type(t), number(num), job_id(jid) {} }; // flush_queue_ and compaction_queue_ hold column families that we need to diff --git a/db/db_impl_compaction_flush.cc b/db/db_impl_compaction_flush.cc index 230b1f57b..421f6f72d 100644 --- a/db/db_impl_compaction_flush.cc +++ b/db/db_impl_compaction_flush.cc @@ -122,7 +122,7 @@ Status DBImpl::FlushMemTableToOutputFile( env_options_for_compaction_, versions_.get(), &mutex_, &shutting_down_, snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker, job_context, log_buffer, directories_.GetDbDir(), - directories_.GetDataDir(0U), + GetDataDir(cfd, 0U), GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_, &event_logger_, mutable_cf_options.report_bg_io_stats); @@ -195,7 +195,7 @@ Status DBImpl::FlushMemTableToOutputFile( if (sfm) { // Notify sst_file_manager that a new file was added std::string file_path = MakeTableFileName( - immutable_db_options_.db_paths[0].path, file_meta.fd.GetNumber()); + cfd->ioptions()->cf_paths[0].path, file_meta.fd.GetNumber()); sfm->OnAddFile(file_path); if (sfm->IsMaxAllowedSpaceReached() && bg_error_.ok()) { Status new_bg_error = Status::NoSpace("Max allowed space was reached"); @@ -240,7 +240,7 @@ void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta, info.cf_name = cfd->GetName(); // TODO(yhchiang): make db_paths dynamic in case flush does not // go to L0 in the future. - info.file_path = MakeTableFileName(immutable_db_options_.db_paths[0].path, + info.file_path = MakeTableFileName(cfd->ioptions()->cf_paths[0].path, file_meta->fd.GetNumber()); info.thread_id = env_->GetThreadID(); info.job_id = job_id; @@ -285,7 +285,7 @@ void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd, info.cf_name = cfd->GetName(); // TODO(yhchiang): make db_paths dynamic in case flush does not // go to L0 in the future. - info.file_path = MakeTableFileName(immutable_db_options_.db_paths[0].path, + info.file_path = MakeTableFileName(cfd->ioptions()->cf_paths[0].path, file_meta->fd.GetNumber()); info.thread_id = env_->GetThreadID(); info.job_id = job_id; @@ -308,12 +308,13 @@ void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd, Status DBImpl::CompactRange(const CompactRangeOptions& options, ColumnFamilyHandle* column_family, const Slice* begin, const Slice* end) { - if (options.target_path_id >= immutable_db_options_.db_paths.size()) { + auto cfh = reinterpret_cast(column_family); + auto cfd = cfh->cfd(); + + if (options.target_path_id >= cfd->ioptions()->cf_paths.size()) { return Status::InvalidArgument("Invalid target path ID"); } - auto cfh = reinterpret_cast(column_family); - auto cfd = cfh->cfd(); bool exclusive = options.exclusive_manual_compaction; bool flush_needed = true; @@ -579,7 +580,7 @@ Status DBImpl::CompactFilesImpl( version->GetColumnFamilyMetaData(&cf_meta); if (output_path_id < 0) { - if (immutable_db_options_.db_paths.size() == 1U) { + if (cfd->ioptions()->cf_paths.size() == 1U) { output_path_id = 0; } else { return Status::NotSupported( @@ -651,8 +652,9 @@ Status DBImpl::CompactFilesImpl( job_context->job_id, c.get(), immutable_db_options_, env_options_for_compaction_, versions_.get(), &shutting_down_, preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(), - directories_.GetDataDir(c->output_path_id()), stats_, &mutex_, &bg_error_, - snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker, + GetDataDir(c->column_family_data(), c->output_path_id()), + stats_, &mutex_, &bg_error_, snapshot_seqs, + earliest_write_conflict_snapshot, snapshot_checker, table_cache_, &event_logger_, c->mutable_cf_options()->paranoid_file_checks, c->mutable_cf_options()->report_bg_io_stats, dbname_, @@ -729,7 +731,7 @@ Status DBImpl::CompactFilesImpl( if (output_file_names != nullptr) { for (const auto newf : c->edit()->GetNewFiles()) { (*output_file_names).push_back(TableFileName( - immutable_db_options_.db_paths, newf.second.fd.GetNumber(), + c->immutable_cf_options()->cf_paths, newf.second.fd.GetNumber(), newf.second.fd.GetPathId()) ); } } @@ -805,7 +807,7 @@ void DBImpl::NotifyOnCompactionCompleted( info.compression = c->output_compression(); for (size_t i = 0; i < c->num_input_levels(); ++i) { for (const auto fmd : *c->inputs(i)) { - auto fn = TableFileName(immutable_db_options_.db_paths, + auto fn = TableFileName(c->immutable_cf_options()->cf_paths, fmd->fd.GetNumber(), fmd->fd.GetPathId()); info.input_files.push_back(fn); if (info.table_properties.count(fn) == 0) { @@ -818,9 +820,10 @@ void DBImpl::NotifyOnCompactionCompleted( } } for (const auto newf : c->edit()->GetNewFiles()) { - info.output_files.push_back(TableFileName(immutable_db_options_.db_paths, - newf.second.fd.GetNumber(), - newf.second.fd.GetPathId())); + info.output_files.push_back(TableFileName( + c->immutable_cf_options()->cf_paths, + newf.second.fd.GetNumber(), + newf.second.fd.GetPathId())); } for (auto listener : immutable_db_options_.listeners) { listener->OnCompactionCompleted(this, info); @@ -1292,10 +1295,9 @@ void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) { } void DBImpl::SchedulePendingPurge(std::string fname, FileType type, - uint64_t number, uint32_t path_id, - int job_id) { + uint64_t number, int job_id) { mutex_.AssertHeld(); - PurgeFileInfo file_info(fname, type, number, path_id, job_id); + PurgeFileInfo file_info(fname, type, number, job_id); purge_queue_.push_back(std::move(file_info)); } @@ -1876,8 +1878,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress, job_context->job_id, c.get(), immutable_db_options_, env_options_for_compaction_, versions_.get(), &shutting_down_, preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(), - directories_.GetDataDir(c->output_path_id()), stats_, &mutex_, - &bg_error_, snapshot_seqs, earliest_write_conflict_snapshot, + GetDataDir(c->column_family_data(), c->output_path_id()), + stats_, &mutex_, &bg_error_, + snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker, table_cache_, &event_logger_, c->mutable_cf_options()->paranoid_file_checks, c->mutable_cf_options()->report_bg_io_stats, dbname_, diff --git a/db/db_impl_files.cc b/db/db_impl_files.cc index c3e698e98..e3cc6722d 100644 --- a/db/db_impl_files.cc +++ b/db/db_impl_files.cc @@ -191,8 +191,8 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, // Mark the elements in job_context->sst_delete_files as grabbedForPurge // so that other threads calling FindObsoleteFiles with full_scan=true // will not add these files to candidate list for purge. - for (const auto sst_to_del : job_context->sst_delete_files) { - MarkAsGrabbedForPurge(sst_to_del->fd.GetNumber()); + for (const auto& sst_to_del : job_context->sst_delete_files) { + MarkAsGrabbedForPurge(sst_to_del.metadata->fd.GetNumber()); } // store the current filenum, lognum, etc @@ -207,13 +207,29 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, if (doing_the_full_scan) { InfoLogPrefix info_log_prefix(!immutable_db_options_.db_log_dir.empty(), dbname_); + std::vector paths; for (size_t path_id = 0; path_id < immutable_db_options_.db_paths.size(); path_id++) { + paths.emplace_back(immutable_db_options_.db_paths[path_id].path); + } + + // Note that if cf_paths is not specified in the ColumnFamilyOptions + // of a particular column family, we use db_paths as the cf_paths + // setting. Hence, there can be multiple duplicates of files from db_paths + // in the following code. The duplicate are removed while identifying + // unique files in PurgeObsoleteFiles. + for (auto cfd : *versions_->GetColumnFamilySet()) { + for (size_t path_id = 0; path_id < cfd->ioptions()->cf_paths.size(); + path_id++) { + paths.emplace_back(cfd->ioptions()->cf_paths[path_id].path); + } + } + + for (auto& path : paths) { // set of all files in the directory. We'll exclude files that are still // alive in the subsequent processings. std::vector files; - env_->GetChildren(immutable_db_options_.db_paths[path_id].path, - &files); // Ignore errors + env_->GetChildren(path, &files); // Ignore errors for (const std::string& file : files) { uint64_t number; FileType type; @@ -231,7 +247,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, // TODO(icanadi) clean up this mess to avoid having one-off "/" prefixes job_context->full_scan_candidate_files.emplace_back( - "/" + file, static_cast(path_id)); + "/" + file, path); } } @@ -241,7 +257,8 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, env_->GetChildren(immutable_db_options_.wal_dir, &log_files); // Ignore errors for (const std::string& log_file : log_files) { - job_context->full_scan_candidate_files.emplace_back(log_file, 0); + job_context->full_scan_candidate_files.emplace_back(log_file, + immutable_db_options_.wal_dir); } } // Add info log files in db_log_dir @@ -250,8 +267,9 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, std::vector info_log_files; // Ignore errors env_->GetChildren(immutable_db_options_.db_log_dir, &info_log_files); - for (std::string log_file : info_log_files) { - job_context->full_scan_candidate_files.emplace_back(log_file, 0); + for (std::string& log_file : info_log_files) { + job_context->full_scan_candidate_files.emplace_back(log_file, + immutable_db_options_.db_log_dir); } } } @@ -326,7 +344,7 @@ bool CompareCandidateFile(const JobContext::CandidateFileInfo& first, } else if (first.file_name < second.file_name) { return false; } else { - return (first.path_id > second.path_id); + return (first.file_path > second.file_path); } } }; // namespace @@ -335,12 +353,11 @@ bool CompareCandidateFile(const JobContext::CandidateFileInfo& first, // Note: All WAL files must be deleted through this function (unelss they are // archived) to ensure that maniefest is updated properly. void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname, - FileType type, uint64_t number, - uint32_t path_id) { + FileType type, uint64_t number) { Status file_deletion_status; if (type == kTableFile) { file_deletion_status = - DeleteSSTFile(&immutable_db_options_, fname, path_id); + DeleteSSTFile(&immutable_db_options_, fname); } else { if (type == kLogFile) { // Before deleting the file, mark file as deleted in the manifest @@ -385,7 +402,7 @@ void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname, // belong to live files are possibly removed. Also, removes all the // files in sst_delete_files and log_delete_files. // It is not necessary to hold the mutex when invoking this method. -void DBImpl::PurgeObsoleteFiles(const JobContext& state, bool schedule_only) { +void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:Begin"); // we'd better have sth to delete assert(state.HaveSomethingToDelete()); @@ -408,23 +425,23 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state, bool schedule_only) { state.log_delete_files.size() + state.manifest_delete_files.size()); // We may ignore the dbname when generating the file names. const char* kDumbDbName = ""; - for (auto file : state.sst_delete_files) { + for (auto& file : state.sst_delete_files) { candidate_files.emplace_back( - MakeTableFileName(kDumbDbName, file->fd.GetNumber()), - file->fd.GetPathId()); - if (file->table_reader_handle) { - table_cache_->Release(file->table_reader_handle); + MakeTableFileName(kDumbDbName, file.metadata->fd.GetNumber()), file.path); + if (file.metadata->table_reader_handle) { + table_cache_->Release(file.metadata->table_reader_handle); } - delete file; + file.DeleteMetadata(); } for (auto file_num : state.log_delete_files) { if (file_num > 0) { - candidate_files.emplace_back(LogFileName(kDumbDbName, file_num), 0); + candidate_files.emplace_back(LogFileName(kDumbDbName, file_num), + immutable_db_options_.wal_dir); } } for (const auto& filename : state.manifest_delete_files) { - candidate_files.emplace_back(filename, 0); + candidate_files.emplace_back(filename, dbname_); } // dedup state.candidate_files so we don't try to delete the same @@ -450,7 +467,6 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state, bool schedule_only) { std::unordered_set files_to_del; for (const auto& candidate_file : candidate_files) { std::string to_delete = candidate_file.file_name; - uint32_t path_id = candidate_file.path_id; uint64_t number; FileType type; // Ignore file if we cannot recognize it. @@ -517,7 +533,7 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state, bool schedule_only) { if (type == kTableFile) { // evict from cache TableCache::Evict(table_cache_.get(), number); - fname = TableFileName(immutable_db_options_.db_paths, number, path_id); + fname = MakeTableFileName(candidate_file.file_path, number); } else { fname = ((type == kLogFile) ? immutable_db_options_.wal_dir : dbname_) + "/" + to_delete; @@ -534,9 +550,9 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state, bool schedule_only) { Status file_deletion_status; if (schedule_only) { InstrumentedMutexLock guard_lock(&mutex_); - SchedulePendingPurge(fname, type, number, path_id, state.job_id); + SchedulePendingPurge(fname, type, number, state.job_id); } else { - DeleteObsoleteFileImpl(state.job_id, fname, type, number, path_id); + DeleteObsoleteFileImpl(state.job_id, fname, type, number); } } diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 78e871c58..5830f0474 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -163,17 +163,13 @@ static Status ValidateOptions( if (s.ok() && db_options.allow_concurrent_memtable_write) { s = CheckConcurrentWritesSupported(cfd.options); } + if (s.ok()) { + s = CheckCFPathsSupported(db_options, cfd.options); + } if (!s.ok()) { return s; } - if (db_options.db_paths.size() > 1) { - if ((cfd.options.compaction_style != kCompactionStyleUniversal) && - (cfd.options.compaction_style != kCompactionStyleLevel)) { - return Status::NotSupported( - "More than one DB paths are only supported in " - "universal and level compaction styles. "); - } - } + if (cfd.options.ttl > 0 || cfd.options.compaction_options_fifo.ttl > 0) { if (db_options.max_open_files != -1) { return Status::NotSupported( @@ -253,9 +249,9 @@ Status DBImpl::NewDB() { return s; } -Status DBImpl::Directories::CreateAndNewDirectory( +Status DBImpl::CreateAndNewDirectory( Env* env, const std::string& dirname, - std::unique_ptr* directory) const { + std::unique_ptr* directory) { // We call CreateDirIfMissing() as the directory may already exist (if we // are reopening a DB), when this happens we don't want creating the // directory to cause an error. However, we need to check if creating the @@ -273,12 +269,12 @@ Status DBImpl::Directories::CreateAndNewDirectory( Status DBImpl::Directories::SetDirectories( Env* env, const std::string& dbname, const std::string& wal_dir, const std::vector& data_paths) { - Status s = CreateAndNewDirectory(env, dbname, &db_dir_); + Status s = DBImpl::CreateAndNewDirectory(env, dbname, &db_dir_); if (!s.ok()) { return s; } if (!wal_dir.empty() && dbname != wal_dir) { - s = CreateAndNewDirectory(env, wal_dir, &wal_dir_); + s = DBImpl::CreateAndNewDirectory(env, wal_dir, &wal_dir_); if (!s.ok()) { return s; } @@ -291,7 +287,7 @@ Status DBImpl::Directories::SetDirectories( data_dirs_.emplace_back(nullptr); } else { std::unique_ptr path_directory; - s = CreateAndNewDirectory(env, db_path, &path_directory); + s = DBImpl::CreateAndNewDirectory(env, db_path, &path_directory); if (!s.ok()) { return s; } @@ -384,6 +380,14 @@ Status DBImpl::Recover( if (immutable_db_options_.paranoid_checks && s.ok()) { s = CheckConsistency(); } + if (s.ok() && !read_only) { + for (auto cfd : *versions_->GetColumnFamilySet()) { + s = cfd->AddDirectories(); + if (!s.ok()) { + return s; + } + } + } if (s.ok()) { SequenceNumber next_sequence(kMaxSequenceNumber); default_cf_handle_ = new ColumnFamilyHandleImpl( @@ -1030,8 +1034,17 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, DBImpl* impl = new DBImpl(db_options, dbname, seq_per_batch); s = impl->env_->CreateDirIfMissing(impl->immutable_db_options_.wal_dir); if (s.ok()) { - for (auto db_path : impl->immutable_db_options_.db_paths) { - s = impl->env_->CreateDirIfMissing(db_path.path); + std::vector paths; + for (auto& db_path : impl->immutable_db_options_.db_paths) { + paths.emplace_back(db_path.path); + } + for (auto& cf : column_families) { + for (auto& cf_path : cf.options.cf_paths) { + paths.emplace_back(cf_path.path); + } + } + for (auto& path : paths) { + s = impl->env_->CreateDirIfMissing(path); if (!s.ok()) { break; } @@ -1174,17 +1187,28 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, impl->immutable_db_options_.sst_file_manager.get()); if (s.ok() && sfm) { // Notify SstFileManager about all sst files that already exist in - // db_paths[0] when the DB is opened. - auto& db_path = impl->immutable_db_options_.db_paths[0]; - std::vector existing_files; - impl->immutable_db_options_.env->GetChildren(db_path.path, &existing_files); - for (auto& file_name : existing_files) { - uint64_t file_number; - FileType file_type; - std::string file_path = db_path.path + "/" + file_name; - if (ParseFileName(file_name, &file_number, &file_type) && - file_type == kTableFile) { - sfm->OnAddFile(file_path); + // db_paths[0] and cf_paths[0] when the DB is opened. + std::vector paths; + paths.emplace_back(impl->immutable_db_options_.db_paths[0].path); + for (auto& cf : column_families) { + if (!cf.options.cf_paths.empty()) { + paths.emplace_back(cf.options.cf_paths[0].path); + } + } + // Remove duplicate paths. + std::sort(paths.begin(), paths.end()); + paths.erase(std::unique(paths.begin(), paths.end()), paths.end()); + for (auto& path : paths) { + std::vector existing_files; + impl->immutable_db_options_.env->GetChildren(path, &existing_files); + for (auto& file_name : existing_files) { + uint64_t file_number; + FileType file_type; + std::string file_path = path + "/" + file_name; + if (ParseFileName(file_name, &file_number, &file_type) && + file_type == kTableFile) { + sfm->OnAddFile(file_path); + } } } } diff --git a/db/db_sst_test.cc b/db/db_sst_test.cc index 408cbd435..a67145942 100644 --- a/db/db_sst_test.cc +++ b/db/db_sst_test.cc @@ -103,7 +103,7 @@ TEST_F(DBSSTTest, SSTsWithLdbSuffixHandling) { ASSERT_GT(num_files, 0); std::vector filenames; - GetSstFiles(dbname_, &filenames); + GetSstFiles(env_, dbname_, &filenames); int num_ldb_files = 0; for (size_t i = 0; i < filenames.size(); ++i) { if (i & 1) { diff --git a/db/db_test2.cc b/db/db_test2.cc index 7af5379e2..b1e07d106 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -1112,7 +1112,7 @@ TEST_F(DBTest2, PresetCompressionDict) { size_t out_bytes = 0; std::vector files; - GetSstFiles(dbname_, &files); + GetSstFiles(env_, dbname_, &files); for (const auto& file : files) { uint64_t curr_bytes; env_->GetFileSize(dbname_ + "/" + file, &curr_bytes); diff --git a/db/db_test_util.cc b/db/db_test_util.cc index 79f364159..107a7f167 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -575,9 +575,17 @@ void DBTestBase::DestroyAndReopen(const Options& options) { ASSERT_OK(TryReopen(options)); } -void DBTestBase::Destroy(const Options& options) { +void DBTestBase::Destroy(const Options& options, bool delete_cf_paths) { + std::vector column_families; + if (delete_cf_paths) { + for (size_t i = 0; i < handles_.size(); ++i) { + ColumnFamilyDescriptor cfdescriptor; + handles_[i]->GetDescriptor(&cfdescriptor); + column_families.push_back(cfdescriptor); + } + } Close(); - ASSERT_OK(DestroyDB(dbname_, options)); + ASSERT_OK(DestroyDB(dbname_, options, column_families)); } Status DBTestBase::ReadOnlyReopen(const Options& options) { @@ -1017,9 +1025,9 @@ std::string DBTestBase::DumpSSTableList() { return property; } -void DBTestBase::GetSstFiles(std::string path, +void DBTestBase::GetSstFiles(Env* env, std::string path, std::vector* files) { - env_->GetChildren(path, files); + env->GetChildren(path, files); files->erase( std::remove_if(files->begin(), files->end(), [](std::string name) { @@ -1031,7 +1039,7 @@ void DBTestBase::GetSstFiles(std::string path, int DBTestBase::GetSstFileCount(std::string path) { std::vector files; - GetSstFiles(path, &files); + DBTestBase::GetSstFiles(env_, path, &files); return static_cast(files.size()); } diff --git a/db/db_test_util.h b/db/db_test_util.h index 0c3da91ce..df1e7cfb2 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -804,7 +804,7 @@ class DBTestBase : public testing::Test { void DestroyAndReopen(const Options& options); - void Destroy(const Options& options); + void Destroy(const Options& options, bool delete_cf_paths = false); Status ReadOnlyReopen(const Options& options); @@ -904,7 +904,8 @@ class DBTestBase : public testing::Test { std::string DumpSSTableList(); - void GetSstFiles(std::string path, std::vector* files); + static void GetSstFiles(Env* env, std::string path, + std::vector* files); int GetSstFileCount(std::string path); diff --git a/db/db_universal_compaction_test.cc b/db/db_universal_compaction_test.cc index 80d17de21..ad672d91b 100644 --- a/db/db_universal_compaction_test.cc +++ b/db/db_universal_compaction_test.cc @@ -1343,6 +1343,146 @@ TEST_P(DBTestUniversalCompaction, UniversalCompactionFourPaths) { Destroy(options); } +TEST_P(DBTestUniversalCompaction, UniversalCompactionCFPathUse) { + Options options = CurrentOptions(); + options.db_paths.emplace_back(dbname_, 300 * 1024); + options.db_paths.emplace_back(dbname_ + "_2", 300 * 1024); + options.db_paths.emplace_back(dbname_ + "_3", 500 * 1024); + options.db_paths.emplace_back(dbname_ + "_4", 1024 * 1024 * 1024); + options.memtable_factory.reset( + new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1)); + options.compaction_style = kCompactionStyleUniversal; + options.compaction_options_universal.size_ratio = 5; + options.write_buffer_size = 111 << 10; // 114KB + options.arena_block_size = 4 << 10; + options.level0_file_num_compaction_trigger = 2; + options.num_levels = 1; + + std::vector option_vector; + option_vector.emplace_back(options); + ColumnFamilyOptions cf_opt1(options), cf_opt2(options); + // Configure CF1 specific paths. + cf_opt1.cf_paths.emplace_back(dbname_ + "cf1", 300 * 1024); + cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_2", 300 * 1024); + cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_3", 500 * 1024); + cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_4", 1024 * 1024 * 1024); + option_vector.emplace_back(DBOptions(options), cf_opt1); + CreateColumnFamilies({"one"},option_vector[1]); + + // Configura CF2 specific paths. + cf_opt2.cf_paths.emplace_back(dbname_ + "cf2", 300 * 1024); + cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_2", 300 * 1024); + cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_3", 500 * 1024); + cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_4", 1024 * 1024 * 1024); + option_vector.emplace_back(DBOptions(options), cf_opt2); + CreateColumnFamilies({"two"},option_vector[2]); + + ReopenWithColumnFamilies({"default", "one", "two"}, option_vector); + + Random rnd(301); + int key_idx = 0; + int key_idx1 = 0; + int key_idx2 = 0; + + auto generate_file = [&]() { + GenerateNewFile(0, &rnd, &key_idx); + GenerateNewFile(1, &rnd, &key_idx1); + GenerateNewFile(2, &rnd, &key_idx2); + }; + + auto check_sstfilecount = [&](int path_id, int expected) { + ASSERT_EQ(expected, GetSstFileCount(options.db_paths[path_id].path)); + ASSERT_EQ(expected, GetSstFileCount(cf_opt1.cf_paths[path_id].path)); + ASSERT_EQ(expected, GetSstFileCount(cf_opt2.cf_paths[path_id].path)); + }; + + auto check_getvalues = [&]() { + for (int i = 0; i < key_idx; i++) { + auto v = Get(0, Key(i)); + ASSERT_NE(v, "NOT_FOUND"); + ASSERT_TRUE(v.size() == 1 || v.size() == 990); + } + + for (int i = 0; i < key_idx1; i++) { + auto v = Get(1, Key(i)); + ASSERT_NE(v, "NOT_FOUND"); + ASSERT_TRUE(v.size() == 1 || v.size() == 990); + } + + for (int i = 0; i < key_idx2; i++) { + auto v = Get(2, Key(i)); + ASSERT_NE(v, "NOT_FOUND"); + ASSERT_TRUE(v.size() == 1 || v.size() == 990); + } + }; + + // First three 110KB files are not going to second path. + // After that, (100K, 200K) + for (int num = 0; num < 3; num++) { + generate_file(); + } + + // Another 110KB triggers a compaction to 400K file to second path + generate_file(); + check_sstfilecount(2, 1); + + // (1, 4) + generate_file(); + check_sstfilecount(2, 1); + check_sstfilecount(0, 1); + + // (1,1,4) -> (2, 4) + generate_file(); + check_sstfilecount(2, 1); + check_sstfilecount(1, 1); + check_sstfilecount(0, 0); + + // (1, 2, 4) -> (3, 4) + generate_file(); + check_sstfilecount(2, 1); + check_sstfilecount(1, 1); + check_sstfilecount(0, 0); + + // (1, 3, 4) -> (8) + generate_file(); + check_sstfilecount(3, 1); + + // (1, 8) + generate_file(); + check_sstfilecount(3, 1); + check_sstfilecount(0, 1); + + // (1, 1, 8) -> (2, 8) + generate_file(); + check_sstfilecount(3, 1); + check_sstfilecount(1, 1); + + // (1, 2, 8) -> (3, 8) + generate_file(); + check_sstfilecount(3, 1); + check_sstfilecount(1, 1); + check_sstfilecount(0, 0); + + // (1, 3, 8) -> (4, 8) + generate_file(); + check_sstfilecount(2, 1); + check_sstfilecount(3, 1); + + // (1, 4, 8) -> (5, 8) + generate_file(); + check_sstfilecount(3, 1); + check_sstfilecount(2, 1); + check_sstfilecount(0, 0); + + check_getvalues(); + + ReopenWithColumnFamilies({"default", "one", "two"}, option_vector); + + check_getvalues(); + + Destroy(options, true); +} + TEST_P(DBTestUniversalCompaction, IncreaseUniversalCompactionNumLevels) { std::function verify_func = [&](int num_keys_in_db) { std::string keys_in_db; diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index b37440e47..91ef46988 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -94,7 +94,8 @@ Status ExternalSstFileIngestionJob::Prepare( const std::string path_outside_db = f.external_file_path; const std::string path_inside_db = - TableFileName(db_options_.db_paths, f.fd.GetNumber(), f.fd.GetPathId()); + TableFileName(cfd_->ioptions()->cf_paths, f.fd.GetNumber(), + f.fd.GetPathId()); if (ingestion_options_.move_files) { status = env_->LinkFile(path_outside_db, path_inside_db); diff --git a/db/job_context.h b/db/job_context.h index 0700cfcee..4669e1714 100644 --- a/db/job_context.h +++ b/db/job_context.h @@ -92,11 +92,12 @@ struct JobContext { // Structure to store information for candidate files to delete. struct CandidateFileInfo { std::string file_name; - uint32_t path_id; - CandidateFileInfo(std::string name, uint32_t path) - : file_name(std::move(name)), path_id(path) {} + std::string file_path; + CandidateFileInfo(std::string name, std::string path) + : file_name(std::move(name)), file_path(path) {} bool operator==(const CandidateFileInfo& other) const { - return file_name == other.file_name && path_id == other.path_id; + return file_name == other.file_name && + file_path == other.file_path; } }; @@ -113,7 +114,7 @@ struct JobContext { std::vector sst_live; // a list of sst files that we need to delete - std::vector sst_delete_files; + std::vector sst_delete_files; // a list of log files that we need to delete std::vector log_delete_files; diff --git a/db/repair.cc b/db/repair.cc index 583a3dbe7..4e59924b0 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -101,10 +101,12 @@ class Repairer { db_options_(SanitizeOptions(dbname_, db_options)), immutable_db_options_(ImmutableDBOptions(db_options_)), icmp_(default_cf_opts.comparator), - default_cf_opts_(default_cf_opts), + default_cf_opts_( + SanitizeOptions(immutable_db_options_, default_cf_opts)), default_cf_iopts_( - ImmutableCFOptions(immutable_db_options_, default_cf_opts)), - unknown_cf_opts_(unknown_cf_opts), + ImmutableCFOptions(immutable_db_options_, default_cf_opts_)), + unknown_cf_opts_( + SanitizeOptions(immutable_db_options_, unknown_cf_opts)), create_unknown_cfs_(create_unknown_cfs), raw_table_cache_( // TableCache can be small since we expect each table to be opened diff --git a/db/table_cache.cc b/db/table_cache.cc index a4f2b2ceb..18246f4f8 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -92,7 +92,7 @@ Status TableCache::GetTableReader( bool skip_filters, int level, bool prefetch_index_and_filter_in_cache, bool for_compaction) { std::string fname = - TableFileName(ioptions_.db_paths, fd.GetNumber(), fd.GetPathId()); + TableFileName(ioptions_.cf_paths, fd.GetNumber(), fd.GetPathId()); unique_ptr file; Status s = ioptions_.env->NewRandomAccessFile(fname, &file, env_options); diff --git a/db/version_set.cc b/db/version_set.cc index b66b3436e..f31fa6792 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -355,7 +355,11 @@ Version::~Version() { assert(f->refs > 0); f->refs--; if (f->refs <= 0) { - vset_->obsolete_files_.push_back(f); + assert(cfd_ != nullptr); + uint32_t path_id = f->fd.GetPathId(); + assert(path_id < cfd_->ioptions()->cf_paths.size()); + vset_->obsolete_files_.push_back( + ObsoleteFileInfo(f, cfd_->ioptions()->cf_paths[path_id].path)); } } } @@ -756,7 +760,7 @@ Status Version::GetTableProperties(std::shared_ptr* tp, file_name = *fname; } else { file_name = - TableFileName(vset_->db_options_->db_paths, file_meta->fd.GetNumber(), + TableFileName(ioptions->cf_paths, file_meta->fd.GetNumber(), file_meta->fd.GetPathId()); } s = ioptions->env->NewRandomAccessFile(file_name, &file, env_options_); @@ -797,7 +801,7 @@ Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props, int level) { for (const auto& file_meta : storage_info_.files_[level]) { auto fname = - TableFileName(vset_->db_options_->db_paths, file_meta->fd.GetNumber(), + TableFileName(cfd_->ioptions()->cf_paths, file_meta->fd.GetNumber(), file_meta->fd.GetPathId()); // 1. If the table is already present in table cache, load table // properties from there. @@ -825,7 +829,7 @@ Status Version::GetPropertiesOfTablesInRange( false); for (const auto& file_meta : files) { auto fname = - TableFileName(vset_->db_options_->db_paths, + TableFileName(cfd_->ioptions()->cf_paths, file_meta->fd.GetNumber(), file_meta->fd.GetPathId()); if (props->count(fname) == 0) { // 1. If the table is already present in table cache, load table @@ -897,11 +901,11 @@ void Version::GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta) { for (const auto& file : vstorage->LevelFiles(level)) { uint32_t path_id = file->fd.GetPathId(); std::string file_path; - if (path_id < ioptions->db_paths.size()) { - file_path = ioptions->db_paths[path_id].path; + if (path_id < ioptions->cf_paths.size()) { + file_path = ioptions->cf_paths[path_id].path; } else { - assert(!ioptions->db_paths.empty()); - file_path = ioptions->db_paths.back().path; + assert(!ioptions->cf_paths.empty()); + file_path = ioptions->cf_paths.back().path; } files.emplace_back( MakeTableFileName("", file->fd.GetNumber()), file_path, @@ -2687,12 +2691,12 @@ VersionSet::~VersionSet() { Cache* table_cache = column_family_set_->get_table_cache(); table_cache->ApplyToAllCacheEntries(&CloseTables, false /* thread_safe */); column_family_set_.reset(); - for (auto file : obsolete_files_) { - if (file->table_reader_handle) { - table_cache->Release(file->table_reader_handle); - TableCache::Evict(table_cache, file->fd.GetNumber()); + for (auto& file : obsolete_files_) { + if (file.metadata->table_reader_handle) { + table_cache->Release(file.metadata->table_reader_handle); + TableCache::Evict(table_cache, file.metadata->fd.GetNumber()); } - delete file; + file.DeleteMetadata(); } obsolete_files_.clear(); } @@ -4101,11 +4105,11 @@ void VersionSet::GetLiveFilesMetaData(std::vector* metadata) { LiveFileMetaData filemetadata; filemetadata.column_family_name = cfd->GetName(); uint32_t path_id = file->fd.GetPathId(); - if (path_id < db_options_->db_paths.size()) { - filemetadata.db_path = db_options_->db_paths[path_id].path; + if (path_id < cfd->ioptions()->cf_paths.size()) { + filemetadata.db_path = cfd->ioptions()->cf_paths[path_id].path; } else { - assert(!db_options_->db_paths.empty()); - filemetadata.db_path = db_options_->db_paths.back().path; + assert(!cfd->ioptions()->cf_paths.empty()); + filemetadata.db_path = cfd->ioptions()->cf_paths.back().path; } filemetadata.name = MakeTableFileName("", file->fd.GetNumber()); filemetadata.level = level; @@ -4120,17 +4124,17 @@ void VersionSet::GetLiveFilesMetaData(std::vector* metadata) { } } -void VersionSet::GetObsoleteFiles(std::vector* files, +void VersionSet::GetObsoleteFiles(std::vector* files, std::vector* manifest_filenames, uint64_t min_pending_output) { assert(manifest_filenames->empty()); obsolete_manifests_.swap(*manifest_filenames); - std::vector pending_files; - for (auto f : obsolete_files_) { - if (f->fd.GetNumber() < min_pending_output) { - files->push_back(f); + std::vector pending_files; + for (auto& f : obsolete_files_) { + if (f.metadata->fd.GetNumber() < min_pending_output) { + files->push_back(std::move(f)); } else { - pending_files.push_back(f); + pending_files.push_back(std::move(f)); } } obsolete_files_.swap(pending_files); diff --git a/db/version_set.h b/db/version_set.h index 3783da314..df0ef5ae8 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -695,6 +695,36 @@ class Version { void operator=(const Version&); }; +struct ObsoleteFileInfo { + FileMetaData* metadata; + std::string path; + + ObsoleteFileInfo() noexcept : metadata(nullptr) {} + ObsoleteFileInfo(FileMetaData* f, const std::string& file_path) + : metadata(f), path(file_path) {} + + ObsoleteFileInfo(const ObsoleteFileInfo&) = delete; + ObsoleteFileInfo& operator=(const ObsoleteFileInfo&) = delete; + + ObsoleteFileInfo(ObsoleteFileInfo&& rhs) noexcept : + ObsoleteFileInfo() { + *this = std::move(rhs); + } + + ObsoleteFileInfo& operator=(ObsoleteFileInfo&& rhs) noexcept { + path = std::move(rhs.path); + metadata = rhs.metadata; + rhs.metadata = nullptr; + + return *this; + } + + void DeleteMetadata() { + delete metadata; + metadata = nullptr; + } +}; + class VersionSet { public: VersionSet(const std::string& dbname, const ImmutableDBOptions* db_options, @@ -876,7 +906,7 @@ class VersionSet { // This function doesn't support leveldb SST filenames void GetLiveFilesMetaData(std::vector *metadata); - void GetObsoleteFiles(std::vector* files, + void GetObsoleteFiles(std::vector* files, std::vector* manifest_filenames, uint64_t min_pending_output); @@ -959,7 +989,7 @@ class VersionSet { // Current size of manifest file uint64_t manifest_file_size_; - std::vector obsolete_files_; + std::vector obsolete_files_; std::vector obsolete_manifests_; // env options for all reads and writes except compactions diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index dc74398c3..5d8f8f699 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -1167,7 +1167,9 @@ class DB { // Destroy the contents of the specified database. // Be very careful using this method. -Status DestroyDB(const std::string& name, const Options& options); +Status DestroyDB(const std::string& name, const Options& options, + const std::vector& column_families = + std::vector()); #ifndef ROCKSDB_LITE // If a DB cannot be opened, you may attempt to call this method to diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 24f5f8fd0..0e1ecb9de 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -77,6 +77,7 @@ enum CompressionType : unsigned char { }; struct Options; +struct DbPath; struct ColumnFamilyOptions : public AdvancedColumnFamilyOptions { // The function recovers options to a previous version. Only 4.6 or later @@ -263,6 +264,20 @@ struct ColumnFamilyOptions : public AdvancedColumnFamilyOptions { // BlockBasedTableOptions. std::shared_ptr table_factory; + // A list of paths where SST files for this column family + // can be put into, with its target size. Similar to db_paths, + // newer data is placed into paths specified earlier in the + // vector while older data gradually moves to paths specified + // later in the vector. + // Note that, if a path is supplied to multiple column + // families, it would have files and total size from all + // the column families combined. User should privision for the + // total size(from all the column families) in such cases. + // + // If left empty, db_paths will be used. + // Default: empty + std::vector cf_paths; + // Create ColumnFamilyOptions with default values for all fields ColumnFamilyOptions(); // Create ColumnFamilyOptions from Options diff --git a/options/cf_options.cc b/options/cf_options.cc index c7e12f587..055102b17 100644 --- a/options/cf_options.cc +++ b/options/cf_options.cc @@ -75,7 +75,8 @@ ImmutableCFOptions::ImmutableCFOptions(const ImmutableDBOptions& db_options, max_subcompactions(db_options.max_subcompactions), memtable_insert_with_hint_prefix_extractor( cf_options.memtable_insert_with_hint_prefix_extractor.get()), - ttl(cf_options.ttl) {} + ttl(cf_options.ttl), + cf_paths(cf_options.cf_paths) {} // Multiple two operands. If they overflow, return op1. uint64_t MultiplyCheckOverflow(uint64_t op1, double op2) { diff --git a/options/cf_options.h b/options/cf_options.h index fec659652..9cc8ff704 100644 --- a/options/cf_options.h +++ b/options/cf_options.h @@ -120,6 +120,8 @@ struct ImmutableCFOptions { const SliceTransform* memtable_insert_with_hint_prefix_extractor; uint64_t ttl; + + std::vector cf_paths; }; struct MutableCFOptions { diff --git a/options/options_helper.cc b/options/options_helper.cc index 9088b589e..6f6c8e5ed 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -1601,6 +1601,7 @@ std::unordered_map uint34_t* existing_value_size, Slice delta_value, std::string* merged_value); + std::vector cf_paths; */ {"report_bg_io_stats", {offset_of(&ColumnFamilyOptions::report_bg_io_stats), diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index 08ae53986..b88cd0215 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -345,6 +345,8 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { sizeof(std::shared_ptr)}, {offset_of(&ColumnFamilyOptions::table_factory), sizeof(std::shared_ptr)}, + {offset_of(&ColumnFamilyOptions::cf_paths), + sizeof(std::vector)}, }; char* options_ptr = new char[sizeof(ColumnFamilyOptions)]; diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 1105b2a66..513588c17 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -1017,8 +1017,10 @@ struct ThreadState { class DbStressListener : public EventListener { public: DbStressListener(const std::string& db_name, - const std::vector& db_paths) - : db_name_(db_name), db_paths_(db_paths) {} + const std::vector& db_paths, + const std::vector& column_families) + : db_name_(db_name), db_paths_(db_paths), + column_families_(column_families) {} virtual ~DbStressListener() {} #ifndef ROCKSDB_LITE virtual void OnFlushCompleted(DB* db, const FlushJobInfo& info) override { @@ -1085,6 +1087,13 @@ class DbStressListener : public EventListener { return; } } + for (auto& cf : column_families_) { + for (const auto& cf_path : cf.options.cf_paths) { + if (cf_path.path == file_dir) { + return; + } + } + } assert(false); #endif // !NDEBUG } @@ -1117,6 +1126,7 @@ class DbStressListener : public EventListener { private: std::string db_name_; std::vector db_paths_; + std::vector column_families_; }; } // namespace @@ -2547,7 +2557,7 @@ class StressTest { } options_.listeners.clear(); options_.listeners.emplace_back( - new DbStressListener(FLAGS_db, options_.db_paths)); + new DbStressListener(FLAGS_db, options_.db_paths, cf_descriptors)); options_.create_missing_column_families = true; if (!FLAGS_use_txn) { s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors, diff --git a/util/file_util.cc b/util/file_util.cc index 8a1adf2bd..19465efbc 100644 --- a/util/file_util.cc +++ b/util/file_util.cc @@ -83,7 +83,7 @@ Status CreateFile(Env* env, const std::string& destination, } Status DeleteSSTFile(const ImmutableDBOptions* db_options, - const std::string& fname, uint32_t path_id) { + const std::string& fname) { #ifndef ROCKSDB_LITE auto sfm = static_cast(db_options->sst_file_manager.get()); diff --git a/util/file_util.h b/util/file_util.h index e59377ab1..df220256f 100644 --- a/util/file_util.h +++ b/util/file_util.h @@ -22,6 +22,6 @@ extern Status CreateFile(Env* env, const std::string& destination, const std::string& contents); extern Status DeleteSSTFile(const ImmutableDBOptions* db_options, - const std::string& fname, uint32_t path_id); + const std::string& fname); } // namespace rocksdb