[rocksdb] Memtable Log Referencing and Prepared Batch Recovery

Summary:
This diff is built on top of WriteBatch modification: https://reviews.facebook.net/D54093 and adds the required functionality to rocksdb core necessary for rocksdb to support 2PC.

modfication of DBImpl::WriteImpl()
- added two arguments *uint64_t log_used = nullptr, uint64_t log_ref = 0;
- *log_used is an output argument which will return the log number which the incoming batch was inserted into, 0 if no WAL insert took place.
-  log_ref is a supplied log_number which all memtables inserted into will reference after the batch insert takes place. This number will reside in 'FindMinPrepLogReferencedByMemTable()' until all Memtables insertinto have flushed.

- Recovery/writepath is now aware of prepared batches and commit and rollback markers.

Test Plan: There is currently no test on this diff. All testing of this functionality takes place in the Transaction layer/diff but I will add some testing.

Reviewers: IslamAbdelRahman, sdong

Subscribers: leveldb, santoshb, andrewkr, vasilep, dhruba, hermanlee4

Differential Revision: https://reviews.facebook.net/D56919
main
Reid Horuff 9 years ago
parent 0460e9dcce
commit 1b8a2e8fdd
  1. 153
      db/db_impl.cc
  2. 82
      db/db_impl.h
  3. 18
      db/db_impl_debug.cc
  4. 14
      db/memtable.cc
  5. 11
      db/memtable.h
  6. 20
      db/memtable_list.cc
  7. 2
      db/memtable_list.h
  8. 195
      db/write_batch.cc
  9. 7
      db/write_batch_internal.h
  10. 17
      db/write_thread.h
  11. 4
      include/rocksdb/options.h
  12. 3
      util/options_helper.h
  13. 3
      util/options_settable_test.cc

@ -90,10 +90,10 @@
#include "util/log_buffer.h" #include "util/log_buffer.h"
#include "util/logging.h" #include "util/logging.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/sst_file_manager_impl.h"
#include "util/options_helper.h" #include "util/options_helper.h"
#include "util/options_parser.h" #include "util/options_parser.h"
#include "util/perf_context_imp.h" #include "util/perf_context_imp.h"
#include "util/sst_file_manager_impl.h"
#include "util/stop_watch.h" #include "util/stop_watch.h"
#include "util/string_util.h" #include "util/string_util.h"
#include "util/sync_point.h" #include "util/sync_point.h"
@ -614,6 +614,78 @@ void DBImpl::MaybeDumpStats() {
} }
} }
uint64_t DBImpl::FindMinPrepLogReferencedByMemTable() {
uint64_t min_log = 0;
// we must look through the memtables for two phase transactions
// that have been committed but not yet flushed
for (auto loop_cfd : *versions_->GetColumnFamilySet()) {
if (loop_cfd->IsDropped()) {
continue;
}
auto log = loop_cfd->imm()->GetMinLogContainingPrepSection();
if (log > 0 && (min_log == 0 || log < min_log)) {
min_log = log;
}
log = loop_cfd->mem()->GetMinLogContainingPrepSection();
if (log > 0 && (min_log == 0 || log < min_log)) {
min_log = log;
}
}
return min_log;
}
void DBImpl::MarkLogAsHavingPrepSectionFlushed(uint64_t log) {
assert(log != 0);
std::lock_guard<std::mutex> lock(prep_heap_mutex_);
auto it = prepared_section_completed_.find(log);
assert(it != prepared_section_completed_.end());
it->second += 1;
}
void DBImpl::MarkLogAsContainingPrepSection(uint64_t log) {
assert(log != 0);
std::lock_guard<std::mutex> lock(prep_heap_mutex_);
min_log_with_prep_.push(log);
auto it = prepared_section_completed_.find(log);
if (it == prepared_section_completed_.end()) {
prepared_section_completed_[log] = 0;
}
}
uint64_t DBImpl::FindMinLogContainingOutstandingPrep() {
uint64_t min_log = 0;
// first we look in the prepared heap where we keep
// track of transactions that have been prepared (written to WAL)
// but not yet committed.
while (!min_log_with_prep_.empty()) {
min_log = min_log_with_prep_.top();
auto it = prepared_section_completed_.find(min_log);
// value was marked as 'deleted' from heap
if (it != prepared_section_completed_.end() && it->second > 0) {
it->second -= 1;
min_log_with_prep_.pop();
// back to squere one...
min_log = 0;
continue;
} else {
// found a valid value
break;
}
}
return min_log;
}
// * Returns the list of live files in 'sst_live' // * Returns the list of live files in 'sst_live'
// If it's doing full scan: // If it's doing full scan:
// * Returns the list of all files in the filesystem in // * Returns the list of all files in the filesystem in
@ -671,6 +743,32 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
job_context->pending_manifest_file_number = job_context->pending_manifest_file_number =
versions_->pending_manifest_file_number(); versions_->pending_manifest_file_number();
job_context->log_number = versions_->MinLogNumber(); job_context->log_number = versions_->MinLogNumber();
if (allow_2pc()) {
// if are 2pc we must consider logs containing prepared
// sections of outstanding transactions.
//
// We must check min logs with outstanding prep before we check
// logs referneces by memtables because a log referenced by the
// first data structure could transition to the second under us.
//
// TODO(horuff): iterating over all column families under db mutex.
// should find more optimial solution
auto min_log_in_prep_heap = FindMinLogContainingOutstandingPrep();
if (min_log_in_prep_heap != 0 &&
min_log_in_prep_heap < job_context->log_number) {
job_context->log_number = min_log_in_prep_heap;
}
auto min_log_refed_by_mem = FindMinPrepLogReferencedByMemTable();
if (min_log_refed_by_mem != 0 &&
min_log_refed_by_mem < job_context->log_number) {
job_context->log_number = min_log_refed_by_mem;
}
}
job_context->prev_log_number = versions_->prev_log_number(); job_context->prev_log_number = versions_->prev_log_number();
versions_->AddLiveFiles(&job_context->sst_live); versions_->AddLiveFiles(&job_context->sst_live);
@ -708,7 +806,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
} }
if (!alive_log_files_.empty()) { if (!alive_log_files_.empty()) {
uint64_t min_log_number = versions_->MinLogNumber(); uint64_t min_log_number = job_context->log_number;
// find newly obsoleted log files // find newly obsoleted log files
while (alive_log_files_.begin()->number < min_log_number) { while (alive_log_files_.begin()->number < min_log_number) {
auto& earliest = *alive_log_files_.begin(); auto& earliest = *alive_log_files_.begin();
@ -1378,9 +1476,9 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
// insert. We don't want to fail the whole write batch in that case -- // insert. We don't want to fail the whole write batch in that case --
// we just ignore the update. // we just ignore the update.
// That's why we set ignore missing column families to true // That's why we set ignore missing column families to true
status = status = WriteBatchInternal::InsertInto(
WriteBatchInternal::InsertInto(&batch, column_family_memtables_.get(), &batch, column_family_memtables_.get(), &flush_scheduler_, true,
&flush_scheduler_, true, log_number); log_number, this);
MaybeIgnoreError(&status); MaybeIgnoreError(&status);
if (!status.ok()) { if (!status.ok()) {
@ -4258,19 +4356,21 @@ Status DBImpl::SingleDelete(const WriteOptions& write_options,
} }
Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) { Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) {
return WriteImpl(write_options, my_batch, nullptr); return WriteImpl(write_options, my_batch, nullptr, nullptr);
} }
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
Status DBImpl::WriteWithCallback(const WriteOptions& write_options, Status DBImpl::WriteWithCallback(const WriteOptions& write_options,
WriteBatch* my_batch, WriteBatch* my_batch,
WriteCallback* callback) { WriteCallback* callback) {
return WriteImpl(write_options, my_batch, callback); return WriteImpl(write_options, my_batch, callback, nullptr);
} }
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
Status DBImpl::WriteImpl(const WriteOptions& write_options, Status DBImpl::WriteImpl(const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback) { WriteBatch* my_batch, WriteCallback* callback,
uint64_t* log_used, uint64_t log_ref,
bool disable_memtable) {
if (my_batch == nullptr) { if (my_batch == nullptr) {
return Status::Corruption("Batch is nullptr!"); return Status::Corruption("Batch is nullptr!");
} }
@ -4295,8 +4395,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
w.batch = my_batch; w.batch = my_batch;
w.sync = write_options.sync; w.sync = write_options.sync;
w.disableWAL = write_options.disableWAL; w.disableWAL = write_options.disableWAL;
w.disable_memtable = disable_memtable;
w.in_batch_group = false; w.in_batch_group = false;
w.callback = callback; w.callback = callback;
w.log_ref = log_ref;
if (!write_options.disableWAL) { if (!write_options.disableWAL) {
RecordTick(stats_, WRITE_WITH_WAL); RecordTick(stats_, WRITE_WITH_WAL);
@ -4309,12 +4411,16 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
// we are a non-leader in a parallel group // we are a non-leader in a parallel group
PERF_TIMER_GUARD(write_memtable_time); PERF_TIMER_GUARD(write_memtable_time);
if (!w.CallbackFailed()) { if (log_used != nullptr) {
*log_used = w.log_used;
}
if (w.ShouldWriteToMemtable()) {
ColumnFamilyMemTablesImpl column_family_memtables( ColumnFamilyMemTablesImpl column_family_memtables(
versions_->GetColumnFamilySet()); versions_->GetColumnFamilySet());
WriteBatchInternal::SetSequence(w.batch, w.sequence); WriteBatchInternal::SetSequence(w.batch, w.sequence);
w.status = WriteBatchInternal::InsertInto( w.status = WriteBatchInternal::InsertInto(
w.batch, &column_family_memtables, &flush_scheduler_, &w, &column_family_memtables, &flush_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/, this, write_options.ignore_missing_column_families, 0 /*log_number*/, this,
true /*dont_filter_deletes*/, true /*concurrent_memtable_writes*/); true /*dont_filter_deletes*/, true /*concurrent_memtable_writes*/);
} }
@ -4332,6 +4438,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
status = w.FinalStatus(); status = w.FinalStatus();
} }
if (w.state == WriteThread::STATE_COMPLETED) { if (w.state == WriteThread::STATE_COMPLETED) {
if (log_used != nullptr) {
*log_used = w.log_used;
}
// write is complete and leader has updated sequence // write is complete and leader has updated sequence
RecordTick(stats_, WRITE_DONE_BY_OTHER); RecordTick(stats_, WRITE_DONE_BY_OTHER);
return w.FinalStatus(); return w.FinalStatus();
@ -4489,10 +4598,15 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
uint64_t total_byte_size = 0; uint64_t total_byte_size = 0;
for (auto writer : write_group) { for (auto writer : write_group) {
if (writer->CheckCallback(this)) { if (writer->CheckCallback(this)) {
if (writer->ShouldWriteToMemtable()) {
total_count += WriteBatchInternal::Count(writer->batch); total_count += WriteBatchInternal::Count(writer->batch);
parallel = parallel && !writer->batch->HasMerge();
}
if (writer->ShouldWriteToWAL()) {
total_byte_size = WriteBatchInternal::AppendedByteSize( total_byte_size = WriteBatchInternal::AppendedByteSize(
total_byte_size, WriteBatchInternal::ByteSize(writer->batch)); total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
parallel = parallel && !writer->batch->HasMerge(); }
} }
} }
@ -4514,22 +4628,27 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
PERF_TIMER_GUARD(write_wal_time); PERF_TIMER_GUARD(write_wal_time);
WriteBatch* merged_batch = nullptr; WriteBatch* merged_batch = nullptr;
if (write_group.size() == 1 && !write_group[0]->CallbackFailed()) { if (write_group.size() == 1 && write_group[0]->ShouldWriteToWAL()) {
merged_batch = write_group[0]->batch; merged_batch = write_group[0]->batch;
write_group[0]->log_used = logfile_number_;
} else { } else {
// WAL needs all of the batches flattened into a single batch. // WAL needs all of the batches flattened into a single batch.
// We could avoid copying here with an iov-like AddRecord // We could avoid copying here with an iov-like AddRecord
// interface // interface
merged_batch = &tmp_batch_; merged_batch = &tmp_batch_;
for (auto writer : write_group) { for (auto writer : write_group) {
if (!writer->CallbackFailed()) { if (writer->ShouldWriteToWAL()) {
WriteBatchInternal::Append(merged_batch, writer->batch); WriteBatchInternal::Append(merged_batch, writer->batch);
} }
writer->log_used = logfile_number_;
} }
} }
WriteBatchInternal::SetSequence(merged_batch, current_sequence);
assert(WriteBatchInternal::Count(merged_batch) == total_count); if (log_used != nullptr) {
*log_used = logfile_number_;
}
WriteBatchInternal::SetSequence(merged_batch, current_sequence);
Slice log_entry = WriteBatchInternal::Contents(merged_batch); Slice log_entry = WriteBatchInternal::Contents(merged_batch);
status = logs_.back().writer->AddRecord(log_entry); status = logs_.back().writer->AddRecord(log_entry);
@ -4615,14 +4734,14 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
std::memory_order_relaxed); std::memory_order_relaxed);
write_thread_.LaunchParallelFollowers(&pg, current_sequence); write_thread_.LaunchParallelFollowers(&pg, current_sequence);
if (!w.CallbackFailed()) { if (w.ShouldWriteToMemtable()) {
// do leader write // do leader write
ColumnFamilyMemTablesImpl column_family_memtables( ColumnFamilyMemTablesImpl column_family_memtables(
versions_->GetColumnFamilySet()); versions_->GetColumnFamilySet());
assert(w.sequence == current_sequence); assert(w.sequence == current_sequence);
WriteBatchInternal::SetSequence(w.batch, w.sequence); WriteBatchInternal::SetSequence(w.batch, w.sequence);
w.status = WriteBatchInternal::InsertInto( w.status = WriteBatchInternal::InsertInto(
w.batch, &column_family_memtables, &flush_scheduler_, &w, &column_family_memtables, &flush_scheduler_,
write_options.ignore_missing_column_families, 0 /*log_number*/, write_options.ignore_missing_column_families, 0 /*log_number*/,
this, true /*dont_filter_deletes*/, this, true /*dont_filter_deletes*/,
true /*concurrent_memtable_writes*/); true /*concurrent_memtable_writes*/);

@ -10,8 +10,10 @@
#include <atomic> #include <atomic>
#include <deque> #include <deque>
#include <functional>
#include <limits> #include <limits>
#include <list> #include <list>
#include <queue>
#include <set> #include <set>
#include <string> #include <string>
#include <utility> #include <utility>
@ -296,7 +298,8 @@ class DBImpl : public DB {
bool disallow_trivial_move = false); bool disallow_trivial_move = false);
// Force current memtable contents to be flushed. // Force current memtable contents to be flushed.
Status TEST_FlushMemTable(bool wait = true); Status TEST_FlushMemTable(bool wait = true,
ColumnFamilyHandle* cfh = nullptr);
// Wait for memtable compaction // Wait for memtable compaction
Status TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family = nullptr); Status TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family = nullptr);
@ -345,6 +348,9 @@ class DBImpl : public DB {
WriteController& TEST_write_controler() { return write_controller_; } WriteController& TEST_write_controler() { return write_controller_; }
uint64_t TEST_FindMinLogContainingOutstandingPrep();
uint64_t TEST_FindMinPrepLogReferencedByMemTable();
#endif // NDEBUG #endif // NDEBUG
// Return maximum background compaction alowed to be scheduled based on // Return maximum background compaction alowed to be scheduled based on
@ -421,12 +427,57 @@ class DBImpl : public DB {
return num_running_compactions_; return num_running_compactions_;
} }
// hollow transactions shell used for recovery.
// these will then be passed to TransactionDB so that
// locks can be reacquired before writing can resume.
struct RecoveredTransaction {
uint64_t log_number_;
std::string name_;
WriteBatch* batch_;
explicit RecoveredTransaction(const uint64_t log, const std::string& name,
WriteBatch* batch)
: log_number_(log), name_(name), batch_(batch) {}
~RecoveredTransaction() { delete batch_; }
};
bool allow_2pc() const { return db_options_.allow_2pc; }
RecoveredTransaction* GetRecoveredTransaction(const std::string& name) {
auto it = recovered_transactions_.find(name);
if (it == recovered_transactions_.end()) {
return nullptr;
} else {
return it->second;
}
}
void InsertRecoveredTransaction(const uint64_t log, const std::string& name,
WriteBatch* batch) {
recovered_transactions_[name] = new RecoveredTransaction(log, name, batch);
MarkLogAsContainingPrepSection(log);
}
void DeleteRecoveredTransaction(const std::string& name) {
auto it = recovered_transactions_.find(name);
assert(it != recovered_transactions_.end());
auto* trx = it->second;
recovered_transactions_.erase(it);
MarkLogAsHavingPrepSectionFlushed(trx->log_number_);
delete trx;
}
void MarkLogAsHavingPrepSectionFlushed(uint64_t log);
void MarkLogAsContainingPrepSection(uint64_t log);
protected: protected:
Env* const env_; Env* const env_;
const std::string dbname_; const std::string dbname_;
unique_ptr<VersionSet> versions_; unique_ptr<VersionSet> versions_;
const DBOptions db_options_; const DBOptions db_options_;
Statistics* stats_; Statistics* stats_;
std::unordered_map<std::string, RecoveredTransaction*>
recovered_transactions_;
InternalIterator* NewInternalIterator(const ReadOptions&, InternalIterator* NewInternalIterator(const ReadOptions&,
ColumnFamilyData* cfd, ColumnFamilyData* cfd,
@ -460,7 +511,12 @@ class DBImpl : public DB {
void EraseThreadStatusDbInfo() const; void EraseThreadStatusDbInfo() const;
Status WriteImpl(const WriteOptions& options, WriteBatch* updates, Status WriteImpl(const WriteOptions& options, WriteBatch* updates,
WriteCallback* callback); WriteCallback* callback = nullptr,
uint64_t* log_used = nullptr, uint64_t log_ref = 0,
bool disable_memtable = false);
uint64_t FindMinLogContainingOutstandingPrep();
uint64_t FindMinPrepLogReferencedByMemTable();
private: private:
friend class DB; friend class DB;
@ -854,6 +910,28 @@ class DBImpl : public DB {
// Indicate DB was opened successfully // Indicate DB was opened successfully
bool opened_successfully_; bool opened_successfully_;
// minmum log number still containing prepared data.
// this is used by FindObsoleteFiles to determine which
// flushed logs we must keep around because they still
// contain prepared data which has not been flushed or rolled back
std::priority_queue<uint64_t, std::vector<uint64_t>, std::greater<uint64_t>>
min_log_with_prep_;
// to be used in conjunction with min_log_with_prep_.
// once a transaction with data in log L is committed or rolled back
// rather than removing the value from the heap we add that value
// to prepared_section_completed_ which maps LOG -> instance_count
// since a log could contain multiple prepared sections
//
// when trying to determine the minmum log still active we first
// consult min_log_with_prep_. while that root value maps to
// a value > 0 in prepared_section_completed_ we decrement the
// instance_count for that log and pop the root value in
// min_log_with_prep_. This will work the same as a min_heap
// where we are deleteing arbitrary elements and the up heaping.
std::unordered_map<uint64_t, uint64_t> prepared_section_completed_;
std::mutex prep_heap_mutex_;
// No copying allowed // No copying allowed
DBImpl(const DBImpl&); DBImpl(const DBImpl&);
void operator=(const DBImpl&); void operator=(const DBImpl&);

@ -74,10 +74,17 @@ Status DBImpl::TEST_CompactRange(int level, const Slice* begin,
disallow_trivial_move); disallow_trivial_move);
} }
Status DBImpl::TEST_FlushMemTable(bool wait) { Status DBImpl::TEST_FlushMemTable(bool wait, ColumnFamilyHandle* cfh) {
FlushOptions fo; FlushOptions fo;
fo.wait = wait; fo.wait = wait;
return FlushMemTable(default_cf_handle_->cfd(), fo); ColumnFamilyData* cfd;
if (cfh == nullptr) {
cfd = default_cf_handle_->cfd();
} else {
auto cfhi = reinterpret_cast<ColumnFamilyHandleImpl*>(cfh);
cfd = cfhi->cfd();
}
return FlushMemTable(cfd, fo);
} }
Status DBImpl::TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family) { Status DBImpl::TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family) {
@ -154,5 +161,12 @@ Status DBImpl::TEST_GetAllImmutableCFOptions(
return Status::OK(); return Status::OK();
} }
uint64_t DBImpl::TEST_FindMinLogContainingOutstandingPrep() {
return FindMinLogContainingOutstandingPrep();
}
uint64_t DBImpl::TEST_FindMinPrepLogReferencedByMemTable() {
return FindMinPrepLogReferencedByMemTable();
}
} // namespace rocksdb } // namespace rocksdb
#endif // NDEBUG #endif // NDEBUG

@ -75,6 +75,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
first_seqno_(0), first_seqno_(0),
earliest_seqno_(earliest_seq), earliest_seqno_(earliest_seq),
mem_next_logfile_number_(0), mem_next_logfile_number_(0),
min_prep_log_referenced_(0),
locks_(moptions_.inplace_update_support locks_(moptions_.inplace_update_support
? moptions_.inplace_update_num_locks ? moptions_.inplace_update_num_locks
: 0), : 0),
@ -800,4 +801,17 @@ void MemTableRep::Get(const LookupKey& k, void* callback_args,
} }
} }
void MemTable::RefLogContainingPrepSection(uint64_t log) {
assert(log > 0);
auto cur = min_prep_log_referenced_.load();
while ((log < cur || cur == 0) &&
!min_prep_log_referenced_.compare_exchange_strong(cur, log)) {
cur = min_prep_log_referenced_.load();
}
}
uint64_t MemTable::GetMinLogContainingPrepSection() {
return min_prep_log_referenced_.load();
}
} // namespace rocksdb } // namespace rocksdb

@ -271,6 +271,13 @@ class MemTable {
// operations on the same MemTable. // operations on the same MemTable.
void SetNextLogNumber(uint64_t num) { mem_next_logfile_number_ = num; } void SetNextLogNumber(uint64_t num) { mem_next_logfile_number_ = num; }
// if this memtable contains data from a committed
// two phase transaction we must take note of the
// log which contains that data so we can know
// when to relese that log
void RefLogContainingPrepSection(uint64_t log);
uint64_t GetMinLogContainingPrepSection();
// Notify the underlying storage that no more items will be added. // Notify the underlying storage that no more items will be added.
// REQUIRES: external synchronization to prevent simultaneous // REQUIRES: external synchronization to prevent simultaneous
// operations on the same MemTable. // operations on the same MemTable.
@ -342,6 +349,10 @@ class MemTable {
// The log files earlier than this number can be deleted. // The log files earlier than this number can be deleted.
uint64_t mem_next_logfile_number_; uint64_t mem_next_logfile_number_;
// the earliest log containing a prepared section
// which has been inserted into this memtable.
std::atomic<uint64_t> min_prep_log_referenced_;
// rw locks for inplace updates // rw locks for inplace updates
std::vector<port::RWMutex> locks_; std::vector<port::RWMutex> locks_;

@ -392,4 +392,24 @@ void MemTableList::InstallNewVersion() {
} }
} }
uint64_t MemTableList::GetMinLogContainingPrepSection() {
uint64_t min_log = 0;
for (auto& m : current_->memlist_) {
// this mem has been flushed it no longer
// needs to hold on the its prep section
if (m->flush_completed_) {
continue;
}
auto log = m->GetMinLogContainingPrepSection();
if (log > 0 && (min_log == 0 || log < min_log)) {
min_log = log;
}
}
return min_log;
}
} // namespace rocksdb } // namespace rocksdb

@ -215,6 +215,8 @@ class MemTableList {
size_t* current_memory_usage() { return &current_memory_usage_; } size_t* current_memory_usage() { return &current_memory_usage_; }
uint64_t GetMinLogContainingPrepSection();
private: private:
// DB mutex held // DB mutex held
void InstallNewVersion(); void InstallNewVersion();

@ -681,38 +681,46 @@ Status WriteBatch::RollbackToSavePoint() {
return Status::OK(); return Status::OK();
} }
namespace {
class MemTableInserter : public WriteBatch::Handler { class MemTableInserter : public WriteBatch::Handler {
public: public:
SequenceNumber sequence_; SequenceNumber sequence_;
ColumnFamilyMemTables* const cf_mems_; ColumnFamilyMemTables* const cf_mems_;
FlushScheduler* const flush_scheduler_; FlushScheduler* const flush_scheduler_;
const bool ignore_missing_column_families_; const bool ignore_missing_column_families_;
const uint64_t log_number_; const uint64_t recovering_log_number_;
// log number that all Memtables inserted into should reference
uint64_t log_number_ref_;
DBImpl* db_; DBImpl* db_;
const bool dont_filter_deletes_; const bool dont_filter_deletes_;
const bool concurrent_memtable_writes_; const bool concurrent_memtable_writes_;
// current recovered transaction we are rebuilding (recovery)
WriteBatch* rebuilding_trx_;
// cf_mems should not be shared with concurrent inserters // cf_mems should not be shared with concurrent inserters
MemTableInserter(SequenceNumber sequence, ColumnFamilyMemTables* cf_mems, MemTableInserter(SequenceNumber sequence, ColumnFamilyMemTables* cf_mems,
FlushScheduler* flush_scheduler, FlushScheduler* flush_scheduler,
bool ignore_missing_column_families, uint64_t log_number, bool ignore_missing_column_families,
DB* db, const bool dont_filter_deletes, uint64_t recovering_log_number, DB* db,
const bool dont_filter_deletes,
bool concurrent_memtable_writes) bool concurrent_memtable_writes)
: sequence_(sequence), : sequence_(sequence),
cf_mems_(cf_mems), cf_mems_(cf_mems),
flush_scheduler_(flush_scheduler), flush_scheduler_(flush_scheduler),
ignore_missing_column_families_(ignore_missing_column_families), ignore_missing_column_families_(ignore_missing_column_families),
log_number_(log_number), recovering_log_number_(recovering_log_number),
log_number_ref_(0),
db_(reinterpret_cast<DBImpl*>(db)), db_(reinterpret_cast<DBImpl*>(db)),
dont_filter_deletes_(dont_filter_deletes), dont_filter_deletes_(dont_filter_deletes),
concurrent_memtable_writes_(concurrent_memtable_writes) { concurrent_memtable_writes_(concurrent_memtable_writes),
rebuilding_trx_(nullptr) {
assert(cf_mems_); assert(cf_mems_);
if (!dont_filter_deletes_) { if (!dont_filter_deletes_) {
assert(db_); assert(db_);
} }
} }
void set_log_number_ref(uint64_t log) { log_number_ref_ = log; }
bool SeekToColumnFamily(uint32_t column_family_id, Status* s) { bool SeekToColumnFamily(uint32_t column_family_id, Status* s) {
// If we are in a concurrent mode, it is the caller's responsibility // If we are in a concurrent mode, it is the caller's responsibility
// to clone the original ColumnFamilyMemTables so that each thread // to clone the original ColumnFamilyMemTables so that each thread
@ -728,16 +736,24 @@ class MemTableInserter : public WriteBatch::Handler {
} }
return false; return false;
} }
if (log_number_ != 0 && log_number_ < cf_mems_->GetLogNumber()) { if (recovering_log_number_ != 0 &&
// This is true only in recovery environment (log_number_ is always 0 in recovering_log_number_ < cf_mems_->GetLogNumber()) {
// This is true only in recovery environment (recovering_log_number_ is
// always 0 in
// non-recovery, regular write code-path) // non-recovery, regular write code-path)
// * If log_number_ < cf_mems_->GetLogNumber(), this means that column // * If recovering_log_number_ < cf_mems_->GetLogNumber(), this means that
// column
// family already contains updates from this log. We can't apply updates // family already contains updates from this log. We can't apply updates
// twice because of update-in-place or merge workloads -- ignore the // twice because of update-in-place or merge workloads -- ignore the
// update // update
*s = Status::OK(); *s = Status::OK();
return false; return false;
} }
if (log_number_ref_ > 0) {
cf_mems_->GetMemTable()->RefLogContainingPrepSection(log_number_ref_);
}
return true; return true;
} }
@ -748,6 +764,12 @@ class MemTableInserter : public WriteBatch::Handler {
++sequence_; ++sequence_;
return seek_status; return seek_status;
} }
if (rebuilding_trx_ != nullptr) {
rebuilding_trx_->Put(cf_mems_->GetColumnFamilyHandle(), key, value);
return Status::OK();
}
MemTable* mem = cf_mems_->GetMemTable(); MemTable* mem = cf_mems_->GetMemTable();
auto* moptions = mem->GetMemTableOptions(); auto* moptions = mem->GetMemTableOptions();
if (!moptions->inplace_update_support) { if (!moptions->inplace_update_support) {
@ -801,11 +823,6 @@ class MemTableInserter : public WriteBatch::Handler {
Status DeleteImpl(uint32_t column_family_id, const Slice& key, Status DeleteImpl(uint32_t column_family_id, const Slice& key,
ValueType delete_type) { ValueType delete_type) {
Status seek_status;
if (!SeekToColumnFamily(column_family_id, &seek_status)) {
++sequence_;
return seek_status;
}
MemTable* mem = cf_mems_->GetMemTable(); MemTable* mem = cf_mems_->GetMemTable();
auto* moptions = mem->GetMemTableOptions(); auto* moptions = mem->GetMemTableOptions();
if (!dont_filter_deletes_ && moptions->filter_deletes) { if (!dont_filter_deletes_ && moptions->filter_deletes) {
@ -832,11 +849,33 @@ class MemTableInserter : public WriteBatch::Handler {
virtual Status DeleteCF(uint32_t column_family_id, virtual Status DeleteCF(uint32_t column_family_id,
const Slice& key) override { const Slice& key) override {
Status seek_status;
if (!SeekToColumnFamily(column_family_id, &seek_status)) {
++sequence_;
return seek_status;
}
if (rebuilding_trx_ != nullptr) {
rebuilding_trx_->Delete(cf_mems_->GetColumnFamilyHandle(), key);
return Status::OK();
}
return DeleteImpl(column_family_id, key, kTypeDeletion); return DeleteImpl(column_family_id, key, kTypeDeletion);
} }
virtual Status SingleDeleteCF(uint32_t column_family_id, virtual Status SingleDeleteCF(uint32_t column_family_id,
const Slice& key) override { const Slice& key) override {
Status seek_status;
if (!SeekToColumnFamily(column_family_id, &seek_status)) {
++sequence_;
return seek_status;
}
if (rebuilding_trx_ != nullptr) {
rebuilding_trx_->SingleDelete(cf_mems_->GetColumnFamilyHandle(), key);
return Status::OK();
}
return DeleteImpl(column_family_id, key, kTypeSingleDeletion); return DeleteImpl(column_family_id, key, kTypeSingleDeletion);
} }
@ -848,6 +887,10 @@ class MemTableInserter : public WriteBatch::Handler {
++sequence_; ++sequence_;
return seek_status; return seek_status;
} }
if (rebuilding_trx_ != nullptr) {
rebuilding_trx_->Merge(cf_mems_->GetColumnFamilyHandle(), key, value);
return Status::OK();
}
MemTable* mem = cf_mems_->GetMemTable(); MemTable* mem = cf_mems_->GetMemTable();
auto* moptions = mem->GetMemTableOptions(); auto* moptions = mem->GetMemTableOptions();
bool perform_merge = false; bool perform_merge = false;
@ -933,8 +976,102 @@ class MemTableInserter : public WriteBatch::Handler {
} }
} }
} }
Status MarkBeginPrepare() override {
assert(rebuilding_trx_ == nullptr);
assert(db_);
if (recovering_log_number_ != 0) {
// during recovery we rebuild a hollow transaction
// from all encountered prepare sections of the wal
if (db_->allow_2pc() == false) {
return Status::NotSupported(
"WAL contains prepared transactions. Open with "
"TransactionDB::Open().");
}
// we are now iterating through a prepared section
rebuilding_trx_ = new WriteBatch();
} else {
// in non-recovery we ignore prepare markers
// and insert the values directly. making sure we have a
// log for each insertion to reference.
assert(log_number_ref_ > 0);
}
return Status::OK();
}
Status MarkEndPrepare(const Slice& name) override {
assert(db_);
assert((rebuilding_trx_ != nullptr) == (recovering_log_number_ != 0));
if (recovering_log_number_ != 0) {
assert(db_->allow_2pc());
db_->InsertRecoveredTransaction(recovering_log_number_, name.ToString(),
rebuilding_trx_);
rebuilding_trx_ = nullptr;
} else {
assert(rebuilding_trx_ == nullptr);
assert(log_number_ref_ > 0);
}
return Status::OK();
}
Status MarkCommit(const Slice& name) override {
assert(db_);
Status s;
if (recovering_log_number_ != 0) {
// in recovery when we encounter a commit marker
// we lookup this transaction in our set of rebuilt transactions
// and commit.
auto trx = db_->GetRecoveredTransaction(name.ToString());
// the log contaiting the prepared section may have
// been released in the last incarnation because the
// data was flushed to L0
if (trx != nullptr) {
// at this point individual CF lognumbers will prevent
// duplicate re-insertion of values.
assert(log_number_ref_ == 0);
// all insertes must refernce this trx log number
log_number_ref_ = trx->log_number_;
s = trx->batch_->Iterate(this);
log_number_ref_ = 0;
if (s.ok()) {
db_->DeleteRecoveredTransaction(name.ToString());
}
}
} else {
// in non recovery we simply ignore this tag
}
return s;
}
Status MarkRollback(const Slice& name) override {
assert(db_);
if (recovering_log_number_ != 0) {
auto trx = db_->GetRecoveredTransaction(name.ToString());
// the log containing the transactions prep section
// may have been released in the previous incarnation
// because we knew it had been rolled back
if (trx != nullptr) {
db_->DeleteRecoveredTransaction(name.ToString());
}
} else {
// in non recovery we simply ignore this tag
}
return Status::OK();
}
}; };
} // namespace
// This function can only be called in these conditions: // This function can only be called in these conditions:
// 1) During Recovery() // 1) During Recovery()
@ -949,18 +1086,36 @@ Status WriteBatchInternal::InsertInto(
MemTableInserter inserter(sequence, memtables, flush_scheduler, MemTableInserter inserter(sequence, memtables, flush_scheduler,
ignore_missing_column_families, log_number, db, ignore_missing_column_families, log_number, db,
dont_filter_deletes, concurrent_memtable_writes); dont_filter_deletes, concurrent_memtable_writes);
for (size_t i = 0; i < writers.size(); i++) { for (size_t i = 0; i < writers.size(); i++) {
if (!writers[i]->CallbackFailed()) { auto w = writers[i];
writers[i]->status = writers[i]->batch->Iterate(&inserter); if (!w->ShouldWriteToMemtable()) {
if (!writers[i]->status.ok()) { continue;
return writers[i]->status;
} }
inserter.set_log_number_ref(w->log_ref);
w->status = w->batch->Iterate(&inserter);
if (!w->status.ok()) {
return w->status;
} }
} }
return Status::OK(); return Status::OK();
} }
Status WriteBatchInternal::InsertInto(WriteThread::Writer* writer,
ColumnFamilyMemTables* memtables,
FlushScheduler* flush_scheduler,
bool ignore_missing_column_families,
uint64_t log_number, DB* db,
const bool dont_filter_deletes,
bool concurrent_memtable_writes) {
MemTableInserter inserter(WriteBatchInternal::Sequence(writer->batch),
memtables, flush_scheduler,
ignore_missing_column_families, log_number, db,
dont_filter_deletes, concurrent_memtable_writes);
assert(writer->ShouldWriteToMemtable());
inserter.set_log_number_ref(writer->log_ref);
return writer->batch->Iterate(&inserter);
}
Status WriteBatchInternal::InsertInto(const WriteBatch* batch, Status WriteBatchInternal::InsertInto(const WriteBatch* batch,
ColumnFamilyMemTables* memtables, ColumnFamilyMemTables* memtables,
FlushScheduler* flush_scheduler, FlushScheduler* flush_scheduler,

@ -164,6 +164,13 @@ class WriteBatchInternal {
uint64_t log_number = 0, DB* db = nullptr, uint64_t log_number = 0, DB* db = nullptr,
const bool dont_filter_deletes = true, const bool dont_filter_deletes = true,
bool concurrent_memtable_writes = false); bool concurrent_memtable_writes = false);
static Status InsertInto(WriteThread::Writer* writer,
ColumnFamilyMemTables* memtables,
FlushScheduler* flush_scheduler,
bool ignore_missing_column_families = false,
uint64_t log_number = 0, DB* db = nullptr,
const bool dont_filter_deletes = true,
bool concurrent_memtable_writes = false);
static void Append(WriteBatch* dst, const WriteBatch* src); static void Append(WriteBatch* dst, const WriteBatch* src);

@ -11,11 +11,12 @@
#include <chrono> #include <chrono>
#include <condition_variable> #include <condition_variable>
#include <mutex> #include <mutex>
#include <vector>
#include <type_traits> #include <type_traits>
#include <vector>
#include "db/write_callback.h" #include "db/write_callback.h"
#include "rocksdb/types.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "rocksdb/types.h"
#include "rocksdb/write_batch.h" #include "rocksdb/write_batch.h"
#include "util/autovector.h" #include "util/autovector.h"
#include "util/instrumented_mutex.h" #include "util/instrumented_mutex.h"
@ -79,6 +80,9 @@ class WriteThread {
WriteBatch* batch; WriteBatch* batch;
bool sync; bool sync;
bool disableWAL; bool disableWAL;
bool disable_memtable;
uint64_t log_used; // log number that this batch was inserted into
uint64_t log_ref; // log number that memtable insert should reference
bool in_batch_group; bool in_batch_group;
WriteCallback* callback; WriteCallback* callback;
bool made_waitable; // records lazy construction of mutex and cv bool made_waitable; // records lazy construction of mutex and cv
@ -96,6 +100,9 @@ class WriteThread {
: batch(nullptr), : batch(nullptr),
sync(false), sync(false),
disableWAL(false), disableWAL(false),
disable_memtable(false),
log_used(0),
log_ref(0),
in_batch_group(false), in_batch_group(false),
callback(nullptr), callback(nullptr),
made_waitable(false), made_waitable(false),
@ -153,6 +160,12 @@ class WriteThread {
return (callback != nullptr) && !callback_status.ok(); return (callback != nullptr) && !callback_status.ok();
} }
bool ShouldWriteToMemtable() {
return !CallbackFailed() && !disable_memtable;
}
bool ShouldWriteToWAL() { return !CallbackFailed() && !disableWAL; }
// No other mutexes may be acquired while holding StateMutex(), it is // No other mutexes may be acquired while holding StateMutex(), it is
// always last in the order // always last in the order
std::mutex& StateMutex() { std::mutex& StateMutex() {

@ -1313,6 +1313,10 @@ struct DBOptions {
// Default: kPointInTimeRecovery // Default: kPointInTimeRecovery
WALRecoveryMode wal_recovery_mode; WALRecoveryMode wal_recovery_mode;
// if set to false then recovery will fail when a prepared
// transaction is encountered in the WAL
bool allow_2pc = false;
// A global cache for table-level rows. // A global cache for table-level rows.
// Default: nullptr (disabled) // Default: nullptr (disabled)
// Not supported in ROCKSDB_LITE mode! // Not supported in ROCKSDB_LITE mode!

@ -167,6 +167,9 @@ static std::unordered_map<std::string, OptionTypeInfo> db_options_type_info = {
{"allow_mmap_writes", {"allow_mmap_writes",
{offsetof(struct DBOptions, allow_mmap_writes), OptionType::kBoolean, {offsetof(struct DBOptions, allow_mmap_writes), OptionType::kBoolean,
OptionVerificationType::kNormal}}, OptionVerificationType::kNormal}},
{"allow_2pc",
{offsetof(struct DBOptions, allow_2pc), OptionType::kBoolean,
OptionVerificationType::kNormal}},
{"allow_os_buffer", {"allow_os_buffer",
{offsetof(struct DBOptions, allow_os_buffer), OptionType::kBoolean, {offsetof(struct DBOptions, allow_os_buffer), OptionType::kBoolean,
OptionVerificationType::kNormal}}, OptionVerificationType::kNormal}},

@ -279,7 +279,8 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) {
"write_thread_max_yield_usec=1000;" "write_thread_max_yield_usec=1000;"
"access_hint_on_compaction_start=NONE;" "access_hint_on_compaction_start=NONE;"
"info_log_level=DEBUG_LEVEL;" "info_log_level=DEBUG_LEVEL;"
"dump_malloc_stats=false;", "dump_malloc_stats=false;"
"allow_2pc=false;",
new_options)); new_options));
ASSERT_EQ(unset_bytes_base, NumUnsetBytes(new_options_ptr, sizeof(DBOptions), ASSERT_EQ(unset_bytes_base, NumUnsetBytes(new_options_ptr, sizeof(DBOptions),

Loading…
Cancel
Save