Enable atomic flush (#4023)

Summary:
Adds a DB option `atomic_flush` to control whether to enable this feature. This PR is a subset of [PR 3752](https://github.com/facebook/rocksdb/pull/3752).
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4023

Differential Revision: D8518381

Pulled By: riversand963

fbshipit-source-id: 1e3bb33e99bb102876a31b378d93b0138ff6634f
main
Yanqin Jin 6 years ago committed by Facebook Github Bot
parent f560c8f5c8
commit 5b4c709fad
  1. 1
      HISTORY.md
  2. 9
      db/db_filesnapshot.cc
  3. 79
      db/db_flush_test.cc
  4. 44
      db/db_impl.cc
  5. 32
      db/db_impl.h
  6. 239
      db/db_impl_compaction_flush.cc
  7. 2
      db/db_impl_debug.cc
  8. 94
      db/db_impl_write.cc
  9. 5
      db/db_test.cc
  10. 7
      db/db_test_util.cc
  11. 2
      db/db_test_util.h
  12. 15
      db/memtable_list.h
  13. 11
      include/rocksdb/db.h
  14. 14
      include/rocksdb/options.h
  15. 5
      include/rocksdb/utilities/stackable_db.h
  16. 1
      options/db_options.h
  17. 6
      options/options_helper.cc
  18. 3
      options/options_settable_test.cc
  19. 255
      tools/db_stress.cc

@ -4,6 +4,7 @@
* Introduced `Memoryllocator`, which lets the user specify custom allocator for memory in block cache.
* Introduced `PerfContextByLevel` as part of `PerfContext` which allows storing perf context at each level. Also replaced `__thread` with `thread_local` keyword for perf_context.
* With level_compaction_dynamic_level_bytes = true, level multiplier may be adjusted automatically when Level 0 to 1 compaction is lagged behind.
* Introduced DB option `atomic_flush`. If true, RocksDB supports flushing multiple column families and atomically committing the result to MANIFEST. Useful when WAL is disabled.
### Bug Fixes
* Fix corner case where a write group leader blocked due to write stall blocks other writers in queue with WriteOptions::no_slowdown set.

@ -87,6 +87,14 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
if (flush_memtable) {
// flush all dirty data to disk.
Status status;
if (atomic_flush_) {
autovector<ColumnFamilyData*> cfds;
SelectColumnFamiliesForAtomicFlush(&cfds);
mutex_.Unlock();
status = AtomicFlushMemTables(cfds, FlushOptions(),
FlushReason::kGetLiveFiles);
mutex_.Lock();
} else {
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
@ -102,6 +110,7 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
break;
}
}
}
versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
if (!status.ok()) {

@ -25,6 +25,12 @@ class DBFlushDirectIOTest : public DBFlushTest,
DBFlushDirectIOTest() : DBFlushTest() {}
};
class DBAtomicFlushTest : public DBFlushTest,
public ::testing::WithParamInterface<bool> {
public:
DBAtomicFlushTest() : DBFlushTest() {}
};
// We had issue when two background threads trying to flush at the same time,
// only one of them get committed. The test verifies the issue is fixed.
TEST_F(DBFlushTest, FlushWhileWritingManifest) {
@ -214,9 +220,82 @@ TEST_F(DBFlushTest, FlushError) {
ASSERT_NE(s, Status::OK());
}
TEST_P(DBAtomicFlushTest, ManualAtomicFlush) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.atomic_flush = GetParam();
options.write_buffer_size = (static_cast<size_t>(64) << 20);
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
size_t num_cfs = handles_.size();
ASSERT_EQ(3, num_cfs);
WriteOptions wopts;
wopts.disableWAL = true;
for (size_t i = 0; i != num_cfs; ++i) {
ASSERT_OK(Put(static_cast<int>(i) /*cf*/, "key", "value", wopts));
}
std::vector<int> cf_ids;
for (size_t i = 0; i != num_cfs; ++i) {
cf_ids.emplace_back(static_cast<int>(i));
}
ASSERT_OK(Flush(cf_ids));
for (size_t i = 0; i != num_cfs; ++i) {
auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());
}
}
TEST_P(DBAtomicFlushTest, AtomicFlushTriggeredByMemTableFull) {
Options options = CurrentOptions();
options.create_if_missing = true;
options.atomic_flush = GetParam();
// 4KB so that we can easily trigger auto flush.
options.write_buffer_size = 4096;
SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::BackgroundCallFlush:FlushFinish:0",
"DBAtomicFlushTest::AtomicFlushTriggeredByMemTableFull:BeforeCheck"}});
SyncPoint::GetInstance()->EnableProcessing();
CreateAndReopenWithCF({"pikachu", "eevee"}, options);
size_t num_cfs = handles_.size();
ASSERT_EQ(3, num_cfs);
WriteOptions wopts;
wopts.disableWAL = true;
for (size_t i = 0; i != num_cfs; ++i) {
ASSERT_OK(Put(static_cast<int>(i) /*cf*/, "key", "value", wopts));
}
// Keep writing to one of them column families to trigger auto flush.
for (int i = 0; i != 4000; ++i) {
ASSERT_OK(Put(static_cast<int>(num_cfs) - 1 /*cf*/,
"key" + std::to_string(i), "value" + std::to_string(i),
wopts));
}
TEST_SYNC_POINT(
"DBAtomicFlushTest::AtomicFlushTriggeredByMemTableFull:BeforeCheck");
if (options.atomic_flush) {
for (size_t i = 0; i != num_cfs - 1; ++i) {
auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
ASSERT_TRUE(cfh->cfd()->mem()->IsEmpty());
}
} else {
for (size_t i = 0; i != num_cfs - 1; ++i) {
auto cfh = static_cast<ColumnFamilyHandleImpl*>(handles_[i]);
ASSERT_EQ(0, cfh->cfd()->imm()->NumNotFlushed());
ASSERT_FALSE(cfh->cfd()->mem()->IsEmpty());
}
}
SyncPoint::GetInstance()->DisableProcessing();
}
INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest,
testing::Bool());
INSTANTIATE_TEST_CASE_P(DBAtomicFlushTest, DBAtomicFlushTest, testing::Bool());
} // namespace rocksdb
int main(int argc, char** argv) {

@ -220,6 +220,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
preserve_deletes_(options.preserve_deletes),
closed_(false),
error_handler_(this, immutable_db_options_, &mutex_),
atomic_flush_(options.atomic_flush),
atomic_flush_commit_in_progress_(false) {
// !batch_per_trx_ implies seq_per_batch_ because it is only unset for
// WriteUnprepared, which should use seq_per_batch_.
@ -305,7 +306,30 @@ Status DBImpl::ResumeImpl() {
// We cannot guarantee consistency of the WAL. So force flush Memtables of
// all the column families
if (s.ok()) {
s = FlushAllCFs(FlushReason::kErrorRecovery);
FlushOptions flush_opts;
// We allow flush to stall write since we are trying to resume from error.
flush_opts.allow_write_stall = true;
if (atomic_flush_) {
autovector<ColumnFamilyData*> cfds;
SelectColumnFamiliesForAtomicFlush(&cfds);
mutex_.Unlock();
s = AtomicFlushMemTables(cfds, flush_opts, FlushReason::kErrorRecovery);
mutex_.Lock();
} else {
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
cfd->Ref();
mutex_.Unlock();
s = FlushMemTable(cfd, flush_opts, FlushReason::kErrorRecovery);
mutex_.Lock();
cfd->Unref();
if (!s.ok()) {
break;
}
}
}
if (!s.ok()) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"DB resume requested but failed due to Flush failure [%s]",
@ -377,6 +401,13 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
if (!shutting_down_.load(std::memory_order_acquire) &&
has_unpersisted_data_.load(std::memory_order_relaxed) &&
!mutable_db_options_.avoid_flush_during_shutdown) {
if (atomic_flush_) {
autovector<ColumnFamilyData*> cfds;
SelectColumnFamiliesForAtomicFlush(&cfds);
mutex_.Unlock();
AtomicFlushMemTables(cfds, FlushOptions(), FlushReason::kShutDown);
mutex_.Lock();
} else {
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (!cfd->IsDropped() && cfd->initialized() && !cfd->mem()->IsEmpty()) {
cfd->Ref();
@ -386,6 +417,7 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
cfd->Unref();
}
}
}
versions_->GetColumnFamilySet()->FreeDeadColumnFamilies();
}
@ -3099,6 +3131,15 @@ Status DBImpl::IngestExternalFile(
TEST_SYNC_POINT_CALLBACK("DBImpl::IngestExternalFile:NeedFlush",
&need_flush);
if (status.ok() && need_flush) {
if (atomic_flush_) {
mutex_.Unlock();
autovector<ColumnFamilyData*> cfds;
SelectColumnFamiliesForAtomicFlush(&cfds);
status = AtomicFlushMemTables(cfds, FlushOptions(),
FlushReason::kExternalFileIngestion,
true /* writes_stopped */);
mutex_.Lock();
} else {
mutex_.Unlock();
status = FlushMemTable(cfd, FlushOptions(),
FlushReason::kExternalFileIngestion,
@ -3106,6 +3147,7 @@ Status DBImpl::IngestExternalFile(
mutex_.Lock();
}
}
}
// Run the ingestion job
if (status.ok()) {

@ -228,6 +228,9 @@ class DBImpl : public DB {
using DB::Flush;
virtual Status Flush(const FlushOptions& options,
ColumnFamilyHandle* column_family) override;
virtual Status Flush(
const FlushOptions& options,
const std::vector<ColumnFamilyHandle*>& column_families) override;
virtual Status FlushWAL(bool sync) override;
bool TEST_WALBufferIsEmpty();
virtual Status SyncWAL() override;
@ -965,10 +968,17 @@ class DBImpl : public DB {
Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context);
void SelectColumnFamiliesForAtomicFlush(autovector<ColumnFamilyData*>* cfds);
// Force current memtable contents to be flushed.
Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options,
FlushReason flush_reason, bool writes_stopped = false);
Status AtomicFlushMemTables(
const autovector<ColumnFamilyData*>& column_family_datas,
const FlushOptions& options, FlushReason flush_reason,
bool writes_stopped = false);
// Wait until flushing this column family won't stall writes
Status WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,
bool* flush_needed);
@ -977,14 +987,22 @@ class DBImpl : public DB {
// If flush_memtable_id is non-null, wait until the memtable with the ID
// gets flush. Otherwise, wait until the column family don't have any
// memtable pending flush.
// resuming_from_bg_err indicates whether the caller is attempting to resume
// from background error.
Status WaitForFlushMemTable(ColumnFamilyData* cfd,
const uint64_t* flush_memtable_id = nullptr) {
return WaitForFlushMemTables({cfd}, {flush_memtable_id});
const uint64_t* flush_memtable_id = nullptr,
bool resuming_from_bg_err = false) {
return WaitForFlushMemTables({cfd}, {flush_memtable_id},
resuming_from_bg_err);
}
// Wait for memtables to be flushed for multiple column families.
Status WaitForFlushMemTables(
const autovector<ColumnFamilyData*>& cfds,
const autovector<const uint64_t*>& flush_memtable_ids);
const autovector<const uint64_t*>& flush_memtable_ids,
bool resuming_from_bg_err);
// REQUIRES: mutex locked and in write thread.
void AssignAtomicFlushSeq(const autovector<ColumnFamilyData*>& cfds);
// REQUIRES: mutex locked
Status SwitchWAL(WriteContext* write_context);
@ -1049,6 +1067,9 @@ class DBImpl : public DB {
// column families in this request, this flush is considered complete.
typedef std::vector<std::pair<ColumnFamilyData*, uint64_t>> FlushRequest;
void GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
FlushRequest* req);
void SchedulePendingFlush(const FlushRequest& req, FlushReason flush_reason);
void SchedulePendingCompaction(ColumnFamilyData* cfd);
@ -1109,8 +1130,6 @@ class DBImpl : public DB {
Status CloseHelper();
Status FlushAllCFs(FlushReason flush_reason);
void WaitForBackgroundWork();
// table_cache_ provides its own synchronization
@ -1584,6 +1603,9 @@ class DBImpl : public DB {
ErrorHandler error_handler_;
// True if DB enables atomic flush.
bool atomic_flush_;
// True if the DB is committing atomic flush.
// TODO (yanqin) the current impl assumes that the entire DB belongs to
// a single atomic flush group. In the future we need to add a new class

@ -211,6 +211,10 @@ Status DBImpl::FlushMemTableToOutputFile(
Status DBImpl::FlushMemTablesToOutputFiles(
const autovector<BGFlushArg>& bg_flush_args, bool* made_progress,
JobContext* job_context, LogBuffer* log_buffer) {
if (atomic_flush_) {
return AtomicFlushMemTablesToOutputFiles(bg_flush_args, made_progress,
job_context, log_buffer);
}
Status s;
for (auto& arg : bg_flush_args) {
ColumnFamilyData* cfd = arg.cfd_;
@ -318,11 +322,23 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
s = SyncClosedLogs(job_context);
}
// exec_status stores the execution status of flush_jobs as
// <bool /* executed */, Status /* status code */>
autovector<std::pair<bool, Status>> exec_status;
for (int i = 0; i != num_cfs; ++i) {
// Initially all jobs are not executed, with status OK.
std::pair<bool, Status> elem(false, Status::OK());
exec_status.emplace_back(elem);
}
if (s.ok()) {
// TODO (yanqin): parallelize jobs with threads.
for (int i = 0; i != num_cfs; ++i) {
s = jobs[i].Run(&logs_with_prep_tracker_, &file_meta[i]);
if (!s.ok()) {
exec_status[i].second =
jobs[i].Run(&logs_with_prep_tracker_, &file_meta[i]);
exec_status[i].first = true;
if (!exec_status[i].second.ok()) {
s = exec_status[i].second;
break;
}
}
@ -401,12 +417,19 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
}
if (!s.ok()) {
// Have to cancel the flush jobs that have NOT executed because we need to
// unref the versions.
for (int i = 0; i != num_cfs; ++i) {
auto& mems = jobs[i].GetMemTables();
cfds[i]->imm()->RollbackMemtableFlush(mems, file_meta[i].fd.GetNumber());
if (!exec_status[i].first) {
jobs[i].Cancel();
}
}
if (!s.IsShutdownInProgress()) {
for (int i = 0; i != num_cfs; ++i) {
auto& mems = jobs[i].GetMemTables();
cfds[i]->imm()->RollbackMemtableFlush(mems,
file_meta[i].fd.GetNumber());
}
Status new_bg_error = s;
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
}
@ -543,8 +566,15 @@ Status DBImpl::CompactRange(const CompactRangeOptions& options,
if (flush_needed) {
FlushOptions fo;
fo.allow_write_stall = options.allow_write_stall;
if (atomic_flush_) {
autovector<ColumnFamilyData*> cfds;
SelectColumnFamiliesForAtomicFlush(&cfds);
s = AtomicFlushMemTables(cfds, fo, FlushReason::kManualCompaction,
false /* writes_stopped */);
} else {
s = FlushMemTable(cfd, fo, FlushReason::kManualCompaction,
false /* writes_stopped*/);
}
if (!s.ok()) {
LogFlush(immutable_db_options_.info_log);
return s;
@ -1165,72 +1195,59 @@ Status DBImpl::Flush(const FlushOptions& flush_options,
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
ROCKS_LOG_INFO(immutable_db_options_.info_log, "[%s] Manual flush start.",
cfh->GetName().c_str());
Status s =
FlushMemTable(cfh->cfd(), flush_options, FlushReason::kManualFlush);
Status s;
if (atomic_flush_) {
s = AtomicFlushMemTables({cfh->cfd()}, flush_options,
FlushReason::kManualFlush);
} else {
s = FlushMemTable(cfh->cfd(), flush_options, FlushReason::kManualFlush);
}
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] Manual flush finished, status: %s\n",
cfh->GetName().c_str(), s.ToString().c_str());
return s;
}
Status DBImpl::FlushAllCFs(FlushReason flush_reason) {
Status DBImpl::Flush(const FlushOptions& flush_options,
const std::vector<ColumnFamilyHandle*>& column_families) {
Status s;
WriteContext context;
WriteThread::Writer w;
mutex_.AssertHeld();
write_thread_.EnterUnbatched(&w, &mutex_);
FlushRequest flush_req;
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->imm()->NumNotFlushed() == 0 && cfd->mem()->IsEmpty() &&
cached_recoverable_state_empty_.load()) {
// Nothing to flush
continue;
}
// SwitchMemtable() will release and reacquire mutex during execution
s = SwitchMemtable(cfd, &context);
if (!atomic_flush_) {
for (auto cfh : column_families) {
s = Flush(flush_options, cfh);
if (!s.ok()) {
break;
}
cfd->imm()->FlushRequested();
flush_req.emplace_back(cfd, cfd->imm()->GetLatestMemTableID());
}
// schedule flush
if (s.ok() && !flush_req.empty()) {
SchedulePendingFlush(flush_req, flush_reason);
MaybeScheduleFlushOrCompaction();
}
write_thread_.ExitUnbatched(&w);
if (s.ok()) {
for (auto& flush : flush_req) {
auto cfd = flush.first;
auto flush_memtable_id = flush.second;
while (cfd->imm()->NumNotFlushed() > 0 &&
cfd->imm()->GetEarliestMemTableID() <= flush_memtable_id) {
if (!error_handler_.GetRecoveryError().ok()) {
break;
}
if (cfd->IsDropped()) {
// FlushJob cannot flush a dropped CF, if we did not break here
// we will loop forever since cfd->imm()->NumNotFlushed() will never
// drop to zero
continue;
}
cfd->Ref();
bg_cv_.Wait();
cfd->Unref();
} else {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Manual atomic flush start.\n"
"=====Column families:=====");
for (auto cfh : column_families) {
auto cfhi = static_cast<ColumnFamilyHandleImpl*>(cfh);
ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s",
cfhi->GetName().c_str());
}
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"=====End of column families list=====");
autovector<ColumnFamilyData*> cfds;
std::for_each(column_families.begin(), column_families.end(),
[&cfds](ColumnFamilyHandle* elem) {
auto cfh = static_cast<ColumnFamilyHandleImpl*>(elem);
cfds.emplace_back(cfh->cfd());
});
s = AtomicFlushMemTables(cfds, flush_options, FlushReason::kManualFlush);
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Manual atomic flush finished, status: %s\n",
"=====Column families:=====", s.ToString().c_str());
for (auto cfh : column_families) {
auto cfhi = static_cast<ColumnFamilyHandleImpl*>(cfh);
ROCKS_LOG_INFO(immutable_db_options_.info_log, "%s",
cfhi->GetName().c_str());
}
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"=====End of column families list=====");
}
flush_req.clear();
return s;
}
@ -1364,6 +1381,19 @@ Status DBImpl::RunManualCompaction(ColumnFamilyData* cfd, int input_level,
return manual.status;
}
void DBImpl::GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
FlushRequest* req) {
assert(req != nullptr);
for (const auto cfd : cfds) {
if (nullptr == cfd) {
// cfd may be null, see DBImpl::ScheduleFlushes
continue;
}
uint64_t max_memtable_id = cfd->imm()->GetLatestMemTableID();
req->emplace_back(cfd, max_memtable_id);
}
}
Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
const FlushOptions& flush_options,
FlushReason flush_reason, bool writes_stopped) {
@ -1415,12 +1445,89 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
cfds.push_back(iter.first);
flush_memtable_ids.push_back(&(iter.second));
}
s = WaitForFlushMemTables(cfds, flush_memtable_ids);
s = WaitForFlushMemTables(cfds, flush_memtable_ids,
(flush_reason == FlushReason::kErrorRecovery));
}
TEST_SYNC_POINT("FlushMemTableFinished");
return s;
}
// Flush all elments in 'column_family_datas'
// and atomically record the result to the MANIFEST.
Status DBImpl::AtomicFlushMemTables(
const autovector<ColumnFamilyData*>& column_family_datas,
const FlushOptions& flush_options, FlushReason flush_reason,
bool writes_stopped) {
Status s;
if (!flush_options.allow_write_stall) {
int num_cfs_to_flush = 0;
for (auto cfd : column_family_datas) {
bool flush_needed = true;
s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed);
if (!s.ok()) {
return s;
} else if (flush_needed) {
++num_cfs_to_flush;
}
}
if (0 == num_cfs_to_flush) {
return s;
}
}
FlushRequest flush_req;
autovector<ColumnFamilyData*> cfds;
{
WriteContext context;
InstrumentedMutexLock guard_lock(&mutex_);
WriteThread::Writer w;
if (!writes_stopped) {
write_thread_.EnterUnbatched(&w, &mutex_);
}
for (auto cfd : column_family_datas) {
if (cfd->IsDropped()) {
continue;
}
if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
!cached_recoverable_state_empty_.load()) {
cfds.emplace_back(cfd);
}
}
for (auto cfd : cfds) {
cfd->Ref();
s = SwitchMemtable(cfd, &context);
cfd->Unref();
if (!s.ok()) {
break;
}
}
if (s.ok()) {
AssignAtomicFlushSeq(cfds);
for (auto cfd : cfds) {
cfd->imm()->FlushRequested();
}
GenerateFlushRequest(cfds, &flush_req);
SchedulePendingFlush(flush_req, flush_reason);
MaybeScheduleFlushOrCompaction();
}
if (!writes_stopped) {
write_thread_.ExitUnbatched(&w);
}
}
if (s.ok() && flush_options.wait) {
autovector<const uint64_t*> flush_memtable_ids;
for (auto& iter : flush_req) {
flush_memtable_ids.push_back(&(iter.second));
}
s = WaitForFlushMemTables(cfds, flush_memtable_ids,
(flush_reason == FlushReason::kErrorRecovery));
}
return s;
}
// Calling FlushMemTable(), whether from DB::Flush() or from Backup Engine, can
// cause write stall, for example if one memtable is being flushed already.
// This method tries to avoid write stall (similar to CompactRange() behavior)
@ -1492,16 +1599,25 @@ Status DBImpl::WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,
// 2) if flush_memtable_ids[i] is null, then all memtables in THIS column
// family have to be flushed.
// Finish waiting when ALL column families finish flushing memtables.
// resuming_from_bg_err indicates whether the caller is trying to resume from
// background error or in normal processing.
Status DBImpl::WaitForFlushMemTables(
const autovector<ColumnFamilyData*>& cfds,
const autovector<const uint64_t*>& flush_memtable_ids) {
const autovector<const uint64_t*>& flush_memtable_ids,
bool resuming_from_bg_err) {
int num = static_cast<int>(cfds.size());
// Wait until the compaction completes
InstrumentedMutexLock l(&mutex_);
while (!error_handler_.IsDBStopped()) {
// If the caller is trying to resume from bg error, then
// error_handler_.IsDBStopped() is true.
while (resuming_from_bg_err || !error_handler_.IsDBStopped()) {
if (shutting_down_.load(std::memory_order_acquire)) {
return Status::ShutdownInProgress();
}
// If an error has occurred during resumption, then no need to wait.
if (!error_handler_.GetRecoveryError().ok()) {
break;
}
// Number of column families that have been dropped.
int num_dropped = 0;
// Number of column families that have finished flush.
@ -1527,7 +1643,9 @@ Status DBImpl::WaitForFlushMemTables(
bg_cv_.Wait();
}
Status s;
if (error_handler_.IsDBStopped()) {
// If not resuming from bg error, and an error has caused the DB to stop,
// then report the bg error to caller.
if (!resuming_from_bg_err && error_handler_.IsDBStopped()) {
s = error_handler_.GetBGError();
}
return s;
@ -1867,6 +1985,7 @@ void DBImpl::BackgroundCallFlush() {
mutex_.Lock();
}
TEST_SYNC_POINT("DBImpl::BackgroundCallFlush:FlushFinish:0");
ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
// If flush failed, we want to delete all temporary files that we might have

@ -123,7 +123,7 @@ Status DBImpl::TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
cfd = cfh->cfd();
}
return WaitForFlushMemTable(cfd);
return WaitForFlushMemTable(cfd, nullptr, false);
}
Status DBImpl::TEST_WaitForCompact(bool wait_unscheduled) {

@ -1014,6 +1014,28 @@ Status DBImpl::WriteRecoverableState() {
return Status::OK();
}
void DBImpl::SelectColumnFamiliesForAtomicFlush(
autovector<ColumnFamilyData*>* cfds) {
for (ColumnFamilyData* cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
!cached_recoverable_state_empty_.load()) {
cfds->push_back(cfd);
}
}
}
// Assign sequence number for atomic flush.
void DBImpl::AssignAtomicFlushSeq(const autovector<ColumnFamilyData*>& cfds) {
assert(atomic_flush_);
auto seq = versions_->LastSequence();
for (auto cfd : cfds) {
cfd->imm()->AssignAtomicFlushSeq(seq);
}
}
Status DBImpl::SwitchWAL(WriteContext* write_context) {
mutex_.AssertHeld();
assert(write_context != nullptr);
@ -1062,21 +1084,36 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) {
oldest_alive_log, total_log_size_.load(), GetMaxTotalWalSize());
// no need to refcount because drop is happening in write thread, so can't
// happen while we're in the write thread
FlushRequest flush_req;
autovector<ColumnFamilyData*> cfds;
if (atomic_flush_) {
SelectColumnFamiliesForAtomicFlush(&cfds);
} else {
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
if (cfd->OldestLogToKeep() <= oldest_alive_log) {
cfds.push_back(cfd);
}
}
}
for (const auto cfd : cfds) {
cfd->Ref();
status = SwitchMemtable(cfd, write_context);
cfd->Unref();
if (!status.ok()) {
break;
}
flush_req.emplace_back(cfd, cfd->imm()->GetLatestMemTableID());
cfd->imm()->FlushRequested();
}
}
if (status.ok()) {
if (atomic_flush_) {
AssignAtomicFlushSeq(cfds);
}
for (auto cfd : cfds) {
cfd->imm()->FlushRequested();
}
FlushRequest flush_req;
GenerateFlushRequest(cfds, &flush_req);
SchedulePendingFlush(flush_req, FlushReason::kWriteBufferManager);
MaybeScheduleFlushOrCompaction();
}
@ -1101,6 +1138,10 @@ Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) {
write_buffer_manager_->buffer_size());
// no need to refcount because drop is happening in write thread, so can't
// happen while we're in the write thread
autovector<ColumnFamilyData*> cfds;
if (atomic_flush_) {
SelectColumnFamiliesForAtomicFlush(&cfds);
} else {
ColumnFamilyData* cfd_picked = nullptr;
SequenceNumber seq_num_for_cf_picked = kMaxSequenceNumber;
@ -1118,12 +1159,11 @@ Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) {
}
}
}
autovector<ColumnFamilyData*> cfds;
if (cfd_picked != nullptr) {
cfds.push_back(cfd_picked);
}
FlushRequest flush_req;
}
for (const auto cfd : cfds) {
cfd->Ref();
status = SwitchMemtable(cfd, write_context);
@ -1131,11 +1171,16 @@ Status DBImpl::HandleWriteBufferFull(WriteContext* write_context) {
if (!status.ok()) {
break;
}
uint64_t flush_memtable_id = cfd->imm()->GetLatestMemTableID();
cfd->imm()->FlushRequested();
flush_req.emplace_back(cfd, flush_memtable_id);
}
if (status.ok()) {
if (atomic_flush_) {
AssignAtomicFlushSeq(cfds);
}
for (const auto cfd : cfds) {
cfd->imm()->FlushRequested();
}
FlushRequest flush_req;
GenerateFlushRequest(cfds, &flush_req);
SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull);
MaybeScheduleFlushOrCompaction();
}
@ -1258,25 +1303,36 @@ Status DBImpl::ThrottleLowPriWritesIfNeeded(const WriteOptions& write_options,
}
Status DBImpl::ScheduleFlushes(WriteContext* context) {
ColumnFamilyData* cfd;
FlushRequest flush_req;
autovector<ColumnFamilyData*> cfds;
if (atomic_flush_) {
SelectColumnFamiliesForAtomicFlush(&cfds);
for (auto cfd : cfds) {
cfd->Ref();
}
flush_scheduler_.Clear();
} else {
ColumnFamilyData* tmp_cfd;
while ((tmp_cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
cfds.push_back(tmp_cfd);
}
}
Status status;
while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
for (auto& cfd : cfds) {
status = SwitchMemtable(cfd, context);
bool should_schedule = true;
if (cfd->Unref()) {
delete cfd;
should_schedule = false;
cfd = nullptr;
}
if (!status.ok()) {
break;
}
if (should_schedule) {
uint64_t flush_memtable_id = cfd->imm()->GetLatestMemTableID();
flush_req.emplace_back(cfd, flush_memtable_id);
}
}
if (status.ok()) {
if (atomic_flush_) {
AssignAtomicFlushSeq(cfds);
}
FlushRequest flush_req;
GenerateFlushRequest(cfds, &flush_req);
SchedulePendingFlush(flush_req, FlushReason::kWriteBufferFull);
MaybeScheduleFlushOrCompaction();
}

@ -2659,6 +2659,11 @@ class ModelDB : public DB {
Status ret;
return ret;
}
virtual Status Flush(
const rocksdb::FlushOptions& /*options*/,
const std::vector<ColumnFamilyHandle*>& /*column_families*/) override {
return Status::OK();
}
virtual Status SyncWAL() override { return Status::OK(); }

@ -646,6 +646,13 @@ Status DBTestBase::Flush(int cf) {
}
}
Status DBTestBase::Flush(const std::vector<int>& cf_ids) {
std::vector<ColumnFamilyHandle*> cfhs;
std::for_each(cf_ids.begin(), cf_ids.end(),
[&cfhs, this](int id) { cfhs.emplace_back(handles_[id]); });
return db_->Flush(FlushOptions(), cfhs);
}
Status DBTestBase::Put(const Slice& k, const Slice& v, WriteOptions wo) {
if (kMergePut == option_config_) {
return db_->Merge(wo, k, v);

@ -799,6 +799,8 @@ class DBTestBase : public testing::Test {
Status Flush(int cf = 0);
Status Flush(const std::vector<int>& cf_ids);
Status Put(const Slice& k, const Slice& v, WriteOptions wo = WriteOptions());
Status Put(int cf, const Slice& k, const Slice& v,

@ -283,6 +283,21 @@ class MemTableList {
return memlist.front()->GetID();
}
void AssignAtomicFlushSeq(const SequenceNumber& seq) {
const auto& memlist = current_->memlist_;
// Scan the memtable list from new to old
for (auto it = memlist.begin(); it != memlist.end(); ++it) {
MemTable* mem = *it;
if (mem->atomic_flush_seqno_ == kMaxSequenceNumber) {
mem->atomic_flush_seqno_ = seq;
} else {
// Earlier memtables must have been assigned a atomic flush seq, no
// need to continue scan.
break;
}
}
}
private:
// DB mutex held
void InstallNewVersion();

@ -900,11 +900,22 @@ class DB {
virtual DBOptions GetDBOptions() const = 0;
// Flush all mem-table data.
// Flush a single column family, even when atomic flush is enabled. To flush
// multiple column families, use Flush(options, column_families).
virtual Status Flush(const FlushOptions& options,
ColumnFamilyHandle* column_family) = 0;
virtual Status Flush(const FlushOptions& options) {
return Flush(options, DefaultColumnFamily());
}
// Flushes multiple column families.
// If atomic flush is not enabled, Flush(options, column_families) is
// equivalent to calling Flush(options, column_family) multiple times.
// If atomic flush is enabled, Flush(options, column_families) will flush all
// column families specified in 'column_families' up to the latest sequence
// number at the time when flush is requested.
virtual Status Flush(
const FlushOptions& options,
const std::vector<ColumnFamilyHandle*>& column_families) = 0;
// Flush the WAL memory buffer to the file. If sync is true, it calls SyncWAL
// afterwards.

@ -943,6 +943,20 @@ struct DBOptions {
// relies on manual invocation of FlushWAL to write the WAL buffer to its
// file.
bool manual_wal_flush = false;
// If true, RocksDB supports flushing multiple column families and committing
// their results atomically to MANIFEST. Note that it is not
// necessary to set atomic_flush to true if WAL is always enabled since WAL
// allows the database to be restored to the last persistent state in WAL.
// This option is useful when there are column families with writes NOT
// protected by WAL.
// For manual flush, application has to specify which column families to
// flush atomically in DB::Flush.
// For auto-triggered flush, RocksDB atomically flushes ALL column families.
//
// Currently, any WAL-enabled writes after atomic flush may be replayed
// independently if the process crashes later and tries to recover.
bool atomic_flush = false;
};
// Options to control the behavior of a database (passed to DB::Open)

@ -278,6 +278,11 @@ class StackableDB : public DB {
ColumnFamilyHandle* column_family) override {
return db_->Flush(fopts, column_family);
}
virtual Status Flush(
const FlushOptions& fopts,
const std::vector<ColumnFamilyHandle*>& column_families) override {
return db_->Flush(fopts, column_families);
}
virtual Status SyncWAL() override {
return db_->SyncWAL();

@ -78,6 +78,7 @@ struct ImmutableDBOptions {
bool preserve_deletes;
bool two_write_queues;
bool manual_wal_flush;
bool atomic_flush;
};
struct MutableDBOptions {

@ -1554,7 +1554,11 @@ std::unordered_map<std::string, OptionTypeInfo>
offsetof(struct ImmutableDBOptions, manual_wal_flush)}},
{"seq_per_batch",
{0, OptionType::kBoolean, OptionVerificationType::kDeprecated, false,
0}}};
0}},
{"atomic_flush",
{offsetof(struct DBOptions, atomic_flush), OptionType::kBoolean,
OptionVerificationType::kNormal, false,
offsetof(struct ImmutableDBOptions, atomic_flush)}}};
std::unordered_map<std::string, BlockBasedTableOptions::IndexType>
OptionsHelper::block_base_table_index_type_string_map = {

@ -291,7 +291,8 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) {
"concurrent_prepare=false;"
"two_write_queues=false;"
"manual_wal_flush=false;"
"seq_per_batch=false;",
"seq_per_batch=false;"
"atomic_flush=false",
new_options));
ASSERT_EQ(unset_bytes_base, NumUnsetBytes(new_options_ptr, sizeof(DBOptions),

@ -133,6 +133,8 @@ DEFINE_bool(test_batches_snapshots, false,
"\t(b) No long validation at the end (more speed up)\n"
"\t(c) Test snapshot and atomicity of batch writes");
DEFINE_bool(atomic_flush, false, "If true, the test enables atomic flush\n");
DEFINE_int32(threads, 32, "Number of concurrent threads to run.");
DEFINE_int32(ttl, -1,
@ -2218,6 +2220,8 @@ class StressTest {
fprintf(stdout, "Format version : %d\n", FLAGS_format_version);
fprintf(stdout, "TransactionDB : %s\n",
FLAGS_use_txn ? "true" : "false");
fprintf(stdout, "Atomic flush : %s\n",
FLAGS_atomic_flush ? "true" : "false");
fprintf(stdout, "Column families : %d\n", FLAGS_column_families);
if (!FLAGS_test_batches_snapshots) {
fprintf(stdout, "Clear CFs one in : %d\n",
@ -2363,6 +2367,7 @@ class StressTest {
FLAGS_universal_max_merge_width;
options_.compaction_options_universal.max_size_amplification_percent =
FLAGS_universal_max_size_amplification_percent;
options_.atomic_flush = FLAGS_atomic_flush;
} else {
#ifdef ROCKSDB_LITE
fprintf(stderr, "--options_file not supported in lite mode\n");
@ -3327,6 +3332,252 @@ class BatchedOpsStressTest : public StressTest {
virtual void VerifyDb(ThreadState* /* thread */) const {}
};
class AtomicFlushStressTest : public StressTest {
public:
AtomicFlushStressTest() : batch_id_(0) {}
virtual ~AtomicFlushStressTest() {}
virtual Status TestPut(ThreadState* thread, WriteOptions& write_opts,
const ReadOptions& /* read_opts */,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys,
char (&value)[100],
std::unique_ptr<MutexLock>& /* lock */) {
std::string key_str = Key(rand_keys[0]);
Slice key = key_str;
uint64_t value_base = batch_id_.fetch_add(1);
size_t sz =
GenerateValue(static_cast<uint32_t>(value_base), value, sizeof(value));
Slice v(value, sz);
WriteBatch batch;
for (auto cf : rand_column_families) {
ColumnFamilyHandle* cfh = column_families_[cf];
if (FLAGS_use_merge) {
batch.Merge(cfh, key, v);
} else { /* !FLAGS_use_merge */
batch.Put(cfh, key, v);
}
}
Status s = db_->Write(write_opts, &batch);
if (!s.ok()) {
fprintf(stderr, "multi put or merge error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);
} else {
size_t num = rand_column_families.size();
thread->stats.AddBytesForWrites(num, (sz + 1) * num);
}
return s;
}
virtual Status TestDelete(ThreadState* thread, WriteOptions& write_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys,
std::unique_ptr<MutexLock>& /* lock */) {
std::string key_str = Key(rand_keys[0]);
Slice key = key_str;
WriteBatch batch;
for (auto cf : rand_column_families) {
ColumnFamilyHandle* cfh = column_families_[cf];
batch.Delete(cfh, key);
}
Status s = db_->Write(write_opts, &batch);
if (!s.ok()) {
fprintf(stderr, "multidel error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);
} else {
thread->stats.AddDeletes(rand_column_families.size());
}
return s;
}
virtual Status TestDeleteRange(ThreadState* thread, WriteOptions& write_opts,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys,
std::unique_ptr<MutexLock>& /* lock */) {
int64_t rand_key = rand_keys[0];
auto shared = thread->shared;
int64_t max_key = shared->GetMaxKey();
if (rand_key > max_key - FLAGS_range_deletion_width) {
rand_key =
thread->rand.Next() % (max_key - FLAGS_range_deletion_width + 1);
}
std::string key_str = Key(rand_key);
Slice key = key_str;
std::string end_key_str = Key(rand_key + FLAGS_range_deletion_width);
Slice end_key = end_key_str;
WriteBatch batch;
for (auto cf : rand_column_families) {
ColumnFamilyHandle* cfh = column_families_[rand_column_families[cf]];
batch.DeleteRange(cfh, key, end_key);
}
Status s = db_->Write(write_opts, &batch);
if (!s.ok()) {
fprintf(stderr, "multi del range error: %s\n", s.ToString().c_str());
thread->stats.AddErrors(1);
} else {
thread->stats.AddRangeDeletions(rand_column_families.size());
}
return s;
}
virtual void TestIngestExternalFile(
ThreadState* /* thread */,
const std::vector<int>& /* rand_column_families */,
const std::vector<int64_t>& /* rand_keys */,
std::unique_ptr<MutexLock>& /* lock */) {
assert(false);
fprintf(stderr,
"AtomicFlushStressTest does not support TestIngestExternalFile "
"because it's not possible to verify the result\n");
std::terminate();
}
virtual Status TestGet(ThreadState* thread, const ReadOptions& readoptions,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) {
std::string key_str = Key(rand_keys[0]);
Slice key = key_str;
auto cfh =
column_families_[rand_column_families[thread->rand.Next() %
rand_column_families.size()]];
std::string from_db;
Status s = db_->Get(readoptions, cfh, key, &from_db);
if (s.ok()) {
thread->stats.AddGets(1, 1);
} else if (s.IsNotFound()) {
thread->stats.AddGets(1, 0);
} else {
thread->stats.AddErrors(1);
}
return s;
}
virtual Status TestPrefixScan(ThreadState* thread,
const ReadOptions& readoptions,
const std::vector<int>& rand_column_families,
const std::vector<int64_t>& rand_keys) {
std::string key_str = Key(rand_keys[0]);
Slice key = key_str;
Slice prefix = Slice(key.data(), FLAGS_prefix_size);
std::string upper_bound;
Slice ub_slice;
ReadOptions ro_copy = readoptions;
if (thread->rand.OneIn(2) && GetNextPrefix(prefix, &upper_bound)) {
ub_slice = Slice(upper_bound);
ro_copy.iterate_upper_bound = &ub_slice;
}
auto cfh =
column_families_[rand_column_families[thread->rand.Next() %
rand_column_families.size()]];
Iterator* iter = db_->NewIterator(ro_copy, cfh);
int64_t count = 0;
for (iter->Seek(prefix); iter->Valid() && iter->key().starts_with(prefix);
iter->Next()) {
++count;
}
assert(count <= (static_cast<int64_t>(1) << ((8 - FLAGS_prefix_size) * 8)));
Status s = iter->status();
if (s.ok()) {
thread->stats.AddPrefixes(1, static_cast<int>(count));
} else {
thread->stats.AddErrors(1);
}
delete iter;
return s;
}
virtual void VerifyDb(ThreadState* thread) const {
ReadOptions options(FLAGS_verify_checksum, true);
// We must set total_order_seek to true because we are doing a SeekToFirst
// on a column family whose memtables may support (by default) prefix-based
// iterator. In this case, NewIterator with options.total_order_seek being
// false returns a prefix-based iterator. Calling SeekToFirst using this
// iterator causes the iterator to become invalid. That means we cannot
// iterate the memtable using this iterator any more, although the memtable
// contains the most up-to-date key-values.
options.total_order_seek = true;
assert(thread != nullptr);
auto shared = thread->shared;
std::vector<unique_ptr<Iterator> > iters(column_families_.size());
for (size_t i = 0; i != column_families_.size(); ++i) {
iters[i].reset(db_->NewIterator(options, column_families_[i]));
}
for (auto& iter : iters) {
iter->SeekToFirst();
}
size_t num = column_families_.size();
assert(num == iters.size());
do {
size_t valid_cnt = 0;
for (auto& iter : iters) {
if (iter->Valid()) {
++valid_cnt;
}
}
if (valid_cnt == 0) {
break;
} else if (valid_cnt != iters.size()) {
fprintf(stderr, "Finished iterating the following column families:\n");
for (size_t i = 0; i != num; ++i) {
if (!iters[i]->Valid()) {
fprintf(stderr, "%s ", column_families_[i]->GetName().c_str());
}
}
fprintf(stderr,
"\nThe following column families have data that have not been "
"scanned:\n");
for (size_t i = 0; i != num; ++i) {
if (iters[i]->Valid()) {
fprintf(stderr, "%s ", column_families_[i]->GetName().c_str());
}
}
fprintf(stderr, "\n");
}
// If the program reaches here, then all column families' iterators are
// still valid.
Slice key;
Slice value;
for (size_t i = 0; i != num; ++i) {
if (i == 0) {
key = iters[i]->key();
value = iters[i]->value();
} else {
if (key.compare(iters[i]->key()) != 0) {
fprintf(stderr, "Verification failed\n");
fprintf(stderr, "cf%s: %s => %s\n",
column_families_[0]->GetName().c_str(),
key.ToString(true /* hex */).c_str(),
value.ToString(/* hex */).c_str());
fprintf(stderr, "cf%s: %s => %s\n",
column_families_[i]->GetName().c_str(),
iters[i]->key().ToString(true /* hex */).c_str(),
iters[i]->value().ToString(true /* hex */).c_str());
shared->SetVerificationFailure();
}
}
}
for (auto& iter : iters) {
iter->Next();
}
} while (true);
}
virtual std::vector<int> GenerateColumnFamilies(
const int /* num_column_families */, int /* rand_column_family */) const {
std::vector<int> ret;
int num = static_cast<int>(column_families_.size());
int k = 0;
std::generate_n(back_inserter(ret), num, [&k]() -> int { return k++; });
return ret;
}
private:
std::atomic<int64_t> batch_id_;
};
} // namespace rocksdb
int main(int argc, char** argv) {
@ -3428,7 +3679,9 @@ int main(int argc, char** argv) {
rocksdb_kill_prefix_blacklist = SplitString(FLAGS_kill_prefix_blacklist);
std::unique_ptr<rocksdb::StressTest> stress;
if (FLAGS_test_batches_snapshots) {
if (FLAGS_atomic_flush) {
stress.reset(new rocksdb::AtomicFlushStressTest());
} else if (FLAGS_test_batches_snapshots) {
stress.reset(new rocksdb::BatchedOpsStressTest());
} else {
stress.reset(new rocksdb::NonBatchedOpsStressTest());

Loading…
Cancel
Save