LogAndApply() should fail if the column family has been dropped

Summary:
This patch finally fixes the ColumnFamilyTest.ReadDroppedColumnFamily test. The test has been failing very sporadically and it was hard to repro. However, I managed to write a new tests that reproes the failure deterministically.

Here's what happens:
1. We start the flush for the column family
2. We check if the column family was dropped here: a3fc49bfdd/db/flush_job.cc (L149)
3. This check goes through, ends up in InstallMemtableFlushResults() and it goes into LogAndApply()
4. At about this time, we start dropping the column family. Dropping the column family process gets to LogAndApply() at about the same time as LogAndApply() from flush process
5. Drop column family goes through LogAndApply() first, marking the column family as dropped.
6. Flush process gets woken up and gets a chance to write to the MANIFEST. However, this is where it gets stuck: a3fc49bfdd/db/version_set.cc (L1975)
7. We see that the column family was dropped, so there is no need to write to the MANIFEST. We return OK.
8. Flush gets OK back from LogAndApply() and it deletes the memtable, thinking that the data is now safely persisted to sst file.

The fix is pretty simple. Instead of OK, we return ShutdownInProgress. This is not really true, but we have been using this status code to also mean "this operation was canceled because the column family has been dropped".

The fix is only one LOC. All other code is related to tests. I added a new test that reproes the failure. I also moved SleepingBackgroundTask to util/testutil.h (because I needed it in column_family_test for my new test). There's plenty of other places where we reimplement SleepingBackgroundTask, but I'll address that in a separate commit.

Test Plan:
1. new test
2. make check
3. Make sure the ColumnFamilyTest.ReadDroppedColumnFamily doesn't fail on Travis: https://travis-ci.org/facebook/rocksdb/jobs/79952386

Reviewers: yhchiang, anthony, IslamAbdelRahman, kradhakrishnan, rven, sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D46773
main
Igor Canadi 9 years ago
parent 2819a1db32
commit a7e80379b0
  1. 63
      db/column_family_test.cc
  2. 100
      db/db_test.cc
  3. 1
      db/flush_job.cc
  4. 8
      db/version_set.cc
  5. 46
      util/testutil.h

@ -10,6 +10,7 @@
#include <algorithm> #include <algorithm>
#include <vector> #include <vector>
#include <string> #include <string>
#include <thread>
#include "db/db_impl.h" #include "db/db_impl.h"
#include "rocksdb/db.h" #include "rocksdb/db.h"
@ -19,6 +20,7 @@
#include "util/testharness.h" #include "util/testharness.h"
#include "util/testutil.h" #include "util/testutil.h"
#include "util/coding.h" #include "util/coding.h"
#include "util/sync_point.h"
#include "utilities/merge_operators.h" #include "utilities/merge_operators.h"
namespace rocksdb { namespace rocksdb {
@ -1196,6 +1198,67 @@ TEST_F(ColumnFamilyTest, ReadDroppedColumnFamily) {
} }
} }
TEST_F(ColumnFamilyTest, FlushAndDropRaceCondition) {
db_options_.create_missing_column_families = true;
Open({"default", "one"});
ColumnFamilyOptions options;
options.level0_file_num_compaction_trigger = 100;
options.level0_slowdown_writes_trigger = 200;
options.level0_stop_writes_trigger = 200;
options.max_write_buffer_number = 20;
options.write_buffer_size = 100000; // small write buffer size
Reopen({options, options});
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"VersionSet::LogAndApply::ColumnFamilyDrop:1"
"FlushJob::InstallResults"},
{"FlushJob::InstallResults",
"VersionSet::LogAndApply::ColumnFamilyDrop:2", }});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
test::SleepingBackgroundTask sleeping_task;
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
Env::Priority::HIGH);
// 1MB should create ~10 files for each CF
int kKeysNum = 10000;
PutRandomData(1, kKeysNum, 100);
std::vector<std::thread> threads;
threads.emplace_back([&] { ASSERT_OK(db_->DropColumnFamily(handles_[1])); });
sleeping_task.WakeUp();
sleeping_task.WaitUntilDone();
sleeping_task.Reset();
// now we sleep again. this is just so we're certain that flush job finished
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task,
Env::Priority::HIGH);
sleeping_task.WakeUp();
sleeping_task.WaitUntilDone();
{
// Since we didn't delete CF handle, RocksDB's contract guarantees that
// we're still able to read dropped CF
std::unique_ptr<Iterator> iterator(
db_->NewIterator(ReadOptions(), handles_[1]));
int count = 0;
for (iterator->SeekToFirst(); iterator->Valid(); iterator->Next()) {
ASSERT_OK(iterator->status());
++count;
}
ASSERT_OK(iterator->status());
ASSERT_EQ(count, kKeysNum);
}
for (auto& t : threads) {
t.join();
}
Close();
Destroy();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -2219,61 +2219,16 @@ TEST_F(DBTest, NumImmutableMemTable) {
} while (ChangeCompactOptions()); } while (ChangeCompactOptions());
} }
class SleepingBackgroundTask {
public:
SleepingBackgroundTask()
: bg_cv_(&mutex_), should_sleep_(true), done_with_sleep_(false) {}
void DoSleep() {
MutexLock l(&mutex_);
while (should_sleep_) {
bg_cv_.Wait();
}
done_with_sleep_ = true;
bg_cv_.SignalAll();
}
void WakeUp() {
MutexLock l(&mutex_);
should_sleep_ = false;
bg_cv_.SignalAll();
}
void WaitUntilDone() {
MutexLock l(&mutex_);
while (!done_with_sleep_) {
bg_cv_.Wait();
}
}
bool WokenUp() {
MutexLock l(&mutex_);
return should_sleep_ == false;
}
void Reset() {
MutexLock l(&mutex_);
should_sleep_ = true;
done_with_sleep_ = false;
}
static void DoSleepTask(void* arg) {
reinterpret_cast<SleepingBackgroundTask*>(arg)->DoSleep();
}
private:
port::Mutex mutex_;
port::CondVar bg_cv_; // Signalled when background work finishes
bool should_sleep_;
bool done_with_sleep_;
};
TEST_F(DBTest, FlushEmptyColumnFamily) { TEST_F(DBTest, FlushEmptyColumnFamily) {
// Block flush thread and disable compaction thread // Block flush thread and disable compaction thread
env_->SetBackgroundThreads(1, Env::HIGH); env_->SetBackgroundThreads(1, Env::HIGH);
env_->SetBackgroundThreads(1, Env::LOW); env_->SetBackgroundThreads(1, Env::LOW);
SleepingBackgroundTask sleeping_task_low; test::SleepingBackgroundTask sleeping_task_low;
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW); Env::Priority::LOW);
SleepingBackgroundTask sleeping_task_high; test::SleepingBackgroundTask sleeping_task_high;
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_high, env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
Env::Priority::HIGH); &sleeping_task_high, Env::Priority::HIGH);
Options options = CurrentOptions(); Options options = CurrentOptions();
// disable compaction // disable compaction
@ -2312,12 +2267,12 @@ TEST_F(DBTest, GetProperty) {
// Set sizes to both background thread pool to be 1 and block them. // Set sizes to both background thread pool to be 1 and block them.
env_->SetBackgroundThreads(1, Env::HIGH); env_->SetBackgroundThreads(1, Env::HIGH);
env_->SetBackgroundThreads(1, Env::LOW); env_->SetBackgroundThreads(1, Env::LOW);
SleepingBackgroundTask sleeping_task_low; test::SleepingBackgroundTask sleeping_task_low;
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW); Env::Priority::LOW);
SleepingBackgroundTask sleeping_task_high; test::SleepingBackgroundTask sleeping_task_high;
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_high, env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
Env::Priority::HIGH); &sleeping_task_high, Env::Priority::HIGH);
Options options = CurrentOptions(); Options options = CurrentOptions();
WriteOptions writeOpt = WriteOptions(); WriteOptions writeOpt = WriteOptions();
@ -2566,8 +2521,8 @@ TEST_F(DBTest, EstimatePendingCompBytes) {
// Set sizes to both background thread pool to be 1 and block them. // Set sizes to both background thread pool to be 1 and block them.
env_->SetBackgroundThreads(1, Env::HIGH); env_->SetBackgroundThreads(1, Env::HIGH);
env_->SetBackgroundThreads(1, Env::LOW); env_->SetBackgroundThreads(1, Env::LOW);
SleepingBackgroundTask sleeping_task_low; test::SleepingBackgroundTask sleeping_task_low;
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW); Env::Priority::LOW);
Options options = CurrentOptions(); Options options = CurrentOptions();
@ -6181,7 +6136,7 @@ TEST_F(DBTest, TableOptionsSanitizeTest) {
TEST_F(DBTest, SanitizeNumThreads) { TEST_F(DBTest, SanitizeNumThreads) {
for (int attempt = 0; attempt < 2; attempt++) { for (int attempt = 0; attempt < 2; attempt++) {
const size_t kTotalTasks = 8; const size_t kTotalTasks = 8;
SleepingBackgroundTask sleeping_tasks[kTotalTasks]; test::SleepingBackgroundTask sleeping_tasks[kTotalTasks];
Options options = CurrentOptions(); Options options = CurrentOptions();
if (attempt == 0) { if (attempt == 0) {
@ -6193,7 +6148,8 @@ TEST_F(DBTest, SanitizeNumThreads) {
for (size_t i = 0; i < kTotalTasks; i++) { for (size_t i = 0; i < kTotalTasks; i++) {
// Insert 5 tasks to low priority queue and 5 tasks to high priority queue // Insert 5 tasks to low priority queue and 5 tasks to high priority queue
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_tasks[i], env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
&sleeping_tasks[i],
(i < 4) ? Env::Priority::LOW : Env::Priority::HIGH); (i < 4) ? Env::Priority::LOW : Env::Priority::HIGH);
} }
@ -6485,8 +6441,8 @@ TEST_F(DBTest, DynamicMemtableOptions) {
// max_background_flushes == 0, so flushes are getting executed by the // max_background_flushes == 0, so flushes are getting executed by the
// compaction thread // compaction thread
env_->SetBackgroundThreads(1, Env::LOW); env_->SetBackgroundThreads(1, Env::LOW);
SleepingBackgroundTask sleeping_task_low; test::SleepingBackgroundTask sleeping_task_low;
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW); Env::Priority::LOW);
// Start from scratch and disable compaction/flush. Flush can only happen // Start from scratch and disable compaction/flush. Flush can only happen
// during compaction but trigger is pretty high // during compaction but trigger is pretty high
@ -6521,7 +6477,7 @@ TEST_F(DBTest, DynamicMemtableOptions) {
dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
sleeping_task_low.Reset(); sleeping_task_low.Reset();
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW); Env::Priority::LOW);
count = 0; count = 0;
while (!sleeping_task_low.WokenUp() && count < 1024) { while (!sleeping_task_low.WokenUp() && count < 1024) {
@ -6544,7 +6500,7 @@ TEST_F(DBTest, DynamicMemtableOptions) {
dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
sleeping_task_low.Reset(); sleeping_task_low.Reset();
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW); Env::Priority::LOW);
count = 0; count = 0;
@ -7351,8 +7307,8 @@ TEST_F(DBTest, DynamicCompactionOptions) {
// since level0_stop_writes_trigger = 8 // since level0_stop_writes_trigger = 8
dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr);
// Block compaction // Block compaction
SleepingBackgroundTask sleeping_task_low; test::SleepingBackgroundTask sleeping_task_low;
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW); Env::Priority::LOW);
rocksdb::SyncPoint::GetInstance()->SetCallBack( rocksdb::SyncPoint::GetInstance()->SetCallBack(
@ -7388,7 +7344,7 @@ TEST_F(DBTest, DynamicCompactionOptions) {
// Block compaction again // Block compaction again
sleeping_task_low.Reset(); sleeping_task_low.Reset();
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW); Env::Priority::LOW);
count = 0; count = 0;
while (count < 64) { while (count < 64) {
@ -7826,7 +7782,7 @@ TEST_F(DBTest, DeleteObsoleteFilesPendingOutputs) {
dbfull()->TEST_WaitForCompact(); dbfull()->TEST_WaitForCompact();
ASSERT_EQ("0,0,1", FilesPerLevel(0)); ASSERT_EQ("0,0,1", FilesPerLevel(0));
SleepingBackgroundTask blocking_thread; test::SleepingBackgroundTask blocking_thread;
port::Mutex mutex_; port::Mutex mutex_;
bool already_blocked(false); bool already_blocked(false);
@ -7893,12 +7849,12 @@ TEST_F(DBTest, CloseSpeedup) {
// Block background threads // Block background threads
env_->SetBackgroundThreads(1, Env::LOW); env_->SetBackgroundThreads(1, Env::LOW);
env_->SetBackgroundThreads(1, Env::HIGH); env_->SetBackgroundThreads(1, Env::HIGH);
SleepingBackgroundTask sleeping_task_low; test::SleepingBackgroundTask sleeping_task_low;
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_low, env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask, &sleeping_task_low,
Env::Priority::LOW); Env::Priority::LOW);
SleepingBackgroundTask sleeping_task_high; test::SleepingBackgroundTask sleeping_task_high;
env_->Schedule(&SleepingBackgroundTask::DoSleepTask, &sleeping_task_high, env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
Env::Priority::HIGH); &sleeping_task_high, Env::Priority::HIGH);
std::vector<std::string> filenames; std::vector<std::string> filenames;
env_->GetChildren(dbname_, &filenames); env_->GetChildren(dbname_, &filenames);

@ -154,6 +154,7 @@ Status FlushJob::Run(FileMetaData* file_meta) {
if (!s.ok()) { if (!s.ok()) {
cfd_->imm()->RollbackMemtableFlush(mems, meta.fd.GetNumber()); cfd_->imm()->RollbackMemtableFlush(mems, meta.fd.GetNumber());
} else { } else {
TEST_SYNC_POINT("FlushJob::InstallResults");
// Replace immutable memtable with the generated Table // Replace immutable memtable with the generated Table
s = cfd_->imm()->InstallMemtableFlushResults( s = cfd_->imm()->InstallMemtableFlushResults(
cfd_, mutable_cf_options_, mems, versions_, db_mutex_, cfd_, mutable_cf_options_, mems, versions_, db_mutex_,

@ -1979,7 +1979,8 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
if (!manifest_writers_.empty()) { if (!manifest_writers_.empty()) {
manifest_writers_.front()->cv.Signal(); manifest_writers_.front()->cv.Signal();
} }
return Status::OK(); // we steal this code to also inform about cf-drop
return Status::ShutdownInProgress();
} }
std::vector<VersionEdit*> batch_edits; std::vector<VersionEdit*> batch_edits;
@ -2141,6 +2142,11 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
new_manifest_file_size = descriptor_log_->file()->GetFileSize(); new_manifest_file_size = descriptor_log_->file()->GetFileSize();
} }
if (edit->is_column_family_drop_) {
TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:1");
TEST_SYNC_POINT("VersionSet::LogAndApply::ColumnFamilyDrop:2");
}
LogFlush(db_options_->info_log); LogFlush(db_options_->info_log);
mu->Lock(); mu->Lock();
} }

@ -16,6 +16,7 @@
#include "rocksdb/env.h" #include "rocksdb/env.h"
#include "rocksdb/iterator.h" #include "rocksdb/iterator.h"
#include "rocksdb/slice.h" #include "rocksdb/slice.h"
#include "util/mutexlock.h"
#include "util/random.h" #include "util/random.h"
namespace rocksdb { namespace rocksdb {
@ -276,5 +277,50 @@ class NullLogger : public Logger {
// Corrupts key by changing the type // Corrupts key by changing the type
extern void CorruptKeyType(InternalKey* ikey); extern void CorruptKeyType(InternalKey* ikey);
class SleepingBackgroundTask {
public:
SleepingBackgroundTask()
: bg_cv_(&mutex_), should_sleep_(true), done_with_sleep_(false) {}
void DoSleep() {
MutexLock l(&mutex_);
while (should_sleep_) {
bg_cv_.Wait();
}
done_with_sleep_ = true;
bg_cv_.SignalAll();
}
void WakeUp() {
MutexLock l(&mutex_);
should_sleep_ = false;
bg_cv_.SignalAll();
}
void WaitUntilDone() {
MutexLock l(&mutex_);
while (!done_with_sleep_) {
bg_cv_.Wait();
}
}
bool WokenUp() {
MutexLock l(&mutex_);
return should_sleep_ == false;
}
void Reset() {
MutexLock l(&mutex_);
should_sleep_ = true;
done_with_sleep_ = false;
}
static void DoSleepTask(void* arg) {
reinterpret_cast<SleepingBackgroundTask*>(arg)->DoSleep();
}
private:
port::Mutex mutex_;
port::CondVar bg_cv_; // Signalled when background work finishes
bool should_sleep_;
bool done_with_sleep_;
};
} // namespace test } // namespace test
} // namespace rocksdb } // namespace rocksdb

Loading…
Cancel
Save