From b088c83e6e514f03998cff68630afd22f917d016 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Thu, 19 Mar 2015 17:04:29 -0700 Subject: [PATCH] Don't delete files when column family is dropped Summary: To understand the bug read t5943287 and check out the new test in column_family_test (ReadDroppedColumnFamily), iter 0. RocksDB contract allowes you to read a drop column family as long as there is a live reference. However, since our iteration ignores dropped column families, AddLiveFiles() didn't mark files of a dropped column families as live. So we deleted them. In this patch I no longer ignore dropped column families in the iteration. I think this behavior was confusing and it also led to this bug. Now if an iterator client wants to ignore dropped column families, he needs to do it explicitly. Test Plan: Added a new unit test that is failing on master. Unit test succeeds now. Reviewers: sdong, rven, yhchiang Reviewed By: yhchiang Subscribers: dhruba, leveldb Differential Revision: https://reviews.facebook.net/D32535 --- HISTORY.md | 1 + db/column_family.h | 15 ++++++---- db/column_family_test.cc | 60 +++++++++++++++++++++++++++++++++++++++- db/db_filesnapshot.cc | 6 ++++ db/db_impl.cc | 36 ++++++++++++++---------- db/version_set.cc | 15 ++++++++++ 6 files changed, 111 insertions(+), 22 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 5759dfba2..cd9818174 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -18,6 +18,7 @@ * Fixed a bug in ReadOnlyBackupEngine that deleted corrupted backups in some cases, even though the engine was ReadOnly * options.level_compaction_dynamic_level_bytes, a feature to allow RocksDB to pick dynamic base of bytes for levels. With this feature turned on, we will automatically adjust max bytes for each level. The goal of this feature is to have lower bound on size amplification. For more details, see comments in options.h. * Added an abstract base class WriteBatchBase for write batches +* Fixed a bug where we start deleting files of a dropped column families even if there are still live references to it ### Public API changes * Deprecated skip_log_error_on_recovery option diff --git a/db/column_family.h b/db/column_family.h index ee615e045..aad75d681 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -162,11 +162,11 @@ class ColumnFamilyData { // until client drops the column family handle. That way, client can still // access data from dropped column family. // Column family can be dropped and still alive. In that state: - // *) Column family is not included in the iteration. // *) Compaction and flush is not executed on the dropped column family. // *) Client can continue reading from column family. Writes will fail unless // WriteOptions::ignore_missing_column_families is true // When the dropped column family is unreferenced, then we: + // *) Remove column family from the linked list maintained by ColumnFamilySet // *) delete all memory associated with that column family // *) delete all the files associated with that column family void SetDropped(); @@ -331,8 +331,9 @@ class ColumnFamilyData { // This needs to be destructed before mutex_ std::unique_ptr local_sv_; - // pointers for a circular linked list. we use it to support iterations - // that can be concurrent with writes + // pointers for a circular linked list. we use it to support iterations over + // all column families that are alive (note: dropped column families can also + // be alive as long as client holds a reference) ColumnFamilyData* next_; ColumnFamilyData* prev_; @@ -383,11 +384,13 @@ class ColumnFamilySet { explicit iterator(ColumnFamilyData* cfd) : current_(cfd) {} iterator& operator++() { - // dummy is never dead or dropped, so this will never be infinite + // dropped column families might still be included in this iteration + // (we're only removing them when client drops the last reference to the + // column family). + // dummy is never dead, so this will never be infinite do { current_ = current_->next_; - } while (current_->refs_.load(std::memory_order_relaxed) == 0 || - current_->IsDropped()); + } while (current_->refs_.load(std::memory_order_relaxed) == 0); return *this; } bool operator!=(const iterator& other) { diff --git a/db/column_family_test.cc b/db/column_family_test.cc index 0f4f94990..010933959 100644 --- a/db/column_family_test.cc +++ b/db/column_family_test.cc @@ -12,8 +12,9 @@ #include #include "db/db_impl.h" -#include "rocksdb/env.h" #include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "rocksdb/iterator.h" #include "util/testharness.h" #include "util/testutil.h" #include "util/coding.h" @@ -1040,6 +1041,63 @@ TEST_F(ColumnFamilyTest, SanitizeOptions) { } } +TEST_F(ColumnFamilyTest, ReadDroppedColumnFamily) { + // iter 0 -- drop CF, don't reopen + // iter 1 -- delete CF, reopen + for (int iter = 0; iter < 2; ++iter) { + db_options_.create_missing_column_families = true; + db_options_.max_open_files = 20; + // delete obsolete files always + db_options_.delete_obsolete_files_period_micros = 0; + Open({"default", "one", "two"}); + ColumnFamilyOptions options; + options.level0_file_num_compaction_trigger = 100; + options.level0_slowdown_writes_trigger = 200; + options.level0_stop_writes_trigger = 200; + options.write_buffer_size = 100000; // small write buffer size + Reopen({options, options, options}); + + // 1MB should create ~10 files for each CF + int kKeysNum = 10000; + PutRandomData(0, kKeysNum, 100); + PutRandomData(1, kKeysNum, 100); + PutRandomData(2, kKeysNum, 100); + + if (iter == 0) { + // Drop CF two + ASSERT_OK(db_->DropColumnFamily(handles_[2])); + } else { + // delete CF two + delete handles_[2]; + handles_[2] = nullptr; + } + + // Add bunch more data to other CFs + PutRandomData(0, kKeysNum, 100); + PutRandomData(1, kKeysNum, 100); + + if (iter == 1) { + Reopen(); + } + + // Since we didn't delete CF handle, RocksDB's contract guarantees that + // we're still able to read dropped CF + for (int i = 0; i < 3; ++i) { + std::unique_ptr iterator( + db_->NewIterator(ReadOptions(), handles_[i])); + int count = 0; + for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) { + ASSERT_OK(iterator->status()); + ++count; + } + ASSERT_EQ(count, kKeysNum * ((i == 2) ? 1 : 2)); + } + + Close(); + Destroy(); + } +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_filesnapshot.cc b/db/db_filesnapshot.cc index b5d4866ae..c72430301 100644 --- a/db/db_filesnapshot.cc +++ b/db/db_filesnapshot.cc @@ -92,6 +92,9 @@ Status DBImpl::GetLiveFiles(std::vector& ret, // flush all dirty data to disk. Status status; for (auto cfd : *versions_->GetColumnFamilySet()) { + if (cfd->IsDropped()) { + continue; + } cfd->Ref(); mutex_.Unlock(); status = FlushMemTable(cfd, FlushOptions()); @@ -114,6 +117,9 @@ Status DBImpl::GetLiveFiles(std::vector& ret, // Make a set of all of the live *.sst files std::vector live; for (auto cfd : *versions_->GetColumnFamilySet()) { + if (cfd->IsDropped()) { + continue; + } cfd->current()->AddLiveFiles(&live); } diff --git a/db/db_impl.cc b/db/db_impl.cc index 5aef74569..a6761cadb 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -265,7 +265,7 @@ DBImpl::~DBImpl() { if (flush_on_destroy_) { for (auto cfd : *versions_->GetColumnFamilySet()) { - if (!cfd->mem()->IsEmpty()) { + if (!cfd->IsDropped() && !cfd->mem()->IsEmpty()) { cfd->Ref(); mutex_.Unlock(); FlushMemTable(cfd, FlushOptions()); @@ -1944,8 +1944,7 @@ void DBImpl::BackgroundCallFlush() { auto pending_outputs_inserted_elem = CaptureCurrentFileNumberInPendingOutputs(); - Status s; - s = BackgroundFlush(&madeProgress, &job_context, &log_buffer); + Status s = BackgroundFlush(&madeProgress, &job_context, &log_buffer); if (!s.ok() && !s.IsShutdownInProgress()) { // Wait a little bit before retrying background flush in // case this is an environmental problem and we do not want to @@ -1967,9 +1966,9 @@ void DBImpl::BackgroundCallFlush() { ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); - // If !s.ok(), this means that Flush failed. In that case, we want - // to delete all obsolete files and we force FindObsoleteFiles() - FindObsoleteFiles(&job_context, !s.ok()); + // If flush failed, we want to delete all temporary files that we might have + // created. Thus, we force full scan in FindObsoleteFiles() + FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress()); // delete unnecessary files if any, this is done outside the mutex if (job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) { mutex_.Unlock(); @@ -2011,8 +2010,7 @@ void DBImpl::BackgroundCallCompaction() { CaptureCurrentFileNumberInPendingOutputs(); assert(bg_compaction_scheduled_); - Status s; - s = BackgroundCompaction(&madeProgress, &job_context, &log_buffer); + Status s = BackgroundCompaction(&madeProgress, &job_context, &log_buffer); if (!s.ok() && !s.IsShutdownInProgress()) { // Wait a little bit before retrying background compaction in // case this is an environmental problem and we do not want to @@ -2034,11 +2032,10 @@ void DBImpl::BackgroundCallCompaction() { ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); - // If !s.ok(), this means that Compaction failed. In that case, we want - // to delete all obsolete files we might have created and we force - // FindObsoleteFiles(). This is because job_context does not - // catch all created files if compaction failed. - FindObsoleteFiles(&job_context, !s.ok()); + // If compaction failed, we want to delete all temporary files that we might + // have created (they might not be all recorded in job_context in case of a + // failure). Thus, we force full scan in FindObsoleteFiles() + FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress()); // delete unnecessary files if any, this is done outside the mutex if (job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) { @@ -2124,7 +2121,10 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, db_options_.max_background_compactions - bg_compaction_scheduled_); auto flush_status = BackgroundFlush(madeProgress, job_context, log_buffer); - if (!flush_status.ok()) { + // the second condition will be false when a column family is dropped. we + // don't want to fail compaction because of that (because it might be a + // different column family) + if (!flush_status.ok() && !flush_status.IsShutdownInProgress()) { if (is_manual) { manual_compaction_->status = flush_status; manual_compaction_->done = true; @@ -2756,7 +2756,7 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) { // is_snapshot_supported_. bool new_is_snapshot_supported = true; for (auto c : *versions_->GetColumnFamilySet()) { - if (!c->mem()->IsSnapshotSupported()) { + if (!c->IsDropped() && !c->mem()->IsSnapshotSupported()) { new_is_snapshot_supported = false; break; } @@ -3080,6 +3080,9 @@ Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) { // no need to refcount because drop is happening in write thread, so can't // happen while we're in the write thread for (auto cfd : *versions_->GetColumnFamilySet()) { + if (cfd->IsDropped()) { + continue; + } if (cfd->GetLogNumber() <= flush_column_family_if_log_file) { status = SetNewMemtableAndNewLogFile(cfd, &context); if (!status.ok()) { @@ -3098,6 +3101,9 @@ Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) { // no need to refcount because drop is happening in write thread, so can't // happen while we're in the write thread for (auto cfd : *versions_->GetColumnFamilySet()) { + if (cfd->IsDropped()) { + continue; + } if (!cfd->mem()->IsEmpty()) { status = SetNewMemtableAndNewLogFile(cfd, &context); if (!status.ok()) { diff --git a/db/version_set.cc b/db/version_set.cc index fa64a1d2e..c1ad0bff2 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2244,6 +2244,9 @@ Status VersionSet::Recover( if (s.ok()) { for (auto cfd : *column_family_set_) { + if (cfd->IsDropped()) { + continue; + } auto builders_iter = builders.find(cfd->GetID()); assert(builders_iter != builders.end()); auto* builder = builders_iter->second->version_builder(); @@ -2279,6 +2282,9 @@ Status VersionSet::Recover( column_family_set_->GetMaxColumnFamily()); for (auto cfd : *column_family_set_) { + if (cfd->IsDropped()) { + continue; + } Log(InfoLogLevel::INFO_LEVEL, db_options_->info_log, "Column family [%s] (ID %u), log number is %" PRIu64 "\n", cfd->GetName().c_str(), cfd->GetID(), cfd->GetLogNumber()); @@ -2585,6 +2591,9 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, if (s.ok()) { for (auto cfd : *column_family_set_) { + if (cfd->IsDropped()) { + continue; + } auto builders_iter = builders.find(cfd->GetID()); assert(builders_iter != builders.end()); auto builder = builders_iter->second->version_builder(); @@ -2644,6 +2653,9 @@ Status VersionSet::WriteSnapshot(log::Writer* log) { // LogAndApply. Column family manipulations can only happen within LogAndApply // (the same single thread), so we're safe to iterate. for (auto cfd : *column_family_set_) { + if (cfd->IsDropped()) { + continue; + } { // Store column family info VersionEdit edit; @@ -2912,6 +2924,9 @@ Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel, void VersionSet::GetLiveFilesMetaData(std::vector* metadata) { for (auto cfd : *column_family_set_) { + if (cfd->IsDropped()) { + continue; + } for (int level = 0; level < cfd->NumberLevels(); level++) { for (const auto& file : cfd->current()->storage_info()->LevelFiles(level)) {