Fix WriteImpl empty batch hanging issue

Summary: There is an issue in DBImpl::WriteImpl where if an empty writebatch comes in and sync=true then the logs will be marked as being synced yet the sync never actually happens because there is no data in the writebatch. This causes the next incoming batch to hang while waiting for the logs to complete syncing. This fix syncs logs even if the writebatch is empty.

Test Plan: DoubleEmptyBatch unit test in transaction_test.

Reviewers: yoshinorim, hermanlee4, sdong, ngbronson, anthony

Subscribers: leveldb, dhruba

Differential Revision: https://reviews.facebook.net/D54057
main
reid horuff 9 years ago
parent 871cc5f987
commit 5bcf952a87
  1. 8
      db/db_impl.cc
  2. 11
      utilities/transactions/transaction_test.cc

@ -4337,11 +4337,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
} }
} }
if (total_count == 0) {
write_thread_.ExitAsBatchGroupLeader(&w, last_writer, status);
return w.FinalStatus();
}
const SequenceNumber current_sequence = last_sequence + 1; const SequenceNumber current_sequence = last_sequence + 1;
last_sequence += total_count; last_sequence += total_count;
@ -4360,7 +4355,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
PERF_TIMER_GUARD(write_wal_time); PERF_TIMER_GUARD(write_wal_time);
WriteBatch* merged_batch = nullptr; WriteBatch* merged_batch = nullptr;
if (write_group.size() == 1) { if (write_group.size() == 1 && !write_group[0]->CallbackFailed()) {
merged_batch = write_group[0]->batch; merged_batch = write_group[0]->batch;
} else { } else {
// WAL needs all of the batches flattened into a single batch. // WAL needs all of the batches flattened into a single batch.
@ -4376,7 +4371,6 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
WriteBatchInternal::SetSequence(merged_batch, current_sequence); WriteBatchInternal::SetSequence(merged_batch, current_sequence);
assert(WriteBatchInternal::Count(merged_batch) == total_count); assert(WriteBatchInternal::Count(merged_batch) == total_count);
assert(WriteBatchInternal::ByteSize(merged_batch) == total_byte_size);
Slice log_entry = WriteBatchInternal::Contents(merged_batch); Slice log_entry = WriteBatchInternal::Contents(merged_batch);
status = logs_.back().writer->AddRecord(log_entry); status = logs_.back().writer->AddRecord(log_entry);

@ -62,6 +62,17 @@ class TransactionTest : public testing::Test {
} }
}; };
TEST_F(TransactionTest, DoubleEmptyWrite) {
WriteOptions write_options;
write_options.sync = true;
write_options.disableWAL = false;
WriteBatch batch;
ASSERT_OK(db->Write(write_options, &batch));
ASSERT_OK(db->Write(write_options, &batch));
}
TEST_F(TransactionTest, SuccessTest) { TEST_F(TransactionTest, SuccessTest) {
WriteOptions write_options; WriteOptions write_options;
ReadOptions read_options; ReadOptions read_options;

Loading…
Cancel
Save