From 26d67e357e45e61171e13e5626b88b38bb9eacb3 Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Thu, 28 Jun 2018 12:16:10 -0700 Subject: [PATCH] Support group commits of version edits (#3944) Summary: This PR supports the group commit of multiple version edit entries corresponding to different column families. Column family drop/creation still cannot be grouped. This PR is a subset of [PR 3752](https://github.com/facebook/rocksdb/pull/3752). Closes https://github.com/facebook/rocksdb/pull/3944 Differential Revision: D8432536 Pulled By: riversand963 fbshipit-source-id: 8f11bd05193b6c0d9272d82e44b676abfac113cb --- cache/lru_cache.cc | 2 +- db/memtable_list.cc | 18 +- db/version_set.cc | 350 +++++++++++++-------- db/version_set.h | 32 +- db/version_set_test.cc | 123 ++++++++ include/rocksdb/utilities/transaction_db.h | 4 +- monitoring/statistics.h | 1 - 7 files changed, 391 insertions(+), 139 deletions(-) diff --git a/cache/lru_cache.cc b/cache/lru_cache.cc index efc423493..d4cbb9a45 100644 --- a/cache/lru_cache.cc +++ b/cache/lru_cache.cc @@ -510,7 +510,7 @@ void LRUCache::DisownData() { shards_ = nullptr; num_shards_ = 0; #endif -#else // __clang__ +#else // __clang__ #ifndef __SANITIZE_ADDRESS__ shards_ = nullptr; num_shards_ = 0; diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 7cb208beb..d89e31e76 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -41,7 +41,6 @@ void MemTableListVersion::UnrefMemTable(autovector* to_delete, to_delete->push_back(m); assert(*parent_memtable_list_memory_usage_ >= m->ApproximateMemoryUsage()); *parent_memtable_list_memory_usage_ -= m->ApproximateMemoryUsage(); - } else { } } @@ -401,7 +400,22 @@ Status MemTableList::InstallMemtableFlushResults( // All the later memtables that have the same filenum // are part of the same batch. They can be committed now. uint64_t mem_id = 1; // how many memtables have been flushed. - if (s.ok()) { // commit new state + + // commit new state only if the column family is NOT dropped. + // The reason is as follows (refer to + // ColumnFamilyTest.FlushAndDropRaceCondition). + // If the column family is dropped, then according to LogAndApply, its + // corrresponding flush operation is NOT written to the MANIFEST. This + // means the DB is not aware of the L0 files generated from the flush. + // By committing the new state, we remove the memtable from the memtable + // list. Creating an iterator on this column family will not be able to + // read full data since the memtable is removed, and the DB is not aware + // of the L0 files, causing MergingIterator unable to build child + // iterators. RocksDB contract requires that the iterator can be created + // on a dropped column family, and we must be able to + // read full data as long as column family handle is not deleted, even if + // the column family is dropped. + if (s.ok() && !cfd->IsDropped()) { // commit new state while (batch_count-- > 0) { MemTable* m = current_->memlist_.back(); ROCKS_LOG_BUFFER(log_buffer, "[%s] Level-0 commit table #%" PRIu64 diff --git a/db/version_set.cc b/db/version_set.cc index cb2a8eeec..5b32737ec 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2645,11 +2645,17 @@ struct VersionSet::ManifestWriter { bool done; InstrumentedCondVar cv; ColumnFamilyData* cfd; + const MutableCFOptions mutable_cf_options; const autovector& edit_list; explicit ManifestWriter(InstrumentedMutex* mu, ColumnFamilyData* _cfd, + const MutableCFOptions& cf_options, const autovector& e) - : done(false), cv(mu), cfd(_cfd), edit_list(e) {} + : done(false), + cv(mu), + cfd(_cfd), + mutable_cf_options(cf_options), + edit_list(e) {} }; VersionSet::VersionSet(const std::string& dbname, @@ -2724,90 +2730,78 @@ void VersionSet::AppendVersion(ColumnFamilyData* column_family_data, v->next_->prev_ = v; } -Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, - const MutableCFOptions& mutable_cf_options, - const autovector& edit_list, - InstrumentedMutex* mu, Directory* db_directory, - bool new_descriptor_log, - const ColumnFamilyOptions* new_cf_options) { - mu->AssertHeld(); - // num of edits - auto num_edits = edit_list.size(); - if (num_edits == 0) { - return Status::OK(); - } else if (num_edits > 1) { -#ifndef NDEBUG - // no group commits for column family add or drop - for (auto& edit : edit_list) { - assert(!edit->IsColumnFamilyManipulation()); - } -#endif - } - - // column_family_data can be nullptr only if this is column_family_add. - // in that case, we also need to specify ColumnFamilyOptions - if (column_family_data == nullptr) { - assert(num_edits == 1); - assert(edit_list[0]->is_column_family_add_); - assert(new_cf_options != nullptr); - } +Status VersionSet::ProcessManifestWrites( + std::deque& writers, InstrumentedMutex* mu, + Directory* db_directory, bool new_descriptor_log, + const ColumnFamilyOptions* new_cf_options) { + assert(!writers.empty()); + ManifestWriter& first_writer = writers.front(); + ManifestWriter* last_writer = &first_writer; - // queue our request - ManifestWriter w(mu, column_family_data, edit_list); - manifest_writers_.push_back(&w); - while (!w.done && &w != manifest_writers_.front()) { - w.cv.Wait(); - } - if (w.done) { - return w.status; - } - if (column_family_data != nullptr && column_family_data->IsDropped()) { - // if column family is dropped by the time we get here, no need to write - // anything to the manifest - manifest_writers_.pop_front(); - // Notify new head of write queue - if (!manifest_writers_.empty()) { - manifest_writers_.front()->cv.Signal(); - } - // we steal this code to also inform about cf-drop - return Status::ShutdownInProgress(); - } + assert(!manifest_writers_.empty()); + assert(manifest_writers_.front() == &first_writer); autovector batch_edits; - Version* v = nullptr; - std::unique_ptr builder_guard(nullptr); - - // process all requests in the queue - ManifestWriter* last_writer = &w; - assert(!manifest_writers_.empty()); - assert(manifest_writers_.front() == &w); - if (w.edit_list.front()->IsColumnFamilyManipulation()) { - // no group commits for column family add or drop - LogAndApplyCFHelper(w.edit_list.front()); - batch_edits.push_back(w.edit_list.front()); + autovector versions; + autovector mutable_cf_options_ptrs; + std::vector> builder_guards; + + if (first_writer.edit_list.front()->IsColumnFamilyManipulation()) { + // No group commits for column family add or drop + LogAndApplyCFHelper(first_writer.edit_list.front()); + batch_edits.push_back(first_writer.edit_list.front()); } else { - v = new Version(column_family_data, this, env_options_, mutable_cf_options, - current_version_number_++); - builder_guard.reset(new BaseReferencedVersionBuilder(column_family_data)); - auto* builder = builder_guard->version_builder(); - for (const auto& writer : manifest_writers_) { - if (writer->edit_list.front()->IsColumnFamilyManipulation() || - writer->cfd->GetID() != column_family_data->GetID()) { + auto it = manifest_writers_.cbegin(); + while (it != manifest_writers_.cend()) { + if ((*it)->edit_list.front()->IsColumnFamilyManipulation()) { // no group commits for column family add or drop - // also, group commits across column families are not supported break; } - last_writer = writer; - for (const auto& edit : writer->edit_list) { - LogAndApplyHelper(column_family_data, builder, v, edit, mu); - batch_edits.push_back(edit); + last_writer = *(it++); + assert(last_writer != nullptr); + if (last_writer->cfd != nullptr && last_writer->cfd->IsDropped()) { + continue; + } + // We do a linear search on versions because versions is small. + // TODO(yanqin) maybe consider unordered_map + Version* version = nullptr; + VersionBuilder* builder = nullptr; + for (int i = 0; i != static_cast(versions.size()); ++i) { + uint32_t cf_id = last_writer->cfd->GetID(); + if (versions[i]->cfd()->GetID() == cf_id) { + version = versions[i]; + assert(!builder_guards.empty() && + builder_guards.size() == versions.size()); + builder = builder_guards[i]->version_builder(); + TEST_SYNC_POINT_CALLBACK( + "VersionSet::ProcessManifestWrites:SameColumnFamily", &cf_id); + break; + } + } + if (version == nullptr) { + version = new Version(last_writer->cfd, this, env_options_, + last_writer->mutable_cf_options, + current_version_number_++); + versions.push_back(version); + mutable_cf_options_ptrs.push_back(&last_writer->mutable_cf_options); + builder_guards.emplace_back( + new BaseReferencedVersionBuilder(last_writer->cfd)); + builder = builder_guards.back()->version_builder(); + } + assert(builder != nullptr); // make checker happy + for (const auto& e : last_writer->edit_list) { + LogAndApplyHelper(last_writer->cfd, builder, version, e, mu); + batch_edits.push_back(e); } } - builder->SaveTo(v->storage_info()); + for (int i = 0; i < static_cast(versions.size()); ++i) { + assert(!builder_guards.empty() && + builder_guards.size() == versions.size()); + auto* builder = builder_guards[i]->version_builder(); + builder->SaveTo(versions[i]->storage_info()); + } } - // Initialize new descriptor log file if necessary by creating - // a temporary file that contains a snapshot of the current version. uint64_t new_manifest_file_size = 0; Status s; @@ -2822,39 +2816,39 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, } if (new_descriptor_log) { - // if we're writing out new snapshot make sure to persist max column family + // if we are writing out new snapshot make sure to persist max column + // family. if (column_family_set_->GetMaxColumnFamily() > 0) { - w.edit_list.front()->SetMaxColumnFamily( + first_writer.edit_list.front()->SetMaxColumnFamily( column_family_set_->GetMaxColumnFamily()); } } - // Unlock during expensive operations. New writes cannot get here - // because &w is ensuring that all new writes get queued. { EnvOptions opt_env_opts = env_->OptimizeForManifestWrite(env_options_); - // Before releasing mutex, make a copy of mutable_cf_options and pass to - // `PrepareApply` to avoided a potential data race with backgroundflush - MutableCFOptions mutable_cf_options_copy(mutable_cf_options); mu->Unlock(); TEST_SYNC_POINT("VersionSet::LogAndApply:WriteManifest"); - if (!w.edit_list.front()->IsColumnFamilyManipulation() && - this->GetColumnFamilySet()->get_table_cache()->GetCapacity() == + if (!first_writer.edit_list.front()->IsColumnFamilyManipulation() && + column_family_set_->get_table_cache()->GetCapacity() == TableCache::kInfiniteCapacity) { - // unlimited table cache. Pre-load table handle now. - // Need to do it out of the mutex. - builder_guard->version_builder()->LoadTableHandlers( - column_family_data->internal_stats(), - column_family_data->ioptions()->optimize_filters_for_hits, - true /* prefetch_index_and_filter_in_cache */, - mutable_cf_options.prefix_extractor.get()); + for (int i = 0; i < static_cast(versions.size()); ++i) { + assert(!builder_guards.empty() && + builder_guards.size() == versions.size()); + assert(!mutable_cf_options_ptrs.empty() && + builder_guards.size() == versions.size()); + ColumnFamilyData* cfd = versions[i]->cfd_; + builder_guards[i]->version_builder()->LoadTableHandlers( + cfd->internal_stats(), cfd->ioptions()->optimize_filters_for_hits, + true /* prefetch_index_and_filter_in_cache */, + mutable_cf_options_ptrs[i]->prefix_extractor.get()); + } } // This is fine because everything inside of this block is serialized -- // only one thread can be here at the same time if (new_descriptor_log) { - // create manifest file + // create new manifest file ROCKS_LOG_INFO(db_options_->info_log, "Creating manifest %" PRIu64 "\n", pending_manifest_file_number_); unique_ptr descriptor_file; @@ -2873,18 +2867,19 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, } } - if (!w.edit_list.front()->IsColumnFamilyManipulation()) { - // This is cpu-heavy operations, which should be called outside mutex. - v->PrepareApply(mutable_cf_options_copy, true); + if (!first_writer.edit_list.front()->IsColumnFamilyManipulation()) { + for (int i = 0; i < static_cast(versions.size()); ++i) { + versions[i]->PrepareApply(*mutable_cf_options_ptrs[i], true); + } } - // Write new record to MANIFEST log + // Write new records to MANIFEST log if (s.ok()) { for (auto& e : batch_edits) { std::string record; if (!e->EncodeTo(&record)) { - s = Status::Corruption( - "Unable to Encode VersionEdit:" + e->DebugString(true)); + s = Status::Corruption("Unable to encode VersionEdit:" + + e->DebugString(true)); break; } TEST_KILL_RANDOM("VersionSet::LogAndApply:BeforeAddRecord", @@ -2898,7 +2893,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, s = SyncManifest(env_, db_options_, descriptor_log_->file()); } if (!s.ok()) { - ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write: %s\n", + ROCKS_LOG_ERROR(db_options_->info_log, "MANIFEST write %s\n", s.ToString().c_str()); } } @@ -2915,7 +2910,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, new_manifest_file_size = descriptor_log_->file()->GetFileSize(); } - if (w.edit_list.front()->is_column_family_drop_) { + if (first_writer.edit_list.front()->is_column_family_drop_) { TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:0"); TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:1"); TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:2"); @@ -2926,25 +2921,24 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, mu->Lock(); } - // Append the old mainfest file to the obsolete_manifests_ list to be deleted + // Append the old manifest file to the obsolete_manifest_ list to be deleted // by PurgeObsoleteFiles later. if (s.ok() && new_descriptor_log) { obsolete_manifests_.emplace_back( DescriptorFileName("", manifest_file_number_)); } - // Install the new version + // Install the new versions if (s.ok()) { - if (w.edit_list.front()->is_column_family_add_) { - // no group commit on column family add + if (first_writer.edit_list.front()->is_column_family_add_) { assert(batch_edits.size() == 1); assert(new_cf_options != nullptr); - CreateColumnFamily(*new_cf_options, w.edit_list.front()); - } else if (w.edit_list.front()->is_column_family_drop_) { + CreateColumnFamily(*new_cf_options, first_writer.edit_list.front()); + } else if (first_writer.edit_list.front()->is_column_family_drop_) { assert(batch_edits.size() == 1); - column_family_data->SetDropped(); - if (column_family_data->Unref()) { - delete column_family_data; + first_writer.cfd->SetDropped(); + if (first_writer.cfd->Unref()) { + delete first_writer.cfd; } } else { uint64_t max_log_number_in_batch = 0; @@ -2960,60 +2954,158 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, } } if (max_log_number_in_batch != 0) { - assert(column_family_data->GetLogNumber() <= max_log_number_in_batch); - column_family_data->SetLogNumber(max_log_number_in_batch); + for (int i = 0; i < static_cast(versions.size()); ++i) { + ColumnFamilyData* cfd = versions[i]->cfd_; + assert(cfd->GetLogNumber() <= max_log_number_in_batch); + cfd->SetLogNumber(max_log_number_in_batch); + } } + if (last_min_log_number_to_keep != 0) { // Should only be set in 2PC mode. MarkMinLogNumberToKeep2PC(last_min_log_number_to_keep); } - AppendVersion(column_family_data, v); + for (int i = 0; i < static_cast(versions.size()); ++i) { + ColumnFamilyData* cfd = versions[i]->cfd_; + AppendVersion(cfd, versions[i]); + } } - manifest_file_number_ = pending_manifest_file_number_; manifest_file_size_ = new_manifest_file_size; - prev_log_number_ = w.edit_list.front()->prev_log_number_; + prev_log_number_ = first_writer.edit_list.front()->prev_log_number_; } else { std::string version_edits; for (auto& e : batch_edits) { - version_edits = version_edits + "\n" + e->DebugString(true); + version_edits += ("\n" + e->DebugString(true)); + } + ROCKS_LOG_ERROR(db_options_->info_log, + "Error in committing version edit to MANIFEST: %s", + version_edits.c_str()); + for (auto v : versions) { + delete v; } - ROCKS_LOG_ERROR( - db_options_->info_log, - "[%s] Error in committing version edit to MANIFEST: %s", - column_family_data ? column_family_data->GetName().c_str() : "", - version_edits.c_str()); - delete v; if (new_descriptor_log) { - ROCKS_LOG_INFO(db_options_->info_log, "Deleting manifest %" PRIu64 - " current manifest %" PRIu64 "\n", + ROCKS_LOG_INFO(db_options_->info_log, + "Deleting manifest %" PRIu64 " current manifest %" PRIu64 + "\n", manifest_file_number_, pending_manifest_file_number_); descriptor_log_.reset(); env_->DeleteFile( DescriptorFileName(dbname_, pending_manifest_file_number_)); } } + pending_manifest_file_number_ = 0; // wake up all the waiting writers while (true) { ManifestWriter* ready = manifest_writers_.front(); manifest_writers_.pop_front(); - if (ready != &w) { - ready->status = s; - ready->done = true; + bool need_signal = true; + for (const auto& w : writers) { + if (&w == ready) { + need_signal = false; + break; + } + } + ready->status = s; + ready->done = true; + if (need_signal) { ready->cv.Signal(); } - if (ready == last_writer) break; + if (ready == last_writer) { + break; + } } - // Notify new head of write queue if (!manifest_writers_.empty()) { manifest_writers_.front()->cv.Signal(); } return s; } +// 'datas' is gramatically incorrect. We still use this notation is to indicate +// that this variable represents a collection of column_family_data. +Status VersionSet::LogAndApply( + const std::vector& column_family_datas, + const std::vector& mutable_cf_options_list, + const std::vector>& edit_lists, + InstrumentedMutex* mu, Directory* db_directory, bool new_descriptor_log, + const ColumnFamilyOptions* new_cf_options) { + mu->AssertHeld(); + int num_edits = 0; + for (const auto& elist : edit_lists) { + num_edits += static_cast(elist.size()); + } + if (num_edits == 0) { + return Status::OK(); + } else if (num_edits > 1) { +#ifndef NDEBUG + for (const auto& edit_list : edit_lists) { + for (const auto& edit : edit_list) { + assert(!edit->IsColumnFamilyManipulation()); + } + } +#endif /* ! NDEBUG */ + } + + int num_cfds = static_cast(column_family_datas.size()); + if (num_cfds == 1 && column_family_datas[0] == nullptr) { + assert(edit_lists.size() == 1 && edit_lists[0].size() == 1); + assert(edit_lists[0][0]->is_column_family_add_); + assert(new_cf_options != nullptr); + } + std::deque writers; + if (num_cfds > 0) { + assert(static_cast(num_cfds) == mutable_cf_options_list.size()); + assert(static_cast(num_cfds) == edit_lists.size()); + } + for (int i = 0; i < num_cfds; ++i) { + writers.emplace_back(mu, column_family_datas[i], mutable_cf_options_list[i], + edit_lists[i]); + manifest_writers_.push_back(&writers[i]); + } + assert(!writers.empty()); + ManifestWriter& first_writer = writers.front(); + while (!first_writer.done && &first_writer != manifest_writers_.front()) { + first_writer.cv.Wait(); + } + if (first_writer.done) { + // All non-CF-manipulation operations can be grouped together and committed + // to MANIFEST. They should all have finished. The status code is stored in + // the first manifest writer. +#ifndef NDEBUG + for (const auto& writer : writers) { + assert(writer.done); + } +#endif /* !NDEBUG */ + return first_writer.status; + } + + int num_undropped_cfds = 0; + for (auto cfd : column_family_datas) { + // if cfd == nullptr, it is a column family add. + if (cfd == nullptr || !cfd->IsDropped()) { + ++num_undropped_cfds; + } + } + if (0 == num_undropped_cfds) { + // TODO (yanqin) maybe use a different status code to denote column family + // drop other than OK and ShutdownInProgress + for (int i = 0; i != num_cfds; ++i) { + manifest_writers_.pop_front(); + } + // Notify new head of manifest write queue. + if (!manifest_writers_.empty()) { + manifest_writers_.front()->cv.Signal(); + } + return Status::OK(); + } + + return ProcessManifestWrites(writers, mu, db_directory, new_descriptor_log, + new_cf_options); +} + void VersionSet::LogAndApplyCFHelper(VersionEdit* edit) { assert(edit->IsColumnFamilyManipulation()); edit->SetNextFile(next_file_number_.load()); @@ -4023,7 +4115,7 @@ InternalIterator* VersionSet::MakeInputIterator( nullptr /* table_reader_ptr */, nullptr /* no per level latency histogram */, true /* for_compaction */, nullptr /* arena */, - false /* skip_filters */, (int)which /* level */); + false /* skip_filters */, static_cast(which) /* level */); } } else { // Create concatenating iterator for the files from this level @@ -4034,7 +4126,7 @@ InternalIterator* VersionSet::MakeInputIterator( false /* should_sample */, nullptr /* no per level latency histogram */, true /* for_compaction */, false /* skip_filters */, - (int)which /* level */, range_del_agg); + static_cast(which) /* level */, range_del_agg); } } } diff --git a/db/version_set.h b/db/version_set.h index 0a5fa504d..7abea9855 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -748,9 +748,11 @@ class VersionSet { InstrumentedMutex* mu, Directory* db_directory = nullptr, bool new_descriptor_log = false, const ColumnFamilyOptions* column_family_options = nullptr) { - autovector edit_list; - edit_list.push_back(edit); - return LogAndApply(column_family_data, mutable_cf_options, edit_list, mu, + std::vector cfds(1, column_family_data); + std::vector mutable_cf_options_list(1, + mutable_cf_options); + std::vector> edit_lists(1, {edit}); + return LogAndApply(cfds, mutable_cf_options_list, edit_lists, mu, db_directory, new_descriptor_log, column_family_options); } // The batch version. If edit_list.size() > 1, caller must ensure that @@ -760,7 +762,24 @@ class VersionSet { const MutableCFOptions& mutable_cf_options, const autovector& edit_list, InstrumentedMutex* mu, Directory* db_directory = nullptr, bool new_descriptor_log = false, - const ColumnFamilyOptions* column_family_options = nullptr); + const ColumnFamilyOptions* column_family_options = nullptr) { + std::vector cfds(1, column_family_data); + std::vector mutable_cf_options_list(1, + mutable_cf_options); + std::vector> edit_lists(1, edit_list); + return LogAndApply(cfds, mutable_cf_options_list, edit_lists, mu, + db_directory, new_descriptor_log, column_family_options); + } + + // The across-multi-cf batch version. If edit_lists contain more than + // 1 version edits, caller must ensure that no edit in the []list is column + // family manipulation. + Status LogAndApply(const std::vector& cfds, + const std::vector& mutable_cf_options, + const std::vector>& edit_lists, + InstrumentedMutex* mu, Directory* db_directory = nullptr, + bool new_descriptor_log = false, + const ColumnFamilyOptions* new_cf_options = nullptr); // Recover the last saved descriptor from persistent storage. // If read_only == true, Recover() will not complain if some column families @@ -965,6 +984,11 @@ class VersionSet { ColumnFamilyData* CreateColumnFamily(const ColumnFamilyOptions& cf_options, VersionEdit* edit); + Status ProcessManifestWrites(std::deque& writers, + InstrumentedMutex* mu, Directory* db_directory, + bool new_descriptor_log, + const ColumnFamilyOptions* new_cf_options); + std::unique_ptr column_family_set_; Env* const env_; diff --git a/db/version_set_test.cc b/db/version_set_test.cc index 090e074cf..996de0dd6 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -8,6 +8,8 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "db/version_set.h" +#include "db/log_writer.h" +#include "table/mock_table.h" #include "util/logging.h" #include "util/testharness.h" #include "util/testutil.h" @@ -452,6 +454,127 @@ TEST_F(FindLevelFileTest, LevelOverlappingFiles) { ASSERT_TRUE(Overlaps("600", "700")); } +class ManifestWriterTest : public testing::Test { + public: + ManifestWriterTest() + : env_(Env::Default()), + dbname_(test::TmpDir() + "/version_set_test"), + db_options_(), + mutable_cf_options_(cf_options_), + table_cache_(NewLRUCache(50000, 16)), + write_buffer_manager_(db_options_.db_write_buffer_size), + versions_(new VersionSet(dbname_, &db_options_, env_options_, + table_cache_.get(), &write_buffer_manager_, + &write_controller_)), + shutting_down_(false), + mock_table_factory_(std::make_shared()) { + EXPECT_OK(env_->CreateDirIfMissing(dbname_)); + db_options_.db_paths.emplace_back(dbname_, + std::numeric_limits::max()); + } + + // Create DB with 3 column families. + void NewDB() { + VersionEdit new_db; + new_db.SetLogNumber(0); + new_db.SetNextFile(2); + new_db.SetLastSequence(0); + + const std::vector cf_names = {kDefaultColumnFamilyName, + "alice", "bob"}; + const int kInitialNumOfCfs = static_cast(cf_names.size()); + autovector new_cfs; + uint64_t last_seq = 1; + uint32_t cf_id = 1; + for (int i = 1; i != kInitialNumOfCfs; ++i) { + VersionEdit new_cf; + new_cf.AddColumnFamily(cf_names[i]); + new_cf.SetColumnFamily(cf_id++); + new_cf.SetLogNumber(0); + new_cf.SetNextFile(2); + new_cf.SetLastSequence(last_seq++); + new_cfs.emplace_back(new_cf); + } + + const std::string manifest = DescriptorFileName(dbname_, 1); + unique_ptr file; + Status s = env_->NewWritableFile( + manifest, &file, env_->OptimizeForManifestWrite(env_options_)); + ASSERT_OK(s); + unique_ptr file_writer( + new WritableFileWriter(std::move(file), env_options_)); + { + log::Writer log(std::move(file_writer), 0, false); + std::string record; + new_db.EncodeTo(&record); + s = log.AddRecord(record); + for (const auto& e : new_cfs) { + e.EncodeTo(&record); + s = log.AddRecord(record); + ASSERT_OK(s); + } + } + ASSERT_OK(s); + // Make "CURRENT" file point to the new manifest file. + s = SetCurrentFile(env_, dbname_, 1, nullptr); + + std::vector column_families; + cf_options_.table_factory = mock_table_factory_; + for (const auto& cf_name : cf_names) { + column_families.emplace_back(cf_name, cf_options_); + } + + EXPECT_OK(versions_->Recover(column_families, false)); + EXPECT_EQ(kInitialNumOfCfs, + versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); + for (auto cfd : *versions_->GetColumnFamilySet()) { + cfds_.emplace_back(cfd); + } + } + + Env* env_; + const std::string dbname_; + EnvOptions env_options_; + ImmutableDBOptions db_options_; + ColumnFamilyOptions cf_options_; + MutableCFOptions mutable_cf_options_; + std::shared_ptr table_cache_; + WriteController write_controller_; + WriteBufferManager write_buffer_manager_; + std::unique_ptr versions_; + InstrumentedMutex mutex_; + std::atomic shutting_down_; + std::shared_ptr mock_table_factory_; + std::vector cfds_; +}; + +TEST_F(ManifestWriterTest, SameColumnFamilyGroupCommit) { + NewDB(); + const int kGroupSize = 5; + std::vector edits(kGroupSize); + std::vector cfds(kGroupSize, cfds_[0]); + std::vector all_mutable_cf_options(kGroupSize, + mutable_cf_options_); + std::vector> edit_lists(kGroupSize); + for (int i = 0; i != kGroupSize; ++i) { + edit_lists[i].emplace_back(&edits[i]); + } + + int count = 0; + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::ProcessManifestWrites:SameColumnFamily", [&](void* arg) { + uint32_t* cf_id = reinterpret_cast(arg); + EXPECT_EQ(0, *cf_id); + ++count; + }); + SyncPoint::GetInstance()->EnableProcessing(); + mutex_.Lock(); + Status s = + versions_->LogAndApply(cfds, all_mutable_cf_options, edit_lists, &mutex_); + mutex_.Unlock(); + EXPECT_OK(s); + EXPECT_EQ(kGroupSize - 1, count); +} } // namespace rocksdb int main(int argc, char** argv) { diff --git a/include/rocksdb/utilities/transaction_db.h b/include/rocksdb/utilities/transaction_db.h index 6bec03eac..92b2d98a5 100644 --- a/include/rocksdb/utilities/transaction_db.h +++ b/include/rocksdb/utilities/transaction_db.h @@ -171,8 +171,8 @@ struct DeadlockPath { bool limit_exceeded; int64_t deadlock_time; - explicit DeadlockPath( - std::vector path_entry, const int64_t& dl_time) + explicit DeadlockPath(std::vector path_entry, + const int64_t& dl_time) : path(path_entry), limit_exceeded(false), deadlock_time(dl_time) {} // empty path, limit exceeded constructor and default constructor diff --git a/monitoring/statistics.h b/monitoring/statistics.h index 8e67657a3..4427c8c54 100644 --- a/monitoring/statistics.h +++ b/monitoring/statistics.h @@ -39,7 +39,6 @@ enum HistogramsInternal : uint32_t { INTERNAL_HISTOGRAM_ENUM_MAX }; - class StatisticsImpl : public Statistics { public: StatisticsImpl(std::shared_ptr stats,