Fixing race condition in DBTest.DynamicMemtableOptions

Summary:
This patch fixes a race condition in DBTEst.DynamicMemtableOptions. In rare cases,
it was possible that the main thread would fill up both memtables before the flush
job acquired its work. Then, the flush job was flushing both memtables together,
producing only one L0 file while the test expected two. Now, the test waits for
flushes to finish earlier, to make sure that the memtables are flushed in separate
flush jobs.

Test Plan:
Insert "usleep(10000);" after "IOSTATS_SET_THREAD_POOL_ID(Env::Priority::HIGH);" in BGWorkFlush()
to make the issue more likely. Then test with:
make db_test && time while ./db_test --gtest_filter=*DynamicMemtableOptions; do true; done

Reviewers: rven, sdong, yhchiang, anthony, igor

Reviewed By: igor

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D45429
main
Andres Noetzli 9 years ago
parent e46bcc08b9
commit 2050832974
  1. 13
      db/db_bench.cc
  2. 35
      db/db_impl.cc
  3. 12
      db/db_test.cc
  4. 1
      db/flush_job.cc
  5. 4
      db/memtable_list.cc

@ -223,9 +223,7 @@ static bool ValidateKeySize(const char* flagname, int32_t value) {
static bool ValidateUint32Range(const char* flagname, uint64_t value) {
if (value > std::numeric_limits<uint32_t>::max()) {
fprintf(stderr,
"Invalid value for --%s: %lu, overflow\n",
flagname,
fprintf(stderr, "Invalid value for --%s: %lu, overflow\n", flagname,
(unsigned long)value);
return false;
}
@ -298,10 +296,11 @@ DEFINE_int32(max_background_compactions,
" that can occur in parallel.");
DEFINE_uint64(subcompactions, 1,
"Maximum number of subcompactions to divide L0-L1 compactions "
"into.");
static const bool FLAGS_subcompactions_dummy __attribute__((unused)) =
RegisterFlagValidator(&FLAGS_subcompactions, &ValidateUint32Range);
"Maximum number of subcompactions to divide L0-L1 compactions "
"into.");
static const bool FLAGS_subcompactions_dummy
__attribute__((unused)) = RegisterFlagValidator(&FLAGS_subcompactions,
&ValidateUint32Range);
DEFINE_int32(max_background_flushes,
rocksdb::Options().max_background_flushes,

@ -1338,7 +1338,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
Status DBImpl::FlushMemTableToOutputFile(
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
bool* madeProgress, JobContext* job_context, LogBuffer* log_buffer) {
bool* made_progress, JobContext* job_context, LogBuffer* log_buffer) {
mutex_.AssertHeld();
assert(cfd->imm()->NumNotFlushed() != 0);
assert(cfd->imm()->IsFlushPending());
@ -1363,8 +1363,8 @@ Status DBImpl::FlushMemTableToOutputFile(
if (s.ok()) {
InstallSuperVersionAndScheduleWorkWrapper(cfd, job_context,
mutable_cf_options);
if (madeProgress) {
*madeProgress = 1;
if (made_progress) {
*made_progress = 1;
}
VersionStorageInfo::LevelSummaryStorage tmp;
LogToBuffer(log_buffer, "[%s] Level summary: %s\n", cfd->GetName().c_str(),
@ -2276,7 +2276,7 @@ void DBImpl::BGWorkCompaction(void* db) {
reinterpret_cast<DBImpl*>(db)->BackgroundCallCompaction();
}
Status DBImpl::BackgroundFlush(bool* madeProgress, JobContext* job_context,
Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
LogBuffer* log_buffer) {
mutex_.AssertHeld();
@ -2317,7 +2317,7 @@ Status DBImpl::BackgroundFlush(bool* madeProgress, JobContext* job_context,
cfd->GetName().c_str(),
db_options_.max_background_flushes - bg_flush_scheduled_,
db_options_.max_background_compactions - bg_compaction_scheduled_);
status = FlushMemTableToOutputFile(cfd, mutable_cf_options, madeProgress,
status = FlushMemTableToOutputFile(cfd, mutable_cf_options, made_progress,
job_context, log_buffer);
if (cfd->Unref()) {
delete cfd;
@ -2327,7 +2327,7 @@ Status DBImpl::BackgroundFlush(bool* madeProgress, JobContext* job_context,
}
void DBImpl::BackgroundCallFlush() {
bool madeProgress = false;
bool made_progress = false;
JobContext job_context(next_job_id_.fetch_add(1), true);
assert(bg_flush_scheduled_);
@ -2338,7 +2338,7 @@ void DBImpl::BackgroundCallFlush() {
auto pending_outputs_inserted_elem =
CaptureCurrentFileNumberInPendingOutputs();
Status s = BackgroundFlush(&madeProgress, &job_context, &log_buffer);
Status s = BackgroundFlush(&made_progress, &job_context, &log_buffer);
if (!s.ok() && !s.IsShutdownInProgress()) {
// Wait a little bit before retrying background flush in
// case this is an environmental problem and we do not want to
@ -2392,7 +2392,7 @@ void DBImpl::BackgroundCallFlush() {
}
void DBImpl::BackgroundCallCompaction() {
bool madeProgress = false;
bool made_progress = false;
JobContext job_context(next_job_id_.fetch_add(1), true);
MaybeDumpStats();
@ -2404,7 +2404,7 @@ void DBImpl::BackgroundCallCompaction() {
CaptureCurrentFileNumberInPendingOutputs();
assert(bg_compaction_scheduled_);
Status s = BackgroundCompaction(&madeProgress, &job_context, &log_buffer);
Status s = BackgroundCompaction(&made_progress, &job_context, &log_buffer);
if (!s.ok() && !s.IsShutdownInProgress()) {
// Wait a little bit before retrying background compaction in
// case this is an environmental problem and we do not want to
@ -2453,9 +2453,9 @@ void DBImpl::BackgroundCallCompaction() {
// See if there's more work to be done
MaybeScheduleFlushOrCompaction();
if (madeProgress || bg_compaction_scheduled_ == 0 || bg_manual_only_ > 0) {
if (made_progress || bg_compaction_scheduled_ == 0 || bg_manual_only_ > 0) {
// signal if
// * madeProgress -- need to wakeup DelayWrite
// * made_progress -- need to wakeup DelayWrite
// * bg_compaction_scheduled_ == 0 -- need to wakeup ~DBImpl
// * bg_manual_only_ > 0 -- need to wakeup RunManualCompaction
// If none of this is true, there is no need to signal since nobody is
@ -2469,9 +2469,10 @@ void DBImpl::BackgroundCallCompaction() {
}
}
Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
Status DBImpl::BackgroundCompaction(bool* made_progress,
JobContext* job_context,
LogBuffer* log_buffer) {
*madeProgress = false;
*made_progress = false;
mutex_.AssertHeld();
bool is_manual = (manual_compaction_ != nullptr) &&
@ -2608,7 +2609,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
LogToBuffer(log_buffer, "[%s] Deleted %d files\n",
c->column_family_data()->GetName().c_str(),
c->num_input_files(0));
*madeProgress = true;
*made_progress = true;
} else if (!trivial_move_disallowed && c->IsTrivialMove()) {
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:TrivialMove");
// Instrument for event update
@ -2665,7 +2666,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
c->column_family_data()->GetName().c_str(), moved_files,
c->output_level(), moved_bytes, status.ToString().c_str(),
c->column_family_data()->current()->storage_info()->LevelSummary(&tmp));
*madeProgress = true;
*made_progress = true;
// Clear Instrument
ThreadStatusUtil::ResetThreadStatus();
@ -2694,14 +2695,14 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
InstallSuperVersionAndScheduleWorkWrapper(
c->column_family_data(), job_context, *c->mutable_cf_options());
}
*madeProgress = true;
*made_progress = true;
}
if (c != nullptr) {
NotifyOnCompactionCompleted(
c->column_family_data(), c.get(), status,
compaction_job_stats, job_context->job_id);
c->ReleaseCompactionFiles(status);
*madeProgress = true;
*made_progress = true;
}
// this will unref its input_version and column_family_data
c.reset();

@ -6113,6 +6113,7 @@ TEST_F(DBTest, DynamicMemtableOptions) {
const uint64_t k64KB = 1 << 16;
const uint64_t k128KB = 1 << 17;
const uint64_t k5KB = 5 * 1024;
const int kNumPutsBeforeWaitForFlush = 64;
Options options;
options.env = env_;
options.create_if_missing = true;
@ -6130,6 +6131,15 @@ TEST_F(DBTest, DynamicMemtableOptions) {
Random rnd(301);
for (int i = 0; i < size; i++) {
ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024)));
// The following condition prevents a race condition between flush jobs
// acquiring work and this thread filling up multiple memtables. Without
// this, the flush might produce less files than expected because
// multiple memtables are flushed into a single L0 file. This race
// condition affects assertion (A).
if (i % kNumPutsBeforeWaitForFlush == kNumPutsBeforeWaitForFlush - 1) {
dbfull()->TEST_WaitForFlushMemTable();
}
}
dbfull()->TEST_WaitForFlushMemTable();
};
@ -6153,7 +6163,7 @@ TEST_F(DBTest, DynamicMemtableOptions) {
// the next memtable will be 128KB in size. Write 256KB total, we should
// have a 64KB L0 file, a 128KB L0 file, and a memtable with 64KB data
gen_l0_kb(256);
ASSERT_EQ(NumTableFilesAtLevel(0), 2);
ASSERT_EQ(NumTableFilesAtLevel(0), 2); // (A)
ASSERT_LT(SizeAtLevel(0), k128KB + k64KB + 2 * k5KB);
ASSERT_GT(SizeAtLevel(0), k128KB + k64KB - 2 * k5KB);

@ -218,6 +218,7 @@ Status FlushJob::WriteLevel0Table(const autovector<MemTable*>& mems,
<< total_num_entries << "num_deletes"
<< total_num_deletes << "memory_usage"
<< total_memory_usage;
TableFileCreationInfo info;
{
ScopedArenaIterator iter(

@ -324,7 +324,7 @@ Status MemTableList::InstallMemtableFlushResults(
// All the later memtables that have the same filenum
// are part of the same batch. They can be committed now.
uint64_t mem_id = 1; // how many memtables has been flushed.
uint64_t mem_id = 1; // how many memtables have been flushed.
do {
if (s.ok()) { // commit new state
LogToBuffer(log_buffer, "[%s] Level-0 commit table #%" PRIu64
@ -333,7 +333,7 @@ Status MemTableList::InstallMemtableFlushResults(
assert(m->file_number_ > 0);
current_->Remove(m, to_delete);
} else {
//commit failed. setup state so that we can flush again.
// commit failed. setup state so that we can flush again.
LogToBuffer(log_buffer, "Level-0 commit table #%" PRIu64
": memtable #%" PRIu64 " failed",
m->file_number_, mem_id);

Loading…
Cancel
Save