Don't delete files when column family is dropped

Summary:
To understand the bug read t5943287 and check out the new test in column_family_test (ReadDroppedColumnFamily), iter 0.

RocksDB contract allowes you to read a drop column family as long as there is a live reference. However, since our iteration ignores dropped column families, AddLiveFiles() didn't mark files of a dropped column families as live. So we deleted them.

In this patch I no longer ignore dropped column families in the iteration. I think this behavior was confusing and it also led to this bug. Now if an iterator client wants to ignore dropped column families, he needs to do it explicitly.

Test Plan: Added a new unit test that is failing on master. Unit test succeeds now.

Reviewers: sdong, rven, yhchiang

Reviewed By: yhchiang

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D32535
main
Igor Canadi 10 years ago
parent 17ae3fcbca
commit b088c83e6e
  1. 1
      HISTORY.md
  2. 15
      db/column_family.h
  3. 60
      db/column_family_test.cc
  4. 6
      db/db_filesnapshot.cc
  5. 36
      db/db_impl.cc
  6. 15
      db/version_set.cc

@ -18,6 +18,7 @@
* Fixed a bug in ReadOnlyBackupEngine that deleted corrupted backups in some cases, even though the engine was ReadOnly * Fixed a bug in ReadOnlyBackupEngine that deleted corrupted backups in some cases, even though the engine was ReadOnly
* options.level_compaction_dynamic_level_bytes, a feature to allow RocksDB to pick dynamic base of bytes for levels. With this feature turned on, we will automatically adjust max bytes for each level. The goal of this feature is to have lower bound on size amplification. For more details, see comments in options.h. * options.level_compaction_dynamic_level_bytes, a feature to allow RocksDB to pick dynamic base of bytes for levels. With this feature turned on, we will automatically adjust max bytes for each level. The goal of this feature is to have lower bound on size amplification. For more details, see comments in options.h.
* Added an abstract base class WriteBatchBase for write batches * Added an abstract base class WriteBatchBase for write batches
* Fixed a bug where we start deleting files of a dropped column families even if there are still live references to it
### Public API changes ### Public API changes
* Deprecated skip_log_error_on_recovery option * Deprecated skip_log_error_on_recovery option

@ -162,11 +162,11 @@ class ColumnFamilyData {
// until client drops the column family handle. That way, client can still // until client drops the column family handle. That way, client can still
// access data from dropped column family. // access data from dropped column family.
// Column family can be dropped and still alive. In that state: // Column family can be dropped and still alive. In that state:
// *) Column family is not included in the iteration.
// *) Compaction and flush is not executed on the dropped column family. // *) Compaction and flush is not executed on the dropped column family.
// *) Client can continue reading from column family. Writes will fail unless // *) Client can continue reading from column family. Writes will fail unless
// WriteOptions::ignore_missing_column_families is true // WriteOptions::ignore_missing_column_families is true
// When the dropped column family is unreferenced, then we: // When the dropped column family is unreferenced, then we:
// *) Remove column family from the linked list maintained by ColumnFamilySet
// *) delete all memory associated with that column family // *) delete all memory associated with that column family
// *) delete all the files associated with that column family // *) delete all the files associated with that column family
void SetDropped(); void SetDropped();
@ -331,8 +331,9 @@ class ColumnFamilyData {
// This needs to be destructed before mutex_ // This needs to be destructed before mutex_
std::unique_ptr<ThreadLocalPtr> local_sv_; std::unique_ptr<ThreadLocalPtr> local_sv_;
// pointers for a circular linked list. we use it to support iterations // pointers for a circular linked list. we use it to support iterations over
// that can be concurrent with writes // all column families that are alive (note: dropped column families can also
// be alive as long as client holds a reference)
ColumnFamilyData* next_; ColumnFamilyData* next_;
ColumnFamilyData* prev_; ColumnFamilyData* prev_;
@ -383,11 +384,13 @@ class ColumnFamilySet {
explicit iterator(ColumnFamilyData* cfd) explicit iterator(ColumnFamilyData* cfd)
: current_(cfd) {} : current_(cfd) {}
iterator& operator++() { iterator& operator++() {
// dummy is never dead or dropped, so this will never be infinite // dropped column families might still be included in this iteration
// (we're only removing them when client drops the last reference to the
// column family).
// dummy is never dead, so this will never be infinite
do { do {
current_ = current_->next_; current_ = current_->next_;
} while (current_->refs_.load(std::memory_order_relaxed) == 0 || } while (current_->refs_.load(std::memory_order_relaxed) == 0);
current_->IsDropped());
return *this; return *this;
} }
bool operator!=(const iterator& other) { bool operator!=(const iterator& other) {

@ -12,8 +12,9 @@
#include <string> #include <string>
#include "db/db_impl.h" #include "db/db_impl.h"
#include "rocksdb/env.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "util/testharness.h" #include "util/testharness.h"
#include "util/testutil.h" #include "util/testutil.h"
#include "util/coding.h" #include "util/coding.h"
@ -1040,6 +1041,63 @@ TEST_F(ColumnFamilyTest, SanitizeOptions) {
} }
} }
TEST_F(ColumnFamilyTest, ReadDroppedColumnFamily) {
// iter 0 -- drop CF, don't reopen
// iter 1 -- delete CF, reopen
for (int iter = 0; iter < 2; ++iter) {
db_options_.create_missing_column_families = true;
db_options_.max_open_files = 20;
// delete obsolete files always
db_options_.delete_obsolete_files_period_micros = 0;
Open({"default", "one", "two"});
ColumnFamilyOptions options;
options.level0_file_num_compaction_trigger = 100;
options.level0_slowdown_writes_trigger = 200;
options.level0_stop_writes_trigger = 200;
options.write_buffer_size = 100000; // small write buffer size
Reopen({options, options, options});
// 1MB should create ~10 files for each CF
int kKeysNum = 10000;
PutRandomData(0, kKeysNum, 100);
PutRandomData(1, kKeysNum, 100);
PutRandomData(2, kKeysNum, 100);
if (iter == 0) {
// Drop CF two
ASSERT_OK(db_->DropColumnFamily(handles_[2]));
} else {
// delete CF two
delete handles_[2];
handles_[2] = nullptr;
}
// Add bunch more data to other CFs
PutRandomData(0, kKeysNum, 100);
PutRandomData(1, kKeysNum, 100);
if (iter == 1) {
Reopen();
}
// Since we didn't delete CF handle, RocksDB's contract guarantees that
// we're still able to read dropped CF
for (int i = 0; i < 3; ++i) {
std::unique_ptr<Iterator> iterator(
db_->NewIterator(ReadOptions(), handles_[i]));
int count = 0;
for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
ASSERT_OK(iterator->status());
++count;
}
ASSERT_EQ(count, kKeysNum * ((i == 2) ? 1 : 2));
}
Close();
Destroy();
}
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -92,6 +92,9 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
// flush all dirty data to disk. // flush all dirty data to disk.
Status status; Status status;
for (auto cfd : *versions_->GetColumnFamilySet()) { for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
cfd->Ref(); cfd->Ref();
mutex_.Unlock(); mutex_.Unlock();
status = FlushMemTable(cfd, FlushOptions()); status = FlushMemTable(cfd, FlushOptions());
@ -114,6 +117,9 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
// Make a set of all of the live *.sst files // Make a set of all of the live *.sst files
std::vector<FileDescriptor> live; std::vector<FileDescriptor> live;
for (auto cfd : *versions_->GetColumnFamilySet()) { for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
cfd->current()->AddLiveFiles(&live); cfd->current()->AddLiveFiles(&live);
} }

@ -265,7 +265,7 @@ DBImpl::~DBImpl() {
if (flush_on_destroy_) { if (flush_on_destroy_) {
for (auto cfd : *versions_->GetColumnFamilySet()) { for (auto cfd : *versions_->GetColumnFamilySet()) {
if (!cfd->mem()->IsEmpty()) { if (!cfd->IsDropped() && !cfd->mem()->IsEmpty()) {
cfd->Ref(); cfd->Ref();
mutex_.Unlock(); mutex_.Unlock();
FlushMemTable(cfd, FlushOptions()); FlushMemTable(cfd, FlushOptions());
@ -1944,8 +1944,7 @@ void DBImpl::BackgroundCallFlush() {
auto pending_outputs_inserted_elem = auto pending_outputs_inserted_elem =
CaptureCurrentFileNumberInPendingOutputs(); CaptureCurrentFileNumberInPendingOutputs();
Status s; Status s = BackgroundFlush(&madeProgress, &job_context, &log_buffer);
s = BackgroundFlush(&madeProgress, &job_context, &log_buffer);
if (!s.ok() && !s.IsShutdownInProgress()) { if (!s.ok() && !s.IsShutdownInProgress()) {
// Wait a little bit before retrying background flush in // Wait a little bit before retrying background flush in
// case this is an environmental problem and we do not want to // case this is an environmental problem and we do not want to
@ -1967,9 +1966,9 @@ void DBImpl::BackgroundCallFlush() {
ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
// If !s.ok(), this means that Flush failed. In that case, we want // If flush failed, we want to delete all temporary files that we might have
// to delete all obsolete files and we force FindObsoleteFiles() // created. Thus, we force full scan in FindObsoleteFiles()
FindObsoleteFiles(&job_context, !s.ok()); FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress());
// delete unnecessary files if any, this is done outside the mutex // delete unnecessary files if any, this is done outside the mutex
if (job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) { if (job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
mutex_.Unlock(); mutex_.Unlock();
@ -2011,8 +2010,7 @@ void DBImpl::BackgroundCallCompaction() {
CaptureCurrentFileNumberInPendingOutputs(); CaptureCurrentFileNumberInPendingOutputs();
assert(bg_compaction_scheduled_); assert(bg_compaction_scheduled_);
Status s; Status s = BackgroundCompaction(&madeProgress, &job_context, &log_buffer);
s = BackgroundCompaction(&madeProgress, &job_context, &log_buffer);
if (!s.ok() && !s.IsShutdownInProgress()) { if (!s.ok() && !s.IsShutdownInProgress()) {
// Wait a little bit before retrying background compaction in // Wait a little bit before retrying background compaction in
// case this is an environmental problem and we do not want to // case this is an environmental problem and we do not want to
@ -2034,11 +2032,10 @@ void DBImpl::BackgroundCallCompaction() {
ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem); ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
// If !s.ok(), this means that Compaction failed. In that case, we want // If compaction failed, we want to delete all temporary files that we might
// to delete all obsolete files we might have created and we force // have created (they might not be all recorded in job_context in case of a
// FindObsoleteFiles(). This is because job_context does not // failure). Thus, we force full scan in FindObsoleteFiles()
// catch all created files if compaction failed. FindObsoleteFiles(&job_context, !s.ok() && !s.IsShutdownInProgress());
FindObsoleteFiles(&job_context, !s.ok());
// delete unnecessary files if any, this is done outside the mutex // delete unnecessary files if any, this is done outside the mutex
if (job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) { if (job_context.HaveSomethingToDelete() || !log_buffer.IsEmpty()) {
@ -2124,7 +2121,10 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
db_options_.max_background_compactions - bg_compaction_scheduled_); db_options_.max_background_compactions - bg_compaction_scheduled_);
auto flush_status = auto flush_status =
BackgroundFlush(madeProgress, job_context, log_buffer); BackgroundFlush(madeProgress, job_context, log_buffer);
if (!flush_status.ok()) { // the second condition will be false when a column family is dropped. we
// don't want to fail compaction because of that (because it might be a
// different column family)
if (!flush_status.ok() && !flush_status.IsShutdownInProgress()) {
if (is_manual) { if (is_manual) {
manual_compaction_->status = flush_status; manual_compaction_->status = flush_status;
manual_compaction_->done = true; manual_compaction_->done = true;
@ -2756,7 +2756,7 @@ Status DBImpl::DropColumnFamily(ColumnFamilyHandle* column_family) {
// is_snapshot_supported_. // is_snapshot_supported_.
bool new_is_snapshot_supported = true; bool new_is_snapshot_supported = true;
for (auto c : *versions_->GetColumnFamilySet()) { for (auto c : *versions_->GetColumnFamilySet()) {
if (!c->mem()->IsSnapshotSupported()) { if (!c->IsDropped() && !c->mem()->IsSnapshotSupported()) {
new_is_snapshot_supported = false; new_is_snapshot_supported = false;
break; break;
} }
@ -3080,6 +3080,9 @@ Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
// no need to refcount because drop is happening in write thread, so can't // no need to refcount because drop is happening in write thread, so can't
// happen while we're in the write thread // happen while we're in the write thread
for (auto cfd : *versions_->GetColumnFamilySet()) { for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
if (cfd->GetLogNumber() <= flush_column_family_if_log_file) { if (cfd->GetLogNumber() <= flush_column_family_if_log_file) {
status = SetNewMemtableAndNewLogFile(cfd, &context); status = SetNewMemtableAndNewLogFile(cfd, &context);
if (!status.ok()) { if (!status.ok()) {
@ -3098,6 +3101,9 @@ Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
// no need to refcount because drop is happening in write thread, so can't // no need to refcount because drop is happening in write thread, so can't
// happen while we're in the write thread // happen while we're in the write thread
for (auto cfd : *versions_->GetColumnFamilySet()) { for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
if (!cfd->mem()->IsEmpty()) { if (!cfd->mem()->IsEmpty()) {
status = SetNewMemtableAndNewLogFile(cfd, &context); status = SetNewMemtableAndNewLogFile(cfd, &context);
if (!status.ok()) { if (!status.ok()) {

@ -2244,6 +2244,9 @@ Status VersionSet::Recover(
if (s.ok()) { if (s.ok()) {
for (auto cfd : *column_family_set_) { for (auto cfd : *column_family_set_) {
if (cfd->IsDropped()) {
continue;
}
auto builders_iter = builders.find(cfd->GetID()); auto builders_iter = builders.find(cfd->GetID());
assert(builders_iter != builders.end()); assert(builders_iter != builders.end());
auto* builder = builders_iter->second->version_builder(); auto* builder = builders_iter->second->version_builder();
@ -2279,6 +2282,9 @@ Status VersionSet::Recover(
column_family_set_->GetMaxColumnFamily()); column_family_set_->GetMaxColumnFamily());
for (auto cfd : *column_family_set_) { for (auto cfd : *column_family_set_) {
if (cfd->IsDropped()) {
continue;
}
Log(InfoLogLevel::INFO_LEVEL, db_options_->info_log, Log(InfoLogLevel::INFO_LEVEL, db_options_->info_log,
"Column family [%s] (ID %u), log number is %" PRIu64 "\n", "Column family [%s] (ID %u), log number is %" PRIu64 "\n",
cfd->GetName().c_str(), cfd->GetID(), cfd->GetLogNumber()); cfd->GetName().c_str(), cfd->GetID(), cfd->GetLogNumber());
@ -2585,6 +2591,9 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
if (s.ok()) { if (s.ok()) {
for (auto cfd : *column_family_set_) { for (auto cfd : *column_family_set_) {
if (cfd->IsDropped()) {
continue;
}
auto builders_iter = builders.find(cfd->GetID()); auto builders_iter = builders.find(cfd->GetID());
assert(builders_iter != builders.end()); assert(builders_iter != builders.end());
auto builder = builders_iter->second->version_builder(); auto builder = builders_iter->second->version_builder();
@ -2644,6 +2653,9 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
// LogAndApply. Column family manipulations can only happen within LogAndApply // LogAndApply. Column family manipulations can only happen within LogAndApply
// (the same single thread), so we're safe to iterate. // (the same single thread), so we're safe to iterate.
for (auto cfd : *column_family_set_) { for (auto cfd : *column_family_set_) {
if (cfd->IsDropped()) {
continue;
}
{ {
// Store column family info // Store column family info
VersionEdit edit; VersionEdit edit;
@ -2912,6 +2924,9 @@ Status VersionSet::GetMetadataForFile(uint64_t number, int* filelevel,
void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) { void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
for (auto cfd : *column_family_set_) { for (auto cfd : *column_family_set_) {
if (cfd->IsDropped()) {
continue;
}
for (int level = 0; level < cfd->NumberLevels(); level++) { for (int level = 0; level < cfd->NumberLevels(); level++) {
for (const auto& file : for (const auto& file :
cfd->current()->storage_info()->LevelFiles(level)) { cfd->current()->storage_info()->LevelFiles(level)) {

Loading…
Cancel
Save