Adding comments to the write path

Summary:
also did minor refactoring
Closes https://github.com/facebook/rocksdb/pull/2115

Differential Revision: D4855818

Pulled By: maysamyabandeh

fbshipit-source-id: fbca6ac57e5c6677fffe8354f7291e596a50cb77
main
Maysam Yabandeh 9 years ago committed by Facebook Github Bot
parent 7124268a09
commit 20778f2f92
  1. 39
      db/db_impl_write.cc
  2. 6
      db/write_batch.cc
  3. 17
      db/write_thread.cc
  4. 2
      db/write_thread.h
  5. 2
      include/rocksdb/options.h

@ -117,7 +117,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
// when it finds suitable, and finish them in the same write batch. // when it finds suitable, and finish them in the same write batch.
// This is how a write job could be done by the other writer. // This is how a write job could be done by the other writer.
WriteContext write_context; WriteContext write_context;
WriteThread::Writer* last_writer = &w; WriteThread::Writer* last_writer = &w; // Dummy intial value
autovector<WriteThread::Writer*> write_group; autovector<WriteThread::Writer*> write_group;
bool logs_getting_synced = false; bool logs_getting_synced = false;
@ -127,7 +127,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
bool need_log_dir_sync = need_log_sync && !log_dir_synced_; bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
status = PreprocessWrite(write_options, need_log_sync, &logs_getting_synced, status = PreprocessWrite(write_options, need_log_sync, &logs_getting_synced,
&write_context); &write_context);
uint64_t last_sequence = versions_->LastSequence();
log::Writer* cur_log_writer = logs_.back().writer; log::Writer* cur_log_writer = logs_.back().writer;
mutex_.Unlock(); mutex_.Unlock();
@ -145,15 +144,13 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
// Rules for when we can update the memtable concurrently // Rules for when we can update the memtable concurrently
// 1. supported by memtable // 1. supported by memtable
// 2. Puts are not okay if inplace_update_support // 2. Puts are not okay if inplace_update_support
// 3. Deletes or SingleDeletes are not okay if filtering deletes // 3. Merges are not okay
// (controlled by both batch and memtable setting)
// 4. Merges are not okay
// //
// Rules 1..3 are enforced by checking the options // Rules 1..2 are enforced by checking the options
// during startup (CheckConcurrentWritesSupported), so if // during startup (CheckConcurrentWritesSupported), so if
// options.allow_concurrent_memtable_write is true then they can be // options.allow_concurrent_memtable_write is true then they can be
// assumed to be true. Rule 4 is checked for each batch. We could // assumed to be true. Rule 3 is checked for each batch. We could
// relax rules 2 and 3 if we could prevent write batches from referring // relax rules 2 if we could prevent write batches from referring
// more than once to a particular key. // more than once to a particular key.
bool parallel = immutable_db_options_.allow_concurrent_memtable_write && bool parallel = immutable_db_options_.allow_concurrent_memtable_write &&
write_group.size() > 1; write_group.size() > 1;
@ -173,6 +170,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
} }
} }
uint64_t last_sequence = versions_->LastSequence();
const SequenceNumber current_sequence = last_sequence + 1; const SequenceNumber current_sequence = last_sequence + 1;
last_sequence += total_count; last_sequence += total_count;
@ -218,7 +216,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
status = WriteBatchInternal::InsertInto( status = WriteBatchInternal::InsertInto(
write_group, current_sequence, column_family_memtables_.get(), write_group, current_sequence, column_family_memtables_.get(),
&flush_scheduler_, write_options.ignore_missing_column_families, &flush_scheduler_, write_options.ignore_missing_column_families,
0 /*log_number*/, this); 0 /*recovery_log_number*/, this);
if (status.ok()) { if (status.ok()) {
// There were no write failures. Set leader's status // There were no write failures. Set leader's status
@ -236,8 +234,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
std::memory_order_relaxed); std::memory_order_relaxed);
write_thread_.LaunchParallelFollowers(&pg, current_sequence); write_thread_.LaunchParallelFollowers(&pg, current_sequence);
// Each parallel follower is doing each own writes. The leader should
// also do its own.
if (w.ShouldWriteToMemtable()) { if (w.ShouldWriteToMemtable()) {
// do leader write
ColumnFamilyMemTablesImpl column_family_memtables( ColumnFamilyMemTablesImpl column_family_memtables(
versions_->GetColumnFamilySet()); versions_->GetColumnFamilySet());
assert(w.sequence == current_sequence); assert(w.sequence == current_sequence);
@ -270,8 +269,12 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
// //
// Is setting bg_error_ enough here? This will at least stop // Is setting bg_error_ enough here? This will at least stop
// compaction and fail any further writes. // compaction and fail any further writes.
if (!status.ok() && bg_error_.ok() && !w.CallbackFailed()) { if (!status.ok() && !w.CallbackFailed()) {
bg_error_ = status; mutex_.Lock();
if (bg_error_.ok()) {
bg_error_ = status; // stop compaction & fail any further writes
}
mutex_.Unlock();
} }
} }
} }
@ -341,11 +344,23 @@ Status DBImpl::PreprocessWrite(const WriteOptions& write_options,
} }
if (status.ok() && need_log_sync) { if (status.ok() && need_log_sync) {
// Wait until the parallel syncs are finished. Any sync process has to sync
// the front log too so it is enough to check the status of front()
// We do a while loop since log_sync_cv_ is signalled when any sync is
// finished
// Note: there does not seem to be a reason to wait for parallel sync at
// this early step but it is not important since parallel sync (SyncWAL) and
// need_log_sync are usually not used together.
while (logs_.front().getting_synced) { while (logs_.front().getting_synced) {
log_sync_cv_.Wait(); log_sync_cv_.Wait();
} }
for (auto& log : logs_) { for (auto& log : logs_) {
assert(!log.getting_synced); assert(!log.getting_synced);
// This is just to prevent the logs to be synced by a parallel SyncWAL
// call. We will do the actual syncing later after we will write to the
// WAL.
// Note: there does not seem to be a reason to set this early before we
// actually write to the WAL
log.getting_synced = true; log.getting_synced = true;
} }
*logs_getting_synced = true; *logs_getting_synced = true;

@ -1243,11 +1243,11 @@ public:
Status WriteBatchInternal::InsertInto( Status WriteBatchInternal::InsertInto(
const autovector<WriteThread::Writer*>& writers, SequenceNumber sequence, const autovector<WriteThread::Writer*>& writers, SequenceNumber sequence,
ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
bool ignore_missing_column_families, uint64_t log_number, DB* db, bool ignore_missing_column_families, uint64_t recovery_log_number, DB* db,
bool concurrent_memtable_writes) { bool concurrent_memtable_writes) {
MemTableInserter inserter(sequence, memtables, flush_scheduler, MemTableInserter inserter(sequence, memtables, flush_scheduler,
ignore_missing_column_families, log_number, db, ignore_missing_column_families, recovery_log_number,
concurrent_memtable_writes); db, concurrent_memtable_writes);
for (size_t i = 0; i < writers.size(); i++) { for (size_t i = 0; i < writers.size(); i++) {
auto w = writers[i]; auto w = writers[i];
if (!w->ShouldWriteToMemtable()) { if (!w->ShouldWriteToMemtable()) {

@ -5,7 +5,6 @@
#include "db/write_thread.h" #include "db/write_thread.h"
#include <chrono> #include <chrono>
#include <limits>
#include <thread> #include <thread>
#include "db/column_family.h" #include "db/column_family.h"
#include "port/port.h" #include "port/port.h"
@ -195,7 +194,9 @@ void WriteThread::LinkOne(Writer* w, bool* linked_as_leader) {
// debugging and is checked by an assert in WriteImpl // debugging and is checked by an assert in WriteImpl
w->state.store(STATE_GROUP_LEADER, std::memory_order_relaxed); w->state.store(STATE_GROUP_LEADER, std::memory_order_relaxed);
} }
// Then we are the head of the queue and hence definiltly the leader
*linked_as_leader = (writers == nullptr); *linked_as_leader = (writers == nullptr);
// Otherwise we will wait for previous leader to define our status
return; return;
} }
} }
@ -223,6 +224,13 @@ void WriteThread::JoinBatchGroup(Writer* w) {
TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait", w); TEST_SYNC_POINT_CALLBACK("WriteThread::JoinBatchGroup:Wait", w);
if (!linked_as_leader) { if (!linked_as_leader) {
/**
* Wait util:
* 1) An existing leader pick us as the new leader when it finishes
* 2) An exisitng leader pick us as its follewer and
* 2.1) finishes the memtable writes on our behalf
* 2.2) Or tell us to finish the memtable writes it in pralallel
*/
AwaitState(w, AwaitState(w,
STATE_GROUP_LEADER | STATE_PARALLEL_FOLLOWER | STATE_COMPLETED, STATE_GROUP_LEADER | STATE_PARALLEL_FOLLOWER | STATE_COMPLETED,
&ctx); &ctx);
@ -316,19 +324,22 @@ void WriteThread::LaunchParallelFollowers(ParallelGroup* pg,
Writer* w = pg->leader; Writer* w = pg->leader;
w->sequence = sequence; w->sequence = sequence;
// Initialize and wake up the others
while (w != pg->last_writer) { while (w != pg->last_writer) {
// Writers that won't write don't get sequence allotment // Writers that won't write don't get sequence allotment
if (!w->CallbackFailed() && w->ShouldWriteToMemtable()) { if (!w->CallbackFailed() && w->ShouldWriteToMemtable()) {
// There is a sequence number of each written key
sequence += WriteBatchInternal::Count(w->batch); sequence += WriteBatchInternal::Count(w->batch);
} }
w = w->link_newer; w = w->link_newer;
w->sequence = sequence; w->sequence = sequence; // sequence number for the first key in the batch
w->parallel_group = pg; w->parallel_group = pg;
SetState(w, STATE_PARALLEL_FOLLOWER); SetState(w, STATE_PARALLEL_FOLLOWER);
} }
} }
// This method is called by both the leader and parallel followers
bool WriteThread::CompleteParallelWorker(Writer* w) { bool WriteThread::CompleteParallelWorker(Writer* w) {
static AdaptationContext ctx("CompleteParallelWorker"); static AdaptationContext ctx("CompleteParallelWorker");
@ -352,6 +363,7 @@ bool WriteThread::CompleteParallelWorker(Writer* w) {
} }
// else we're the last parallel worker // else we're the last parallel worker
// Errors (if there is any) must be handled by leader before waking up others
if (w == leader || (early_exit_allowed && pg->status.ok())) { if (w == leader || (early_exit_allowed && pg->status.ok())) {
// this thread should perform exit duties // this thread should perform exit duties
w->status = pg->status; w->status = pg->status;
@ -434,6 +446,7 @@ void WriteThread::EnterUnbatched(Writer* w, InstrumentedMutex* mu) {
if (!linked_as_leader) { if (!linked_as_leader) {
mu->Unlock(); mu->Unlock();
TEST_SYNC_POINT("WriteThread::EnterUnbatched:Wait"); TEST_SYNC_POINT("WriteThread::EnterUnbatched:Wait");
// Last leader will not pick us as a follower since our batch is nullptr
AwaitState(w, STATE_GROUP_LEADER, &ctx); AwaitState(w, STATE_GROUP_LEADER, &ctx);
mu->Lock(); mu->Lock();
} }

@ -90,7 +90,7 @@ class WriteThread {
bool made_waitable; // records lazy construction of mutex and cv bool made_waitable; // records lazy construction of mutex and cv
std::atomic<uint8_t> state; // write under StateMutex() or pre-link std::atomic<uint8_t> state; // write under StateMutex() or pre-link
ParallelGroup* parallel_group; ParallelGroup* parallel_group;
SequenceNumber sequence; // the sequence number to use SequenceNumber sequence; // the sequence number to use for the first key
Status status; // status of memtable inserter Status status; // status of memtable inserter
Status callback_status; // status returned by callback->Callback() Status callback_status; // status returned by callback->Callback()
std::aligned_storage<sizeof(std::mutex)>::type state_mutex_bytes; std::aligned_storage<sizeof(std::mutex)>::type state_mutex_bytes;

@ -412,6 +412,8 @@ struct DBOptions {
// This parameter should be set to true while storing data to // This parameter should be set to true while storing data to
// filesystem like ext3 that can lose files after a reboot. // filesystem like ext3 that can lose files after a reboot.
// Default: false // Default: false
// Note: on many platforms fdatasync is defined as fsync, so this parameter
// would make no difference. Refer to fdatasync definition in this code base.
bool use_fsync = false; bool use_fsync = false;
// A list of paths where SST files can be put into, with its target size. // A list of paths where SST files can be put into, with its target size.

Loading…
Cancel
Save