Optimize for serial commits in 2PC

Summary:
Throughput: 46k tps in our sysbench settings (filling the details later)

The idea is to have the simplest change that gives us a reasonable boost
in 2PC throughput.

Major design changes:
1. The WAL file internal buffer is not flushed after each write. Instead
it is flushed before critical operations (WAL copy via fs) or when
FlushWAL is called by MySQL. Flushing the WAL buffer is also protected
via mutex_.
2. Use two sequence numbers: last seq, and last seq for write. Last seq
is the last visible sequence number for reads. Last seq for write is the
next sequence number that should be used to write to WAL/memtable. This
allows to have a memtable write be in parallel to WAL writes.
3. BatchGroup is not used for writes. This means that we can have
parallel writers which changes a major assumption in the code base. To
accommodate for that i) allow only 1 WriteImpl that intends to write to
memtable via mem_mutex_--which is fine since in 2PC almost all of the memtable writes
come via group commit phase which is serial anyway, ii) make all the
parts in the code base that assumed to be the only writer (via
EnterUnbatched) to also acquire mem_mutex_, iii) stat updates are
protected via a stat_mutex_.

Note: the first commit has the approach figured out but is not clean.
Submitting the PR anyway to get the early feedback on the approach. If
we are ok with the approach I will go ahead with this updates:
0) Rebase with Yi's pipelining changes
1) Currently batching is disabled by default to make sure that it will be
consistent with all unit tests. Will make this optional via a config.
2) A couple of unit tests are disabled. They need to be updated with the
serial commit of 2PC taken into account.
3) Replacing BatchGroup with mem_mutex_ got a bit ugly as it requires
releasing mutex_ beforehand (the same way EnterUnbatched does). This
needs to be cleaned up.
Closes https://github.com/facebook/rocksdb/pull/2345

Differential Revision: D5210732

Pulled By: maysamyabandeh

fbshipit-source-id: 78653bd95a35cd1e831e555e0e57bdfd695355a4
main
Maysam Yabandeh 8 years ago committed by Facebook Github Bot
parent 0ac4afb975
commit 499ebb3ab5
  1. 3
      db/column_family_test.cc
  2. 1
      db/compaction_job_test.cc
  3. 36
      db/db_impl.cc
  4. 45
      db/db_impl.h
  5. 3
      db/db_impl_files.cc
  6. 1
      db/db_impl_open.cc
  7. 1
      db/db_impl_readonly.cc
  8. 249
      db/db_impl_write.cc
  9. 4
      db/db_log_iter_test.cc
  10. 6
      db/db_test_util.cc
  11. 15
      db/db_test_util.h
  12. 9
      db/db_wal_test.cc
  13. 1
      db/db_write_test.cc
  14. 5
      db/external_sst_file_ingestion_job.cc
  15. 3
      db/fault_injection_test.cc
  16. 14
      db/log_writer.cc
  17. 10
      db/log_writer.h
  18. 1
      db/repair.cc
  19. 3
      db/version_set.cc
  20. 22
      db/version_set.h
  21. 1
      db/wal_manager_test.cc
  22. 10
      db/write_callback_test.cc
  23. 5
      include/rocksdb/db.h
  24. 12
      include/rocksdb/options.h
  25. 2
      include/rocksdb/utilities/stackable_db.h
  26. 8
      options/db_options.cc
  27. 2
      options/db_options.h
  28. 10
      options/options_helper.h
  29. 15
      options/options_settable_test.cc
  30. 1
      table/block_based_table_factory.cc
  31. 4
      util/murmurhash.cc
  32. 4
      util/murmurhash.h
  33. 8
      util/transaction_test_util.cc
  34. 2
      util/transaction_test_util.h
  35. 3
      utilities/backupable/backupable_db_test.cc
  36. 1
      utilities/checkpoint/checkpoint_impl.cc
  37. 2
      utilities/transactions/transaction_impl.cc
  38. 30
      utilities/transactions/transaction_test.cc

@ -303,6 +303,7 @@ class ColumnFamilyTest : public testing::Test {
ASSERT_OK(Put(cf, key, RandomString(&rnd_, key_value_size - 10))); ASSERT_OK(Put(cf, key, RandomString(&rnd_, key_value_size - 10)));
} }
} }
db_->FlushWAL(false);
} }
#ifndef ROCKSDB_LITE // TEST functions in DB are not supported in lite #ifndef ROCKSDB_LITE // TEST functions in DB are not supported in lite
@ -580,6 +581,7 @@ TEST_P(FlushEmptyCFTestWithParam, FlushEmptyCFTest) {
Flush(0); Flush(0);
ASSERT_OK(Put(1, "bar", "v3")); // seqID 4 ASSERT_OK(Put(1, "bar", "v3")); // seqID 4
ASSERT_OK(Put(1, "foo", "v4")); // seqID 5 ASSERT_OK(Put(1, "foo", "v4")); // seqID 5
db_->FlushWAL(false);
// Preserve file system state up to here to simulate a crash condition. // Preserve file system state up to here to simulate a crash condition.
fault_env->SetFilesystemActive(false); fault_env->SetFilesystemActive(false);
@ -642,6 +644,7 @@ TEST_P(FlushEmptyCFTestWithParam, FlushEmptyCFTest2) {
// Write to log file D // Write to log file D
ASSERT_OK(Put(1, "bar", "v4")); // seqID 7 ASSERT_OK(Put(1, "bar", "v4")); // seqID 7
ASSERT_OK(Put(1, "bar", "v5")); // seqID 8 ASSERT_OK(Put(1, "bar", "v5")); // seqID 8
db_->FlushWAL(false);
// Preserve file system state up to here to simulate a crash condition. // Preserve file system state up to here to simulate a crash condition.
fault_env->SetFilesystemActive(false); fault_env->SetFilesystemActive(false);
std::vector<std::string> names; std::vector<std::string> names;

@ -144,6 +144,7 @@ class CompactionJobTest : public testing::Test {
} }
void SetLastSequence(const SequenceNumber sequence_number) { void SetLastSequence(const SequenceNumber sequence_number) {
versions_->SetLastToBeWrittenSequence(sequence_number + 1);
versions_->SetLastSequence(sequence_number + 1); versions_->SetLastSequence(sequence_number + 1);
} }

@ -160,6 +160,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
is_snapshot_supported_(true), is_snapshot_supported_(true),
write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()), write_buffer_manager_(immutable_db_options_.write_buffer_manager.get()),
write_thread_(immutable_db_options_), write_thread_(immutable_db_options_),
nonmem_write_thread_(immutable_db_options_),
write_controller_(mutable_db_options_.delayed_write_rate), write_controller_(mutable_db_options_.delayed_write_rate),
// Use delayed_write_rate as a base line to determine the initial // Use delayed_write_rate as a base line to determine the initial
// low pri write rate limit. It may be adjusted later. // low pri write rate limit. It may be adjusted later.
@ -189,7 +190,9 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname)
bg_work_paused_(0), bg_work_paused_(0),
bg_compaction_paused_(0), bg_compaction_paused_(0),
refitting_level_(false), refitting_level_(false),
opened_successfully_(false) { opened_successfully_(false),
concurrent_prepare_(options.concurrent_prepare),
manual_wal_flush_(options.manual_wal_flush) {
env_->GetAbsolutePath(dbname, &db_absolute_path_); env_->GetAbsolutePath(dbname, &db_absolute_path_);
// Reserve ten files or so for other uses and give the rest to TableCache. // Reserve ten files or so for other uses and give the rest to TableCache.
@ -612,6 +615,26 @@ int DBImpl::FindMinimumEmptyLevelFitting(ColumnFamilyData* cfd,
return minimum_level; return minimum_level;
} }
Status DBImpl::FlushWAL(bool sync) {
{
// We need to lock log_write_mutex_ since logs_ might change concurrently
InstrumentedMutexLock wl(&log_write_mutex_);
log::Writer* cur_log_writer = logs_.back().writer;
auto s = cur_log_writer->WriteBuffer();
if (!s.ok()) {
ROCKS_LOG_ERROR(immutable_db_options_.info_log, "WAL flush error %s",
s.ToString().c_str());
}
if (!sync) {
ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=false");
return s;
}
}
// sync = true
ROCKS_LOG_DEBUG(immutable_db_options_.info_log, "FlushWAL sync=true");
return SyncWAL();
}
Status DBImpl::SyncWAL() { Status DBImpl::SyncWAL() {
autovector<log::Writer*, 1> logs_to_sync; autovector<log::Writer*, 1> logs_to_sync;
bool need_log_dir_sync; bool need_log_dir_sync;
@ -650,6 +673,7 @@ Status DBImpl::SyncWAL() {
need_log_dir_sync = !log_dir_synced_; need_log_dir_sync = !log_dir_synced_;
} }
TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1");
RecordTick(stats_, WAL_FILE_SYNCED); RecordTick(stats_, WAL_FILE_SYNCED);
Status status; Status status;
for (log::Writer* log : logs_to_sync) { for (log::Writer* log : logs_to_sync) {
@ -661,6 +685,7 @@ Status DBImpl::SyncWAL() {
if (status.ok() && need_log_dir_sync) { if (status.ok() && need_log_dir_sync) {
status = directories_.GetWalDir()->Fsync(); status = directories_.GetWalDir()->Fsync();
} }
TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2");
TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1"); TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1");
{ {
@ -2634,9 +2659,13 @@ Status DBImpl::IngestExternalFile(
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
TEST_SYNC_POINT("DBImpl::AddFile:MutexLock"); TEST_SYNC_POINT("DBImpl::AddFile:MutexLock");
// Stop writes to the DB // Stop writes to the DB by entering both write threads
WriteThread::Writer w; WriteThread::Writer w;
write_thread_.EnterUnbatched(&w, &mutex_); write_thread_.EnterUnbatched(&w, &mutex_);
WriteThread::Writer nonmem_w;
if (concurrent_prepare_) {
nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_);
}
num_running_ingest_file_++; num_running_ingest_file_++;
@ -2677,6 +2706,9 @@ Status DBImpl::IngestExternalFile(
} }
// Resume writes to the DB // Resume writes to the DB
if (concurrent_prepare_) {
nonmem_write_thread_.ExitUnbatched(&nonmem_w);
}
write_thread_.ExitUnbatched(&w); write_thread_.ExitUnbatched(&w);
// Update stats // Update stats

@ -199,6 +199,7 @@ class DBImpl : public DB {
using DB::Flush; using DB::Flush;
virtual Status Flush(const FlushOptions& options, virtual Status Flush(const FlushOptions& options,
ColumnFamilyHandle* column_family) override; ColumnFamilyHandle* column_family) override;
virtual Status FlushWAL(bool sync) override;
virtual Status SyncWAL() override; virtual Status SyncWAL() override;
virtual SequenceNumber GetLatestSequenceNumber() const override; virtual SequenceNumber GetLatestSequenceNumber() const override;
@ -621,6 +622,10 @@ class DBImpl : public DB {
uint64_t* log_used = nullptr, uint64_t log_ref = 0, uint64_t* log_used = nullptr, uint64_t log_ref = 0,
bool disable_memtable = false); bool disable_memtable = false);
Status WriteImplWALOnly(const WriteOptions& options, WriteBatch* updates,
WriteCallback* callback = nullptr,
uint64_t* log_used = nullptr, uint64_t log_ref = 0);
uint64_t FindMinLogContainingOutstandingPrep(); uint64_t FindMinLogContainingOutstandingPrep();
uint64_t FindMinPrepLogReferencedByMemTable(); uint64_t FindMinPrepLogReferencedByMemTable();
@ -746,9 +751,20 @@ class DBImpl : public DB {
Status PreprocessWrite(const WriteOptions& write_options, bool* need_log_sync, Status PreprocessWrite(const WriteOptions& write_options, bool* need_log_sync,
WriteContext* write_context); WriteContext* write_context);
WriteBatch* MergeBatch(const WriteThread::WriteGroup& write_group,
WriteBatch* tmp_batch, size_t* write_with_wal);
Status WriteToWAL(const WriteBatch& merged_batch, log::Writer* log_writer,
uint64_t* log_used, uint64_t* log_size);
Status WriteToWAL(const WriteThread::WriteGroup& write_group, Status WriteToWAL(const WriteThread::WriteGroup& write_group,
log::Writer* log_writer, bool need_log_sync, log::Writer* log_writer, uint64_t* log_used,
bool need_log_dir_sync, SequenceNumber sequence); bool need_log_sync, bool need_log_dir_sync,
SequenceNumber sequence);
Status ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group,
uint64_t* log_used, SequenceNumber* last_sequence,
int total_count);
// Used by WriteImpl to update bg_error_ if paranoid check is enabled. // Used by WriteImpl to update bg_error_ if paranoid check is enabled.
void WriteCallbackStatusCheck(const Status& status); void WriteCallbackStatusCheck(const Status& status);
@ -827,10 +843,12 @@ class DBImpl : public DB {
// Lock over the persistent DB state. Non-nullptr iff successfully acquired. // Lock over the persistent DB state. Non-nullptr iff successfully acquired.
FileLock* db_lock_; FileLock* db_lock_;
// The mutex for options file related operations. // It is used to concurrently update stats in the write threads
// NOTE: should never acquire options_file_mutex_ and mutex_ at the InstrumentedMutex stat_mutex_;
// same time. // It protects the back() of logs_ and alive_log_files_. Any push_back to
InstrumentedMutex options_files_mutex_; // these must be under log_write_mutex_ and any access that requires the
// back() to remain the same must also lock log_write_mutex_.
InstrumentedMutex log_write_mutex_;
// State below is protected by mutex_ // State below is protected by mutex_
mutable InstrumentedMutex mutex_; mutable InstrumentedMutex mutex_;
@ -891,6 +909,10 @@ class DBImpl : public DB {
// - back() and items with getting_synced=true are not popped, // - back() and items with getting_synced=true are not popped,
// - it follows that write thread with unlocked mutex_ can safely access // - it follows that write thread with unlocked mutex_ can safely access
// back() and items with getting_synced=true. // back() and items with getting_synced=true.
// -- Update: apparently this was a mistake. back() should be called under
// mute_: https://github.com/facebook/rocksdb/pull/1774
// - When concurrent write threads is enabled, back(), push_back(), and
// pop_front() must be called within log_write_mutex_
std::deque<LogWriterNumber> logs_; std::deque<LogWriterNumber> logs_;
// Signaled when getting_synced becomes false for some of the logs_. // Signaled when getting_synced becomes false for some of the logs_.
InstrumentedCondVar log_sync_cv_; InstrumentedCondVar log_sync_cv_;
@ -939,8 +961,10 @@ class DBImpl : public DB {
WriteBufferManager* write_buffer_manager_; WriteBufferManager* write_buffer_manager_;
WriteThread write_thread_; WriteThread write_thread_;
WriteBatch tmp_batch_; WriteBatch tmp_batch_;
// The write thread when the writers have no memtable write. This will be used
// in 2PC to batch the prepares separately from the serial commit.
WriteThread nonmem_write_thread_;
WriteController write_controller_; WriteController write_controller_;
@ -948,6 +972,8 @@ class DBImpl : public DB {
// Size of the last batch group. In slowdown mode, next write needs to // Size of the last batch group. In slowdown mode, next write needs to
// sleep if it uses up the quota. // sleep if it uses up the quota.
// Note: This is to protect memtable and compaction. If the batch only writes
// to the WAL its size need not to be included in this.
uint64_t last_batch_group_size_; uint64_t last_batch_group_size_;
FlushScheduler flush_scheduler_; FlushScheduler flush_scheduler_;
@ -1190,6 +1216,11 @@ class DBImpl : public DB {
bool MCOverlap(ManualCompaction* m, ManualCompaction* m1); bool MCOverlap(ManualCompaction* m, ManualCompaction* m1);
size_t GetWalPreallocateBlockSize(uint64_t write_buffer_size) const; size_t GetWalPreallocateBlockSize(uint64_t write_buffer_size) const;
// When set, we use a seprate queue for writes that dont write to memtable. In
// 2PC these are the writes at Prepare phase.
const bool concurrent_prepare_;
const bool manual_wal_flush_;
}; };
extern Options SanitizeOptions(const std::string& db, extern Options SanitizeOptions(const std::string& db,

@ -265,8 +265,11 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
continue; continue;
} }
logs_to_free_.push_back(log.ReleaseWriter()); logs_to_free_.push_back(log.ReleaseWriter());
{
InstrumentedMutexLock wl(&log_write_mutex_);
logs_.pop_front(); logs_.pop_front();
} }
}
// Current log cannot be obsolete. // Current log cannot be obsolete.
assert(!logs_.empty()); assert(!logs_.empty());
} }

@ -725,6 +725,7 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& log_numbers,
auto last_sequence = *next_sequence - 1; auto last_sequence = *next_sequence - 1;
if ((*next_sequence != kMaxSequenceNumber) && if ((*next_sequence != kMaxSequenceNumber) &&
(versions_->LastSequence() <= last_sequence)) { (versions_->LastSequence() <= last_sequence)) {
versions_->SetLastToBeWrittenSequence(last_sequence);
versions_->SetLastSequence(last_sequence); versions_->SetLastSequence(last_sequence);
} }
} }

@ -5,7 +5,6 @@
// This source code is also licensed under the GPLv2 license found in the // This source code is also licensed under the GPLv2 license found in the
// COPYING file in the root directory of this source tree. // COPYING file in the root directory of this source tree.
#include "db/db_impl_readonly.h" #include "db/db_impl_readonly.h"
#include "db/compacted_db_impl.h" #include "db/compacted_db_impl.h"

@ -66,6 +66,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
if (my_batch == nullptr) { if (my_batch == nullptr) {
return Status::Corruption("Batch is nullptr!"); return Status::Corruption("Batch is nullptr!");
} }
if (concurrent_prepare_ && immutable_db_options_.enable_pipelined_write) {
return Status::NotSupported(
"pipelined_writes is not compatible with concurrent prepares");
}
Status status; Status status;
if (write_options.low_pri) { if (write_options.low_pri) {
@ -75,6 +79,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
} }
} }
if (concurrent_prepare_ && disable_memtable) {
return WriteImplWALOnly(write_options, my_batch, callback, log_used,
log_ref);
}
if (immutable_db_options_.enable_pipelined_write) { if (immutable_db_options_.enable_pipelined_write) {
return PipelinedWriteImpl(write_options, my_batch, callback, log_used, return PipelinedWriteImpl(write_options, my_batch, callback, log_used,
log_ref, disable_memtable); log_ref, disable_memtable);
@ -133,14 +142,22 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
WriteContext write_context; WriteContext write_context;
WriteThread::WriteGroup write_group; WriteThread::WriteGroup write_group;
bool in_parallel_group = false; bool in_parallel_group = false;
uint64_t last_sequence = versions_->LastSequence(); uint64_t last_sequence;
if (!concurrent_prepare_) {
last_sequence = versions_->LastSequence();
}
mutex_.Lock(); mutex_.Lock();
bool need_log_sync = !write_options.disableWAL && write_options.sync; bool need_log_sync = !write_options.disableWAL && write_options.sync;
bool need_log_dir_sync = need_log_sync && !log_dir_synced_; bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
if (!concurrent_prepare_ || !disable_memtable) {
// With concurrent writes we do preprocess only in the write thread that
// also does write to memtable to avoid sync issue on shared data structure
// with the other thread
status = PreprocessWrite(write_options, &need_log_sync, &write_context); status = PreprocessWrite(write_options, &need_log_sync, &write_context);
log::Writer* cur_log_writer = logs_.back().writer; }
log::Writer* log_writer = logs_.back().writer;
mutex_.Unlock(); mutex_.Unlock();
@ -180,9 +197,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
} }
} }
const SequenceNumber current_sequence = last_sequence + 1; if (concurrent_prepare_) {
last_sequence += total_count; stat_mutex_.Lock();
}
// Update stats while we are an exclusive group leader, so we know // Update stats while we are an exclusive group leader, so we know
// that nobody else can be writing to these particular stats. // that nobody else can be writing to these particular stats.
// We're optimistic, updating the stats before we successfully // We're optimistic, updating the stats before we successfully
@ -201,6 +218,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other); RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other);
} }
MeasureTime(stats_, BYTES_PER_WRITE, total_byte_size); MeasureTime(stats_, BYTES_PER_WRITE, total_byte_size);
if (concurrent_prepare_) {
stat_mutex_.Unlock();
}
if (write_options.disableWAL) { if (write_options.disableWAL) {
has_unpersisted_data_.store(true, std::memory_order_relaxed); has_unpersisted_data_.store(true, std::memory_order_relaxed);
@ -208,14 +228,27 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
PERF_TIMER_STOP(write_pre_and_post_process_time); PERF_TIMER_STOP(write_pre_and_post_process_time);
if (!concurrent_prepare_) {
if (status.ok() && !write_options.disableWAL) { if (status.ok() && !write_options.disableWAL) {
PERF_TIMER_GUARD(write_wal_time); PERF_TIMER_GUARD(write_wal_time);
status = WriteToWAL(write_group, cur_log_writer, need_log_sync, status = WriteToWAL(write_group, log_writer, log_used, need_log_sync,
need_log_dir_sync, current_sequence); need_log_dir_sync, last_sequence + 1);
if (log_used != nullptr) { }
*log_used = logfile_number_; } else {
assert(!need_log_sync && !need_log_dir_sync);
if (status.ok() && !write_options.disableWAL) {
PERF_TIMER_GUARD(write_wal_time);
// LastToBeWrittenSequence is increased inside WriteToWAL under
// wal_write_mutex_ to ensure ordered events in WAL
status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence,
total_count);
} else {
// Otherwise we inc seq number for memtable writes
last_sequence = versions_->FetchAddLastToBeWrittenSequence(total_count);
} }
} }
const SequenceNumber current_sequence = last_sequence + 1;
last_sequence += total_count;
if (status.ok()) { if (status.ok()) {
PERF_TIMER_GUARD(write_memtable_time); PERF_TIMER_GUARD(write_memtable_time);
@ -263,6 +296,15 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
mutex_.Lock(); mutex_.Lock();
MarkLogsSynced(logfile_number_, need_log_dir_sync, status); MarkLogsSynced(logfile_number_, need_log_dir_sync, status);
mutex_.Unlock(); mutex_.Unlock();
// Requesting sync with concurrent_prepare_ is expected to be very rare. We
// hance provide a simple implementation that is not necessarily efficient.
if (concurrent_prepare_) {
if (manual_wal_flush_) {
status = FlushWAL(true);
} else {
status = SyncWAL();
}
}
} }
bool should_exit_batch_group = true; bool should_exit_batch_group = true;
@ -272,7 +314,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
should_exit_batch_group = write_thread_.CompleteParallelMemTableWriter(&w); should_exit_batch_group = write_thread_.CompleteParallelMemTableWriter(&w);
} }
if (should_exit_batch_group) { if (should_exit_batch_group) {
if (status.ok()) {
versions_->SetLastSequence(last_sequence); versions_->SetLastSequence(last_sequence);
}
MemTableInsertStatusCheck(w.status); MemTableInsertStatusCheck(w.status);
write_thread_.ExitAsBatchGroupLeader(write_group, w.status); write_thread_.ExitAsBatchGroupLeader(write_group, w.status);
} }
@ -304,7 +348,7 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
bool need_log_sync = !write_options.disableWAL && write_options.sync; bool need_log_sync = !write_options.disableWAL && write_options.sync;
bool need_log_dir_sync = need_log_sync && !log_dir_synced_; bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
w.status = PreprocessWrite(write_options, &need_log_sync, &write_context); w.status = PreprocessWrite(write_options, &need_log_sync, &write_context);
log::Writer* cur_log_writer = logs_.back().writer; log::Writer* log_writer = logs_.back().writer;
mutex_.Unlock(); mutex_.Unlock();
// This can set non-OK status if callback fail. // This can set non-OK status if callback fail.
@ -352,8 +396,8 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
wal_write_group.size - 1); wal_write_group.size - 1);
RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1); RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1);
} }
w.status = WriteToWAL(wal_write_group, cur_log_writer, need_log_sync, w.status = WriteToWAL(wal_write_group, log_writer, log_used,
need_log_dir_sync, current_sequence); need_log_sync, need_log_dir_sync, current_sequence);
} }
if (!w.CallbackFailed()) { if (!w.CallbackFailed()) {
@ -403,11 +447,91 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
} }
assert(w.state == WriteThread::STATE_COMPLETED); assert(w.state == WriteThread::STATE_COMPLETED);
return w.FinalStatus();
}
Status DBImpl::WriteImplWALOnly(const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback,
uint64_t* log_used, uint64_t log_ref) {
Status status;
PERF_TIMER_GUARD(write_pre_and_post_process_time);
WriteThread::Writer w(write_options, my_batch, callback, log_ref,
true /* disable_memtable */);
if (write_options.disableWAL) {
return status;
}
RecordTick(stats_, WRITE_WITH_WAL);
StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
nonmem_write_thread_.JoinBatchGroup(&w);
assert(w.state != WriteThread::STATE_PARALLEL_MEMTABLE_WRITER);
if (w.state == WriteThread::STATE_COMPLETED) {
if (log_used != nullptr) { if (log_used != nullptr) {
*log_used = w.log_used; *log_used = w.log_used;
} }
return w.FinalStatus(); return w.FinalStatus();
}
// else we are the leader of the write batch group
assert(w.state == WriteThread::STATE_GROUP_LEADER);
WriteContext write_context;
WriteThread::WriteGroup write_group;
uint64_t last_sequence;
nonmem_write_thread_.EnterAsBatchGroupLeader(&w, &write_group);
// Note: no need to update last_batch_group_size_ here since the batch writes
// to WAL only
uint64_t total_byte_size = 0;
for (auto* writer : write_group) {
if (writer->CheckCallback(this)) {
total_byte_size = WriteBatchInternal::AppendedByteSize(
total_byte_size, WriteBatchInternal::ByteSize(writer->batch));
}
}
stat_mutex_.Lock();
// Update stats while we are an exclusive group leader, so we know
// that nobody else can be writing to these particular stats.
// We're optimistic, updating the stats before we successfully
// commit. That lets us release our leader status early.
auto stats = default_cf_internal_stats_;
stats->AddDBStats(InternalStats::BYTES_WRITTEN, total_byte_size);
RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
stats->AddDBStats(InternalStats::WRITE_DONE_BY_SELF, 1);
RecordTick(stats_, WRITE_DONE_BY_SELF);
auto write_done_by_other = write_group.size - 1;
if (write_done_by_other > 0) {
stats->AddDBStats(InternalStats::WRITE_DONE_BY_OTHER, write_done_by_other);
RecordTick(stats_, WRITE_DONE_BY_OTHER, write_done_by_other);
}
MeasureTime(stats_, BYTES_PER_WRITE, total_byte_size);
stat_mutex_.Unlock();
PERF_TIMER_STOP(write_pre_and_post_process_time);
PERF_TIMER_GUARD(write_wal_time);
// LastToBeWrittenSequence is increased inside WriteToWAL under
// wal_write_mutex_ to ensure ordered events in WAL
status = ConcurrentWriteToWAL(write_group, log_used, &last_sequence,
0 /*total_count*/);
if (status.ok() && write_options.sync) {
// Requesting sync with concurrent_prepare_ is expected to be very rare. We
// hance provide a simple implementation that is not necessarily efficient.
if (manual_wal_flush_) {
status = FlushWAL(true);
} else {
status = SyncWAL();
}
}
PERF_TIMER_START(write_pre_and_post_process_time);
if (!w.CallbackFailed()) {
ParanoidCheck(status);
}
nonmem_write_thread_.ExitAsBatchGroupLeader(write_group, w.status);
if (status.ok()) {
status = w.FinalStatus();
}
return status;
} }
void DBImpl::WriteCallbackStatusCheck(const Status& status) { void DBImpl::WriteCallbackStatusCheck(const Status& status) {
@ -519,13 +643,12 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
return status; return status;
} }
Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, WriteBatch* DBImpl::MergeBatch(const WriteThread::WriteGroup& write_group,
log::Writer* log_writer, bool need_log_sync, WriteBatch* tmp_batch, size_t* write_with_wal) {
bool need_log_dir_sync, SequenceNumber sequence) { assert(write_with_wal != nullptr);
Status status; assert(tmp_batch != nullptr);
WriteBatch* merged_batch = nullptr; WriteBatch* merged_batch = nullptr;
size_t write_with_wal = 0; *write_with_wal = 0;
auto* leader = write_group.leader; auto* leader = write_group.leader;
if (write_group.size == 1 && leader->ShouldWriteToWAL() && if (write_group.size == 1 && leader->ShouldWriteToWAL() &&
leader->batch->GetWalTerminationPoint().is_cleared()) { leader->batch->GetWalTerminationPoint().is_cleared()) {
@ -534,30 +657,54 @@ Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
// and the batch is not wanting to be truncated // and the batch is not wanting to be truncated
merged_batch = leader->batch; merged_batch = leader->batch;
leader->log_used = logfile_number_; leader->log_used = logfile_number_;
write_with_wal = 1; *write_with_wal = 1;
} else { } else {
// WAL needs all of the batches flattened into a single batch. // WAL needs all of the batches flattened into a single batch.
// We could avoid copying here with an iov-like AddRecord // We could avoid copying here with an iov-like AddRecord
// interface // interface
merged_batch = &tmp_batch_; merged_batch = tmp_batch;
for (auto writer : write_group) { for (auto writer : write_group) {
if (writer->ShouldWriteToWAL()) { if (writer->ShouldWriteToWAL()) {
WriteBatchInternal::Append(merged_batch, writer->batch, WriteBatchInternal::Append(merged_batch, writer->batch,
/*WAL_only*/ true); /*WAL_only*/ true);
write_with_wal++; (*write_with_wal)++;
} }
writer->log_used = logfile_number_; writer->log_used = logfile_number_;
} }
} }
return merged_batch;
}
WriteBatchInternal::SetSequence(merged_batch, sequence); Status DBImpl::WriteToWAL(const WriteBatch& merged_batch,
log::Writer* log_writer, uint64_t* log_used,
Slice log_entry = WriteBatchInternal::Contents(merged_batch); uint64_t* log_size) {
status = log_writer->AddRecord(log_entry); assert(log_size != nullptr);
Slice log_entry = WriteBatchInternal::Contents(&merged_batch);
*log_size = log_entry.size();
Status status = log_writer->AddRecord(log_entry);
if (log_used != nullptr) {
*log_used = logfile_number_;
}
total_log_size_ += log_entry.size(); total_log_size_ += log_entry.size();
alive_log_files_.back().AddSize(log_entry.size()); alive_log_files_.back().AddSize(log_entry.size());
log_empty_ = false; log_empty_ = false;
uint64_t log_size = log_entry.size(); return status;
}
Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
log::Writer* log_writer, uint64_t* log_used,
bool need_log_sync, bool need_log_dir_sync,
SequenceNumber sequence) {
Status status;
size_t write_with_wal = 0;
WriteBatch* merged_batch =
MergeBatch(write_group, &tmp_batch_, &write_with_wal);
WriteBatchInternal::SetSequence(merged_batch, sequence);
uint64_t log_size;
status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
if (status.ok() && need_log_sync) { if (status.ok() && need_log_sync) {
StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS); StopWatch sw(env_, stats_, WAL_FILE_SYNC_MICROS);
@ -599,6 +746,41 @@ Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
return status; return status;
} }
Status DBImpl::ConcurrentWriteToWAL(const WriteThread::WriteGroup& write_group,
uint64_t* log_used,
SequenceNumber* last_sequence,
int total_count) {
Status status;
WriteBatch tmp_batch;
size_t write_with_wal = 0;
WriteBatch* merged_batch =
MergeBatch(write_group, &tmp_batch, &write_with_wal);
// We need to lock log_write_mutex_ since logs_ and alive_log_files might be
// pushed back concurrently
log_write_mutex_.Lock();
*last_sequence = versions_->FetchAddLastToBeWrittenSequence(total_count);
auto sequence = *last_sequence + 1;
WriteBatchInternal::SetSequence(merged_batch, sequence);
log::Writer* log_writer = logs_.back().writer;
uint64_t log_size;
status = WriteToWAL(*merged_batch, log_writer, log_used, &log_size);
log_write_mutex_.Unlock();
if (status.ok()) {
stat_mutex_.Lock();
auto stats = default_cf_internal_stats_;
stats->AddDBStats(InternalStats::WAL_FILE_BYTES, log_size);
RecordTick(stats_, WAL_FILE_BYTES, log_size);
stats->AddDBStats(InternalStats::WRITE_WITH_WAL, write_with_wal);
RecordTick(stats_, WRITE_WITH_WAL, write_with_wal);
stat_mutex_.Unlock();
}
return status;
}
Status DBImpl::HandleWALFull(WriteContext* write_context) { Status DBImpl::HandleWALFull(WriteContext* write_context) {
mutex_.AssertHeld(); mutex_.AssertHeld();
assert(write_context != nullptr); assert(write_context != nullptr);
@ -895,9 +1077,9 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
lfile->SetPreallocationBlockSize(preallocate_block_size); lfile->SetPreallocationBlockSize(preallocate_block_size);
unique_ptr<WritableFileWriter> file_writer( unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(lfile), opt_env_opt)); new WritableFileWriter(std::move(lfile), opt_env_opt));
new_log = new_log = new log::Writer(
new log::Writer(std::move(file_writer), new_log_number, std::move(file_writer), new_log_number,
immutable_db_options_.recycle_log_file_num > 0); immutable_db_options_.recycle_log_file_num > 0, manual_wal_flush_);
} }
} }
@ -931,8 +1113,15 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) {
assert(new_log != nullptr); assert(new_log != nullptr);
log_empty_ = true; log_empty_ = true;
log_dir_synced_ = false; log_dir_synced_ = false;
log_write_mutex_.Lock();
if (!logs_.empty()) {
// Alway flush the buffer of the last log before switching to a new one
log::Writer* cur_log_writer = logs_.back().writer;
cur_log_writer->WriteBuffer();
}
logs_.emplace_back(logfile_number_, new_log); logs_.emplace_back(logfile_number_, new_log);
alive_log_files_.push_back(LogFileNumberSize(logfile_number_)); alive_log_files_.push_back(LogFileNumberSize(logfile_number_));
log_write_mutex_.Unlock();
} }
for (auto loop_cfd : *versions_->GetColumnFamilySet()) { for (auto loop_cfd : *versions_->GetColumnFamilySet()) {
// all this is just optimization to delete logs that // all this is just optimization to delete logs that

@ -118,6 +118,7 @@ TEST_F(DBTestXactLogIterator, TransactionLogIteratorRace) {
dbfull()->Flush(FlushOptions()); dbfull()->Flush(FlushOptions());
Put("key4", DummyString(1024)); Put("key4", DummyString(1024));
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 4U); ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 4U);
dbfull()->FlushWAL(false);
{ {
auto iter = OpenTransactionLogIter(0); auto iter = OpenTransactionLogIter(0);
@ -134,6 +135,7 @@ TEST_F(DBTestXactLogIterator, TransactionLogIteratorRace) {
// "key5" would be written in a new memtable and log // "key5" would be written in a new memtable and log
Put("key5", DummyString(1024)); Put("key5", DummyString(1024));
dbfull()->FlushWAL(false);
{ {
// this iter would miss "key4" if not fixed // this iter would miss "key4" if not fixed
auto iter = OpenTransactionLogIter(0); auto iter = OpenTransactionLogIter(0);
@ -183,6 +185,7 @@ TEST_F(DBTestXactLogIterator, TransactionLogIteratorCorruptedLog) {
Put("key"+ToString(i), DummyString(10)); Put("key"+ToString(i), DummyString(10));
} }
dbfull()->Flush(FlushOptions()); dbfull()->Flush(FlushOptions());
dbfull()->FlushWAL(false);
// Corrupt this log to create a gap // Corrupt this log to create a gap
rocksdb::VectorLogPtr wal_files; rocksdb::VectorLogPtr wal_files;
ASSERT_OK(dbfull()->GetSortedWalFiles(wal_files)); ASSERT_OK(dbfull()->GetSortedWalFiles(wal_files));
@ -196,6 +199,7 @@ TEST_F(DBTestXactLogIterator, TransactionLogIteratorCorruptedLog) {
// Insert a new entry to a new log file // Insert a new entry to a new log file
Put("key1025", DummyString(10)); Put("key1025", DummyString(10));
dbfull()->FlushWAL(false);
// Try to read from the beginning. Should stop before the gap and read less // Try to read from the beginning. Should stop before the gap and read less
// than 1025 entries // than 1025 entries
auto iter = OpenTransactionLogIter(0); auto iter = OpenTransactionLogIter(0);

@ -469,6 +469,12 @@ Options DBTestBase::GetOptions(
options.enable_pipelined_write = true; options.enable_pipelined_write = true;
break; break;
} }
case kConcurrentWALWrites: {
// This options optimize 2PC commit path
options.concurrent_prepare = true;
options.manual_wal_flush = true;
break;
}
default: default:
break; break;

@ -641,13 +641,14 @@ class DBTestBase : public testing::Test {
kRecycleLogFiles = 28, kRecycleLogFiles = 28,
kConcurrentSkipList = 29, kConcurrentSkipList = 29,
kPipelinedWrite = 30, kPipelinedWrite = 30,
kEnd = 31, kConcurrentWALWrites = 31,
kDirectIO = 32, kEnd = 32,
kLevelSubcompactions = 33, kDirectIO = 33,
kUniversalSubcompactions = 34, kLevelSubcompactions = 34,
kBlockBasedTableWithIndexRestartInterval = 35, kUniversalSubcompactions = 35,
kBlockBasedTableWithPartitionedIndex = 36, kBlockBasedTableWithIndexRestartInterval = 36,
kPartitionedFilterWithNewTableReaderForCompactions = 37, kBlockBasedTableWithPartitionedIndex = 37,
kPartitionedFilterWithNewTableReaderForCompactions = 38,
}; };
public: public:

@ -122,9 +122,11 @@ TEST_F(DBWALTest, SyncWALNotWaitWrite) {
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
rocksdb::port::Thread thread([&]() { ASSERT_OK(Put("foo2", "bar2")); }); rocksdb::port::Thread thread([&]() { ASSERT_OK(Put("foo2", "bar2")); });
TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1"); // Moving this to SyncWAL before the actual fsync
// TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:1");
ASSERT_OK(db_->SyncWAL()); ASSERT_OK(db_->SyncWAL());
TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2"); // Moving this to SyncWAL after actual fsync
// TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2");
thread.join(); thread.join();
@ -660,6 +662,7 @@ TEST_F(DBWALTest, PartOfWritesWithWALDisabled) {
ASSERT_OK(Flush(0)); ASSERT_OK(Flush(0));
ASSERT_OK(Put(0, "key", "v5", wal_on)); // seq id 5 ASSERT_OK(Put(0, "key", "v5", wal_on)); // seq id 5
ASSERT_EQ("v5", Get(0, "key")); ASSERT_EQ("v5", Get(0, "key"));
dbfull()->FlushWAL(false);
// Simulate a crash. // Simulate a crash.
fault_env->SetFilesystemActive(false); fault_env->SetFilesystemActive(false);
Close(); Close();
@ -729,6 +732,7 @@ class RecoveryTestHelper {
batch.Put(key, value); batch.Put(key, value);
WriteBatchInternal::SetSequence(&batch, seq); WriteBatchInternal::SetSequence(&batch, seq);
current_log_writer->AddRecord(WriteBatchInternal::Contents(&batch)); current_log_writer->AddRecord(WriteBatchInternal::Contents(&batch));
versions->SetLastToBeWrittenSequence(seq);
versions->SetLastSequence(seq); versions->SetLastSequence(seq);
} }
} }
@ -1113,6 +1117,7 @@ TEST_F(DBWALTest, RecoverWithoutFlushMultipleCF) {
ASSERT_EQ(3, countWalFiles()); ASSERT_EQ(3, countWalFiles());
Flush(1); Flush(1);
ASSERT_OK(Put(2, "key7", kLargeValue)); ASSERT_OK(Put(2, "key7", kLargeValue));
dbfull()->FlushWAL(false);
ASSERT_EQ(4, countWalFiles()); ASSERT_EQ(4, countWalFiles());
// Reopen twice and validate. // Reopen twice and validate.

@ -71,6 +71,7 @@ TEST_P(DBWriteTest, ReturnSeuqneceNumberMultiThreaded) {
INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest, INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest,
testing::Values(DBTestBase::kDefault, testing::Values(DBTestBase::kDefault,
DBTestBase::kConcurrentWALWrites,
DBTestBase::kPipelinedWrite)); DBTestBase::kPipelinedWrite));
} // namespace rocksdb } // namespace rocksdb

@ -146,6 +146,8 @@ Status ExternalSstFileIngestionJob::NeedsFlush(bool* flush_needed) {
return status; return status;
} }
// REQUIRES: we have become the only writer by entering both write_thread_ and
// nonmem_write_thread_
Status ExternalSstFileIngestionJob::Run() { Status ExternalSstFileIngestionJob::Run() {
Status status; Status status;
#ifndef NDEBUG #ifndef NDEBUG
@ -164,6 +166,8 @@ Status ExternalSstFileIngestionJob::Run() {
// if the dont overlap with any ranges since we have snapshots // if the dont overlap with any ranges since we have snapshots
force_global_seqno = true; force_global_seqno = true;
} }
// It is safe to use this instead of LastToBeWrittenSequence since we are
// the only active writer, and hence they are equal
const SequenceNumber last_seqno = versions_->LastSequence(); const SequenceNumber last_seqno = versions_->LastSequence();
SuperVersion* super_version = cfd_->GetSuperVersion(); SuperVersion* super_version = cfd_->GetSuperVersion();
edit_.SetColumnFamily(cfd_->GetID()); edit_.SetColumnFamily(cfd_->GetID());
@ -197,6 +201,7 @@ Status ExternalSstFileIngestionJob::Run() {
} }
if (consumed_seqno) { if (consumed_seqno) {
versions_->SetLastToBeWrittenSequence(last_seqno + 1);
versions_->SetLastSequence(last_seqno + 1); versions_->SetLastSequence(last_seqno + 1);
} }

@ -412,6 +412,7 @@ TEST_P(FaultInjectionTest, WriteOptionSyncTest) {
write_options.sync = true; write_options.sync = true;
ASSERT_OK( ASSERT_OK(
db_->Put(write_options, Key(2, &key_space), Value(2, &value_space))); db_->Put(write_options, Key(2, &key_space), Value(2, &value_space)));
db_->FlushWAL(false);
env_->SetFilesystemActive(false); env_->SetFilesystemActive(false);
NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced); NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced);
@ -496,7 +497,7 @@ TEST_P(FaultInjectionTest, ManualLogSyncTest) {
ASSERT_OK(db_->Flush(flush_options)); ASSERT_OK(db_->Flush(flush_options));
ASSERT_OK( ASSERT_OK(
db_->Put(write_options, Key(2, &key_space), Value(2, &value_space))); db_->Put(write_options, Key(2, &key_space), Value(2, &value_space)));
ASSERT_OK(db_->SyncWAL()); ASSERT_OK(db_->FlushWAL(true));
env_->SetFilesystemActive(false); env_->SetFilesystemActive(false);
NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced); NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced);

@ -20,20 +20,22 @@
namespace rocksdb { namespace rocksdb {
namespace log { namespace log {
Writer::Writer(unique_ptr<WritableFileWriter>&& dest, Writer::Writer(unique_ptr<WritableFileWriter>&& dest, uint64_t log_number,
uint64_t log_number, bool recycle_log_files) bool recycle_log_files, bool manual_flush)
: dest_(std::move(dest)), : dest_(std::move(dest)),
block_offset_(0), block_offset_(0),
log_number_(log_number), log_number_(log_number),
recycle_log_files_(recycle_log_files) { recycle_log_files_(recycle_log_files),
manual_flush_(manual_flush) {
for (int i = 0; i <= kMaxRecordType; i++) { for (int i = 0; i <= kMaxRecordType; i++) {
char t = static_cast<char>(i); char t = static_cast<char>(i);
type_crc_[i] = crc32c::Value(&t, 1); type_crc_[i] = crc32c::Value(&t, 1);
} }
} }
Writer::~Writer() { Writer::~Writer() { WriteBuffer(); }
}
Status Writer::WriteBuffer() { return dest_->Flush(); }
Status Writer::AddRecord(const Slice& slice) { Status Writer::AddRecord(const Slice& slice) {
const char* ptr = slice.data(); const char* ptr = slice.data();
@ -129,9 +131,11 @@ Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
if (s.ok()) { if (s.ok()) {
s = dest_->Append(Slice(ptr, n)); s = dest_->Append(Slice(ptr, n));
if (s.ok()) { if (s.ok()) {
if (!manual_flush_) {
s = dest_->Flush(); s = dest_->Flush();
} }
} }
}
block_offset_ += header_size + n; block_offset_ += header_size + n;
return s; return s;
} }

@ -74,8 +74,8 @@ class Writer {
// Create a writer that will append data to "*dest". // Create a writer that will append data to "*dest".
// "*dest" must be initially empty. // "*dest" must be initially empty.
// "*dest" must remain live while this Writer is in use. // "*dest" must remain live while this Writer is in use.
explicit Writer(unique_ptr<WritableFileWriter>&& dest, explicit Writer(unique_ptr<WritableFileWriter>&& dest, uint64_t log_number,
uint64_t log_number, bool recycle_log_files); bool recycle_log_files, bool manual_flush = false);
~Writer(); ~Writer();
Status AddRecord(const Slice& slice); Status AddRecord(const Slice& slice);
@ -85,6 +85,8 @@ class Writer {
uint64_t get_log_number() const { return log_number_; } uint64_t get_log_number() const { return log_number_; }
Status WriteBuffer();
private: private:
unique_ptr<WritableFileWriter> dest_; unique_ptr<WritableFileWriter> dest_;
size_t block_offset_; // Current offset in block size_t block_offset_; // Current offset in block
@ -98,6 +100,10 @@ class Writer {
Status EmitPhysicalRecord(RecordType type, const char* ptr, size_t length); Status EmitPhysicalRecord(RecordType type, const char* ptr, size_t length);
// If true, it does not flush after each write. Instead it relies on the upper
// layer to manually does the flush by calling ::WriteBuffer()
bool manual_flush_;
// No copying allowed // No copying allowed
Writer(const Writer&); Writer(const Writer&);
void operator=(const Writer&); void operator=(const Writer&);

@ -524,6 +524,7 @@ class Repairer {
max_sequence = tables_[i].max_sequence; max_sequence = tables_[i].max_sequence;
} }
} }
vset_.SetLastToBeWrittenSequence(max_sequence);
vset_.SetLastSequence(max_sequence); vset_.SetLastSequence(max_sequence);
for (const auto& cf_id_and_tables : cf_id_to_tables) { for (const auto& cf_id_and_tables : cf_id_to_tables) {

@ -2286,6 +2286,7 @@ VersionSet::VersionSet(const std::string& dbname,
manifest_file_number_(0), // Filled by Recover() manifest_file_number_(0), // Filled by Recover()
pending_manifest_file_number_(0), pending_manifest_file_number_(0),
last_sequence_(0), last_sequence_(0),
last_to_be_written_sequence_(0),
prev_log_number_(0), prev_log_number_(0),
current_version_number_(0), current_version_number_(0),
manifest_file_size_(0), manifest_file_size_(0),
@ -2922,6 +2923,7 @@ Status VersionSet::Recover(
manifest_file_size_ = current_manifest_file_size; manifest_file_size_ = current_manifest_file_size;
next_file_number_.store(next_file + 1); next_file_number_.store(next_file + 1);
last_to_be_written_sequence_ = last_sequence;
last_sequence_ = last_sequence; last_sequence_ = last_sequence;
prev_log_number_ = previous_log_number; prev_log_number_ = previous_log_number;
@ -3291,6 +3293,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname,
} }
next_file_number_.store(next_file + 1); next_file_number_.store(next_file + 1);
last_to_be_written_sequence_ = last_sequence;
last_sequence_ = last_sequence; last_sequence_ = last_sequence;
prev_log_number_ = previous_log_number; prev_log_number_ = previous_log_number;

@ -699,12 +699,31 @@ class VersionSet {
return last_sequence_.load(std::memory_order_acquire); return last_sequence_.load(std::memory_order_acquire);
} }
// Note: memory_order_acquire must be sufficient.
uint64_t LastToBeWrittenSequence() const {
return last_to_be_written_sequence_.load(std::memory_order_seq_cst);
}
// Set the last sequence number to s. // Set the last sequence number to s.
void SetLastSequence(uint64_t s) { void SetLastSequence(uint64_t s) {
assert(s >= last_sequence_); assert(s >= last_sequence_);
// Last visible seqeunce must always be less than last written seq
assert(!db_options_->concurrent_prepare ||
s <= last_to_be_written_sequence_);
last_sequence_.store(s, std::memory_order_release); last_sequence_.store(s, std::memory_order_release);
} }
// Note: memory_order_release must be sufficient
void SetLastToBeWrittenSequence(uint64_t s) {
assert(s >= last_to_be_written_sequence_);
last_to_be_written_sequence_.store(s, std::memory_order_seq_cst);
}
// Note: memory_order_release must be sufficient
uint64_t FetchAddLastToBeWrittenSequence(uint64_t s) {
return last_to_be_written_sequence_.fetch_add(s, std::memory_order_seq_cst);
}
// Mark the specified file number as used. // Mark the specified file number as used.
// REQUIRED: this is only called during single-threaded recovery // REQUIRED: this is only called during single-threaded recovery
void MarkFileNumberUsedDuringRecovery(uint64_t number); void MarkFileNumberUsedDuringRecovery(uint64_t number);
@ -804,7 +823,10 @@ class VersionSet {
uint64_t manifest_file_number_; uint64_t manifest_file_number_;
uint64_t options_file_number_; uint64_t options_file_number_;
uint64_t pending_manifest_file_number_; uint64_t pending_manifest_file_number_;
// The last seq visible to reads
std::atomic<uint64_t> last_sequence_; std::atomic<uint64_t> last_sequence_;
// The last seq with which a writer has written/will write.
std::atomic<uint64_t> last_to_be_written_sequence_;
uint64_t prev_log_number_; // 0 or backing store for memtable being compacted uint64_t prev_log_number_; // 0 or backing store for memtable being compacted
// Opened lazily // Opened lazily

@ -69,6 +69,7 @@ class WalManagerTest : public testing::Test {
batch.Put(key, value); batch.Put(key, value);
WriteBatchInternal::SetSequence(&batch, seq); WriteBatchInternal::SetSequence(&batch, seq);
current_log_writer_->AddRecord(WriteBatchInternal::Contents(&batch)); current_log_writer_->AddRecord(WriteBatchInternal::Contents(&batch));
versions_->SetLastToBeWrittenSequence(seq);
versions_->SetLastSequence(seq); versions_->SetLastSequence(seq);
} }

@ -125,6 +125,7 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) {
{false, false, true, false, true}, {false, false, true, false, true},
}; };
for (auto& two_queues : {true, false}) {
for (auto& allow_parallel : {true, false}) { for (auto& allow_parallel : {true, false}) {
for (auto& allow_batching : {true, false}) { for (auto& allow_batching : {true, false}) {
for (auto& enable_WAL : {true, false}) { for (auto& enable_WAL : {true, false}) {
@ -134,6 +135,7 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) {
options.create_if_missing = true; options.create_if_missing = true;
options.allow_concurrent_memtable_write = allow_parallel; options.allow_concurrent_memtable_write = allow_parallel;
options.enable_pipelined_write = enable_pipelined_write; options.enable_pipelined_write = enable_pipelined_write;
options.concurrent_prepare = two_queues;
ReadOptions read_options; ReadOptions read_options;
DB* db; DB* db;
@ -200,11 +202,12 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) {
ASSERT_TRUE(writer->state == ASSERT_TRUE(writer->state ==
WriteThread::State::STATE_GROUP_LEADER); WriteThread::State::STATE_GROUP_LEADER);
} else if (!allow_parallel) { } else if (!allow_parallel) {
ASSERT_TRUE( ASSERT_TRUE(writer->state ==
writer->state == WriteThread::State::STATE_COMPLETED || WriteThread::State::STATE_COMPLETED ||
(enable_pipelined_write && (enable_pipelined_write &&
writer->state == writer->state ==
WriteThread::State::STATE_MEMTABLE_WRITER_LEADER)); WriteThread::State::
STATE_MEMTABLE_WRITER_LEADER));
} }
}); });
@ -296,6 +299,7 @@ TEST_F(WriteCallbackTest, WriteWithCallbackTest) {
} }
} }
} }
}
TEST_F(WriteCallbackTest, WriteCallBackTest) { TEST_F(WriteCallbackTest, WriteCallBackTest) {
Options options; Options options;

@ -853,6 +853,11 @@ class DB {
return Flush(options, DefaultColumnFamily()); return Flush(options, DefaultColumnFamily());
} }
// Flush the WAL memory buffer to the file. If sync is true, it calls SyncWAL
// afterwards.
virtual Status FlushWAL(bool sync) {
return Status::NotSupported("FlushWAL not implemented");
}
// Sync the wal. Note that Write() followed by SyncWAL() is not exactly the // Sync the wal. Note that Write() followed by SyncWAL() is not exactly the
// same as Write() with sync=true: in the latter case the changes won't be // same as Write() with sync=true: in the latter case the changes won't be
// visible until the sync is done. // visible until the sync is done.

@ -887,6 +887,18 @@ struct DBOptions {
// DEFAULT: false // DEFAULT: false
// Immutable. // Immutable.
bool allow_ingest_behind = false; bool allow_ingest_behind = false;
// If enabled it uses two queues for writes, one for the ones with
// disable_memtable and one for the ones that also write to memtable. This
// allows the memtable writes not to lag behind other writes. It can be used
// to optimize MySQL 2PC in which only the commits, which are serial, write to
// memtable.
bool concurrent_prepare = false;
// If true WAL is not flushed automatically after each write. Instead it
// relies on manual invocation of FlushWAL to write the WAL buffer to its
// file.
bool manual_wal_flush = false;
}; };
// Options to control the behavior of a database (passed to DB::Open) // Options to control the behavior of a database (passed to DB::Open)

@ -268,6 +268,8 @@ class StackableDB : public DB {
return db_->SyncWAL(); return db_->SyncWAL();
} }
virtual Status FlushWAL(bool sync) override { return db_->FlushWAL(sync); }
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
virtual Status DisableFileDeletions() override { virtual Status DisableFileDeletions() override {

@ -86,7 +86,9 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options)
fail_if_options_file_error(options.fail_if_options_file_error), fail_if_options_file_error(options.fail_if_options_file_error),
dump_malloc_stats(options.dump_malloc_stats), dump_malloc_stats(options.dump_malloc_stats),
avoid_flush_during_recovery(options.avoid_flush_during_recovery), avoid_flush_during_recovery(options.avoid_flush_during_recovery),
allow_ingest_behind(options.allow_ingest_behind) { allow_ingest_behind(options.allow_ingest_behind),
concurrent_prepare(options.concurrent_prepare),
manual_wal_flush(options.manual_wal_flush) {
} }
void ImmutableDBOptions::Dump(Logger* log) const { void ImmutableDBOptions::Dump(Logger* log) const {
@ -219,6 +221,10 @@ void ImmutableDBOptions::Dump(Logger* log) const {
avoid_flush_during_recovery); avoid_flush_during_recovery);
ROCKS_LOG_HEADER(log, " Options.allow_ingest_behind: %d", ROCKS_LOG_HEADER(log, " Options.allow_ingest_behind: %d",
allow_ingest_behind); allow_ingest_behind);
ROCKS_LOG_HEADER(log, " Options.concurrent_prepare: %d",
concurrent_prepare);
ROCKS_LOG_HEADER(log, " Options.manual_wal_flush: %d",
manual_wal_flush);
} }
MutableDBOptions::MutableDBOptions() MutableDBOptions::MutableDBOptions()

@ -79,6 +79,8 @@ struct ImmutableDBOptions {
bool dump_malloc_stats; bool dump_malloc_stats;
bool avoid_flush_during_recovery; bool avoid_flush_during_recovery;
bool allow_ingest_behind; bool allow_ingest_behind;
bool concurrent_prepare;
bool manual_wal_flush;
}; };
struct MutableDBOptions { struct MutableDBOptions {

@ -349,7 +349,15 @@ static std::unordered_map<std::string, OptionTypeInfo> db_options_type_info = {
{"allow_ingest_behind", {"allow_ingest_behind",
{offsetof(struct DBOptions, allow_ingest_behind), OptionType::kBoolean, {offsetof(struct DBOptions, allow_ingest_behind), OptionType::kBoolean,
OptionVerificationType::kNormal, false, OptionVerificationType::kNormal, false,
offsetof(struct ImmutableDBOptions, allow_ingest_behind)}}}; offsetof(struct ImmutableDBOptions, allow_ingest_behind)}},
{"concurrent_prepare",
{offsetof(struct DBOptions, concurrent_prepare), OptionType::kBoolean,
OptionVerificationType::kNormal, false,
offsetof(struct ImmutableDBOptions, concurrent_prepare)}},
{"manual_wal_flush",
{offsetof(struct DBOptions, manual_wal_flush), OptionType::kBoolean,
OptionVerificationType::kNormal, false,
offsetof(struct ImmutableDBOptions, manual_wal_flush)}}};
// offset_of is used to get the offset of a class data member // offset_of is used to get the offset of a class data member
// ex: offset_of(&ColumnFamilyOptions::num_levels) // ex: offset_of(&ColumnFamilyOptions::num_levels)

@ -13,22 +13,11 @@
#define __STDC_FORMAT_MACROS #define __STDC_FORMAT_MACROS
#endif #endif
#include <inttypes.h>
#include <cctype>
#include <cstring> #include <cstring>
#include <unordered_map>
#include "options/options_helper.h"
#include "options/options_parser.h" #include "options/options_parser.h"
#include "options/options_sanity_check.h"
#include "rocksdb/cache.h"
#include "rocksdb/convenience.h" #include "rocksdb/convenience.h"
#include "rocksdb/memtablerep.h"
#include "rocksdb/utilities/leveldb_options.h"
#include "util/random.h"
#include "util/stderr_logger.h"
#include "util/testharness.h" #include "util/testharness.h"
#include "util/testutil.h"
#ifndef GFLAGS #ifndef GFLAGS
bool FLAGS_enable_print = false; bool FLAGS_enable_print = false;
@ -294,7 +283,9 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) {
"allow_2pc=false;" "allow_2pc=false;"
"avoid_flush_during_recovery=false;" "avoid_flush_during_recovery=false;"
"avoid_flush_during_shutdown=false;" "avoid_flush_during_shutdown=false;"
"allow_ingest_behind=false;", "allow_ingest_behind=false;"
"concurrent_prepare=false;"
"manual_wal_flush=false;",
new_options)); new_options));
ASSERT_EQ(unset_bytes_base, NumUnsetBytes(new_options_ptr, sizeof(DBOptions), ASSERT_EQ(unset_bytes_base, NumUnsetBytes(new_options_ptr, sizeof(DBOptions),

@ -9,7 +9,6 @@
// Use of this source code is governed by a BSD-style license that can be // Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "table/block_based_table_factory.h" #include "table/block_based_table_factory.h"
#include <memory> #include <memory>

@ -8,8 +8,8 @@
/* /*
Murmurhash from http://sites.google.com/site/murmurhash/ Murmurhash from http://sites.google.com/site/murmurhash/
All code is released to the public domain. For business purposes, Murmurhash is All code is released to the public domain. For business purposes, Murmurhash
under the MIT license. is under the MIT license.
*/ */
#include "murmurhash.h" #include "murmurhash.h"

@ -8,8 +8,8 @@
/* /*
Murmurhash from http://sites.google.com/site/murmurhash/ Murmurhash from http://sites.google.com/site/murmurhash/
All code is released to the public domain. For business purposes, Murmurhash is All code is released to the public domain. For business purposes, Murmurhash
under the MIT license. is under the MIT license.
*/ */
#pragma once #pragma once
#include <stdint.h> #include <stdint.h>

@ -12,6 +12,7 @@
#include <inttypes.h> #include <inttypes.h>
#include <string> #include <string>
#include <thread>
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/utilities/optimistic_transaction_db.h" #include "rocksdb/utilities/optimistic_transaction_db.h"
@ -135,6 +136,13 @@ bool RandomTransactionInserter::DoInsert(DB* db, Transaction* txn,
if (s.ok()) { if (s.ok()) {
if (txn != nullptr) { if (txn != nullptr) {
std::hash<std::thread::id> hasher;
char name[64];
snprintf(name, 64, "txn%zu-%d", hasher(std::this_thread::get_id()),
txn_id_++);
assert(strlen(name) < 64 - 1);
txn->SetName(name);
s = txn->Prepare();
s = txn->Commit(); s = txn->Commit();
if (!s.ok()) { if (!s.ok()) {

@ -104,6 +104,8 @@ class RandomTransactionInserter {
Transaction* txn_ = nullptr; Transaction* txn_ = nullptr;
Transaction* optimistic_txn_ = nullptr; Transaction* optimistic_txn_ = nullptr;
std::atomic<int> txn_id_;
bool DoInsert(DB* db, Transaction* txn, bool is_optimistic); bool DoInsert(DB* db, Transaction* txn, bool is_optimistic);
}; };

@ -136,6 +136,9 @@ class DummyDB : public StackableDB {
return Status::OK(); return Status::OK();
} }
// To avoid FlushWAL called on stacked db which is nullptr
virtual Status FlushWAL(bool sync) override { return Status::OK(); }
std::vector<std::string> live_files_; std::vector<std::string> live_files_;
// pair<filename, alive?> // pair<filename, alive?>
std::vector<std::pair<std::string, bool>> wal_files_; std::vector<std::pair<std::string, bool>> wal_files_;

@ -209,6 +209,7 @@ Status CheckpointImpl::CreateCustomCheckpoint(
TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles1"); TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles1");
TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles2"); TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles2");
db_->FlushWAL(false /* sync */);
} }
// if we have more than one column family, we need to also get WAL files // if we have more than one column family, we need to also get WAL files
if (s.ok()) { if (s.ok()) {

@ -274,6 +274,8 @@ Status TransactionImpl::Commit() {
s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr, s = db_impl_->WriteImpl(write_options_, working_batch, nullptr, nullptr,
log_number_); log_number_);
if (!s.ok()) { if (!s.ok()) {
ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
"Commit write failed");
return s; return s;
} }

@ -35,7 +35,8 @@ using std::string;
namespace rocksdb { namespace rocksdb {
class TransactionTest : public ::testing::TestWithParam<bool> { class TransactionTest
: public ::testing::TestWithParam<std::tuple<bool, bool>> {
public: public:
TransactionDB* db; TransactionDB* db;
FaultInjectionTestEnv* env; FaultInjectionTestEnv* env;
@ -52,13 +53,14 @@ class TransactionTest : public ::testing::TestWithParam<bool> {
options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); options.merge_operator = MergeOperators::CreateFromStringId("stringappend");
env = new FaultInjectionTestEnv(Env::Default()); env = new FaultInjectionTestEnv(Env::Default());
options.env = env; options.env = env;
options.concurrent_prepare = std::get<1>(GetParam());
dbname = test::TmpDir() + "/transaction_testdb"; dbname = test::TmpDir() + "/transaction_testdb";
DestroyDB(dbname, options); DestroyDB(dbname, options);
txn_db_options.transaction_lock_timeout = 0; txn_db_options.transaction_lock_timeout = 0;
txn_db_options.default_lock_timeout = 0; txn_db_options.default_lock_timeout = 0;
Status s; Status s;
if (GetParam() == false) { if (std::get<0>(GetParam()) == false) {
s = TransactionDB::Open(options, txn_db_options, dbname, &db); s = TransactionDB::Open(options, txn_db_options, dbname, &db);
} else { } else {
s = OpenWithStackableDB(); s = OpenWithStackableDB();
@ -79,7 +81,7 @@ class TransactionTest : public ::testing::TestWithParam<bool> {
env->DropUnsyncedFileData(); env->DropUnsyncedFileData();
env->ResetState(); env->ResetState();
Status s; Status s;
if (GetParam() == false) { if (std::get<0>(GetParam()) == false) {
s = TransactionDB::Open(options, txn_db_options, dbname, &db); s = TransactionDB::Open(options, txn_db_options, dbname, &db);
} else { } else {
s = OpenWithStackableDB(); s = OpenWithStackableDB();
@ -91,7 +93,7 @@ class TransactionTest : public ::testing::TestWithParam<bool> {
delete db; delete db;
DestroyDB(dbname, options); DestroyDB(dbname, options);
Status s; Status s;
if (GetParam() == false) { if (std::get<0>(GetParam()) == false) {
s = TransactionDB::Open(options, txn_db_options, dbname, &db); s = TransactionDB::Open(options, txn_db_options, dbname, &db);
} else { } else {
s = OpenWithStackableDB(); s = OpenWithStackableDB();
@ -122,9 +124,17 @@ class TransactionTest : public ::testing::TestWithParam<bool> {
} }
}; };
INSTANTIATE_TEST_CASE_P(DBAsBaseDB, TransactionTest, ::testing::Values(false)); class MySQLStyleTransactionTest : public TransactionTest {};
INSTANTIATE_TEST_CASE_P(DBAsBaseDB, TransactionTest,
::testing::Values(std::make_tuple(false, false)));
INSTANTIATE_TEST_CASE_P(StackableDBAsBaseDB, TransactionTest, INSTANTIATE_TEST_CASE_P(StackableDBAsBaseDB, TransactionTest,
::testing::Values(true)); ::testing::Values(std::make_tuple(true, false)));
INSTANTIATE_TEST_CASE_P(MySQLStyleTransactionTest, MySQLStyleTransactionTest,
::testing::Values(std::make_tuple(false, false),
std::make_tuple(false, true),
std::make_tuple(true, false),
std::make_tuple(true, true)));
TEST_P(TransactionTest, DoubleEmptyWrite) { TEST_P(TransactionTest, DoubleEmptyWrite) {
WriteOptions write_options; WriteOptions write_options;
@ -957,6 +967,7 @@ TEST_P(TransactionTest, PersistentTwoPhaseTransactionTest) {
s = db->Get(read_options, Slice("foo"), &value); s = db->Get(read_options, Slice("foo"), &value);
ASSERT_TRUE(s.IsNotFound()); ASSERT_TRUE(s.IsNotFound());
db->FlushWAL(false);
delete txn; delete txn;
// kill and reopen // kill and reopen
s = ReOpenNoDelete(); s = ReOpenNoDelete();
@ -1021,7 +1032,8 @@ TEST_P(TransactionTest, PersistentTwoPhaseTransactionTest) {
ASSERT_EQ(db->GetTransactionByName("xid"), nullptr); ASSERT_EQ(db->GetTransactionByName("xid"), nullptr);
} }
TEST_P(TransactionTest, TwoPhaseMultiThreadTest) { // TODO this test needs to be updated with serial commits
TEST_P(TransactionTest, DISABLED_TwoPhaseMultiThreadTest) {
// mix transaction writes and regular writes // mix transaction writes and regular writes
const uint32_t NUM_TXN_THREADS = 50; const uint32_t NUM_TXN_THREADS = 50;
std::atomic<uint32_t> txn_thread_num(0); std::atomic<uint32_t> txn_thread_num(0);
@ -1546,6 +1558,8 @@ TEST_P(TransactionTest, TwoPhaseOutOfOrderDelete) {
s = db->Put(wal_on, "cats", "dogs4"); s = db->Put(wal_on, "cats", "dogs4");
ASSERT_OK(s); ASSERT_OK(s);
db->FlushWAL(false);
// kill and reopen // kill and reopen
env->SetFilesystemActive(false); env->SetFilesystemActive(false);
ReOpenNoDelete(); ReOpenNoDelete();
@ -4487,7 +4501,7 @@ Status TransactionStressTestInserter(TransactionDB* db,
} }
} // namespace } // namespace
TEST_P(TransactionTest, TransactionStressTest) { TEST_P(MySQLStyleTransactionTest, TransactionStressTest) {
const size_t num_threads = 4; const size_t num_threads = 4;
const size_t num_transactions_per_thread = 10000; const size_t num_transactions_per_thread = 10000;
const size_t num_sets = 3; const size_t num_sets = 3;

Loading…
Cancel
Save