Skip deleted WALs during recovery

Summary:
This patch record min log number to keep to the manifest while flushing SST files to ignore them and any WAL older than them during recovery. This is to avoid scenarios when we have a gap between the WAL files are fed to the recovery procedure. The gap could happen by for example out-of-order WAL deletion. Such gap could cause problems in 2PC recovery where the prepared and commit entry are placed into two separate WAL and gap in the WALs could result into not processing the WAL with the commit entry and hence breaking the 2PC recovery logic.

Before the commit, for 2PC case, we determined which log number to keep in FindObsoleteFiles(). We looked at the earliest logs with outstanding prepare entries, or prepare entries whose respective commit or abort are in memtable. With the commit, the same calculation is done while we apply the SST flush. Just before installing the flush file, we precompute the earliest log file to keep after the flush finishes using the same logic (but skipping the memtables just flushed), record this information to the manifest entry for this new flushed SST file. This pre-computed value is also remembered in memory, and will later be used to determine whether a log file can be deleted. This value is unlikely to change until next flush because the commit entry will stay in memtable. (In WritePrepared, we could have removed the older log files as soon as all prepared entries are committed. It's not yet done anyway. Even if we do it, the only thing we loss with this new approach is earlier log deletion between two flushes, which does not guarantee to happen anyway because the obsolete file clean-up function is only executed after flush or compaction)

This min log number to keep is stored in the manifest using the safely-ignore customized field of AddFile entry, in order to guarantee that the DB generated using newer release can be opened by previous releases no older than 4.2.
Closes https://github.com/facebook/rocksdb/pull/3765

Differential Revision: D7747618

Pulled By: siying

fbshipit-source-id: d00c92105b4f83852e9754a1b70d6b64cb590729
main
Siying Dong 7 years ago committed by Facebook Github Bot
parent cfb86659bf
commit d59549298f
  1. 1
      CMakeLists.txt
  2. 11
      TARGETS
  3. 4
      db/column_family.cc
  4. 8
      db/db_flush_test.cc
  5. 3
      db/db_impl.cc
  6. 66
      db/db_impl.h
  7. 2
      db/db_impl_compaction_flush.cc
  8. 12
      db/db_impl_debug.cc
  9. 206
      db/db_impl_files.cc
  10. 7
      db/db_impl_open.cc
  11. 38
      db/db_impl_write.cc
  12. 5
      db/db_test_util.h
  13. 102
      db/db_wal_test.cc
  14. 8
      db/external_sst_file_test.cc
  15. 5
      db/flush_job.cc
  16. 5
      db/flush_job.h
  17. 2
      db/flush_job_test.cc
  18. 2
      db/internal_stats.h
  19. 67
      db/logs_with_prep_tracker.cc
  20. 61
      db/logs_with_prep_tracker.h
  21. 34
      db/memtable_list.cc
  22. 13
      db/memtable_list.h
  23. 8
      db/memtable_list_test.cc
  24. 45
      db/version_edit.cc
  25. 11
      db/version_edit.h
  26. 10
      db/version_edit_test.cc
  27. 50
      db/version_set.cc
  28. 32
      db/version_set.h
  29. 1
      src.mk
  30. 9
      utilities/transactions/pessimistic_transaction.cc
  31. 18
      utilities/transactions/transaction_test.cc

@ -456,6 +456,7 @@ set(SOURCES
db/flush_scheduler.cc db/flush_scheduler.cc
db/forward_iterator.cc db/forward_iterator.cc
db/internal_stats.cc db/internal_stats.cc
db/logs_with_prep_tracker.cc
db/log_reader.cc db/log_reader.cc
db/log_writer.cc db/log_writer.cc
db/malloc_stats.cc db/malloc_stats.cc

@ -103,6 +103,7 @@ cpp_library(
"db/internal_stats.cc", "db/internal_stats.cc",
"db/log_reader.cc", "db/log_reader.cc",
"db/log_writer.cc", "db/log_writer.cc",
"db/logs_with_prep_tracker.cc",
"db/malloc_stats.cc", "db/malloc_stats.cc",
"db/managed_iterator.cc", "db/managed_iterator.cc",
"db/memtable.cc", "db/memtable.cc",
@ -659,11 +660,6 @@ ROCKS_TESTS = [
"utilities/document/document_db_test.cc", "utilities/document/document_db_test.cc",
"serial", "serial",
], ],
[
"obsolete_files_test",
"db/obsolete_files_test.cc",
"serial",
],
[ [
"dynamic_bloom_test", "dynamic_bloom_test",
"util/dynamic_bloom_test.cc", "util/dynamic_bloom_test.cc",
@ -834,6 +830,11 @@ ROCKS_TESTS = [
"utilities/object_registry_test.cc", "utilities/object_registry_test.cc",
"serial", "serial",
], ],
[
"obsolete_files_test",
"db/obsolete_files_test.cc",
"serial",
],
[ [
"optimistic_transaction_test", "optimistic_transaction_test",
"utilities/transactions/optimistic_transaction_test.cc", "utilities/transactions/optimistic_transaction_test.cc",

@ -558,7 +558,9 @@ uint64_t ColumnFamilyData::OldestLogToKeep() {
auto current_log = GetLogNumber(); auto current_log = GetLogNumber();
if (allow_2pc_) { if (allow_2pc_) {
auto imm_prep_log = imm()->GetMinLogContainingPrepSection(); autovector<MemTable*> empty_list;
auto imm_prep_log =
imm()->PrecomputeMinLogContainingPrepSection(empty_list);
auto mem_prep_log = mem()->GetMinLogContainingPrepSection(); auto mem_prep_log = mem()->GetMinLogContainingPrepSection();
if (imm_prep_log > 0 && imm_prep_log < current_log) { if (imm_prep_log > 0 && imm_prep_log < current_log) {

@ -72,19 +72,23 @@ TEST_F(DBFlushTest, SyncFail) {
auto* cfd = auto* cfd =
reinterpret_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily()) reinterpret_cast<ColumnFamilyHandleImpl*>(db_->DefaultColumnFamily())
->cfd(); ->cfd();
int refs_before = cfd->current()->TEST_refs();
FlushOptions flush_options; FlushOptions flush_options;
flush_options.wait = false; flush_options.wait = false;
ASSERT_OK(dbfull()->Flush(flush_options)); ASSERT_OK(dbfull()->Flush(flush_options));
// Flush installs a new super-version. Get the ref count after that.
auto current_before = cfd->current();
int refs_before = cfd->current()->TEST_refs();
fault_injection_env->SetFilesystemActive(false); fault_injection_env->SetFilesystemActive(false);
TEST_SYNC_POINT("DBFlushTest::SyncFail:1"); TEST_SYNC_POINT("DBFlushTest::SyncFail:1");
TEST_SYNC_POINT("DBFlushTest::SyncFail:2"); TEST_SYNC_POINT("DBFlushTest::SyncFail:2");
fault_injection_env->SetFilesystemActive(true); fault_injection_env->SetFilesystemActive(true);
// Now the background job will do the flush; wait for it.
dbfull()->TEST_WaitForFlushMemTable(); dbfull()->TEST_WaitForFlushMemTable();
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
ASSERT_EQ("", FilesPerLevel()); // flush failed. ASSERT_EQ("", FilesPerLevel()); // flush failed.
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
// Flush job should release ref count to current version. // Backgroun flush job should release ref count to current version.
ASSERT_EQ(current_before, cfd->current());
ASSERT_EQ(refs_before, cfd->current()->TEST_refs()); ASSERT_EQ(refs_before, cfd->current()->TEST_refs());
Destroy(options); Destroy(options);
} }

@ -183,7 +183,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
last_stats_dump_time_microsec_(0), last_stats_dump_time_microsec_(0),
next_job_id_(1), next_job_id_(1),
has_unpersisted_data_(false), has_unpersisted_data_(false),
unable_to_flush_oldest_log_(false), unable_to_release_oldest_log_(false),
env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)), env_options_(BuildDBOptions(immutable_db_options_, mutable_db_options_)),
env_options_for_compaction_(env_->OptimizeForCompactionTableWrite( env_options_for_compaction_(env_->OptimizeForCompactionTableWrite(
env_options_, immutable_db_options_)), env_options_, immutable_db_options_)),
@ -3020,5 +3020,4 @@ void DBImpl::WaitForIngestFile() {
} }
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
} // namespace rocksdb } // namespace rocksdb

@ -27,6 +27,7 @@
#include "db/flush_scheduler.h" #include "db/flush_scheduler.h"
#include "db/internal_stats.h" #include "db/internal_stats.h"
#include "db/log_writer.h" #include "db/log_writer.h"
#include "db/logs_with_prep_tracker.h"
#include "db/pre_release_callback.h" #include "db/pre_release_callback.h"
#include "db/read_callback.h" #include "db/read_callback.h"
#include "db/snapshot_checker.h" #include "db/snapshot_checker.h"
@ -354,6 +355,10 @@ class DBImpl : public DB {
Arena* arena, RangeDelAggregator* range_del_agg, Arena* arena, RangeDelAggregator* range_del_agg,
ColumnFamilyHandle* column_family = nullptr); ColumnFamilyHandle* column_family = nullptr);
LogsWithPrepTracker* logs_with_prep_tracker() {
return &logs_with_prep_tracker_;
}
#ifndef NDEBUG #ifndef NDEBUG
// Extra methods (for testing) that are not in the public DB interface // Extra methods (for testing) that are not in the public DB interface
// Implemented in db_impl_debug.cc // Implemented in db_impl_debug.cc
@ -365,9 +370,7 @@ class DBImpl : public DB {
void TEST_SwitchWAL(); void TEST_SwitchWAL();
bool TEST_UnableToFlushOldestLog() { bool TEST_UnableToReleaseOldestLog() { return unable_to_release_oldest_log_; }
return unable_to_flush_oldest_log_;
}
bool TEST_IsLogGettingFlushed() { bool TEST_IsLogGettingFlushed() {
return alive_log_files_.begin()->getting_flushed; return alive_log_files_.begin()->getting_flushed;
@ -594,7 +597,7 @@ class DBImpl : public DB {
size_t batch_cnt) { size_t batch_cnt) {
recovered_transactions_[name] = recovered_transactions_[name] =
new RecoveredTransaction(log, name, batch, seq, batch_cnt); new RecoveredTransaction(log, name, batch, seq, batch_cnt);
MarkLogAsContainingPrepSection(log); logs_with_prep_tracker_.MarkLogAsContainingPrepSection(log);
} }
void DeleteRecoveredTransaction(const std::string& name) { void DeleteRecoveredTransaction(const std::string& name) {
@ -602,7 +605,7 @@ class DBImpl : public DB {
assert(it != recovered_transactions_.end()); assert(it != recovered_transactions_.end());
auto* trx = it->second; auto* trx = it->second;
recovered_transactions_.erase(it); recovered_transactions_.erase(it);
MarkLogAsHavingPrepSectionFlushed(trx->log_number_); logs_with_prep_tracker_.MarkLogAsHavingPrepSectionFlushed(trx->log_number_);
delete trx; delete trx;
} }
@ -614,8 +617,6 @@ class DBImpl : public DB {
recovered_transactions_.clear(); recovered_transactions_.clear();
} }
void MarkLogAsHavingPrepSectionFlushed(uint64_t log);
void MarkLogAsContainingPrepSection(uint64_t log);
void AddToLogsToFreeQueue(log::Writer* log_writer) { void AddToLogsToFreeQueue(log::Writer* log_writer) {
logs_to_free_queue_.push_back(log_writer); logs_to_free_queue_.push_back(log_writer);
} }
@ -728,8 +729,6 @@ class DBImpl : public DB {
uint64_t* seq_used = nullptr, size_t batch_cnt = 0, uint64_t* seq_used = nullptr, size_t batch_cnt = 0,
PreReleaseCallback* pre_release_callback = nullptr); PreReleaseCallback* pre_release_callback = nullptr);
uint64_t FindMinLogContainingOutstandingPrep();
uint64_t FindMinPrepLogReferencedByMemTable();
// write cached_recoverable_state_ to memtable if it is not empty // write cached_recoverable_state_ to memtable if it is not empty
// The writer must be the leader in write_thread_ and holding mutex_ // The writer must be the leader in write_thread_ and holding mutex_
Status WriteRecoverableState(); Status WriteRecoverableState();
@ -1302,7 +1301,7 @@ class DBImpl : public DB {
// We must attempt to free the dependent memtables again // We must attempt to free the dependent memtables again
// at a later time after the transaction in the oldest // at a later time after the transaction in the oldest
// log is fully commited. // log is fully commited.
bool unable_to_flush_oldest_log_; bool unable_to_release_oldest_log_;
static const int KEEP_LOG_FILE_NUM = 1000; static const int KEEP_LOG_FILE_NUM = 1000;
// MSVC version 1800 still does not have constexpr for ::max() // MSVC version 1800 still does not have constexpr for ::max()
@ -1339,33 +1338,7 @@ class DBImpl : public DB {
// Indicate DB was opened successfully // Indicate DB was opened successfully
bool opened_successfully_; bool opened_successfully_;
// REQUIRES: logs_with_prep_mutex_ held LogsWithPrepTracker logs_with_prep_tracker_;
//
// sorted list of log numbers still containing prepared data.
// this is used by FindObsoleteFiles to determine which
// flushed logs we must keep around because they still
// contain prepared data which has not been committed or rolled back
struct LogCnt {
uint64_t log; // the log number
uint64_t cnt; // number of prepared sections in the log
};
std::vector<LogCnt> logs_with_prep_;
std::mutex logs_with_prep_mutex_;
// REQUIRES: prepared_section_completed_mutex_ held
//
// to be used in conjunction with logs_with_prep_.
// once a transaction with data in log L is committed or rolled back
// rather than updating logs_with_prep_ directly we keep track of that
// in prepared_section_completed_ which maps LOG -> instance_count. This helps
// avoiding contention between a commit thread and the prepare threads.
//
// when trying to determine the minimum log still active we first
// consult logs_with_prep_. while that root value maps to
// an equal value in prepared_section_completed_ we erase the log from
// both logs_with_prep_ and prepared_section_completed_.
std::unordered_map<uint64_t, uint64_t> prepared_section_completed_;
std::mutex prepared_section_completed_mutex_;
// Callback for compaction to check if a key is visible to a snapshot. // Callback for compaction to check if a key is visible to a snapshot.
// REQUIRES: mutex held // REQUIRES: mutex held
@ -1461,6 +1434,25 @@ extern CompressionType GetCompressionFlush(
const ImmutableCFOptions& ioptions, const ImmutableCFOptions& ioptions,
const MutableCFOptions& mutable_cf_options); const MutableCFOptions& mutable_cf_options);
// Return the earliest log file to keep after the memtable flush is
// finalized.
// `cfd_to_flush` is the column family whose memtable (specified in
// `memtables_to_flush`) will be flushed and thus will not depend on any WAL
// file.
// The function is only applicable to 2pc mode.
extern uint64_t PrecomputeMinLogNumberToKeep(
VersionSet* vset, const ColumnFamilyData& cfd_to_flush,
autovector<VersionEdit*> edit_list,
const autovector<MemTable*>& memtables_to_flush,
LogsWithPrepTracker* prep_tracker);
// `cfd_to_flush` is the column family whose memtable will be flushed and thus
// will not depend on any WAL file. nullptr means no memtable is being flushed.
// The function is only applicable to 2pc mode.
extern uint64_t FindMinPrepLogReferencedByMemTable(
VersionSet* vset, const ColumnFamilyData* cfd_to_flush,
const autovector<MemTable*>& memtables_to_flush);
// Fix user-supplied options to be reasonable // Fix user-supplied options to be reasonable
template <class T, class V> template <class T, class V>
static void ClipToRange(T* ptr, V minvalue, V maxvalue) { static void ClipToRange(T* ptr, V minvalue, V maxvalue) {

@ -160,7 +160,7 @@ Status DBImpl::FlushMemTableToOutputFile(
// and EventListener callback will be called when the db_mutex // and EventListener callback will be called when the db_mutex
// is unlocked by the current thread. // is unlocked by the current thread.
if (s.ok()) { if (s.ok()) {
s = flush_job.Run(&file_meta); s = flush_job.Run(&logs_with_prep_tracker_, &file_meta);
} else { } else {
flush_job.Cancel(); flush_job.Cancel();
} }

@ -184,17 +184,21 @@ Status DBImpl::TEST_GetAllImmutableCFOptions(
} }
uint64_t DBImpl::TEST_FindMinLogContainingOutstandingPrep() { uint64_t DBImpl::TEST_FindMinLogContainingOutstandingPrep() {
return FindMinLogContainingOutstandingPrep(); return logs_with_prep_tracker_.FindMinLogContainingOutstandingPrep();
} }
size_t DBImpl::TEST_PreparedSectionCompletedSize() { size_t DBImpl::TEST_PreparedSectionCompletedSize() {
return prepared_section_completed_.size(); return logs_with_prep_tracker_.TEST_PreparedSectionCompletedSize();
} }
size_t DBImpl::TEST_LogsWithPrepSize() { return logs_with_prep_.size(); } size_t DBImpl::TEST_LogsWithPrepSize() {
return logs_with_prep_tracker_.TEST_LogsWithPrepSize();
}
uint64_t DBImpl::TEST_FindMinPrepLogReferencedByMemTable() { uint64_t DBImpl::TEST_FindMinPrepLogReferencedByMemTable() {
return FindMinPrepLogReferencedByMemTable(); autovector<MemTable*> empty_list;
return FindMinPrepLogReferencedByMemTable(versions_.get(), nullptr,
empty_list);
} }
Status DBImpl::TEST_GetLatestMutableCFOptions( Status DBImpl::TEST_GetLatestMutableCFOptions(

@ -14,124 +14,17 @@
#include <inttypes.h> #include <inttypes.h>
#include <unordered_set> #include <unordered_set>
#include "db/event_helpers.h" #include "db/event_helpers.h"
#include "db/memtable_list.h"
#include "util/file_util.h" #include "util/file_util.h"
#include "util/sst_file_manager_impl.h" #include "util/sst_file_manager_impl.h"
namespace rocksdb { namespace rocksdb {
uint64_t DBImpl::FindMinPrepLogReferencedByMemTable() {
if (!allow_2pc()) {
return 0;
}
uint64_t min_log = 0;
// we must look through the memtables for two phase transactions
// that have been committed but not yet flushed
for (auto loop_cfd : *versions_->GetColumnFamilySet()) {
if (loop_cfd->IsDropped()) {
continue;
}
auto log = loop_cfd->imm()->GetMinLogContainingPrepSection();
if (log > 0 && (min_log == 0 || log < min_log)) {
min_log = log;
}
log = loop_cfd->mem()->GetMinLogContainingPrepSection();
if (log > 0 && (min_log == 0 || log < min_log)) {
min_log = log;
}
}
return min_log;
}
void DBImpl::MarkLogAsHavingPrepSectionFlushed(uint64_t log) {
assert(log != 0);
std::lock_guard<std::mutex> lock(prepared_section_completed_mutex_);
auto it = prepared_section_completed_.find(log);
if (UNLIKELY(it == prepared_section_completed_.end())) {
prepared_section_completed_[log] = 1;
} else {
it->second += 1;
}
}
void DBImpl::MarkLogAsContainingPrepSection(uint64_t log) {
assert(log != 0);
std::lock_guard<std::mutex> lock(logs_with_prep_mutex_);
auto rit = logs_with_prep_.rbegin();
bool updated = false;
// Most probably the last log is the one that is being marked for
// having a prepare section; so search from the end.
for (; rit != logs_with_prep_.rend() && rit->log >= log; ++rit) {
if (rit->log == log) {
rit->cnt++;
updated = true;
break;
}
}
if (!updated) {
// We are either at the start, or at a position with rit->log < log
logs_with_prep_.insert(rit.base(), {log, 1});
}
}
uint64_t DBImpl::FindMinLogContainingOutstandingPrep() {
std::lock_guard<std::mutex> lock(logs_with_prep_mutex_);
auto it = logs_with_prep_.begin();
// start with the smallest log
for (; it != logs_with_prep_.end();) {
auto min_log = it->log;
{
std::lock_guard<std::mutex> lock2(prepared_section_completed_mutex_);
auto completed_it = prepared_section_completed_.find(min_log);
if (completed_it == prepared_section_completed_.end() ||
completed_it->second < it->cnt) {
return min_log;
}
assert(completed_it != prepared_section_completed_.end() &&
completed_it->second == it->cnt);
prepared_section_completed_.erase(completed_it);
}
// erase from beginning in vector is not efficient but this function is not
// on the fast path.
it = logs_with_prep_.erase(it);
}
// no such log found
return 0;
}
uint64_t DBImpl::MinLogNumberToKeep() { uint64_t DBImpl::MinLogNumberToKeep() {
uint64_t log_number = versions_->MinLogNumber();
if (allow_2pc()) { if (allow_2pc()) {
// if are 2pc we must consider logs containing prepared return versions_->min_log_number_to_keep_2pc();
// sections of outstanding transactions. } else {
// return versions_->MinLogNumberWithUnflushedData();
// We must check min logs with outstanding prep before we check
// logs references by memtables because a log referenced by the
// first data structure could transition to the second under us.
//
// TODO(horuff): iterating over all column families under db mutex.
// should find more optimal solution
auto min_log_in_prep_heap = FindMinLogContainingOutstandingPrep();
if (min_log_in_prep_heap != 0 && min_log_in_prep_heap < log_number) {
log_number = min_log_in_prep_heap;
}
auto min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable();
if (min_log_refed_by_mem != 0 && min_log_refed_by_mem < log_number) {
log_number = min_log_refed_by_mem;
}
} }
return log_number;
} }
// * Returns the list of live files in 'sst_live' // * Returns the list of live files in 'sst_live'
@ -200,7 +93,6 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
job_context->pending_manifest_file_number = job_context->pending_manifest_file_number =
versions_->pending_manifest_file_number(); versions_->pending_manifest_file_number();
job_context->log_number = MinLogNumberToKeep(); job_context->log_number = MinLogNumberToKeep();
job_context->prev_log_number = versions_->prev_log_number(); job_context->prev_log_number = versions_->prev_log_number();
versions_->AddLiveFiles(&job_context->sst_live); versions_->AddLiveFiles(&job_context->sst_live);
@ -621,4 +513,94 @@ void DBImpl::DeleteObsoleteFiles() {
mutex_.Lock(); mutex_.Lock();
} }
uint64_t FindMinPrepLogReferencedByMemTable(
VersionSet* vset, const ColumnFamilyData* cfd_to_flush,
const autovector<MemTable*>& memtables_to_flush) {
uint64_t min_log = 0;
// we must look through the memtables for two phase transactions
// that have been committed but not yet flushed
for (auto loop_cfd : *vset->GetColumnFamilySet()) {
if (loop_cfd->IsDropped() || loop_cfd == cfd_to_flush) {
continue;
}
auto log = loop_cfd->imm()->PrecomputeMinLogContainingPrepSection(
memtables_to_flush);
if (log > 0 && (min_log == 0 || log < min_log)) {
min_log = log;
}
log = loop_cfd->mem()->GetMinLogContainingPrepSection();
if (log > 0 && (min_log == 0 || log < min_log)) {
min_log = log;
}
}
return min_log;
}
uint64_t PrecomputeMinLogNumberToKeep(
VersionSet* vset, const ColumnFamilyData& cfd_to_flush,
autovector<VersionEdit*> edit_list,
const autovector<MemTable*>& memtables_to_flush,
LogsWithPrepTracker* prep_tracker) {
assert(vset != nullptr);
assert(prep_tracker != nullptr);
// Calculate updated min_log_number_to_keep
// Since the function should only be called in 2pc mode, log number in
// the version edit should be sufficient.
// Precompute the min log number containing unflushed data for the column
// family being flushed (`cfd_to_flush`).
uint64_t cf_min_log_number_to_keep = 0;
for (auto& e : edit_list) {
if (e->has_log_number()) {
cf_min_log_number_to_keep =
std::max(cf_min_log_number_to_keep, e->log_number());
}
}
if (cf_min_log_number_to_keep == 0) {
// No version edit contains information on log number. The log number
// for this column family should stay the same as it is.
cf_min_log_number_to_keep = cfd_to_flush.GetLogNumber();
}
// Get min log number containing unflushed data for other column families.
uint64_t min_log_number_to_keep =
vset->PreComputeMinLogNumberWithUnflushedData(&cfd_to_flush);
if (cf_min_log_number_to_keep != 0) {
min_log_number_to_keep =
std::min(cf_min_log_number_to_keep, min_log_number_to_keep);
}
// if are 2pc we must consider logs containing prepared
// sections of outstanding transactions.
//
// We must check min logs with outstanding prep before we check
// logs references by memtables because a log referenced by the
// first data structure could transition to the second under us.
//
// TODO: iterating over all column families under db mutex.
// should find more optimal solution
auto min_log_in_prep_heap =
prep_tracker->FindMinLogContainingOutstandingPrep();
if (min_log_in_prep_heap != 0 &&
min_log_in_prep_heap < min_log_number_to_keep) {
min_log_number_to_keep = min_log_in_prep_heap;
}
uint64_t min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable(
vset, &cfd_to_flush, memtables_to_flush);
if (min_log_refed_by_mem != 0 &&
min_log_refed_by_mem < min_log_number_to_keep) {
min_log_number_to_keep = min_log_refed_by_mem;
}
return min_log_number_to_keep;
}
} // namespace rocksdb } // namespace rocksdb

@ -532,6 +532,13 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
bool flushed = false; bool flushed = false;
uint64_t corrupted_log_number = kMaxSequenceNumber; uint64_t corrupted_log_number = kMaxSequenceNumber;
for (auto log_number : log_numbers) { for (auto log_number : log_numbers) {
if (log_number < versions_->min_log_number_to_keep_2pc()) {
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Skipping log #%" PRIu64
" since it is older than min log to keep #%" PRIu64,
log_number, versions_->min_log_number_to_keep_2pc());
continue;
}
// The previous incarnation may not have written any MANIFEST // The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually // records after allocating this log number. So we manually
// update the file number allocation counter in VersionSet. // update the file number allocation counter in VersionSet.

@ -1033,28 +1033,34 @@ Status DBImpl::SwitchWAL(WriteContext* write_context) {
} }
auto oldest_alive_log = alive_log_files_.begin()->number; auto oldest_alive_log = alive_log_files_.begin()->number;
auto oldest_log_with_uncommited_prep = FindMinLogContainingOutstandingPrep(); bool flush_wont_release_oldest_log = false;
if (allow_2pc()) {
if (allow_2pc() && auto oldest_log_with_uncommited_prep =
oldest_log_with_uncommited_prep > 0 && logs_with_prep_tracker_.FindMinLogContainingOutstandingPrep();
oldest_log_with_uncommited_prep <= oldest_alive_log) {
if (unable_to_flush_oldest_log_) { assert(oldest_log_with_uncommited_prep == 0 ||
oldest_log_with_uncommited_prep >= oldest_alive_log);
if (oldest_log_with_uncommited_prep > 0 &&
oldest_log_with_uncommited_prep == oldest_alive_log) {
if (unable_to_release_oldest_log_) {
// we already attempted to flush all column families dependent on // we already attempted to flush all column families dependent on
// the oldest alive log but the log still contained uncommited transactions. // the oldest alive log but the log still contained uncommited
// the oldest alive log STILL contains uncommited transaction so there // transactions so there is still nothing that we can do.
// is still nothing that we can do.
return status; return status;
} else { } else {
ROCKS_LOG_WARN( ROCKS_LOG_WARN(
immutable_db_options_.info_log, immutable_db_options_.info_log,
"Unable to release oldest log due to uncommited transaction"); "Unable to release oldest log due to uncommited transaction");
unable_to_flush_oldest_log_ = true; unable_to_release_oldest_log_ = true;
flush_wont_release_oldest_log = true;
}
} }
} else { }
if (!flush_wont_release_oldest_log) {
// we only mark this log as getting flushed if we have successfully // we only mark this log as getting flushed if we have successfully
// flushed all data in this log. If this log contains outstanding prepared // flushed all data in this log. If this log contains outstanding prepared
// transactions then we cannot flush this log until those transactions are commited. // transactions then we cannot flush this log until those transactions are commited.
unable_to_flush_oldest_log_ = false; unable_to_release_oldest_log_ = false;
alive_log_files_.begin()->getting_flushed = true; alive_log_files_.begin()->getting_flushed = true;
} }

@ -451,8 +451,9 @@ class SpecialEnv : public EnvWrapper {
return s; return s;
} }
Status NewSequentialFile(const std::string& f, unique_ptr<SequentialFile>* r, virtual Status NewSequentialFile(const std::string& f,
const EnvOptions& soptions) override { unique_ptr<SequentialFile>* r,
const EnvOptions& soptions) override {
class CountingFile : public SequentialFile { class CountingFile : public SequentialFile {
public: public:
CountingFile(unique_ptr<SequentialFile>&& target, CountingFile(unique_ptr<SequentialFile>&& target,

@ -20,6 +20,106 @@ class DBWALTest : public DBTestBase {
DBWALTest() : DBTestBase("/db_wal_test") {} DBWALTest() : DBTestBase("/db_wal_test") {}
}; };
// A SpecialEnv enriched to give more insight about deleted files
class EnrichedSpecialEnv : public SpecialEnv {
public:
explicit EnrichedSpecialEnv(Env* base) : SpecialEnv(base) {}
Status NewSequentialFile(const std::string& f, unique_ptr<SequentialFile>* r,
const EnvOptions& soptions) override {
InstrumentedMutexLock l(&env_mutex_);
if (f == skipped_wal) {
deleted_wal_reopened = true;
if (IsWAL(f) && largetest_deleted_wal.size() != 0 &&
f.compare(largetest_deleted_wal) <= 0) {
gap_in_wals = true;
}
}
return SpecialEnv::NewSequentialFile(f, r, soptions);
}
Status DeleteFile(const std::string& fname) override {
if (IsWAL(fname)) {
deleted_wal_cnt++;
InstrumentedMutexLock l(&env_mutex_);
// If this is the first WAL, remember its name and skip deleting it. We
// remember its name partly because the application might attempt to
// delete the file again.
if (skipped_wal.size() != 0 && skipped_wal != fname) {
if (largetest_deleted_wal.size() == 0 ||
largetest_deleted_wal.compare(fname) < 0) {
largetest_deleted_wal = fname;
}
} else {
skipped_wal = fname;
return Status::OK();
}
}
return SpecialEnv::DeleteFile(fname);
}
bool IsWAL(const std::string& fname) {
// printf("iswal %s\n", fname.c_str());
return fname.compare(fname.size() - 3, 3, "log") == 0;
}
InstrumentedMutex env_mutex_;
// the wal whose actual delete was skipped by the env
std::string skipped_wal = "";
// the largest WAL that was requested to be deleted
std::string largetest_deleted_wal = "";
// number of WALs that were successfully deleted
std::atomic<size_t> deleted_wal_cnt = {0};
// the WAL whose delete from fs was skipped is reopened during recovery
std::atomic<bool> deleted_wal_reopened = {false};
// whether a gap in the WALs was detected during recovery
std::atomic<bool> gap_in_wals = {false};
};
class DBWALTestWithEnrichedEnv : public DBTestBase {
public:
DBWALTestWithEnrichedEnv() : DBTestBase("/db_wal_test") {
enriched_env_ = new EnrichedSpecialEnv(env_->target());
auto options = CurrentOptions();
options.env = enriched_env_;
Reopen(options);
delete env_;
// to be deleted by the parent class
env_ = enriched_env_;
}
protected:
EnrichedSpecialEnv* enriched_env_;
};
// Test that the recovery would successfully avoid the gaps between the logs.
// One known scenario that could cause this is that the application issue the
// WAL deletion out of order. For the sake of simplicity in the test, here we
// create the gap by manipulating the env to skip deletion of the first WAL but
// not the ones after it.
TEST_F(DBWALTestWithEnrichedEnv, SkipDeletedWALs) {
auto options = last_options_;
// To cause frequent WAL deletion
options.write_buffer_size = 128;
Reopen(options);
WriteOptions writeOpt = WriteOptions();
for (int i = 0; i < 128 * 5; i++) {
ASSERT_OK(dbfull()->Put(writeOpt, "foo", "v1"));
}
FlushOptions fo;
fo.wait = true;
ASSERT_OK(db_->Flush(fo));
// some wals are deleted
ASSERT_NE(0, enriched_env_->deleted_wal_cnt);
// but not the first one
ASSERT_NE(0, enriched_env_->skipped_wal.size());
// Test that the WAL that was not deleted will be skipped during recovery
options = last_options_;
Reopen(options);
ASSERT_FALSE(enriched_env_->deleted_wal_reopened);
ASSERT_FALSE(enriched_env_->gap_in_wals);
}
TEST_F(DBWALTest, WAL) { TEST_F(DBWALTest, WAL) {
do { do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions()); CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
@ -891,7 +991,7 @@ TEST_F(DBWALTest, kPointInTimeRecoveryCFConsistency) {
// Record the offset at this point // Record the offset at this point
Env* env = options.env; Env* env = options.env;
int wal_file_id = RecoveryTestHelper::kWALFileOffset + 1; uint64_t wal_file_id = dbfull()->TEST_LogfileNumber();
std::string fname = LogFileName(dbname_, wal_file_id); std::string fname = LogFileName(dbname_, wal_file_id);
uint64_t offset_to_corrupt; uint64_t offset_to_corrupt;
ASSERT_OK(env->GetFileSize(fname, &offset_to_corrupt)); ASSERT_OK(env->GetFileSize(fname, &offset_to_corrupt));

@ -1413,8 +1413,12 @@ TEST_F(ExternalSSTFileTest, AddFileTrivialMoveBug) {
// fit in L3 but will overlap with compaction so will be added // fit in L3 but will overlap with compaction so will be added
// to L2 but a compaction will trivially move it to L3 // to L2 but a compaction will trivially move it to L3
// and break LSM consistency // and break LSM consistency
ASSERT_OK(dbfull()->SetOptions({{"max_bytes_for_level_base", "1"}})); static std::atomic<bool> called = {false};
ASSERT_OK(GenerateAndAddExternalFile(options, {15, 16}, 7)); if (!called) {
called = true;
ASSERT_OK(dbfull()->SetOptions({{"max_bytes_for_level_base", "1"}}));
ASSERT_OK(GenerateAndAddExternalFile(options, {15, 16}, 7));
}
}); });
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();

@ -185,7 +185,8 @@ void FlushJob::PickMemTable() {
base_->Ref(); // it is likely that we do not need this reference base_->Ref(); // it is likely that we do not need this reference
} }
Status FlushJob::Run(FileMetaData* file_meta) { Status FlushJob::Run(LogsWithPrepTracker* prep_tracker,
FileMetaData* file_meta) {
db_mutex_->AssertHeld(); db_mutex_->AssertHeld();
assert(pick_memtable_called); assert(pick_memtable_called);
AutoThreadOperationStageUpdater stage_run( AutoThreadOperationStageUpdater stage_run(
@ -226,7 +227,7 @@ Status FlushJob::Run(FileMetaData* file_meta) {
TEST_SYNC_POINT("FlushJob::InstallResults"); TEST_SYNC_POINT("FlushJob::InstallResults");
// Replace immutable memtable with the generated Table // Replace immutable memtable with the generated Table
s = cfd_->imm()->InstallMemtableFlushResults( s = cfd_->imm()->InstallMemtableFlushResults(
cfd_, mutable_cf_options_, mems_, versions_, db_mutex_, cfd_, mutable_cf_options_, mems_, prep_tracker, versions_, db_mutex_,
meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_, meta_.fd.GetNumber(), &job_context_->memtables_to_free, db_directory_,
log_buffer_); log_buffer_);
} }

@ -22,6 +22,7 @@
#include "db/internal_stats.h" #include "db/internal_stats.h"
#include "db/job_context.h" #include "db/job_context.h"
#include "db/log_writer.h" #include "db/log_writer.h"
#include "db/logs_with_prep_tracker.h"
#include "db/memtable_list.h" #include "db/memtable_list.h"
#include "db/snapshot_impl.h" #include "db/snapshot_impl.h"
#include "db/version_edit.h" #include "db/version_edit.h"
@ -42,6 +43,7 @@
namespace rocksdb { namespace rocksdb {
class DBImpl;
class MemTable; class MemTable;
class SnapshotChecker; class SnapshotChecker;
class TableCache; class TableCache;
@ -71,7 +73,8 @@ class FlushJob {
// Require db_mutex held. // Require db_mutex held.
// Once PickMemTable() is called, either Run() or Cancel() has to be called. // Once PickMemTable() is called, either Run() or Cancel() has to be called.
void PickMemTable(); void PickMemTable();
Status Run(FileMetaData* file_meta = nullptr); Status Run(LogsWithPrepTracker* prep_tracker = nullptr,
FileMetaData* file_meta = nullptr);
void Cancel(); void Cancel();
TableProperties GetTableProperties() const { return table_properties_; } TableProperties GetTableProperties() const { return table_properties_; }

@ -150,7 +150,7 @@ TEST_F(FlushJobTest, NonEmpty) {
FileMetaData fd; FileMetaData fd;
mutex_.Lock(); mutex_.Lock();
flush_job.PickMemTable(); flush_job.PickMemTable();
ASSERT_OK(flush_job.Run(&fd)); ASSERT_OK(flush_job.Run(nullptr, &fd));
mutex_.Unlock(); mutex_.Unlock();
db_options_.statistics->histogramData(FLUSH_TIME, &hist); db_options_.statistics->histogramData(FLUSH_TIME, &hist);
ASSERT_GT(hist.average, 0.0); ASSERT_GT(hist.average, 0.0);

@ -19,8 +19,8 @@ class ColumnFamilyData;
namespace rocksdb { namespace rocksdb {
class MemTableList;
class DBImpl; class DBImpl;
class MemTableList;
// Config for retrieving a property's value. // Config for retrieving a property's value.
struct DBPropertyInfo { struct DBPropertyInfo {

@ -0,0 +1,67 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
#include "db/logs_with_prep_tracker.h"
#include "port/likely.h"
namespace rocksdb {
void LogsWithPrepTracker::MarkLogAsHavingPrepSectionFlushed(uint64_t log) {
assert(log != 0);
std::lock_guard<std::mutex> lock(prepared_section_completed_mutex_);
auto it = prepared_section_completed_.find(log);
if (UNLIKELY(it == prepared_section_completed_.end())) {
prepared_section_completed_[log] = 1;
} else {
it->second += 1;
}
}
void LogsWithPrepTracker::MarkLogAsContainingPrepSection(uint64_t log) {
assert(log != 0);
std::lock_guard<std::mutex> lock(logs_with_prep_mutex_);
auto rit = logs_with_prep_.rbegin();
bool updated = false;
// Most probably the last log is the one that is being marked for
// having a prepare section; so search from the end.
for (; rit != logs_with_prep_.rend() && rit->log >= log; ++rit) {
if (rit->log == log) {
rit->cnt++;
updated = true;
break;
}
}
if (!updated) {
// We are either at the start, or at a position with rit->log < log
logs_with_prep_.insert(rit.base(), {log, 1});
}
}
uint64_t LogsWithPrepTracker::FindMinLogContainingOutstandingPrep() {
std::lock_guard<std::mutex> lock(logs_with_prep_mutex_);
auto it = logs_with_prep_.begin();
// start with the smallest log
for (; it != logs_with_prep_.end();) {
auto min_log = it->log;
{
std::lock_guard<std::mutex> lock2(prepared_section_completed_mutex_);
auto completed_it = prepared_section_completed_.find(min_log);
if (completed_it == prepared_section_completed_.end() ||
completed_it->second < it->cnt) {
return min_log;
}
assert(completed_it != prepared_section_completed_.end() &&
completed_it->second == it->cnt);
prepared_section_completed_.erase(completed_it);
}
// erase from beginning in vector is not efficient but this function is not
// on the fast path.
it = logs_with_prep_.erase(it);
}
// no such log found
return 0;
}
} // namespace rocksdb

@ -0,0 +1,61 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
//
#pragma once
#include <stdint.h>
#include <cassert>
#include <cstdlib>
#include <mutex>
#include <unordered_map>
#include <vector>
namespace rocksdb {
// This class is used to track the log files with outstanding prepare entries.
class LogsWithPrepTracker {
public:
// Called when a transaction prepared in `log` has been committed or aborted.
void MarkLogAsHavingPrepSectionFlushed(uint64_t log);
// Called when a transaction is prepared in `log`.
void MarkLogAsContainingPrepSection(uint64_t log);
// Return the earliest log file with outstanding prepare entries.
uint64_t FindMinLogContainingOutstandingPrep();
size_t TEST_PreparedSectionCompletedSize() {
return prepared_section_completed_.size();
}
size_t TEST_LogsWithPrepSize() { return logs_with_prep_.size(); }
private:
// REQUIRES: logs_with_prep_mutex_ held
//
// sorted list of log numbers still containing prepared data.
// this is used by FindObsoleteFiles to determine which
// flushed logs we must keep around because they still
// contain prepared data which has not been committed or rolled back
struct LogCnt {
uint64_t log; // the log number
uint64_t cnt; // number of prepared sections in the log
};
std::vector<LogCnt> logs_with_prep_;
std::mutex logs_with_prep_mutex_;
// REQUIRES: prepared_section_completed_mutex_ held
//
// to be used in conjunction with logs_with_prep_.
// once a transaction with data in log L is committed or rolled back
// rather than updating logs_with_prep_ directly we keep track of that
// in prepared_section_completed_ which maps LOG -> instance_count. This helps
// avoiding contention between a commit thread and the prepare threads.
//
// when trying to determine the minimum log still active we first
// consult logs_with_prep_. while that root value maps to
// an equal value in prepared_section_completed_ we erase the log from
// both logs_with_prep_ and prepared_section_completed_.
std::unordered_map<uint64_t, uint64_t> prepared_section_completed_;
std::mutex prepared_section_completed_mutex_;
};
} // namespace rocksdb

@ -12,6 +12,7 @@
#include <inttypes.h> #include <inttypes.h>
#include <limits> #include <limits>
#include <string> #include <string>
#include "db/db_impl.h"
#include "db/memtable.h" #include "db/memtable.h"
#include "db/version_set.h" #include "db/version_set.h"
#include "monitoring/thread_status_util.h" #include "monitoring/thread_status_util.h"
@ -322,9 +323,10 @@ void MemTableList::RollbackMemtableFlush(const autovector<MemTable*>& mems,
// Record a successful flush in the manifest file // Record a successful flush in the manifest file
Status MemTableList::InstallMemtableFlushResults( Status MemTableList::InstallMemtableFlushResults(
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
const autovector<MemTable*>& mems, VersionSet* vset, InstrumentedMutex* mu, const autovector<MemTable*>& mems, LogsWithPrepTracker* prep_tracker,
uint64_t file_number, autovector<MemTable*>* to_delete, VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number,
Directory* db_directory, LogBuffer* log_buffer) { autovector<MemTable*>* to_delete, Directory* db_directory,
LogBuffer* log_buffer) {
AutoThreadOperationStageUpdater stage_updater( AutoThreadOperationStageUpdater stage_updater(
ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS); ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS);
mu->AssertHeld(); mu->AssertHeld();
@ -361,6 +363,7 @@ Status MemTableList::InstallMemtableFlushResults(
uint64_t batch_file_number = 0; uint64_t batch_file_number = 0;
size_t batch_count = 0; size_t batch_count = 0;
autovector<VersionEdit*> edit_list; autovector<VersionEdit*> edit_list;
autovector<MemTable*> memtables_to_flush;
// enumerate from the last (earliest) element to see how many batch finished // enumerate from the last (earliest) element to see how many batch finished
for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) {
MemTable* m = *it; MemTable* m = *it;
@ -373,11 +376,20 @@ Status MemTableList::InstallMemtableFlushResults(
"[%s] Level-0 commit table #%" PRIu64 " started", "[%s] Level-0 commit table #%" PRIu64 " started",
cfd->GetName().c_str(), m->file_number_); cfd->GetName().c_str(), m->file_number_);
edit_list.push_back(&m->edit_); edit_list.push_back(&m->edit_);
memtables_to_flush.push_back(m);
} }
batch_count++; batch_count++;
} }
if (batch_count > 0) { if (batch_count > 0) {
if (vset->db_options()->allow_2pc) {
assert(edit_list.size() > 0);
// We piggyback the information of earliest log file to keep in the
// manifest entry for the last file flushed.
edit_list.back()->SetMinLogNumberToKeep(PrecomputeMinLogNumberToKeep(
vset, *cfd, edit_list, memtables_to_flush, prep_tracker));
}
// this can release and reacquire the mutex. // this can release and reacquire the mutex.
s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu, s = vset->LogAndApply(cfd, mutable_cf_options, edit_list, mu,
db_directory); db_directory);
@ -468,13 +480,21 @@ void MemTableList::InstallNewVersion() {
} }
} }
uint64_t MemTableList::GetMinLogContainingPrepSection() { uint64_t MemTableList::PrecomputeMinLogContainingPrepSection(
const autovector<MemTable*>& memtables_to_flush) {
uint64_t min_log = 0; uint64_t min_log = 0;
for (auto& m : current_->memlist_) { for (auto& m : current_->memlist_) {
// this mem has been flushed it no longer // Assume the list is very short, we can live with O(m*n). We can optimize
// needs to hold on the its prep section // if the performance has some problem.
if (m->flush_completed_) { bool should_skip = false;
for (MemTable* m_to_flush : memtables_to_flush) {
if (m == m_to_flush) {
should_skip = true;
break;
}
}
if (should_skip) {
continue; continue;
} }

@ -13,6 +13,7 @@
#include <vector> #include <vector>
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/logs_with_prep_tracker.h"
#include "db/memtable.h" #include "db/memtable.h"
#include "db/range_del_aggregator.h" #include "db/range_del_aggregator.h"
#include "monitoring/instrumented_mutex.h" #include "monitoring/instrumented_mutex.h"
@ -210,9 +211,10 @@ class MemTableList {
// Commit a successful flush in the manifest file // Commit a successful flush in the manifest file
Status InstallMemtableFlushResults( Status InstallMemtableFlushResults(
ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options,
const autovector<MemTable*>& m, VersionSet* vset, InstrumentedMutex* mu, const autovector<MemTable*>& m, LogsWithPrepTracker* prep_tracker,
uint64_t file_number, autovector<MemTable*>* to_delete, VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number,
Directory* db_directory, LogBuffer* log_buffer); autovector<MemTable*>* to_delete, Directory* db_directory,
LogBuffer* log_buffer);
// New memtables are inserted at the front of the list. // New memtables are inserted at the front of the list.
// Takes ownership of the referenced held on *m by the caller of Add(). // Takes ownership of the referenced held on *m by the caller of Add().
@ -243,7 +245,10 @@ class MemTableList {
size_t* current_memory_usage() { return &current_memory_usage_; } size_t* current_memory_usage() { return &current_memory_usage_; }
uint64_t GetMinLogContainingPrepSection(); // Returns the min log containing the prep section after memtables listsed in
// `memtables_to_flush` are flushed and their status is persisted in manifest.
uint64_t PrecomputeMinLogContainingPrepSection(
const autovector<MemTable*>& memtables_to_flush);
uint64_t GetEarliestMemTableID() const { uint64_t GetEarliestMemTableID() const {
auto& memlist = current_->memlist_; auto& memlist = current_->memlist_;

@ -82,10 +82,10 @@ class MemTableListTest : public testing::Test {
// Create dummy mutex. // Create dummy mutex.
InstrumentedMutex mutex; InstrumentedMutex mutex;
InstrumentedMutexLock l(&mutex); InstrumentedMutexLock l(&mutex);
LogsWithPrepTracker dummy_prep_tracker;
return list->InstallMemtableFlushResults(cfd, mutable_cf_options, m, return list->InstallMemtableFlushResults(
&versions, &mutex, 1, to_delete, cfd, mutable_cf_options, m, &dummy_prep_tracker, &versions, &mutex, 1,
nullptr, &log_buffer); to_delete, nullptr, &log_buffer);
} }
}; };

@ -30,6 +30,7 @@ enum Tag {
kNewFile = 7, kNewFile = 7,
// 8 was used for large value refs // 8 was used for large value refs
kPrevLogNumber = 9, kPrevLogNumber = 9,
kMinLogNumberToKeep = 10,
// these are new formats divergent from open source leveldb // these are new formats divergent from open source leveldb
kNewFile2 = 100, kNewFile2 = 100,
@ -44,6 +45,11 @@ enum Tag {
enum CustomTag { enum CustomTag {
kTerminate = 1, // The end of customized fields kTerminate = 1, // The end of customized fields
kNeedCompaction = 2, kNeedCompaction = 2,
// Since Manifest is not entirely currently forward-compatible, and the only
// forward-compatbile part is the CutsomtTag of kNewFile, we currently encode
// kMinLogNumberToKeep as part of a CustomTag as a hack. This should be
// removed when manifest becomes forward-comptabile.
kMinLogNumberToKeepHack = 3,
kPathId = 65, kPathId = 65,
}; };
// If this bit for the custom tag is set, opening DB should fail if // If this bit for the custom tag is set, opening DB should fail if
@ -63,12 +69,14 @@ void VersionEdit::Clear() {
last_sequence_ = 0; last_sequence_ = 0;
next_file_number_ = 0; next_file_number_ = 0;
max_column_family_ = 0; max_column_family_ = 0;
min_log_number_to_keep_ = 0;
has_comparator_ = false; has_comparator_ = false;
has_log_number_ = false; has_log_number_ = false;
has_prev_log_number_ = false; has_prev_log_number_ = false;
has_next_file_number_ = false; has_next_file_number_ = false;
has_last_sequence_ = false; has_last_sequence_ = false;
has_max_column_family_ = false; has_max_column_family_ = false;
has_min_log_number_to_keep_ = false;
deleted_files_.clear(); deleted_files_.clear();
new_files_.clear(); new_files_.clear();
column_family_ = 0; column_family_ = 0;
@ -97,19 +105,19 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
if (has_max_column_family_) { if (has_max_column_family_) {
PutVarint32Varint32(dst, kMaxColumnFamily, max_column_family_); PutVarint32Varint32(dst, kMaxColumnFamily, max_column_family_);
} }
for (const auto& deleted : deleted_files_) { for (const auto& deleted : deleted_files_) {
PutVarint32Varint32Varint64(dst, kDeletedFile, deleted.first /* level */, PutVarint32Varint32Varint64(dst, kDeletedFile, deleted.first /* level */,
deleted.second /* file number */); deleted.second /* file number */);
} }
bool min_log_num_written = false;
for (size_t i = 0; i < new_files_.size(); i++) { for (size_t i = 0; i < new_files_.size(); i++) {
const FileMetaData& f = new_files_[i].second; const FileMetaData& f = new_files_[i].second;
if (!f.smallest.Valid() || !f.largest.Valid()) { if (!f.smallest.Valid() || !f.largest.Valid()) {
return false; return false;
} }
bool has_customized_fields = false; bool has_customized_fields = false;
if (f.marked_for_compaction) { if (f.marked_for_compaction || has_min_log_number_to_keep_) {
PutVarint32(dst, kNewFile4); PutVarint32(dst, kNewFile4);
has_customized_fields = true; has_customized_fields = true;
} else if (f.fd.GetPathId() == 0) { } else if (f.fd.GetPathId() == 0) {
@ -165,6 +173,13 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
char p = static_cast<char>(1); char p = static_cast<char>(1);
PutLengthPrefixedSlice(dst, Slice(&p, 1)); PutLengthPrefixedSlice(dst, Slice(&p, 1));
} }
if (has_min_log_number_to_keep_ && !min_log_num_written) {
PutVarint32(dst, CustomTag::kMinLogNumberToKeepHack);
std::string varint_log_number;
PutFixed64(&varint_log_number, min_log_number_to_keep_);
PutLengthPrefixedSlice(dst, Slice(varint_log_number));
min_log_num_written = true;
}
TEST_SYNC_POINT_CALLBACK("VersionEdit::EncodeTo:NewFile4:CustomizeFields", TEST_SYNC_POINT_CALLBACK("VersionEdit::EncodeTo:NewFile4:CustomizeFields",
dst); dst);
@ -218,6 +233,9 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) {
uint64_t number; uint64_t number;
uint32_t path_id = 0; uint32_t path_id = 0;
uint64_t file_size; uint64_t file_size;
// Since this is the only forward-compatible part of the code, we hack new
// extension into this record. When we do, we set this boolean to distinguish
// the record from the normal NewFile records.
if (GetLevel(input, &level, &msg) && GetVarint64(input, &number) && if (GetLevel(input, &level, &msg) && GetVarint64(input, &number) &&
GetVarint64(input, &file_size) && GetInternalKey(input, &f.smallest) && GetVarint64(input, &file_size) && GetInternalKey(input, &f.smallest) &&
GetInternalKey(input, &f.largest) && GetInternalKey(input, &f.largest) &&
@ -252,6 +270,14 @@ const char* VersionEdit::DecodeNewFile4From(Slice* input) {
} }
f.marked_for_compaction = (field[0] == 1); f.marked_for_compaction = (field[0] == 1);
break; break;
case kMinLogNumberToKeepHack:
// This is a hack to encode kMinLogNumberToKeep in a
// forward-compatbile fashion.
if (!GetFixed64(&field, &min_log_number_to_keep_)) {
return "deleted log number malformatted";
}
has_min_log_number_to_keep_ = true;
break;
default: default:
if ((custom_tag & kCustomTagNonSafeIgnoreMask) != 0) { if ((custom_tag & kCustomTagNonSafeIgnoreMask) != 0) {
// Should not proceed if cannot understand it // Should not proceed if cannot understand it
@ -331,6 +357,14 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
} }
break; break;
case kMinLogNumberToKeep:
if (GetVarint64(&input, &min_log_number_to_keep_)) {
has_min_log_number_to_keep_ = true;
} else {
msg = "min log number to kee";
}
break;
case kCompactPointer: case kCompactPointer:
if (GetLevel(&input, &level, &msg) && if (GetLevel(&input, &level, &msg) &&
GetInternalKey(&input, &key)) { GetInternalKey(&input, &key)) {
@ -475,6 +509,10 @@ std::string VersionEdit::DebugString(bool hex_key) const {
r.append("\n NextFileNumber: "); r.append("\n NextFileNumber: ");
AppendNumberTo(&r, next_file_number_); AppendNumberTo(&r, next_file_number_);
} }
if (has_min_log_number_to_keep_) {
r.append("\n MinLogNumberToKeep: ");
AppendNumberTo(&r, min_log_number_to_keep_);
}
if (has_last_sequence_) { if (has_last_sequence_) {
r.append("\n LastSeq: "); r.append("\n LastSeq: ");
AppendNumberTo(&r, last_sequence_); AppendNumberTo(&r, last_sequence_);
@ -582,6 +620,9 @@ std::string VersionEdit::DebugJSON(int edit_num, bool hex_key) const {
if (has_max_column_family_) { if (has_max_column_family_) {
jw << "MaxColumnFamily" << max_column_family_; jw << "MaxColumnFamily" << max_column_family_;
} }
if (has_min_log_number_to_keep_) {
jw << "MinLogNumberToKeep" << min_log_number_to_keep_;
}
jw.EndObject(); jw.EndObject();

@ -199,6 +199,14 @@ class VersionEdit {
has_max_column_family_ = true; has_max_column_family_ = true;
max_column_family_ = max_column_family; max_column_family_ = max_column_family;
} }
void SetMinLogNumberToKeep(uint64_t num) {
has_min_log_number_to_keep_ = true;
min_log_number_to_keep_ = num;
}
bool has_log_number() { return has_log_number_; }
uint64_t log_number() { return log_number_; }
// Add the specified file at the specified number. // Add the specified file at the specified number.
// REQUIRES: This version has not been saved (see VersionSet::SaveTo) // REQUIRES: This version has not been saved (see VersionSet::SaveTo)
@ -285,6 +293,8 @@ class VersionEdit {
uint64_t prev_log_number_; uint64_t prev_log_number_;
uint64_t next_file_number_; uint64_t next_file_number_;
uint32_t max_column_family_; uint32_t max_column_family_;
// The most recent WAL log number that is deleted
uint64_t min_log_number_to_keep_;
SequenceNumber last_sequence_; SequenceNumber last_sequence_;
bool has_comparator_; bool has_comparator_;
bool has_log_number_; bool has_log_number_;
@ -292,6 +302,7 @@ class VersionEdit {
bool has_next_file_number_; bool has_next_file_number_;
bool has_last_sequence_; bool has_last_sequence_;
bool has_max_column_family_; bool has_max_column_family_;
bool has_min_log_number_to_keep_;
DeletedFileSet deleted_files_; DeletedFileSet deleted_files_;
std::vector<std::pair<int, FileMetaData>> new_files_; std::vector<std::pair<int, FileMetaData>> new_files_;

@ -181,6 +181,16 @@ TEST_F(VersionEditTest, ColumnFamilyTest) {
TestEncodeDecode(edit); TestEncodeDecode(edit);
} }
TEST_F(VersionEditTest, MinLogNumberToKeep) {
VersionEdit edit;
edit.SetMinLogNumberToKeep(13);
TestEncodeDecode(edit);
edit.Clear();
edit.SetMinLogNumberToKeep(23);
TestEncodeDecode(edit);
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

@ -2663,16 +2663,16 @@ struct VersionSet::ManifestWriter {
}; };
VersionSet::VersionSet(const std::string& dbname, VersionSet::VersionSet(const std::string& dbname,
const ImmutableDBOptions* db_options, const ImmutableDBOptions* _db_options,
const EnvOptions& storage_options, Cache* table_cache, const EnvOptions& storage_options, Cache* table_cache,
WriteBufferManager* write_buffer_manager, WriteBufferManager* write_buffer_manager,
WriteController* write_controller) WriteController* write_controller)
: column_family_set_( : column_family_set_(
new ColumnFamilySet(dbname, db_options, storage_options, table_cache, new ColumnFamilySet(dbname, _db_options, storage_options, table_cache,
write_buffer_manager, write_controller)), write_buffer_manager, write_controller)),
env_(db_options->env), env_(_db_options->env),
dbname_(dbname), dbname_(dbname),
db_options_(db_options), db_options_(_db_options),
next_file_number_(2), next_file_number_(2),
manifest_file_number_(0), // Filled by Recover() manifest_file_number_(0), // Filled by Recover()
options_file_number_(0), options_file_number_(0),
@ -2957,16 +2957,26 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data,
} }
} else { } else {
uint64_t max_log_number_in_batch = 0; uint64_t max_log_number_in_batch = 0;
uint64_t min_log_number_to_keep = 0;
for (auto& e : batch_edits) { for (auto& e : batch_edits) {
if (e->has_log_number_) { if (e->has_log_number_) {
max_log_number_in_batch = max_log_number_in_batch =
std::max(max_log_number_in_batch, e->log_number_); std::max(max_log_number_in_batch, e->log_number_);
} }
if (e->has_min_log_number_to_keep_) {
min_log_number_to_keep =
std::max(min_log_number_to_keep, e->min_log_number_to_keep_);
}
} }
if (max_log_number_in_batch != 0) { if (max_log_number_in_batch != 0) {
assert(column_family_data->GetLogNumber() <= max_log_number_in_batch); assert(column_family_data->GetLogNumber() <= max_log_number_in_batch);
column_family_data->SetLogNumber(max_log_number_in_batch); column_family_data->SetLogNumber(max_log_number_in_batch);
} }
if (min_log_number_to_keep != 0) {
// Should only be set in 2PC mode.
MarkMinLogNumberToKeep2PC(min_log_number_to_keep);
}
AppendVersion(column_family_data, v); AppendVersion(column_family_data, v);
} }
@ -3122,6 +3132,7 @@ Status VersionSet::Recover(
uint64_t log_number = 0; uint64_t log_number = 0;
uint64_t previous_log_number = 0; uint64_t previous_log_number = 0;
uint32_t max_column_family = 0; uint32_t max_column_family = 0;
uint64_t min_log_number_to_keep = 0;
std::unordered_map<uint32_t, BaseReferencedVersionBuilder*> builders; std::unordered_map<uint32_t, BaseReferencedVersionBuilder*> builders;
// add default column family // add default column family
@ -3262,6 +3273,11 @@ Status VersionSet::Recover(
max_column_family = edit.max_column_family_; max_column_family = edit.max_column_family_;
} }
if (edit.has_min_log_number_to_keep_) {
min_log_number_to_keep =
std::max(min_log_number_to_keep, edit.min_log_number_to_keep_);
}
if (edit.has_last_sequence_) { if (edit.has_last_sequence_) {
last_sequence = edit.last_sequence_; last_sequence = edit.last_sequence_;
have_last_sequence = true; have_last_sequence = true;
@ -3284,6 +3300,9 @@ Status VersionSet::Recover(
column_family_set_->UpdateMaxColumnFamily(max_column_family); column_family_set_->UpdateMaxColumnFamily(max_column_family);
// When reading DB generated using old release, min_log_number_to_keep=0.
// All log files will be scanned for potential prepare entries.
MarkMinLogNumberToKeep2PC(min_log_number_to_keep);
MarkFileNumberUsed(previous_log_number); MarkFileNumberUsed(previous_log_number);
MarkFileNumberUsed(log_number); MarkFileNumberUsed(log_number);
} }
@ -3355,11 +3374,12 @@ Status VersionSet::Recover(
"manifest_file_number is %lu, next_file_number is %lu, " "manifest_file_number is %lu, next_file_number is %lu, "
"last_sequence is %lu, log_number is %lu," "last_sequence is %lu, log_number is %lu,"
"prev_log_number is %lu," "prev_log_number is %lu,"
"max_column_family is %u\n", "max_column_family is %u,"
"min_log_number_to_keep is %lu\n",
manifest_filename.c_str(), (unsigned long)manifest_file_number_, manifest_filename.c_str(), (unsigned long)manifest_file_number_,
(unsigned long)next_file_number_.load(), (unsigned long)last_sequence_, (unsigned long)next_file_number_.load(), (unsigned long)last_sequence_,
(unsigned long)log_number, (unsigned long)prev_log_number_, (unsigned long)log_number, (unsigned long)prev_log_number_,
column_family_set_->GetMaxColumnFamily()); column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep_2pc());
for (auto cfd : *column_family_set_) { for (auto cfd : *column_family_set_) {
if (cfd->IsDropped()) { if (cfd->IsDropped()) {
@ -3647,6 +3667,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
cfd->SetLogNumber(edit.log_number_); cfd->SetLogNumber(edit.log_number_);
} }
if (edit.has_prev_log_number_) { if (edit.has_prev_log_number_) {
previous_log_number = edit.prev_log_number_; previous_log_number = edit.prev_log_number_;
have_prev_log_number = true; have_prev_log_number = true;
@ -3665,6 +3686,10 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
if (edit.has_max_column_family_) { if (edit.has_max_column_family_) {
column_family_set_->UpdateMaxColumnFamily(edit.max_column_family_); column_family_set_->UpdateMaxColumnFamily(edit.max_column_family_);
} }
if (edit.has_min_log_number_to_keep_) {
MarkMinLogNumberToKeep2PC(edit.min_log_number_to_keep_);
}
} }
} }
file_reader.reset(); file_reader.reset();
@ -3723,10 +3748,11 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
printf( printf(
"next_file_number %lu last_sequence " "next_file_number %lu last_sequence "
"%lu prev_log_number %lu max_column_family %u\n", "%lu prev_log_number %lu max_column_family %u min_log_number_to_keep "
"%" PRIu64 "\n",
(unsigned long)next_file_number_.load(), (unsigned long)last_sequence, (unsigned long)next_file_number_.load(), (unsigned long)last_sequence,
(unsigned long)previous_log_number, (unsigned long)previous_log_number,
column_family_set_->GetMaxColumnFamily()); column_family_set_->GetMaxColumnFamily(), min_log_number_to_keep_2pc());
} }
return s; return s;
@ -3741,6 +3767,14 @@ void VersionSet::MarkFileNumberUsed(uint64_t number) {
} }
} }
// Called only either from ::LogAndApply which is protected by mutex or during
// recovery which is single-threaded.
void VersionSet::MarkMinLogNumberToKeep2PC(uint64_t number) {
if (min_log_number_to_keep_2pc_.load(std::memory_order_relaxed) < number) {
min_log_number_to_keep_2pc_.store(number, std::memory_order_relaxed);
}
}
Status VersionSet::WriteSnapshot(log::Writer* log) { Status VersionSet::WriteSnapshot(log::Writer* log) {
// TODO: Break up into multiple records to reduce memory usage on recovery? // TODO: Break up into multiple records to reduce memory usage on recovery?

@ -802,6 +802,10 @@ class VersionSet {
uint64_t current_next_file_number() const { return next_file_number_.load(); } uint64_t current_next_file_number() const { return next_file_number_.load(); }
uint64_t min_log_number_to_keep_2pc() const {
return min_log_number_to_keep_2pc_.load();
}
// Allocate and return a new file number // Allocate and return a new file number
uint64_t NewFileNumber() { return next_file_number_.fetch_add(1); } uint64_t NewFileNumber() { return next_file_number_.fetch_add(1); }
@ -849,15 +853,31 @@ class VersionSet {
// REQUIRED: this is only called during single-threaded recovery or repair. // REQUIRED: this is only called during single-threaded recovery or repair.
void MarkFileNumberUsed(uint64_t number); void MarkFileNumberUsed(uint64_t number);
// Mark the specified log number as deleted
// REQUIRED: this is only called during single-threaded recovery or repair, or
// from ::LogAndApply where the global mutex is held.
void MarkMinLogNumberToKeep2PC(uint64_t number);
// Return the log file number for the log file that is currently // Return the log file number for the log file that is currently
// being compacted, or zero if there is no such log file. // being compacted, or zero if there is no such log file.
uint64_t prev_log_number() const { return prev_log_number_; } uint64_t prev_log_number() const { return prev_log_number_; }
// Returns the minimum log number such that all // Returns the minimum log number which still has data not flushed to any SST
// log numbers less than or equal to it can be deleted // file.
uint64_t MinLogNumber() const { // In non-2PC mode, all the log numbers smaller than this number can be safely
// deleted.
uint64_t MinLogNumberWithUnflushedData() const {
return PreComputeMinLogNumberWithUnflushedData(nullptr);
}
// Returns the minimum log number which still has data not flushed to any SST
// file, except data from `cfd_to_skip`.
uint64_t PreComputeMinLogNumberWithUnflushedData(
const ColumnFamilyData* cfd_to_skip) const {
uint64_t min_log_num = std::numeric_limits<uint64_t>::max(); uint64_t min_log_num = std::numeric_limits<uint64_t>::max();
for (auto cfd : *column_family_set_) { for (auto cfd : *column_family_set_) {
if (cfd == cfd_to_skip) {
continue;
}
// It's safe to ignore dropped column families here: // It's safe to ignore dropped column families here:
// cfd->IsDropped() becomes true after the drop is persisted in MANIFEST. // cfd->IsDropped() becomes true after the drop is persisted in MANIFEST.
if (min_log_num > cfd->GetLogNumber() && !cfd->IsDropped()) { if (min_log_num > cfd->GetLogNumber() && !cfd->IsDropped()) {
@ -908,6 +928,8 @@ class VersionSet {
new_options.writable_file_max_buffer_size; new_options.writable_file_max_buffer_size;
} }
const ImmutableDBOptions* db_options() const { return db_options_; }
static uint64_t GetNumLiveVersions(Version* dummy_versions); static uint64_t GetNumLiveVersions(Version* dummy_versions);
static uint64_t GetTotalSstFilesSize(Version* dummy_versions); static uint64_t GetTotalSstFilesSize(Version* dummy_versions);
@ -946,6 +968,10 @@ class VersionSet {
const std::string dbname_; const std::string dbname_;
const ImmutableDBOptions* const db_options_; const ImmutableDBOptions* const db_options_;
std::atomic<uint64_t> next_file_number_; std::atomic<uint64_t> next_file_number_;
// Any log number equal or lower than this should be ignored during recovery,
// and is qualified for being deleted in 2PC mode. In non-2PC mode, this
// number is ignored.
std::atomic<uint64_t> min_log_number_to_keep_2pc_ = {0};
uint64_t manifest_file_number_; uint64_t manifest_file_number_;
uint64_t options_file_number_; uint64_t options_file_number_;
uint64_t pending_manifest_file_number_; uint64_t pending_manifest_file_number_;

@ -33,6 +33,7 @@ LIB_SOURCES = \
db/flush_scheduler.cc \ db/flush_scheduler.cc \
db/forward_iterator.cc \ db/forward_iterator.cc \
db/internal_stats.cc \ db/internal_stats.cc \
db/logs_with_prep_tracker.cc \
db/log_reader.cc \ db/log_reader.cc \
db/log_writer.cc \ db/log_writer.cc \
db/malloc_stats.cc \ db/malloc_stats.cc \

@ -197,7 +197,8 @@ Status PessimisticTransaction::Prepare() {
s = PrepareInternal(); s = PrepareInternal();
if (s.ok()) { if (s.ok()) {
assert(log_number_ != 0); assert(log_number_ != 0);
dbimpl_->MarkLogAsContainingPrepSection(log_number_); dbimpl_->logs_with_prep_tracker()->MarkLogAsContainingPrepSection(
log_number_);
txn_state_.store(PREPARED); txn_state_.store(PREPARED);
} }
} else if (txn_state_ == LOCKS_STOLEN) { } else if (txn_state_ == LOCKS_STOLEN) {
@ -284,7 +285,8 @@ Status PessimisticTransaction::Commit() {
// to determine what prep logs must be kept around, // to determine what prep logs must be kept around,
// not the prep section heap. // not the prep section heap.
assert(log_number_ > 0); assert(log_number_ > 0);
dbimpl_->MarkLogAsHavingPrepSectionFlushed(log_number_); dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
log_number_);
txn_db_impl_->UnregisterTransaction(this); txn_db_impl_->UnregisterTransaction(this);
Clear(); Clear();
@ -341,7 +343,8 @@ Status PessimisticTransaction::Rollback() {
if (s.ok()) { if (s.ok()) {
// we do not need to keep our prepared section around // we do not need to keep our prepared section around
assert(log_number_ > 0); assert(log_number_ > 0);
dbimpl_->MarkLogAsHavingPrepSectionFlushed(log_number_); dbimpl_->logs_with_prep_tracker()->MarkLogAsHavingPrepSectionFlushed(
log_number_);
Clear(); Clear();
txn_state_.store(ROLLEDBACK); txn_state_.store(ROLLEDBACK);
} }

@ -900,6 +900,7 @@ TEST_P(TransactionTest, SimpleTwoPhaseTransactionTest) {
switch (txn_db_options.write_policy) { switch (txn_db_options.write_policy) {
case WRITE_COMMITTED: case WRITE_COMMITTED:
// but now our memtable should be referencing the prep section // but now our memtable should be referencing the prep section
ASSERT_GE(log_containing_prep, db_impl->MinLogNumberToKeep());
ASSERT_EQ(log_containing_prep, ASSERT_EQ(log_containing_prep,
db_impl->TEST_FindMinPrepLogReferencedByMemTable()); db_impl->TEST_FindMinPrepLogReferencedByMemTable());
break; break;
@ -925,6 +926,7 @@ TEST_P(TransactionTest, SimpleTwoPhaseTransactionTest) {
} }
// after memtable flush we can now relese the log // after memtable flush we can now relese the log
ASSERT_GT(db_impl->MinLogNumberToKeep(), log_containing_prep);
ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable()); ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
delete txn; delete txn;
@ -1279,6 +1281,8 @@ TEST_P(TransactionTest, PersistentTwoPhaseTransactionTest) {
// but now our memtable should be referencing the prep section // but now our memtable should be referencing the prep section
ASSERT_EQ(log_containing_prep, ASSERT_EQ(log_containing_prep,
db_impl->TEST_FindMinPrepLogReferencedByMemTable()); db_impl->TEST_FindMinPrepLogReferencedByMemTable());
ASSERT_GE(log_containing_prep, db_impl->MinLogNumberToKeep());
break; break;
case WRITE_PREPARED: case WRITE_PREPARED:
case WRITE_UNPREPARED: case WRITE_UNPREPARED:
@ -1289,9 +1293,15 @@ TEST_P(TransactionTest, PersistentTwoPhaseTransactionTest) {
assert(false); assert(false);
} }
// Add a dummy record to memtable before a flush. Otherwise, the
// memtable will be empty and flush will be skipped.
s = db->Put(write_options, Slice("foo3"), Slice("bar3"));
ASSERT_OK(s);
db_impl->TEST_FlushMemTable(true); db_impl->TEST_FlushMemTable(true);
// after memtable flush we can now relese the log // after memtable flush we can now release the log
ASSERT_GT(db_impl->MinLogNumberToKeep(), log_containing_prep);
ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable()); ASSERT_EQ(0, db_impl->TEST_FindMinPrepLogReferencedByMemTable());
delete txn; delete txn;
@ -1805,14 +1815,14 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest2) {
assert(false); assert(false);
} }
ASSERT_TRUE(!db_impl->TEST_UnableToFlushOldestLog()); ASSERT_TRUE(!db_impl->TEST_UnableToReleaseOldestLog());
// request a flush for all column families such that the earliest // request a flush for all column families such that the earliest
// alive log file can be killed // alive log file can be killed
db_impl->TEST_SwitchWAL(); db_impl->TEST_SwitchWAL();
// log cannot be flushed because txn2 has not been commited // log cannot be flushed because txn2 has not been commited
ASSERT_TRUE(!db_impl->TEST_IsLogGettingFlushed()); ASSERT_TRUE(!db_impl->TEST_IsLogGettingFlushed());
ASSERT_TRUE(db_impl->TEST_UnableToFlushOldestLog()); ASSERT_TRUE(db_impl->TEST_UnableToReleaseOldestLog());
// assert that cfa has a flush requested // assert that cfa has a flush requested
ASSERT_TRUE(cfh_a->cfd()->imm()->HasFlushRequested()); ASSERT_TRUE(cfh_a->cfd()->imm()->HasFlushRequested());
@ -1836,7 +1846,7 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest2) {
ASSERT_OK(s); ASSERT_OK(s);
db_impl->TEST_SwitchWAL(); db_impl->TEST_SwitchWAL();
ASSERT_TRUE(!db_impl->TEST_UnableToFlushOldestLog()); ASSERT_TRUE(!db_impl->TEST_UnableToReleaseOldestLog());
// we should see that cfb now has a flush requested // we should see that cfb now has a flush requested
ASSERT_TRUE(cfh_b->cfd()->imm()->HasFlushRequested()); ASSERT_TRUE(cfh_b->cfd()->imm()->HasFlushRequested());

Loading…
Cancel
Save