Fix bug of prematurely excluded CF in atomic flush contains unflushed data that should've been included in the atomic flush (#11148)

Summary:
**Context:**
Atomic flush should guarantee recoverability of all data of seqno up to the max seqno of the flush. It achieves this by ensuring all such data are flushed by the time this atomic flush finishes through `SelectColumnFamiliesForAtomicFlush()`. However, our crash test exposed the following case where an excluded CF from an atomic flush contains unflushed data of seqno less than the max seqno of that atomic flush and loses its data with `WriteOptions::DisableWAL=true` in face of a crash right after the atomic flush finishes .
```
./db_stress --preserve_unverified_changes=1 --reopen=0 --acquire_snapshot_one_in=0 --adaptive_readahead=1 --allow_data_in_errors=True --async_io=1 --atomic_flush=1 --avoid_flush_during_recovery=0 --avoid_unnecessary_blocking_io=0 --backup_max_size=104857600 --backup_one_in=0 --batch_protection_bytes_per_key=0 --block_size=16384 --bloom_bits=15 --bottommost_compression_type=none --bytes_per_sync=262144 --cache_index_and_filter_blocks=0 --cache_size=8388608 --cache_type=lru_cache --charge_compression_dictionary_building_buffer=0 --charge_file_metadata=1 --charge_filter_construction=0 --charge_table_reader=0 --checkpoint_one_in=0 --checksum_type=kXXH3 --clear_column_family_one_in=0 --compact_files_one_in=0 --compact_range_one_in=0 --compaction_pri=1 --compaction_ttl=100 --compression_max_dict_buffer_bytes=134217727 --compression_max_dict_bytes=16384 --compression_parallel_threads=1 --compression_type=lz4hc --compression_use_zstd_dict_trainer=0 --compression_zstd_max_train_bytes=0 --continuous_verification_interval=0 --data_block_index_type=0 --db=$db --db_write_buffer_size=1048576 --delpercent=4 --delrangepercent=1 --destroy_db_initially=0 --detect_filter_construct_corruption=0 --disable_wal=1 --enable_compaction_filter=0 --enable_pipelined_write=0 --expected_values_dir=$exp --fail_if_options_file_error=0 --fifo_allow_compaction=0 --file_checksum_impl=none --flush_one_in=0 --format_version=5 --get_current_wal_file_one_in=0 --get_live_files_one_in=100 --get_property_one_in=0 --get_sorted_wal_files_one_in=0 --index_block_restart_interval=2 --index_type=0 --ingest_external_file_one_in=0 --initial_auto_readahead_size=524288 --iterpercent=10 --key_len_percent_dist=1,30,69 --level_compaction_dynamic_level_bytes=True --long_running_snapshots=1 --manual_wal_flush_one_in=100 --mark_for_compaction_one_file_in=0 --max_auto_readahead_size=0 --max_background_compactions=20 --max_bytes_for_level_base=10485760 --max_key=10000 --max_key_len=3 --max_manifest_file_size=1073741824 --max_write_batch_group_size_bytes=64 --max_write_buffer_number=3 --max_write_buffer_size_to_maintain=0 --memtable_prefix_bloom_size_ratio=0.01 --memtable_protection_bytes_per_key=4 --memtable_whole_key_filtering=0 --memtablerep=skip_list --min_write_buffer_number_to_merge=2 --mmap_read=1 --mock_direct_io=False --nooverwritepercent=1 --num_file_reads_for_auto_readahead=0 --open_files=-1 --open_metadata_write_fault_one_in=0 --open_read_fault_one_in=0 --open_write_fault_one_in=0 --ops_per_thread=100000000 --optimize_filters_for_memory=1 --paranoid_file_checks=1 --partition_filters=0 --partition_pinning=3 --pause_background_one_in=0 --periodic_compaction_seconds=100 --prefix_size=8 --prefixpercent=5 --prepopulate_block_cache=0 --preserve_internal_time_seconds=3600 --progress_reports=0 --read_fault_one_in=32 --readahead_size=16384 --readpercent=50 --recycle_log_file_num=0 --ribbon_starting_level=6 --secondary_cache_fault_one_in=0 --set_options_one_in=10000 --snapshot_hold_ops=100000 --sst_file_manager_bytes_per_sec=104857600 --sst_file_manager_bytes_per_truncate=1048576 --stats_dump_period_sec=10 --subcompactions=1 --sync=0 --sync_fault_injection=0 --target_file_size_base=524288 --target_file_size_multiplier=2 --test_batches_snapshots=0 --top_level_index_pinning=0 --unpartitioned_pinning=1 --use_direct_io_for_flush_and_compaction=0 --use_direct_reads=0 --use_full_merge_v1=0 --use_merge=0 --use_multiget=1 --use_put_entity_one_in=0 --user_timestamp_size=0 --value_size_mult=32 --verify_checksum=1 --verify_checksum_one_in=0 --verify_db_one_in=1000 --verify_sst_unique_id_in_manifest=1 --wal_bytes_per_sync=524288 --wal_compression=none --write_buffer_size=524288 --write_dbid_to_manifest=1 --write_fault_one_in=0 --writepercent=30 &
    pid=$!
    sleep 0.2
    sleep 10
    kill $pid
    sleep 0.2
./db_stress --ops_per_thread=1 --preserve_unverified_changes=1 --reopen=0 --acquire_snapshot_one_in=0 --adaptive_readahead=1 --allow_data_in_errors=True --async_io=1 --atomic_flush=1 --avoid_flush_during_recovery=0 --avoid_unnecessary_blocking_io=0 --backup_max_size=104857600 --backup_one_in=0 --batch_protection_bytes_per_key=0 --block_size=16384 --bloom_bits=15 --bottommost_compression_type=none --bytes_per_sync=262144 --cache_index_and_filter_blocks=0 --cache_size=8388608 --cache_type=lru_cache --charge_compression_dictionary_building_buffer=0 --charge_file_metadata=1 --charge_filter_construction=0 --charge_table_reader=0 --checkpoint_one_in=0 --checksum_type=kXXH3 --clear_column_family_one_in=0 --compact_files_one_in=0 --compact_range_one_in=0 --compaction_pri=1 --compaction_ttl=100 --compression_max_dict_buffer_bytes=134217727 --compression_max_dict_bytes=16384 --compression_parallel_threads=1 --compression_type=lz4hc --compression_use_zstd_dict_trainer=0 --compression_zstd_max_train_bytes=0 --continuous_verification_interval=0 --data_block_index_type=0 --db=$db --db_write_buffer_size=1048576 --delpercent=4 --delrangepercent=1 --destroy_db_initially=0 --detect_filter_construct_corruption=0 --disable_wal=1 --enable_compaction_filter=0 --enable_pipelined_write=0 --expected_values_dir=$exp --fail_if_options_file_error=0 --fifo_allow_compaction=0 --file_checksum_impl=none --flush_one_in=0 --format_version=5 --get_current_wal_file_one_in=0 --get_live_files_one_in=100 --get_property_one_in=0 --get_sorted_wal_files_one_in=0 --index_block_restart_interval=2 --index_type=0 --ingest_external_file_one_in=0 --initial_auto_readahead_size=524288 --iterpercent=10 --key_len_percent_dist=1,30,69 --level_compaction_dynamic_level_bytes=True --long_running_snapshots=1 --manual_wal_flush_one_in=100 --mark_for_compaction_one_file_in=0 --max_auto_readahead_size=0 --max_background_compactions=20 --max_bytes_for_level_base=10485760 --max_key=10000 --max_key_len=3 --max_manifest_file_size=1073741824 --max_write_batch_group_size_bytes=64 --max_write_buffer_number=3 --max_write_buffer_size_to_maintain=0 --memtable_prefix_bloom_size_ratio=0.01 --memtable_protection_bytes_per_key=4 --memtable_whole_key_filtering=0 --memtablerep=skip_list --min_write_buffer_number_to_merge=2 --mmap_read=1 --mock_direct_io=False --nooverwritepercent=1 --num_file_reads_for_auto_readahead=0 --open_files=-1 --open_metadata_write_fault_one_in=0 --open_read_fault_one_in=0 --open_write_fault_one_in=0 --ops_per_thread=100000000 --optimize_filters_for_memory=1 --paranoid_file_checks=1 --partition_filters=0 --partition_pinning=3 --pause_background_one_in=0 --periodic_compaction_seconds=100 --prefix_size=8 --prefixpercent=5 --prepopulate_block_cache=0 --preserve_internal_time_seconds=3600 --progress_reports=0 --read_fault_one_in=32 --readahead_size=16384 --readpercent=50 --recycle_log_file_num=0 --ribbon_starting_level=6 --secondary_cache_fault_one_in=0 --set_options_one_in=10000 --snapshot_hold_ops=100000 --sst_file_manager_bytes_per_sec=104857600 --sst_file_manager_bytes_per_truncate=1048576 --stats_dump_period_sec=10 --subcompactions=1 --sync=0 --sync_fault_injection=0 --target_file_size_base=524288 --target_file_size_multiplier=2 --test_batches_snapshots=0 --top_level_index_pinning=0 --unpartitioned_pinning=1 --use_direct_io_for_flush_and_compaction=0 --use_direct_reads=0 --use_full_merge_v1=0 --use_merge=0 --use_multiget=1 --use_put_entity_one_in=0 --user_timestamp_size=0 --value_size_mult=32 --verify_checksum=1 --verify_checksum_one_in=0 --verify_db_one_in=1000 --verify_sst_unique_id_in_manifest=1 --wal_bytes_per_sync=524288 --wal_compression=none --write_buffer_size=524288 --write_dbid_to_manifest=1 --write_fault_one_in=0 --writepercent=30 &
    pid=$!
    sleep 0.2
    sleep 40
    kill $pid
    sleep 0.2

Verification failed for column family 6 key 0000000000000239000000000000012B0000000000000138 (56622): value_from_db: , value_from_expected: 4A6331754E4F4C4D42434041464744455A5B58595E5F5C5D5253505156575455, msg: Value not found: NotFound:
Crash-recovery verification failed :(
No writes or ops?
Verification failed :(
```

The bug is due to the following:
- When atomic flush is used, an empty CF is legally [excluded](https://github.com/facebook/rocksdb/blob/7.10.fb/db/db_filesnapshot.cc#L39) in `SelectColumnFamiliesForAtomicFlush` as the first step of `DBImpl::FlushForGetLiveFiles` before [passing](https://github.com/facebook/rocksdb/blob/7.10.fb/db/db_filesnapshot.cc#L42) the included CFDs to `AtomicFlushMemTables`.
- But [later](https://github.com/facebook/rocksdb/blob/7.10.fb/db/db_impl/db_impl_compaction_flush.cc#L2133) in `AtomicFlushMemTables`, `WaitUntilFlushWouldNotStallWrites` will [release the db mutex](https://github.com/facebook/rocksdb/blob/7.10.fb/db/db_impl/db_impl_compaction_flush.cc#L2403), during which data@seqno N can be inserted into the excluded CF and data@seqno M can be inserted into one of the included CFs, where M > N.
- However, data@seqno N in an already-excluded CF is thus excluded from this atomic flush while we seqno N is less than seqno M.

**Summary:**
- Replace `SelectColumnFamiliesForAtomicFlush()`-before-`AtomicFlushMemTables()` with `SelectColumnFamiliesForAtomicFlush()`-after-wait-within-`AtomicFlushMemTables()` so we ensure no write affecting the recoverability of this atomic job (i.e, change to max seqno of this atomic flush or insertion of data with less seqno than the max seqno of the atomic flush to excluded CF) can happen after calling `SelectColumnFamiliesForAtomicFlush()`.
- For above, refactored and clarified comments on `SelectColumnFamiliesForAtomicFlush()` and `AtomicFlushMemTables()` for clearer semantics of passed-in CFDs to atomic-flush

Pull Request resolved: https://github.com/facebook/rocksdb/pull/11148

Test Plan:
- New unit test failed before the fix and passes after
- Make check
- Rehearsal stress test

Reviewed By: ajkr

Differential Revision: D42799871

Pulled By: hx235

fbshipit-source-id: 13636b63e9c25c5895857afc36ea580d57f6d644
oxigraph-8.1.1
Hui Xiao 2 years ago committed by Facebook GitHub Bot
parent 2a23bee963
commit 11cb6af6e5
  1. 1
      HISTORY.md
  2. 6
      db/db_filesnapshot.cc
  3. 92
      db/db_flush_test.cc
  4. 17
      db/db_impl/db_impl.cc
  5. 18
      db/db_impl/db_impl.h
  6. 67
      db/db_impl/db_impl_compaction_flush.cc
  7. 6
      db/db_impl/db_impl_debug.cc
  8. 30
      db/db_impl/db_impl_write.cc

@ -7,6 +7,7 @@
* Fixed an issue for backward iteration when user defined timestamp is enabled in combination with BlobDB.
* Fixed a couple of cases where a Merge operand encountered during iteration wasn't reflected in the `internal_merge_count` PerfContext counter.
* Fixed a bug in CreateColumnFamilyWithImport()/ExportColumnFamily() which did not support range tombstones (#11252).
* Fixed a bug where an excluded column family from an atomic flush contains unflushed data that should've been included in this atomic flush (i.e, data of seqno less than the max seqno of this atomic flush), leading to potential data loss in this excluded column family when `WriteOptions::disableWAL == true` (#11148).
### New Features
* Add statistics rocksdb.secondary.cache.filter.hits, rocksdb.secondary.cache.index.hits, and rocksdb.secondary.cache.filter.hits

@ -34,11 +34,8 @@ Status DBImpl::FlushForGetLiveFiles() {
// flush all dirty data to disk.
Status status;
if (immutable_db_options_.atomic_flush) {
autovector<ColumnFamilyData*> cfds;
SelectColumnFamiliesForAtomicFlush(&cfds);
mutex_.Unlock();
status =
AtomicFlushMemTables(cfds, FlushOptions(), FlushReason::kGetLiveFiles);
status = AtomicFlushMemTables(FlushOptions(), FlushReason::kGetLiveFiles);
if (status.IsColumnFamilyDropped()) {
status = Status::OK();
}
@ -437,4 +434,3 @@ Status DBImpl::GetLiveFilesStorageInfo(
}
} // namespace ROCKSDB_NAMESPACE

@ -740,7 +740,97 @@ class TestFlushListener : public EventListener {
DBFlushTest* test_;
};
// RocksDB lite does not support GetLiveFiles()
TEST_F(
DBFlushTest,
FixUnrecoverableWriteDuringAtomicFlushWaitUntilFlushWouldNotStallWrites) {
Options options = CurrentOptions();
options.atomic_flush = true;
// To simulate a real-life crash where we can't flush during db's shutdown
options.avoid_flush_during_shutdown = true;
// Set 3 low thresholds (while `disable_auto_compactions=false`) here so flush
// adding one more L0 file during `GetLiveFiles()` will have to wait till such
// flush will not stall writes
options.level0_stop_writes_trigger = 2;
options.level0_slowdown_writes_trigger = 2;
// Disable level-0 compaction triggered by number of files to avoid
// stalling check being skipped (resulting in the flush mentioned above didn't
// wait)
options.level0_file_num_compaction_trigger = -1;
CreateAndReopenWithCF({"cf1"}, options);
// Manually pause compaction thread to ensure enough L0 files as
// `disable_auto_compactions=false`is needed, in order to meet the 3 low
// thresholds above
std::unique_ptr<test::SleepingBackgroundTask> sleeping_task_;
sleeping_task_.reset(new test::SleepingBackgroundTask());
env_->SetBackgroundThreads(1, Env::LOW);
env_->Schedule(&test::SleepingBackgroundTask::DoSleepTask,
sleeping_task_.get(), Env::Priority::LOW);
sleeping_task_->WaitUntilSleeping();
// Create some initial file to help meet the 3 low thresholds above
ASSERT_OK(Put(1, "dontcare", "dontcare"));
ASSERT_OK(Flush(1));
// Insert some initial data so we have something to atomic-flush later
// triggered by `GetLiveFiles()`
WriteOptions write_opts;
write_opts.disableWAL = true;
ASSERT_OK(Put(1, "k1", "v1", write_opts));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency({{
"DBImpl::WaitUntilFlushWouldNotStallWrites:StallWait",
"DBFlushTest::"
"UnrecoverableWriteInAtomicFlushWaitUntilFlushWouldNotStallWrites::Write",
}});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
// Write to db when atomic flush releases the lock to wait on write stall
// condition to be gone in `WaitUntilFlushWouldNotStallWrites()`
port::Thread write_thread([&] {
TEST_SYNC_POINT(
"DBFlushTest::"
"UnrecoverableWriteInAtomicFlushWaitUntilFlushWouldNotStallWrites::"
"Write");
// Before the fix, the empty default CF would've been prematurely excluded
// from this atomic flush. The following two writes together make default CF
// later contain data that should've been included in the atomic flush.
ASSERT_OK(Put(0, "k2", "v2", write_opts));
// The following write increases the max seqno of this atomic flush to be 3,
// which is greater than the seqno of default CF's data. This then violates
// the invariant that all entries of seqno less than the max seqno
// of this atomic flush should've been flushed by the time of this atomic
// flush finishes.
ASSERT_OK(Put(1, "k3", "v3", write_opts));
// Resume compaction threads and reduce L0 files so `GetLiveFiles()` can
// resume from the wait
sleeping_task_->WakeUp();
sleeping_task_->WaitUntilDone();
MoveFilesToLevel(1, 1);
});
// Trigger an atomic flush by `GetLiveFiles()`
std::vector<std::string> files;
uint64_t manifest_file_size;
ASSERT_OK(db_->GetLiveFiles(files, &manifest_file_size, /*flush*/ true));
write_thread.join();
ReopenWithColumnFamilies({"default", "cf1"}, options);
ASSERT_EQ(Get(1, "k3"), "v3");
// Prior to the fix, `Get()` will return `NotFound as "k2" entry in default CF
// can't be recovered from a crash right after the atomic flush finishes,
// resulting in a "recovery hole" as "k3" can be recovered. It's due to the
// invariant violation described above.
ASSERT_EQ(Get(0, "k2"), "v2");
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
}
TEST_F(DBFlushTest, FixFlushReasonRaceFromConcurrentFlushes) {
Options options = CurrentOptions();
options.atomic_flush = true;

@ -387,10 +387,8 @@ Status DBImpl::ResumeImpl(DBRecoverContext context) {
// We allow flush to stall write since we are trying to resume from error.
flush_opts.allow_write_stall = true;
if (immutable_db_options_.atomic_flush) {
autovector<ColumnFamilyData*> cfds;
SelectColumnFamiliesForAtomicFlush(&cfds);
mutex_.Unlock();
s = AtomicFlushMemTables(cfds, flush_opts, context.flush_reason);
s = AtomicFlushMemTables(flush_opts, context.flush_reason);
mutex_.Lock();
} else {
for (auto cfd : versions_->GetRefedColumnFamilySet()) {
@ -507,11 +505,8 @@ void DBImpl::CancelAllBackgroundWork(bool wait) {
has_unpersisted_data_.load(std::memory_order_relaxed) &&
!mutable_db_options_.avoid_flush_during_shutdown) {
if (immutable_db_options_.atomic_flush) {
autovector<ColumnFamilyData*> cfds;
SelectColumnFamiliesForAtomicFlush(&cfds);
mutex_.Unlock();
Status s =
AtomicFlushMemTables(cfds, FlushOptions(), FlushReason::kShutDown);
Status s = AtomicFlushMemTables(FlushOptions(), FlushReason::kShutDown);
s.PermitUncheckedError(); //**TODO: What to do on error?
mutex_.Lock();
} else {
@ -5350,12 +5345,10 @@ Status DBImpl::IngestExternalFiles(
FlushOptions flush_opts;
flush_opts.allow_write_stall = true;
if (immutable_db_options_.atomic_flush) {
autovector<ColumnFamilyData*> cfds_to_flush;
SelectColumnFamiliesForAtomicFlush(&cfds_to_flush);
mutex_.Unlock();
status = AtomicFlushMemTables(cfds_to_flush, flush_opts,
FlushReason::kExternalFileIngestion,
true /* entered_write_thread */);
status = AtomicFlushMemTables(
flush_opts, FlushReason::kExternalFileIngestion,
{} /* provided_candidate_cfds */, true /* entered_write_thread */);
mutex_.Lock();
} else {
for (size_t i = 0; i != num_cfs; ++i) {

@ -1081,7 +1081,8 @@ class DBImpl : public DB {
// is because in certain cases, we can flush column families, wait for the
// flush to complete, but delete the column family handle before the wait
// finishes. For example in CompactRange.
Status TEST_AtomicFlushMemTables(const autovector<ColumnFamilyData*>& cfds,
Status TEST_AtomicFlushMemTables(
const autovector<ColumnFamilyData*>& provided_candidate_cfds,
const FlushOptions& flush_opts);
// Wait for background threads to complete scheduled work.
@ -1886,16 +1887,27 @@ class DBImpl : public DB {
Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context);
void SelectColumnFamiliesForAtomicFlush(autovector<ColumnFamilyData*>* cfds);
// Select and output column families qualified for atomic flush in
// `selected_cfds`. If `provided_candidate_cfds` is non-empty, it will be used
// as candidate CFs to select qualified ones from. Otherwise, all column
// families are used as candidate to select from.
//
// REQUIRES: mutex held
void SelectColumnFamiliesForAtomicFlush(
autovector<ColumnFamilyData*>* selected_cfds,
const autovector<ColumnFamilyData*>& provided_candidate_cfds = {});
// Force current memtable contents to be flushed.
Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options,
FlushReason flush_reason,
bool entered_write_thread = false);
// Atomic-flush memtables from quanlified CFs among `provided_candidate_cfds`
// (if non-empty) or amomg all column families and atomically record the
// result to the MANIFEST.
Status AtomicFlushMemTables(
const autovector<ColumnFamilyData*>& column_family_datas,
const FlushOptions& options, FlushReason flush_reason,
const autovector<ColumnFamilyData*>& provided_candidate_cfds = {},
bool entered_write_thread = false);
// Wait until flushing this column family won't stall writes

@ -1031,15 +1031,9 @@ Status DBImpl::CompactRangeInternal(const CompactRangeOptions& options,
FlushOptions fo;
fo.allow_write_stall = options.allow_write_stall;
if (immutable_db_options_.atomic_flush) {
autovector<ColumnFamilyData*> cfds;
mutex_.Lock();
SelectColumnFamiliesForAtomicFlush(&cfds);
mutex_.Unlock();
s = AtomicFlushMemTables(cfds, fo, FlushReason::kManualCompaction,
false /* entered_write_thread */);
s = AtomicFlushMemTables(fo, FlushReason::kManualCompaction);
} else {
s = FlushMemTable(cfd, fo, FlushReason::kManualCompaction,
false /* entered_write_thread */);
s = FlushMemTable(cfd, fo, FlushReason::kManualCompaction);
}
if (!s.ok()) {
LogFlush(immutable_db_options_.info_log);
@ -1800,8 +1794,8 @@ Status DBImpl::Flush(const FlushOptions& flush_options,
cfh->GetName().c_str());
Status s;
if (immutable_db_options_.atomic_flush) {
s = AtomicFlushMemTables({cfh->cfd()}, flush_options,
FlushReason::kManualFlush);
s = AtomicFlushMemTables(flush_options, FlushReason::kManualFlush,
{cfh->cfd()});
} else {
s = FlushMemTable(cfh->cfd(), flush_options, FlushReason::kManualFlush);
}
@ -1839,7 +1833,7 @@ Status DBImpl::Flush(const FlushOptions& flush_options,
auto cfh = static_cast<ColumnFamilyHandleImpl*>(elem);
cfds.emplace_back(cfh->cfd());
});
s = AtomicFlushMemTables(cfds, flush_options, FlushReason::kManualFlush);
s = AtomicFlushMemTables(flush_options, FlushReason::kManualFlush, cfds);
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Manual atomic flush finished, status: %s\n"
"=====Column families:=====",
@ -2223,11 +2217,9 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
return s;
}
// Flush all elements in 'column_family_datas'
// and atomically record the result to the MANIFEST.
Status DBImpl::AtomicFlushMemTables(
const autovector<ColumnFamilyData*>& column_family_datas,
const FlushOptions& flush_options, FlushReason flush_reason,
const autovector<ColumnFamilyData*>& provided_candidate_cfds,
bool entered_write_thread) {
assert(immutable_db_options_.atomic_flush);
if (!flush_options.wait && write_controller_.IsStopped()) {
@ -2237,18 +2229,48 @@ Status DBImpl::AtomicFlushMemTables(
return Status::TryAgain(oss.str());
}
Status s;
autovector<ColumnFamilyData*> candidate_cfds;
if (provided_candidate_cfds.empty()) {
// Generate candidate cfds if not provided
{
InstrumentedMutexLock l(&mutex_);
for (ColumnFamilyData* cfd : *versions_->GetColumnFamilySet()) {
if (!cfd->IsDropped() && cfd->initialized()) {
cfd->Ref();
candidate_cfds.push_back(cfd);
}
}
}
} else {
candidate_cfds = provided_candidate_cfds;
}
if (!flush_options.allow_write_stall) {
int num_cfs_to_flush = 0;
for (auto cfd : column_family_datas) {
for (auto cfd : candidate_cfds) {
bool flush_needed = true;
s = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed);
if (!s.ok()) {
// Unref the newly generated candidate cfds (when not provided) in
// `candidate_cfds`
if (provided_candidate_cfds.empty()) {
for (auto candidate_cfd : candidate_cfds) {
candidate_cfd->UnrefAndTryDelete();
}
}
return s;
} else if (flush_needed) {
++num_cfs_to_flush;
}
}
if (0 == num_cfs_to_flush) {
// Unref the newly generated candidate cfds (when not provided) in
// `candidate_cfds`
if (provided_candidate_cfds.empty()) {
for (auto candidate_cfd : candidate_cfds) {
candidate_cfd->UnrefAndTryDelete();
}
}
return s;
}
}
@ -2269,15 +2291,16 @@ Status DBImpl::AtomicFlushMemTables(
}
WaitForPendingWrites();
for (auto cfd : column_family_datas) {
if (cfd->IsDropped()) {
continue;
}
if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
!cached_recoverable_state_empty_.load()) {
cfds.emplace_back(cfd);
SelectColumnFamiliesForAtomicFlush(&cfds, candidate_cfds);
// Unref the newly generated candidate cfds (when not provided) in
// `candidate_cfds`
if (provided_candidate_cfds.empty()) {
for (auto candidate_cfd : candidate_cfds) {
candidate_cfd->UnrefAndTryDelete();
}
}
for (auto cfd : cfds) {
if ((cfd->mem()->IsEmpty() && cached_recoverable_state_empty_.load()) ||
flush_reason == FlushReason::kErrorRecoveryRetryFlush) {

@ -155,8 +155,10 @@ Status DBImpl::TEST_FlushMemTable(ColumnFamilyData* cfd,
}
Status DBImpl::TEST_AtomicFlushMemTables(
const autovector<ColumnFamilyData*>& cfds, const FlushOptions& flush_opts) {
return AtomicFlushMemTables(cfds, flush_opts, FlushReason::kTest);
const autovector<ColumnFamilyData*>& provided_candidate_cfds,
const FlushOptions& flush_opts) {
return AtomicFlushMemTables(flush_opts, FlushReason::kTest,
provided_candidate_cfds);
}
Status DBImpl::TEST_WaitForBackgroundWork() {

@ -1543,14 +1543,40 @@ Status DBImpl::WriteRecoverableState() {
}
void DBImpl::SelectColumnFamiliesForAtomicFlush(
autovector<ColumnFamilyData*>* cfds) {
autovector<ColumnFamilyData*>* selected_cfds,
const autovector<ColumnFamilyData*>& provided_candidate_cfds) {
mutex_.AssertHeld();
assert(selected_cfds);
autovector<ColumnFamilyData*> candidate_cfds;
// Generate candidate cfds if not provided
if (provided_candidate_cfds.empty()) {
for (ColumnFamilyData* cfd : *versions_->GetColumnFamilySet()) {
if (!cfd->IsDropped() && cfd->initialized()) {
cfd->Ref();
candidate_cfds.push_back(cfd);
}
}
} else {
candidate_cfds = provided_candidate_cfds;
}
for (ColumnFamilyData* cfd : candidate_cfds) {
if (cfd->IsDropped()) {
continue;
}
if (cfd->imm()->NumNotFlushed() != 0 || !cfd->mem()->IsEmpty() ||
!cached_recoverable_state_empty_.load()) {
cfds->push_back(cfd);
selected_cfds->push_back(cfd);
}
}
// Unref the newly generated candidate cfds (when not provided) in
// `candidate_cfds`
if (provided_candidate_cfds.empty()) {
for (auto candidate_cfd : candidate_cfds) {
candidate_cfd->UnrefAndTryDelete();
}
}
}

Loading…
Cancel
Save