Add further tests to ASSERT_STATUS_CHECKED (2) (#7698)

Summary:
Second batch of adding more tests to ASSERT_STATUS_CHECKED.

* external_sst_file_basic_test
* checkpoint_test
* db_wal_test
* db_block_cache_test
* db_logical_block_size_cache_test
* db_blob_index_test
* optimistic_transaction_test
* transaction_test
* point_lock_manager_test
* write_prepared_transaction_test
* write_unprepared_transaction_test

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7698

Reviewed By: cheng-chang

Differential Revision: D25441664

Pulled By: pdillinger

fbshipit-source-id: 9e78867f32321db5d4833e95eb96c5734526ef00
main
Adam Retter 4 years ago committed by Facebook GitHub Bot
parent 8e2749fd3a
commit 8ff6557e7f
  1. 11
      Makefile
  2. 6
      db/blob/db_blob_index_test.cc
  3. 6
      db/db_block_cache_test.cc
  4. 2
      db/db_impl/db_impl.h
  5. 5
      db/db_impl/db_impl_debug.cc
  6. 3
      db/db_impl/db_impl_open.cc
  7. 4
      db/db_impl/db_impl_write.cc
  8. 11
      db/db_iter.cc
  9. 16
      db/db_logical_block_size_cache_test.cc
  10. 106
      db/db_wal_test.cc
  11. 4
      db/db_write_test.cc
  12. 85
      db/external_sst_file_basic_test.cc
  13. 6
      db/external_sst_file_ingestion_job.cc
  14. 1
      db/write_thread.cc
  15. 15
      test_util/testutil.cc
  16. 6
      test_util/testutil.h
  17. 192
      utilities/checkpoint/checkpoint_impl.cc
  18. 38
      utilities/checkpoint/checkpoint_test.cc
  19. 2
      utilities/fault_injection_env.cc
  20. 18
      utilities/transactions/lock/point/point_lock_manager.cc
  21. 2
      utilities/transactions/optimistic_transaction.cc
  22. 655
      utilities/transactions/optimistic_transaction_test.cc
  23. 6
      utilities/transactions/transaction_test.cc
  24. 20
      utilities/transactions/transaction_test.h
  25. 213
      utilities/transactions/write_prepared_transaction_test.cc
  26. 23
      utilities/transactions/write_prepared_txn.cc
  27. 46
      utilities/transactions/write_unprepared_transaction_test.cc
  28. 5
      utilities/transactions/write_unprepared_txn_db.cc

@ -589,16 +589,21 @@ ifdef ASSERT_STATUS_CHECKED
cassandra_row_merge_test \
cassandra_serialize_test \
cleanable_test \
checkpoint_test \
coding_test \
crc32c_test \
dbformat_test \
db_basic_test \
db_blob_basic_test \
db_blob_index_test \
db_block_cache_test \
db_flush_test \
db_iterator_test \
db_logical_block_size_cache_test \
db_memtable_test \
db_merge_operand_test \
db_merge_operator_test \
db_wal_test \
db_with_timestamp_basic_test \
db_with_timestamp_compaction_test \
db_options_test \
@ -613,6 +618,7 @@ ifdef ASSERT_STATUS_CHECKED
env_logger_test \
event_logger_test \
error_handler_fs_test \
external_sst_file_basic_test \
auto_roll_logger_test \
file_indexer_test \
flush_job_test \
@ -628,6 +634,7 @@ ifdef ASSERT_STATUS_CHECKED
merger_test \
mock_env_test \
object_registry_test \
optimistic_transaction_test \
prefix_test \
plain_table_db_test \
repair_test \
@ -635,6 +642,7 @@ ifdef ASSERT_STATUS_CHECKED
customizable_test \
options_settable_test \
options_test \
point_lock_manager_test \
random_test \
range_del_aggregator_test \
sst_file_reader_test \
@ -648,6 +656,7 @@ ifdef ASSERT_STATUS_CHECKED
stats_history_test \
thread_local_test \
trace_analyzer_test \
transaction_test \
env_timed_test \
filelock_test \
timer_queue_test \
@ -663,6 +672,8 @@ ifdef ASSERT_STATUS_CHECKED
version_edit_test \
work_queue_test \
write_controller_test \
write_prepared_transaction_test \
write_unprepared_transaction_test \
compaction_iterator_test \
compaction_job_test \
compaction_job_stats_test \

@ -305,6 +305,7 @@ TEST_F(DBBlobIndexTest, Iterate) {
std::function<void(Iterator*)> extra_check = nullptr) {
// Seek
auto* iterator = create_iterator();
ASSERT_OK(iterator->status());
ASSERT_OK(iterator->Refresh());
iterator->Seek(get_key(index));
check_iterator(iterator, expected_status, forward_value);
@ -318,6 +319,7 @@ TEST_F(DBBlobIndexTest, Iterate) {
ASSERT_OK(iterator->Refresh());
iterator->Seek(get_key(index - 1));
ASSERT_TRUE(iterator->Valid());
ASSERT_OK(iterator->status());
iterator->Next();
check_iterator(iterator, expected_status, forward_value);
if (extra_check) {
@ -327,6 +329,7 @@ TEST_F(DBBlobIndexTest, Iterate) {
// SeekForPrev
iterator = create_iterator();
ASSERT_OK(iterator->status());
ASSERT_OK(iterator->Refresh());
iterator->SeekForPrev(get_key(index));
check_iterator(iterator, expected_status, backward_value);
@ -339,6 +342,7 @@ TEST_F(DBBlobIndexTest, Iterate) {
iterator = create_iterator();
iterator->Seek(get_key(index + 1));
ASSERT_TRUE(iterator->Valid());
ASSERT_OK(iterator->status());
iterator->Prev();
check_iterator(iterator, expected_status, backward_value);
if (extra_check) {
@ -376,7 +380,7 @@ TEST_F(DBBlobIndexTest, Iterate) {
ASSERT_OK(Write(&batch));
break;
default:
assert(false);
FAIL();
};
}
snapshots.push_back(dbfull()->GetSnapshot());

@ -677,7 +677,7 @@ TEST_F(DBBlockCacheTest, ParanoidFileChecks) {
// Create a new SST file. This will further trigger a compaction
// and generate another file.
ASSERT_OK(Flush(1));
dbfull()->TEST_WaitForCompact();
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(3, /* Totally 3 files created up to now */
TestGetTickerCount(options, BLOCK_CACHE_ADD));
@ -692,7 +692,7 @@ TEST_F(DBBlockCacheTest, ParanoidFileChecks) {
ASSERT_OK(Put(1, "1_key4", "val4"));
ASSERT_OK(Put(1, "9_key4", "val4"));
ASSERT_OK(Flush(1));
dbfull()->TEST_WaitForCompact();
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(3, /* Totally 3 files created up to now */
TestGetTickerCount(options, BLOCK_CACHE_ADD));
}
@ -860,7 +860,7 @@ TEST_F(DBBlockCacheTest, CacheCompressionDict) {
}
ASSERT_OK(Flush());
}
dbfull()->TEST_WaitForCompact();
ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(0, NumTableFilesAtLevel(0));
ASSERT_EQ(kNumFiles, NumTableFilesAtLevel(1));

@ -922,7 +922,7 @@ class DBImpl : public DB {
ColumnFamilyHandle* column_family = nullptr,
bool disallow_trivial_move = false);
void TEST_SwitchWAL();
Status TEST_SwitchWAL();
bool TEST_UnableToReleaseOldestLog() { return unable_to_release_oldest_log_; }

@ -22,12 +22,13 @@ uint64_t DBImpl::TEST_GetLevel0TotalSize() {
return default_cf_handle_->cfd()->current()->storage_info()->NumLevelBytes(0);
}
void DBImpl::TEST_SwitchWAL() {
Status DBImpl::TEST_SwitchWAL() {
WriteContext write_context;
InstrumentedMutexLock l(&mutex_);
void* writer = TEST_BeginWrite();
SwitchWAL(&write_context);
auto s = SwitchWAL(&write_context);
TEST_EndWrite(writer);
return s;
}
bool DBImpl::TEST_WALBufferIsEmpty(bool lock) {

@ -1368,6 +1368,9 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
cfd->GetName().c_str(), meta.fd.GetNumber(),
meta.fd.GetFileSize(), s.ToString().c_str());
mutex_.Lock();
io_s.PermitUncheckedError(); // TODO(AR) is this correct, or should we
// return io_s if not ok()?
}
}
ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);

@ -666,7 +666,6 @@ Status DBImpl::WriteImplWALOnly(
const uint64_t log_ref, uint64_t* seq_used, const size_t sub_batch_cnt,
PreReleaseCallback* pre_release_callback, const AssignOrder assign_order,
const PublishLastSeq publish_last_seq, const bool disable_memtable) {
Status status;
PERF_TIMER_GUARD(write_pre_and_post_process_time);
WriteThread::Writer w(write_options, my_batch, callback, log_ref,
disable_memtable, sub_batch_cnt, pre_release_callback);
@ -688,6 +687,8 @@ Status DBImpl::WriteImplWALOnly(
assert(w.state == WriteThread::STATE_GROUP_LEADER);
if (publish_last_seq == kDoPublishLastSeq) {
Status status;
// Currently we only use kDoPublishLastSeq in unordered_write
assert(immutable_db_options_.unordered_write);
WriteContext write_context;
@ -764,6 +765,7 @@ Status DBImpl::WriteImplWALOnly(
}
seq_inc = total_batch_cnt;
}
Status status;
IOStatus io_s;
if (!write_options.disableWAL) {
io_s = ConcurrentWriteToWAL(write_group, log_used, &last_sequence, seq_inc);

@ -545,7 +545,6 @@ bool DBIter::MergeValuesNewToOld() {
TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:PushedFirstOperand");
ParsedInternalKey ikey;
Status s;
for (iter_.Next(); iter_.Valid(); iter_.Next()) {
TEST_SYNC_POINT("DBIter::MergeValuesNewToOld:SteppedToNextOperand");
if (!ParseKey(&ikey)) {
@ -573,7 +572,7 @@ bool DBIter::MergeValuesNewToOld() {
// hit a put, merge the put value with operands and store the
// final result in saved_value_. We are done!
const Slice val = iter_.value();
s = MergeHelper::TimedFullMerge(
Status s = MergeHelper::TimedFullMerge(
merge_operator_, ikey.user_key, &val, merge_context_.GetOperands(),
&saved_value_, logger_, statistics_, env_, &pinned_value_, true);
if (!s.ok()) {
@ -616,10 +615,10 @@ bool DBIter::MergeValuesNewToOld() {
// a deletion marker.
// feed null as the existing value to the merge operator, such that
// client can differentiate this scenario and do things accordingly.
s = MergeHelper::TimedFullMerge(merge_operator_, saved_key_.GetUserKey(),
nullptr, merge_context_.GetOperands(),
&saved_value_, logger_, statistics_, env_,
&pinned_value_, true);
Status s = MergeHelper::TimedFullMerge(
merge_operator_, saved_key_.GetUserKey(), nullptr,
merge_context_.GetOperands(), &saved_value_, logger_, statistics_, env_,
&pinned_value_, true);
if (!s.ok()) {
valid_ = false;
status_ = s;

@ -401,7 +401,7 @@ TEST_F(DBLogicalBlockSizeCacheTest, MultiDBWithDifferentPaths) {
ColumnFamilyOptions cf_options0;
cf_options0.cf_paths = {{cf_path_0_, 1024}};
ColumnFamilyHandle* cf0;
db0->CreateColumnFamily(cf_options0, "cf", &cf0);
ASSERT_OK(db0->CreateColumnFamily(cf_options0, "cf", &cf0));
ASSERT_EQ(2, cache_->Size());
ASSERT_TRUE(cache_->Contains(data_path_0_));
ASSERT_EQ(1, cache_->GetRefCount(data_path_0_));
@ -421,7 +421,7 @@ TEST_F(DBLogicalBlockSizeCacheTest, MultiDBWithDifferentPaths) {
ColumnFamilyOptions cf_options1;
cf_options1.cf_paths = {{cf_path_1_, 1024}};
ColumnFamilyHandle* cf1;
db1->CreateColumnFamily(cf_options1, "cf", &cf1);
ASSERT_OK(db1->CreateColumnFamily(cf_options1, "cf", &cf1));
ASSERT_EQ(4, cache_->Size());
ASSERT_TRUE(cache_->Contains(data_path_0_));
ASSERT_EQ(1, cache_->GetRefCount(data_path_0_));
@ -432,7 +432,7 @@ TEST_F(DBLogicalBlockSizeCacheTest, MultiDBWithDifferentPaths) {
ASSERT_TRUE(cache_->Contains(cf_path_1_));
ASSERT_EQ(1, cache_->GetRefCount(cf_path_1_));
db0->DestroyColumnFamilyHandle(cf0);
ASSERT_OK(db0->DestroyColumnFamilyHandle(cf0));
delete db0;
ASSERT_EQ(2, cache_->Size());
ASSERT_TRUE(cache_->Contains(data_path_1_));
@ -441,7 +441,7 @@ TEST_F(DBLogicalBlockSizeCacheTest, MultiDBWithDifferentPaths) {
ASSERT_EQ(1, cache_->GetRefCount(cf_path_1_));
ASSERT_OK(DestroyDB(data_path_0_, options, {{"cf", cf_options0}}));
db1->DestroyColumnFamilyHandle(cf1);
ASSERT_OK(db1->DestroyColumnFamilyHandle(cf1));
delete db1;
ASSERT_EQ(0, cache_->Size());
ASSERT_OK(DestroyDB(data_path_1_, options, {{"cf", cf_options1}}));
@ -466,7 +466,7 @@ TEST_F(DBLogicalBlockSizeCacheTest, MultiDBWithSamePaths) {
ASSERT_EQ(1, cache_->GetRefCount(data_path_0_));
ColumnFamilyHandle* cf0;
db0->CreateColumnFamily(cf_options, "cf", &cf0);
ASSERT_OK(db0->CreateColumnFamily(cf_options, "cf", &cf0));
ASSERT_EQ(2, cache_->Size());
ASSERT_TRUE(cache_->Contains(data_path_0_));
ASSERT_EQ(1, cache_->GetRefCount(data_path_0_));
@ -482,14 +482,14 @@ TEST_F(DBLogicalBlockSizeCacheTest, MultiDBWithSamePaths) {
ASSERT_EQ(1, cache_->GetRefCount(cf_path_0_));
ColumnFamilyHandle* cf1;
db1->CreateColumnFamily(cf_options, "cf", &cf1);
ASSERT_OK(db1->CreateColumnFamily(cf_options, "cf", &cf1));
ASSERT_EQ(2, cache_->Size());
ASSERT_TRUE(cache_->Contains(data_path_0_));
ASSERT_EQ(2, cache_->GetRefCount(data_path_0_));
ASSERT_TRUE(cache_->Contains(cf_path_0_));
ASSERT_EQ(2, cache_->GetRefCount(cf_path_0_));
db0->DestroyColumnFamilyHandle(cf0);
ASSERT_OK(db0->DestroyColumnFamilyHandle(cf0));
delete db0;
ASSERT_EQ(2, cache_->Size());
ASSERT_TRUE(cache_->Contains(data_path_0_));
@ -498,7 +498,7 @@ TEST_F(DBLogicalBlockSizeCacheTest, MultiDBWithSamePaths) {
ASSERT_EQ(1, cache_->GetRefCount(cf_path_0_));
ASSERT_OK(DestroyDB(dbname_ + "/db0", options, {{"cf", cf_options}}));
db1->DestroyColumnFamilyHandle(cf1);
ASSERT_OK(db1->DestroyColumnFamilyHandle(cf1));
delete db1;
ASSERT_EQ(0, cache_->Size());
ASSERT_OK(DestroyDB(dbname_ + "/db1", options, {{"cf", cf_options}}));

@ -358,16 +358,16 @@ TEST_F(DBWALTest, RecoverWithBlob) {
// There should be no files just yet since we haven't flushed.
{
VersionSet* const versions = dbfull()->TEST_GetVersionSet();
assert(versions);
ASSERT_NE(versions, nullptr);
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
assert(cfd);
ASSERT_NE(cfd, nullptr);
Version* const current = cfd->current();
assert(current);
ASSERT_NE(current, nullptr);
const VersionStorageInfo* const storage_info = current->storage_info();
assert(storage_info);
ASSERT_NE(storage_info, nullptr);
ASSERT_EQ(storage_info->num_non_empty_levels(), 0);
ASSERT_TRUE(storage_info->GetBlobFiles().empty());
@ -388,28 +388,28 @@ TEST_F(DBWALTest, RecoverWithBlob) {
ASSERT_EQ(Get("key2"), long_value);
VersionSet* const versions = dbfull()->TEST_GetVersionSet();
assert(versions);
ASSERT_NE(versions, nullptr);
ColumnFamilyData* const cfd = versions->GetColumnFamilySet()->GetDefault();
assert(cfd);
ASSERT_NE(cfd, nullptr);
Version* const current = cfd->current();
assert(current);
ASSERT_NE(current, nullptr);
const VersionStorageInfo* const storage_info = current->storage_info();
assert(storage_info);
ASSERT_NE(storage_info, nullptr);
const auto& l0_files = storage_info->LevelFiles(0);
ASSERT_EQ(l0_files.size(), 1);
const FileMetaData* const table_file = l0_files[0];
assert(table_file);
ASSERT_NE(table_file, nullptr);
const auto& blob_files = storage_info->GetBlobFiles();
ASSERT_EQ(blob_files.size(), 1);
const auto& blob_file = blob_files.begin()->second;
assert(blob_file);
ASSERT_NE(blob_file, nullptr);
ASSERT_EQ(table_file->smallest.user_key(), "key1");
ASSERT_EQ(table_file->largest.user_key(), "key2");
@ -422,7 +422,7 @@ TEST_F(DBWALTest, RecoverWithBlob) {
#ifndef ROCKSDB_LITE
const InternalStats* const internal_stats = cfd->internal_stats();
assert(internal_stats);
ASSERT_NE(internal_stats, nullptr);
const uint64_t expected_bytes =
table_file->fd.GetFileSize() + blob_file->GetTotalBlobBytes();
@ -502,12 +502,12 @@ TEST_F(DBWALTest, IgnoreRecoveredLog) {
do {
// delete old files in backup_logs directory
env_->CreateDirIfMissing(backup_logs);
ASSERT_OK(env_->CreateDirIfMissing(backup_logs));
std::vector<std::string> old_files;
env_->GetChildren(backup_logs, &old_files);
ASSERT_OK(env_->GetChildren(backup_logs, &old_files));
for (auto& file : old_files) {
if (file != "." && file != "..") {
env_->DeleteFile(backup_logs + "/" + file);
ASSERT_OK(env_->DeleteFile(backup_logs + "/" + file));
}
}
Options options = CurrentOptions();
@ -526,7 +526,7 @@ TEST_F(DBWALTest, IgnoreRecoveredLog) {
// copy the logs to backup
std::vector<std::string> logs;
env_->GetChildren(options.wal_dir, &logs);
ASSERT_OK(env_->GetChildren(options.wal_dir, &logs));
for (auto& log : logs) {
if (log != ".." && log != ".") {
CopyFile(options.wal_dir + "/" + log, backup_logs + "/" + log);
@ -557,7 +557,7 @@ TEST_F(DBWALTest, IgnoreRecoveredLog) {
Close();
// copy the logs from backup back to wal dir
env_->CreateDirIfMissing(options.wal_dir);
ASSERT_OK(env_->CreateDirIfMissing(options.wal_dir));
for (auto& log : logs) {
if (log != ".." && log != ".") {
CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log);
@ -572,16 +572,16 @@ TEST_F(DBWALTest, IgnoreRecoveredLog) {
// Recovery will fail if DB directory doesn't exist.
Destroy(options);
// copy the logs from backup back to wal dir
env_->CreateDirIfMissing(options.wal_dir);
ASSERT_OK(env_->CreateDirIfMissing(options.wal_dir));
for (auto& log : logs) {
if (log != ".." && log != ".") {
CopyFile(backup_logs + "/" + log, options.wal_dir + "/" + log);
// we won't be needing this file no more
env_->DeleteFile(backup_logs + "/" + log);
ASSERT_OK(env_->DeleteFile(backup_logs + "/" + log));
}
}
Status s = TryReopen(options);
ASSERT_TRUE(!s.ok());
ASSERT_NOK(s);
Destroy(options);
} while (ChangeWalOptions());
}
@ -619,9 +619,9 @@ TEST_F(DBWALTest, PreallocateBlock) {
called.fetch_add(1);
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Put("", "");
Flush();
Put("", "");
ASSERT_OK(Put("", ""));
ASSERT_OK(Flush());
ASSERT_OK(Put("", ""));
Close();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ASSERT_EQ(2, called.load());
@ -638,9 +638,9 @@ TEST_F(DBWALTest, PreallocateBlock) {
called.fetch_add(1);
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Put("", "");
Flush();
Put("", "");
ASSERT_OK(Put("", ""));
ASSERT_OK(Flush());
ASSERT_OK(Put("", ""));
Close();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ASSERT_EQ(2, called.load());
@ -658,9 +658,9 @@ TEST_F(DBWALTest, PreallocateBlock) {
called.fetch_add(1);
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Put("", "");
Flush();
Put("", "");
ASSERT_OK(Put("", ""));
ASSERT_OK(Flush());
ASSERT_OK(Put("", ""));
Close();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ASSERT_EQ(2, called.load());
@ -679,9 +679,9 @@ TEST_F(DBWALTest, PreallocateBlock) {
called.fetch_add(1);
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Put("", "");
Flush();
Put("", "");
ASSERT_OK(Put("", ""));
ASSERT_OK(Flush());
ASSERT_OK(Put("", ""));
Close();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ASSERT_EQ(2, called.load());
@ -907,7 +907,7 @@ TEST_F(DBWALTest, RecoverCheckFileAmountWithSmallWriteBuffer) {
// Make 'dobrynia' to be flushed and new WAL file to be created
ASSERT_OK(Put(2, Key(10), DummyString(7500000)));
ASSERT_OK(Put(2, Key(1), DummyString(1)));
dbfull()->TEST_WaitForFlushMemTable(handles_[2]);
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[2]));
{
auto tables = ListTableFiles(env_, dbname_);
ASSERT_EQ(tables.size(), static_cast<size_t>(1));
@ -961,7 +961,7 @@ TEST_F(DBWALTest, RecoverCheckFileAmount) {
// Make 'nikitich' memtable to be flushed
ASSERT_OK(Put(3, Key(10), DummyString(1002400)));
ASSERT_OK(Put(3, Key(1), DummyString(1)));
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[3]));
// 4 memtable are not flushed, 1 sst file
{
auto tables = ListTableFiles(env_, dbname_);
@ -981,7 +981,7 @@ TEST_F(DBWALTest, RecoverCheckFileAmount) {
ASSERT_OK(Put(3, Key(10), DummyString(1002400)));
// make it flush
ASSERT_OK(Put(3, Key(1), DummyString(1)));
dbfull()->TEST_WaitForFlushMemTable(handles_[3]);
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable(handles_[3]));
// There are still 4 memtable not flushed, and 2 sst tables
ASSERT_OK(Put(0, Key(1), DummyString(1)));
ASSERT_OK(Put(1, Key(1), DummyString(1)));
@ -1029,10 +1029,10 @@ TEST_F(DBWALTest, SyncMultipleLogs) {
for (uint64_t b = 0; b < kNumBatches; b++) {
batch.Clear();
for (int i = 0; i < kBatchSize; i++) {
batch.Put(Key(i), DummyString(128));
ASSERT_OK(batch.Put(Key(i), DummyString(128)));
}
dbfull()->Write(wo, &batch);
ASSERT_OK(dbfull()->Write(wo, &batch));
}
ASSERT_OK(dbfull()->SyncWAL());
@ -1060,7 +1060,7 @@ TEST_F(DBWALTest, PartOfWritesWithWALDisabled) {
ASSERT_OK(Flush(0));
ASSERT_OK(Put(0, "key", "v5", wal_on)); // seq id 5
ASSERT_EQ("v5", Get(0, "key"));
dbfull()->FlushWAL(false);
ASSERT_OK(dbfull()->FlushWAL(false));
// Simulate a crash.
fault_env->SetFilesystemActive(false);
Close();
@ -1128,12 +1128,13 @@ class RecoveryTestHelper {
for (int i = 0; i < kKeysPerWALFile; i++) {
std::string key = "key" + ToString((*count)++);
std::string value = test->DummyString(kValueSize);
assert(current_log_writer.get() != nullptr);
ASSERT_NE(current_log_writer.get(), nullptr);
uint64_t seq = versions->LastSequence() + 1;
batch.Clear();
batch.Put(key, value);
ASSERT_OK(batch.Put(key, value));
WriteBatchInternal::SetSequence(&batch, seq);
current_log_writer->AddRecord(WriteBatchInternal::Contents(&batch));
ASSERT_OK(current_log_writer->AddRecord(
WriteBatchInternal::Contents(&batch)));
versions->SetLastAllocatedSequence(seq);
versions->SetLastPublishedSequence(seq);
versions->SetLastSequence(seq);
@ -1309,10 +1310,11 @@ TEST_F(DBWALTest, kPointInTimeRecoveryCFConsistency) {
ASSERT_OK(Put(1, "key3", "val3"));
// Corrupt WAL at location of key3
test::CorruptFile(env, fname, static_cast<int>(offset_to_corrupt), 4, false);
ASSERT_OK(test::CorruptFile(env, fname, static_cast<int>(offset_to_corrupt),
4, false));
ASSERT_OK(Put(2, "key4", "val4"));
ASSERT_OK(Put(1, "key5", "val5"));
Flush(2);
ASSERT_OK(Flush(2));
// PIT recovery & verify
options.wal_recovery_mode = WALRecoveryMode::kPointInTimeRecovery;
@ -1466,7 +1468,7 @@ TEST_F(DBWALTest, WalCleanupAfterAvoidFlushDuringRecovery) {
for (int i = 0; i < 2; ++i) {
if (i > 0) {
// Flush() triggers deletion of obsolete tracked files
Flush();
ASSERT_OK(Flush());
}
VectorLogPtr log_files;
ASSERT_OK(dbfull()->GetSortedWalFiles(log_files));
@ -1508,7 +1510,7 @@ TEST_F(DBWALTest, RecoverWithoutFlush) {
ASSERT_EQ(Get("foo"), "foo_v2");
ASSERT_EQ(Get("bar"), "bar_v2");
// manual flush and insert again
Flush();
ASSERT_OK(Flush());
ASSERT_EQ(Get("foo"), "foo_v2");
ASSERT_EQ(Get("bar"), "bar_v2");
ASSERT_OK(Put("foo", "foo_v3"));
@ -1529,7 +1531,9 @@ TEST_F(DBWALTest, RecoverWithoutFlushMultipleCF) {
auto countWalFiles = [this]() {
VectorLogPtr log_files;
dbfull()->GetSortedWalFiles(log_files);
if (!dbfull()->GetSortedWalFiles(log_files).ok()) {
return size_t{0};
}
return log_files.size();
};
@ -1537,11 +1541,11 @@ TEST_F(DBWALTest, RecoverWithoutFlushMultipleCF) {
CreateAndReopenWithCF({"one", "two"}, options);
ASSERT_OK(Put(0, "key1", kSmallValue));
ASSERT_OK(Put(1, "key2", kLargeValue));
Flush(1);
ASSERT_OK(Flush(1));
ASSERT_EQ(1, countWalFiles());
ASSERT_OK(Put(0, "key3", kSmallValue));
ASSERT_OK(Put(2, "key4", kLargeValue));
Flush(2);
ASSERT_OK(Flush(2));
ASSERT_EQ(2, countWalFiles());
// Reopen, insert and flush.
@ -1555,9 +1559,9 @@ TEST_F(DBWALTest, RecoverWithoutFlushMultipleCF) {
ASSERT_OK(Put(0, "key5", kLargeValue));
ASSERT_OK(Put(1, "key6", kLargeValue));
ASSERT_EQ(3, countWalFiles());
Flush(1);
ASSERT_OK(Flush(1));
ASSERT_OK(Put(2, "key7", kLargeValue));
dbfull()->FlushWAL(false);
ASSERT_OK(dbfull()->FlushWAL(false));
ASSERT_EQ(4, countWalFiles());
// Reopen twice and validate.
@ -1766,9 +1770,9 @@ TEST_F(DBWALTest, WalTermTest) {
wo.disableWAL = false;
WriteBatch batch;
batch.Put("foo", "bar");
ASSERT_OK(batch.Put("foo", "bar"));
batch.MarkWalTerminationPoint();
batch.Put("foo2", "bar2");
ASSERT_OK(batch.Put("foo2", "bar2"));
ASSERT_OK(dbfull()->Write(wo, &batch));

@ -320,7 +320,7 @@ TEST_P(DBWriteTest, ManualWalFlushInEffect) {
ASSERT_TRUE(dbfull()->FlushWAL(false).ok());
ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty());
// try the 2nd wal created during SwitchWAL
dbfull()->TEST_SwitchWAL();
ASSERT_OK(dbfull()->TEST_SwitchWAL());
ASSERT_TRUE(Put("key" + ToString(0), "value").ok());
ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
ASSERT_TRUE(dbfull()->FlushWAL(false).ok());
@ -395,7 +395,7 @@ TEST_P(DBWriteTest, LockWalInEffect) {
ASSERT_TRUE(dbfull()->TEST_WALBufferIsEmpty(false));
ASSERT_OK(dbfull()->UnlockWAL());
// try the 2nd wal created during SwitchWAL
dbfull()->TEST_SwitchWAL();
ASSERT_OK(dbfull()->TEST_SwitchWAL());
ASSERT_OK(Put("key" + ToString(0), "value"));
ASSERT_TRUE(options.manual_wal_flush != dbfull()->TEST_WALBufferIsEmpty());
ASSERT_OK(dbfull()->LockWAL());

@ -29,8 +29,8 @@ class ExternalSSTFileBasicTest
}
void DestroyAndRecreateExternalSSTFilesDir() {
DestroyDir(env_, sst_files_dir_);
env_->CreateDir(sst_files_dir_);
ASSERT_OK(DestroyDir(env_, sst_files_dir_));
ASSERT_OK(env_->CreateDir(sst_files_dir_));
}
Status DeprecatedAddFile(const std::vector<std::string>& files,
@ -162,7 +162,9 @@ class ExternalSSTFileBasicTest
write_global_seqno, verify_checksums_before_ingest, true_data);
}
~ExternalSSTFileBasicTest() override { DestroyDir(env_, sst_files_dir_); }
~ExternalSSTFileBasicTest() override {
DestroyDir(env_, sst_files_dir_).PermitUncheckedError();
}
protected:
std::string sst_files_dir_;
@ -186,7 +188,7 @@ TEST_F(ExternalSSTFileBasicTest, Basic) {
}
ExternalSstFileInfo file1_info;
Status s = sst_file_writer.Finish(&file1_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(s) << s.ToString();
// Current file size should be non-zero after success write.
ASSERT_GT(sst_file_writer.FileSize(), 0);
@ -202,14 +204,14 @@ TEST_F(ExternalSSTFileBasicTest, Basic) {
ASSERT_EQ(file1_info.file_checksum_func_name, kUnknownFileChecksumFuncName);
// sst_file_writer already finished, cannot add this value
s = sst_file_writer.Put(Key(100), "bad_val");
ASSERT_FALSE(s.ok()) << s.ToString();
ASSERT_NOK(s) << s.ToString();
s = sst_file_writer.DeleteRange(Key(100), Key(200));
ASSERT_FALSE(s.ok()) << s.ToString();
ASSERT_NOK(s) << s.ToString();
DestroyAndReopen(options);
// Add file using file path
s = DeprecatedAddFile({file1});
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(s) << s.ToString();
ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U);
for (int k = 0; k < 100; k++) {
ASSERT_EQ(Get(Key(k)), Key(k) + "_val");
@ -286,7 +288,7 @@ TEST_F(ExternalSSTFileBasicTest, BasicWithFileChecksumCrc32c) {
}
ExternalSstFileInfo file1_info;
Status s = sst_file_writer.Finish(&file1_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(s) << s.ToString();
std::string file_checksum, file_checksum_func_name;
ASSERT_OK(checksum_helper.GetSingleFileChecksumAndFuncName(
file1, &file_checksum, &file_checksum_func_name));
@ -305,14 +307,14 @@ TEST_F(ExternalSSTFileBasicTest, BasicWithFileChecksumCrc32c) {
ASSERT_EQ(file1_info.file_checksum_func_name, file_checksum_func_name);
// sst_file_writer already finished, cannot add this value
s = sst_file_writer.Put(Key(100), "bad_val");
ASSERT_FALSE(s.ok()) << s.ToString();
ASSERT_NOK(s) << s.ToString();
s = sst_file_writer.DeleteRange(Key(100), Key(200));
ASSERT_FALSE(s.ok()) << s.ToString();
ASSERT_NOK(s) << s.ToString();
DestroyAndReopen(options);
// Add file using file path
s = DeprecatedAddFile({file1});
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(s) << s.ToString();
ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U);
for (int k = 0; k < 100; k++) {
ASSERT_EQ(Get(Key(k)), Key(k) + "_val");
@ -338,7 +340,7 @@ TEST_F(ExternalSSTFileBasicTest, IngestFileWithFileChecksum) {
}
ExternalSstFileInfo file1_info;
Status s = sst_file_writer.Finish(&file1_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(s) << s.ToString();
ASSERT_EQ(file1_info.file_path, file1);
ASSERT_EQ(file1_info.num_entries, 100);
ASSERT_EQ(file1_info.smallest_key, Key(1000));
@ -357,7 +359,7 @@ TEST_F(ExternalSSTFileBasicTest, IngestFileWithFileChecksum) {
}
ExternalSstFileInfo file2_info;
s = sst_file_writer.Finish(&file2_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(s) << s.ToString();
ASSERT_EQ(file2_info.file_path, file2);
ASSERT_EQ(file2_info.num_entries, 200);
ASSERT_EQ(file2_info.smallest_key, Key(1100));
@ -376,7 +378,7 @@ TEST_F(ExternalSSTFileBasicTest, IngestFileWithFileChecksum) {
}
ExternalSstFileInfo file3_info;
s = sst_file_writer.Finish(&file3_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(s) << s.ToString();
ASSERT_EQ(file3_info.file_path, file3);
ASSERT_EQ(file3_info.num_entries, 200);
ASSERT_EQ(file3_info.smallest_key, Key(1300));
@ -395,7 +397,7 @@ TEST_F(ExternalSSTFileBasicTest, IngestFileWithFileChecksum) {
}
ExternalSstFileInfo file4_info;
s = sst_file_writer.Finish(&file4_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(s) << s.ToString();
ASSERT_EQ(file4_info.file_path, file4);
ASSERT_EQ(file4_info.num_entries, 300);
ASSERT_EQ(file4_info.smallest_key, Key(1500));
@ -414,7 +416,7 @@ TEST_F(ExternalSSTFileBasicTest, IngestFileWithFileChecksum) {
}
ExternalSstFileInfo file5_info;
s = sst_file_writer.Finish(&file5_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(s) << s.ToString();
ASSERT_EQ(file5_info.file_path, file5);
ASSERT_EQ(file5_info.num_entries, 200);
ASSERT_EQ(file5_info.smallest_key, Key(1800));
@ -433,7 +435,7 @@ TEST_F(ExternalSSTFileBasicTest, IngestFileWithFileChecksum) {
}
ExternalSstFileInfo file6_info;
s = sst_file_writer.Finish(&file6_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(s) << s.ToString();
ASSERT_EQ(file6_info.file_path, file6);
ASSERT_EQ(file6_info.num_entries, 200);
ASSERT_EQ(file6_info.smallest_key, Key(2000));
@ -447,7 +449,7 @@ TEST_F(ExternalSSTFileBasicTest, IngestFileWithFileChecksum) {
s = AddFileWithFileChecksum({file1}, {file_checksum1, "xyz"},
{file_checksum1}, true, false, false, false);
// does not care the checksum input since db does not enable file checksum
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(s) << s.ToString();
ASSERT_OK(env_->FileExists(file1));
std::vector<LiveFileMetaData> live_files;
dbfull()->GetLiveFilesMetaData(&live_files);
@ -465,26 +467,26 @@ TEST_F(ExternalSSTFileBasicTest, IngestFileWithFileChecksum) {
s = AddFileWithFileChecksum({file2}, {file_checksum2, "xyz"},
{file_checksum_func_name2}, true, false, false,
false);
ASSERT_FALSE(s.ok()) << s.ToString();
ASSERT_NOK(s) << s.ToString();
// Enable verify_file_checksum option
// The checksum name does not match, fail the ingestion
s = AddFileWithFileChecksum({file2}, {file_checksum2}, {"xyz"}, true, false,
false, false);
ASSERT_FALSE(s.ok()) << s.ToString();
ASSERT_NOK(s) << s.ToString();
// Enable verify_file_checksum option
// The checksum itself does not match, fail the ingestion
s = AddFileWithFileChecksum({file2}, {"xyz"}, {file_checksum_func_name2},
true, false, false, false);
ASSERT_FALSE(s.ok()) << s.ToString();
ASSERT_NOK(s) << s.ToString();
// Enable verify_file_checksum option
// All matches, ingestion is successful
s = AddFileWithFileChecksum({file2}, {file_checksum2},
{file_checksum_func_name2}, true, false, false,
false);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(s) << s.ToString();
std::vector<LiveFileMetaData> live_files1;
dbfull()->GetLiveFilesMetaData(&live_files1);
for (auto f : live_files1) {
@ -501,7 +503,7 @@ TEST_F(ExternalSSTFileBasicTest, IngestFileWithFileChecksum) {
std::vector<std::string> checksum, checksum_func;
s = AddFileWithFileChecksum({file3}, checksum, checksum_func, true, false,
false, false);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(s) << s.ToString();
std::vector<LiveFileMetaData> live_files2;
dbfull()->GetLiveFilesMetaData(&live_files2);
for (auto f : live_files2) {
@ -511,20 +513,20 @@ TEST_F(ExternalSSTFileBasicTest, IngestFileWithFileChecksum) {
set1.insert(f.name);
}
}
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(s) << s.ToString();
ASSERT_OK(env_->FileExists(file3));
// Does not enable verify_file_checksum options
// The checksum name does not match, fail the ingestion
s = AddFileWithFileChecksum({file4}, {file_checksum4}, {"xyz"}, false, false,
false, false);
ASSERT_FALSE(s.ok()) << s.ToString();
ASSERT_NOK(s) << s.ToString();
// Does not enable verify_file_checksum options
// Checksum function name matches, store the checksum being ingested.
s = AddFileWithFileChecksum({file4}, {"asd"}, {file_checksum_func_name4},
false, false, false, false);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(s) << s.ToString();
std::vector<LiveFileMetaData> live_files3;
dbfull()->GetLiveFilesMetaData(&live_files3);
for (auto f : live_files3) {
@ -535,7 +537,7 @@ TEST_F(ExternalSSTFileBasicTest, IngestFileWithFileChecksum) {
set1.insert(f.name);
}
}
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(s) << s.ToString();
ASSERT_OK(env_->FileExists(file4));
// enable verify_file_checksum options, DB enable checksum, and enable
@ -544,8 +546,7 @@ TEST_F(ExternalSSTFileBasicTest, IngestFileWithFileChecksum) {
s = AddFileWithFileChecksum({file5}, {file_checksum5},
{file_checksum_func_name5}, true, false, false,
true);
ASSERT_OK(s);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(s) << s.ToString();
std::vector<LiveFileMetaData> live_files4;
dbfull()->GetLiveFilesMetaData(&live_files4);
for (auto f : live_files4) {
@ -558,7 +559,7 @@ TEST_F(ExternalSSTFileBasicTest, IngestFileWithFileChecksum) {
set1.insert(f.name);
}
}
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(s) << s.ToString();
ASSERT_OK(env_->FileExists(file5));
// Does not enable verify_file_checksum options and also the ingested file
@ -567,7 +568,7 @@ TEST_F(ExternalSSTFileBasicTest, IngestFileWithFileChecksum) {
std::vector<std::string> files_c6, files_name6;
s = AddFileWithFileChecksum({file6}, files_c6, files_name6, false, false,
false, false);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(s) << s.ToString();
std::vector<LiveFileMetaData> live_files6;
dbfull()->GetLiveFilesMetaData(&live_files6);
for (auto f : live_files6) {
@ -577,7 +578,7 @@ TEST_F(ExternalSSTFileBasicTest, IngestFileWithFileChecksum) {
set1.insert(f.name);
}
}
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(s) << s.ToString();
ASSERT_OK(env_->FileExists(file6));
}
@ -595,7 +596,7 @@ TEST_F(ExternalSSTFileBasicTest, NoCopy) {
}
ExternalSstFileInfo file1_info;
Status s = sst_file_writer.Finish(&file1_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(s) << s.ToString();
ASSERT_EQ(file1_info.file_path, file1);
ASSERT_EQ(file1_info.num_entries, 100);
ASSERT_EQ(file1_info.smallest_key, Key(0));
@ -609,7 +610,7 @@ TEST_F(ExternalSSTFileBasicTest, NoCopy) {
}
ExternalSstFileInfo file2_info;
s = sst_file_writer.Finish(&file2_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(s) << s.ToString();
ASSERT_EQ(file2_info.file_path, file2);
ASSERT_EQ(file2_info.num_entries, 200);
ASSERT_EQ(file2_info.smallest_key, Key(100));
@ -623,23 +624,23 @@ TEST_F(ExternalSSTFileBasicTest, NoCopy) {
}
ExternalSstFileInfo file3_info;
s = sst_file_writer.Finish(&file3_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(s) << s.ToString();
ASSERT_EQ(file3_info.file_path, file3);
ASSERT_EQ(file3_info.num_entries, 15);
ASSERT_EQ(file3_info.smallest_key, Key(110));
ASSERT_EQ(file3_info.largest_key, Key(124));
s = DeprecatedAddFile({file1}, true /* move file */);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(s) << s.ToString();
ASSERT_EQ(Status::NotFound(), env_->FileExists(file1));
s = DeprecatedAddFile({file2}, false /* copy file */);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(s) << s.ToString();
ASSERT_OK(env_->FileExists(file2));
// This file has overlapping values with the existing data
s = DeprecatedAddFile({file3}, true /* move file */);
ASSERT_FALSE(s.ok()) << s.ToString();
ASSERT_NOK(s) << s.ToString();
ASSERT_OK(env_->FileExists(file3));
for (int k = 0; k < 300; k++) {
@ -1126,7 +1127,7 @@ TEST_F(ExternalSSTFileBasicTest, SyncFailure) {
if (i == 2) {
ingest_opt.write_global_seqno = true;
}
ASSERT_FALSE(db_->IngestExternalFile({file_name}, ingest_opt).ok());
ASSERT_NOK(db_->IngestExternalFile({file_name}, ingest_opt));
db_->ReleaseSnapshot(snapshot);
SyncPoint::GetInstance()->DisableProcessing();
@ -1326,7 +1327,7 @@ TEST_F(ExternalSSTFileBasicTest, AdjacentRangeDeletionTombstones) {
ASSERT_OK(sst_file_writer.DeleteRange(Key(300), Key(400)));
ExternalSstFileInfo file8_info;
Status s = sst_file_writer.Finish(&file8_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(s) << s.ToString();
ASSERT_EQ(file8_info.file_path, file8);
ASSERT_EQ(file8_info.num_entries, 0);
ASSERT_EQ(file8_info.smallest_key, "");
@ -1341,7 +1342,7 @@ TEST_F(ExternalSSTFileBasicTest, AdjacentRangeDeletionTombstones) {
ASSERT_OK(sst_file_writer.DeleteRange(Key(400), Key(500)));
ExternalSstFileInfo file9_info;
s = sst_file_writer.Finish(&file9_info);
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(s) << s.ToString();
ASSERT_EQ(file9_info.file_path, file9);
ASSERT_EQ(file9_info.num_entries, 0);
ASSERT_EQ(file9_info.smallest_key, "");
@ -1353,7 +1354,7 @@ TEST_F(ExternalSSTFileBasicTest, AdjacentRangeDeletionTombstones) {
// Range deletion tombstones are exclusive on their end key, so these SSTs
// should not be considered as overlapping.
s = DeprecatedAddFile({file8, file9});
ASSERT_TRUE(s.ok()) << s.ToString();
ASSERT_OK(s) << s.ToString();
ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U);
DestroyAndRecreateExternalSSTFilesDir();
}

@ -335,6 +335,12 @@ Status ExternalSstFileIngestionJob::Run() {
// with the files we are ingesting
bool need_flush = false;
status = NeedsFlush(&need_flush, super_version);
if (!status.ok()) {
return status;
}
if (need_flush) {
return Status::TryAgain();
}
assert(status.ok() && need_flush == false);
#endif

@ -208,6 +208,7 @@ uint8_t WriteThread::AwaitState(Writer* w, uint8_t goal_mask,
}
void WriteThread::SetState(Writer* w, uint8_t new_state) {
assert(w);
auto state = w->state.load(std::memory_order_acquire);
if (state == STATE_LOCKED_WAITING ||
!w->state.compare_exchange_strong(state, new_state)) {

@ -546,5 +546,20 @@ Status TruncateFile(Env* env, const std::string& fname, uint64_t new_length) {
return s;
}
// Try and delete a directory if it exists
Status TryDeleteDir(Env* env, const std::string& dirname) {
bool is_dir = false;
Status s = env->IsDirectory(dirname, &is_dir);
if (s.ok() && is_dir) {
s = env->DeleteDir(dirname);
}
return s;
}
// Delete a directory if it exists
void DeleteDir(Env* env, const std::string& dirname) {
TryDeleteDir(env, dirname).PermitUncheckedError();
}
} // namespace test
} // namespace ROCKSDB_NAMESPACE

@ -807,5 +807,11 @@ Status CorruptFile(Env* env, const std::string& fname, int offset,
int bytes_to_corrupt, bool verify_checksum = true);
Status TruncateFile(Env* env, const std::string& fname, uint64_t length);
// Try and delete a directory if it exists
Status TryDeleteDir(Env* env, const std::string& dirname);
// Delete a directory if it exists
void DeleteDir(Env* env, const std::string& dirname);
} // namespace test
} // namespace ROCKSDB_NAMESPACE

@ -51,12 +51,14 @@ void CheckpointImpl::CleanStagingDirectory(
}
ROCKS_LOG_INFO(info_log, "File exists %s -- %s",
full_private_path.c_str(), s.ToString().c_str());
db_->GetEnv()->GetChildren(full_private_path, &subchildren);
for (auto& subchild : subchildren) {
std::string subchild_path = full_private_path + "/" + subchild;
s = db_->GetEnv()->DeleteFile(subchild_path);
ROCKS_LOG_INFO(info_log, "Delete file %s -- %s",
subchild_path.c_str(), s.ToString().c_str());
s = db_->GetEnv()->GetChildren(full_private_path, &subchildren);
if (s.ok()) {
for (auto& subchild : subchildren) {
std::string subchild_path = full_private_path + "/" + subchild;
s = db_->GetEnv()->DeleteFile(subchild_path);
ROCKS_LOG_INFO(info_log, "Delete file %s -- %s", subchild_path.c_str(),
s.ToString().c_str());
}
}
// finally delete the private dir
s = db_->GetEnv()->DeleteDir(full_private_path);
@ -109,33 +111,44 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
s = db_->GetEnv()->CreateDir(full_private_path);
uint64_t sequence_number = 0;
if (s.ok()) {
db_->DisableFileDeletions();
s = CreateCustomCheckpoint(
db_options,
[&](const std::string& src_dirname, const std::string& fname,
FileType) {
ROCKS_LOG_INFO(db_options.info_log, "Hard Linking %s", fname.c_str());
return db_->GetFileSystem()->LinkFile(src_dirname + fname,
full_private_path + fname,
IOOptions(), nullptr);
} /* link_file_cb */,
[&](const std::string& src_dirname, const std::string& fname,
uint64_t size_limit_bytes, FileType,
const std::string& /* checksum_func_name */,
const std::string& /* checksum_val */) {
ROCKS_LOG_INFO(db_options.info_log, "Copying %s", fname.c_str());
return CopyFile(db_->GetFileSystem(), src_dirname + fname,
full_private_path + fname, size_limit_bytes,
db_options.use_fsync);
} /* copy_file_cb */,
[&](const std::string& fname, const std::string& contents, FileType) {
ROCKS_LOG_INFO(db_options.info_log, "Creating %s", fname.c_str());
return CreateFile(db_->GetFileSystem(), full_private_path + fname,
contents, db_options.use_fsync);
} /* create_file_cb */,
&sequence_number, log_size_for_flush);
// we copied all the files, enable file deletions
db_->EnableFileDeletions(false);
// enable file deletions
s = db_->DisableFileDeletions();
const bool disabled_file_deletions = s.ok();
if (s.ok() || s.IsNotSupported()) {
s = CreateCustomCheckpoint(
db_options,
[&](const std::string& src_dirname, const std::string& fname,
FileType) {
ROCKS_LOG_INFO(db_options.info_log, "Hard Linking %s",
fname.c_str());
return db_->GetFileSystem()->LinkFile(src_dirname + fname,
full_private_path + fname,
IOOptions(), nullptr);
} /* link_file_cb */,
[&](const std::string& src_dirname, const std::string& fname,
uint64_t size_limit_bytes, FileType,
const std::string& /* checksum_func_name */,
const std::string& /* checksum_val */) {
ROCKS_LOG_INFO(db_options.info_log, "Copying %s", fname.c_str());
return CopyFile(db_->GetFileSystem(), src_dirname + fname,
full_private_path + fname, size_limit_bytes,
db_options.use_fsync);
} /* copy_file_cb */,
[&](const std::string& fname, const std::string& contents, FileType) {
ROCKS_LOG_INFO(db_options.info_log, "Creating %s", fname.c_str());
return CreateFile(db_->GetFileSystem(), full_private_path + fname,
contents, db_options.use_fsync);
} /* create_file_cb */,
&sequence_number, log_size_for_flush);
// we copied all the files, enable file deletions
if (disabled_file_deletions) {
Status ss = db_->EnableFileDeletions(false);
assert(ss.ok());
ss.PermitUncheckedError();
}
}
}
if (s.ok()) {
@ -144,8 +157,8 @@ Status CheckpointImpl::CreateCheckpoint(const std::string& checkpoint_dir,
}
if (s.ok()) {
std::unique_ptr<Directory> checkpoint_directory;
db_->GetEnv()->NewDirectory(checkpoint_dir, &checkpoint_directory);
if (checkpoint_directory != nullptr) {
s = db_->GetEnv()->NewDirectory(checkpoint_dir, &checkpoint_directory);
if (s.ok() && checkpoint_directory != nullptr) {
s = checkpoint_directory->Fsync();
}
}
@ -191,68 +204,71 @@ Status CheckpointImpl::CreateCustomCheckpoint(
VectorLogPtr live_wal_files;
bool flush_memtable = true;
if (s.ok()) {
if (!db_options.allow_2pc) {
if (log_size_for_flush == port::kMaxUint64) {
flush_memtable = false;
} else if (log_size_for_flush > 0) {
// If out standing log files are small, we skip the flush.
s = db_->GetSortedWalFiles(live_wal_files);
if (!db_options.allow_2pc) {
if (log_size_for_flush == port::kMaxUint64) {
flush_memtable = false;
} else if (log_size_for_flush > 0) {
// If out standing log files are small, we skip the flush.
s = db_->GetSortedWalFiles(live_wal_files);
if (!s.ok()) {
return s;
}
if (!s.ok()) {
return s;
}
// Don't flush column families if total log size is smaller than
// log_size_for_flush. We copy the log files instead.
// We may be able to cover 2PC case too.
uint64_t total_wal_size = 0;
for (auto& wal : live_wal_files) {
total_wal_size += wal->SizeFileBytes();
}
if (total_wal_size < log_size_for_flush) {
flush_memtable = false;
}
live_wal_files.clear();
// Don't flush column families if total log size is smaller than
// log_size_for_flush. We copy the log files instead.
// We may be able to cover 2PC case too.
uint64_t total_wal_size = 0;
for (auto& wal : live_wal_files) {
total_wal_size += wal->SizeFileBytes();
}
if (total_wal_size < log_size_for_flush) {
flush_memtable = false;
}
live_wal_files.clear();
}
}
// this will return live_files prefixed with "/"
s = db_->GetLiveFiles(live_files, &manifest_file_size, flush_memtable);
// this will return live_files prefixed with "/"
s = db_->GetLiveFiles(live_files, &manifest_file_size, flush_memtable);
if (s.ok() && db_options.allow_2pc) {
// If 2PC is enabled, we need to get minimum log number after the flush.
// Need to refetch the live files to recapture the snapshot.
if (!db_->GetIntProperty(DB::Properties::kMinLogNumberToKeep,
&min_log_num)) {
return Status::InvalidArgument(
"2PC enabled but cannot fine the min log number to keep.");
}
// We need to refetch live files with flush to handle this case:
// A previous 000001.log contains the prepare record of transaction tnx1.
// The current log file is 000002.log, and sequence_number points to this
// file.
// After calling GetLiveFiles(), 000003.log is created.
// Then tnx1 is committed. The commit record is written to 000003.log.
// Now we fetch min_log_num, which will be 3.
// Then only 000002.log and 000003.log will be copied, and 000001.log will
// be skipped. 000003.log contains commit message of tnx1, but we don't
// have respective prepare record for it.
// In order to avoid this situation, we need to force flush to make sure
// all transactions committed before getting min_log_num will be flushed
// to SST files.
// We cannot get min_log_num before calling the GetLiveFiles() for the
// first time, because if we do that, all the logs files will be included,
// far more than needed.
s = db_->GetLiveFiles(live_files, &manifest_file_size, flush_memtable);
if (s.ok() && db_options.allow_2pc) {
// If 2PC is enabled, we need to get minimum log number after the flush.
// Need to refetch the live files to recapture the snapshot.
if (!db_->GetIntProperty(DB::Properties::kMinLogNumberToKeep,
&min_log_num)) {
return Status::InvalidArgument(
"2PC enabled but cannot fine the min log number to keep.");
}
// We need to refetch live files with flush to handle this case:
// A previous 000001.log contains the prepare record of transaction tnx1.
// The current log file is 000002.log, and sequence_number points to this
// file.
// After calling GetLiveFiles(), 000003.log is created.
// Then tnx1 is committed. The commit record is written to 000003.log.
// Now we fetch min_log_num, which will be 3.
// Then only 000002.log and 000003.log will be copied, and 000001.log will
// be skipped. 000003.log contains commit message of tnx1, but we don't
// have respective prepare record for it.
// In order to avoid this situation, we need to force flush to make sure
// all transactions committed before getting min_log_num will be flushed
// to SST files.
// We cannot get min_log_num before calling the GetLiveFiles() for the
// first time, because if we do that, all the logs files will be included,
// far more than needed.
s = db_->GetLiveFiles(live_files, &manifest_file_size, flush_memtable);
}
TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles1");
TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles2");
TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles1");
TEST_SYNC_POINT("CheckpointImpl::CreateCheckpoint:SavedLiveFiles2");
db_->FlushWAL(false /* sync */);
if (s.ok()) {
s = db_->FlushWAL(false /* sync */);
}
TEST_SYNC_POINT("CheckpointImpl::CreateCustomCheckpoint:AfterGetLive1");
TEST_SYNC_POINT("CheckpointImpl::CreateCustomCheckpoint:AfterGetLive2");
// if we have more than one column family, we need to also get WAL files
if (s.ok()) {
s = db_->GetSortedWalFiles(live_wal_files);
@ -358,8 +374,8 @@ Status CheckpointImpl::CreateCustomCheckpoint(
}
}
if (s.ok() && !current_fname.empty() && !manifest_fname.empty()) {
create_file_cb(current_fname, manifest_fname.substr(1) + "\n",
kCurrentFile);
s = create_file_cb(current_fname, manifest_fname.substr(1) + "\n",
kCurrentFile);
}
ROCKS_LOG_INFO(db_options.info_log, "Number of log files %" ROCKSDB_PRIszt,
live_wal_files.size());

@ -66,12 +66,12 @@ class CheckpointTest : public testing::Test {
snapshot_name_ = test::PerThreadDBPath(env_, "snapshot");
std::string snapshot_tmp_name = snapshot_name_ + ".tmp";
EXPECT_OK(DestroyDB(snapshot_name_, options));
env_->DeleteDir(snapshot_name_);
test::DeleteDir(env_, snapshot_name_);
EXPECT_OK(DestroyDB(snapshot_tmp_name, options));
env_->DeleteDir(snapshot_tmp_name);
test::DeleteDir(env_, snapshot_tmp_name);
Reopen(options);
export_path_ = test::PerThreadDBPath("/export");
DestroyDir(env_, export_path_);
DestroyDir(env_, export_path_).PermitUncheckedError();
cfh_reverse_comp_ = nullptr;
metadata_ = nullptr;
}
@ -96,7 +96,7 @@ class CheckpointTest : public testing::Test {
options.db_paths.emplace_back(dbname_ + "_4", 0);
EXPECT_OK(DestroyDB(dbname_, options));
EXPECT_OK(DestroyDB(snapshot_name_, options));
DestroyDir(env_, export_path_);
DestroyDir(env_, export_path_).PermitUncheckedError();
}
// Return the current option configuration.
@ -274,7 +274,6 @@ TEST_F(CheckpointTest, GetSnapshotLink) {
ASSERT_OK(DestroyDB(dbname_, options));
// Create a database
Status s;
options.create_if_missing = true;
ASSERT_OK(DB::Open(options, dbname_, &db_));
std::string key = std::string("foo");
@ -316,7 +315,6 @@ TEST_F(CheckpointTest, GetSnapshotLink) {
TEST_F(CheckpointTest, ExportColumnFamilyWithLinks) {
// Create a database
Status s;
auto options = CurrentOptions();
options.create_if_missing = true;
CreateAndReopenWithCF({}, options);
@ -326,7 +324,7 @@ TEST_F(CheckpointTest, ExportColumnFamilyWithLinks) {
int num_files_expected) {
ASSERT_EQ(metadata.files.size(), num_files_expected);
std::vector<std::string> subchildren;
env_->GetChildren(export_path_, &subchildren);
ASSERT_OK(env_->GetChildren(export_path_, &subchildren));
int num_children = 0;
for (const auto& child : subchildren) {
if (child != "." && child != "..") {
@ -349,7 +347,7 @@ TEST_F(CheckpointTest, ExportColumnFamilyWithLinks) {
export_path_, &metadata_));
verify_files_exported(*metadata_, 1);
ASSERT_EQ(metadata_->db_comparator_name, options.comparator->Name());
DestroyDir(env_, export_path_);
ASSERT_OK(DestroyDir(env_, export_path_));
delete metadata_;
metadata_ = nullptr;
@ -360,7 +358,7 @@ TEST_F(CheckpointTest, ExportColumnFamilyWithLinks) {
export_path_, &metadata_));
verify_files_exported(*metadata_, 2);
ASSERT_EQ(metadata_->db_comparator_name, options.comparator->Name());
DestroyDir(env_, export_path_);
ASSERT_OK(DestroyDir(env_, export_path_));
delete metadata_;
metadata_ = nullptr;
delete checkpoint;
@ -390,7 +388,6 @@ TEST_F(CheckpointTest, ExportColumnFamilyWithLinks) {
TEST_F(CheckpointTest, ExportColumnFamilyNegativeTest) {
// Create a database
Status s;
auto options = CurrentOptions();
options.create_if_missing = true;
CreateAndReopenWithCF({}, options);
@ -402,11 +399,11 @@ TEST_F(CheckpointTest, ExportColumnFamilyNegativeTest) {
ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
// Export onto existing directory
env_->CreateDirIfMissing(export_path_);
ASSERT_OK(env_->CreateDirIfMissing(export_path_));
ASSERT_EQ(checkpoint->ExportColumnFamily(db_->DefaultColumnFamily(),
export_path_, &metadata_),
Status::InvalidArgument("Specified export_dir exists"));
DestroyDir(env_, export_path_);
ASSERT_OK(DestroyDir(env_, export_path_));
// Export with invalid directory specification
export_path_ = "";
@ -437,7 +434,6 @@ TEST_F(CheckpointTest, CheckpointCF) {
std::string result;
std::vector<ColumnFamilyHandle*> cphandles;
Status s;
// Take a snapshot
ROCKSDB_NAMESPACE::port::Thread t([&]() {
Checkpoint* checkpoint;
@ -465,7 +461,7 @@ TEST_F(CheckpointTest, CheckpointCF) {
// Open snapshot and verify contents while DB is running
options.create_if_missing = false;
std::vector<std::string> cfs;
cfs= {kDefaultColumnFamilyName, "one", "two", "three", "four", "five"};
cfs = {kDefaultColumnFamilyName, "one", "two", "three", "four", "five"};
std::vector<ColumnFamilyDescriptor> column_families;
for (size_t i = 0; i < cfs.size(); ++i) {
column_families.push_back(ColumnFamilyDescriptor(cfs[i], options));
@ -493,7 +489,7 @@ TEST_F(CheckpointTest, CheckpointCFNoFlush) {
ASSERT_OK(Put(0, "Default", "Default"));
ASSERT_OK(Put(1, "one", "one"));
Flush();
ASSERT_OK(Flush());
ASSERT_OK(Put(2, "two", "two"));
DB* snapshotDB;
@ -501,7 +497,6 @@ TEST_F(CheckpointTest, CheckpointCFNoFlush) {
std::string result;
std::vector<ColumnFamilyHandle*> cphandles;
Status s;
// Take a snapshot
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCallFlush:start", [&](void* /*arg*/) {
@ -590,7 +585,7 @@ TEST_F(CheckpointTest, CurrentFileModifiedWhileCheckpointing2PC) {
Close();
const std::string dbname = test::PerThreadDBPath("transaction_testdb");
ASSERT_OK(DestroyDB(dbname, CurrentOptions()));
env_->DeleteDir(dbname);
test::DeleteDir(env_, dbname);
Options options = CurrentOptions();
options.allow_2pc = true;
@ -599,7 +594,7 @@ TEST_F(CheckpointTest, CurrentFileModifiedWhileCheckpointing2PC) {
TransactionDBOptions txn_db_options;
TransactionDB* txdb;
Status s = TransactionDB::Open(options, txn_db_options, dbname, &txdb);
assert(s.ok());
ASSERT_OK(s);
ColumnFamilyHandle* cfa;
ColumnFamilyHandle* cfb;
ColumnFamilyOptions cf_options;
@ -620,6 +615,7 @@ TEST_F(CheckpointTest, CurrentFileModifiedWhileCheckpointing2PC) {
ASSERT_EQ(txdb->GetTransactionByName("xid"), txn);
s = txn->Put(Slice("foo"), Slice("bar"));
ASSERT_OK(s);
s = txn->Put(cfa, Slice("foocfa"), Slice("barcfa"));
ASSERT_OK(s);
// Writing prepare into middle of first WAL, then flush WALs many times
@ -631,7 +627,7 @@ TEST_F(CheckpointTest, CurrentFileModifiedWhileCheckpointing2PC) {
ASSERT_OK(tx->Prepare());
ASSERT_OK(tx->Commit());
if (i % 10000 == 0) {
txdb->Flush(FlushOptions());
ASSERT_OK(txdb->Flush(FlushOptions()));
}
if (i == 88888) {
ASSERT_OK(txn->Prepare());
@ -662,7 +658,7 @@ TEST_F(CheckpointTest, CurrentFileModifiedWhileCheckpointing2PC) {
// No more than two logs files should exist.
std::vector<std::string> files;
env_->GetChildren(snapshot_name_, &files);
ASSERT_OK(env_->GetChildren(snapshot_name_, &files));
int num_log_files = 0;
for (auto& file : files) {
uint64_t num;
@ -733,7 +729,7 @@ TEST_F(CheckpointTest, CheckpointWithUnsyncedDataDropped) {
ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_));
delete checkpoint;
env->DropUnsyncedFileData();
ASSERT_OK(env->DropUnsyncedFileData());
// make sure it's openable even though whatever data that wasn't synced got
// dropped.

@ -217,7 +217,7 @@ TestRandomRWFile::TestRandomRWFile(const std::string& /*fname*/,
TestRandomRWFile::~TestRandomRWFile() {
if (file_opened_) {
Close();
Close().PermitUncheckedError();
}
}

@ -432,7 +432,14 @@ bool PointLockManager::IncrementWaiters(
extracted_info.m_waiting_key});
head = queue_parents[head];
}
env->GetCurrentTime(&deadlock_time);
if (!env->GetCurrentTime(&deadlock_time).ok()) {
/*
TODO(AR) this preserves the current behaviour whilst checking the
status of env->GetCurrentTime to ensure that ASSERT_STATUS_CHECKED
passes. Should we instead raise an error if !ok() ?
*/
deadlock_time = 0;
}
std::reverse(path.begin(), path.end());
dlock_buffer_.AddNewPath(DeadlockPath(path, deadlock_time));
deadlock_time = 0;
@ -448,7 +455,14 @@ bool PointLockManager::IncrementWaiters(
}
// Wait cycle too big, just assume deadlock.
env->GetCurrentTime(&deadlock_time);
if (!env->GetCurrentTime(&deadlock_time).ok()) {
/*
TODO(AR) this preserves the current behaviour whilst checking the status
of env->GetCurrentTime to ensure that ASSERT_STATUS_CHECKED passes.
Should we instead raise an error if !ok() ?
*/
deadlock_time = 0;
}
dlock_buffer_.AddNewPath(DeadlockPath(deadlock_time, true));
DecrementWaitersImpl(txn, wait_ids);
return true;

@ -177,8 +177,6 @@ Status OptimisticTransaction::TryLock(ColumnFamilyHandle* column_family,
// Should only be called on writer thread in order to avoid any race conditions
// in detecting write conflicts.
Status OptimisticTransaction::CheckTransactionForConflicts(DB* db) {
Status result;
auto db_impl = static_cast_with_check<DBImpl>(db);
// Since we are on the write thread and do not want to block other writers,

File diff suppressed because it is too large Load Diff

@ -102,7 +102,7 @@ TEST_P(TransactionTest, DoubleEmptyWrite) {
// Also test that it works during recovery
txn0 = db->BeginTransaction(write_options, txn_options);
ASSERT_OK(txn0->SetName("xid2"));
txn0->Put(Slice("foo0"), Slice("bar0a"));
ASSERT_OK(txn0->Put(Slice("foo0"), Slice("bar0a")));
ASSERT_OK(txn0->Prepare());
delete txn0;
reinterpret_cast<PessimisticTransactionDB*>(db)->TEST_Crash();
@ -1936,7 +1936,7 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest2) {
// request a flush for all column families such that the earliest
// alive log file can be killed
db_impl->TEST_SwitchWAL();
ASSERT_OK(db_impl->TEST_SwitchWAL());
// log cannot be flushed because txn2 has not been commited
ASSERT_TRUE(!db_impl->TEST_IsLogGettingFlushed());
ASSERT_TRUE(db_impl->TEST_UnableToReleaseOldestLog());
@ -1962,7 +1962,7 @@ TEST_P(TransactionTest, TwoPhaseLogRollingTest2) {
s = txn2->Commit();
ASSERT_OK(s);
db_impl->TEST_SwitchWAL();
ASSERT_OK(db_impl->TEST_SwitchWAL());
ASSERT_TRUE(!db_impl->TEST_UnableToReleaseOldestLog());
// we should see that cfb now has a flush requested

@ -68,7 +68,7 @@ class TransactionTestBase : public ::testing::Test {
options.two_write_queues = two_write_queue;
dbname = test::PerThreadDBPath("transaction_testdb");
DestroyDB(dbname, options);
EXPECT_OK(DestroyDB(dbname, options));
txn_db_options.transaction_lock_timeout = 0;
txn_db_options.default_lock_timeout = 0;
txn_db_options.write_policy = write_policy;
@ -85,7 +85,7 @@ class TransactionTestBase : public ::testing::Test {
} else {
s = OpenWithStackableDB();
}
assert(s.ok());
EXPECT_OK(s);
}
~TransactionTestBase() {
@ -96,7 +96,7 @@ class TransactionTestBase : public ::testing::Test {
// unlink-ed files. By using the default fs we simply ignore errors resulted
// from attempting to delete such files in DestroyDB.
options.env = Env::Default();
DestroyDB(dbname, options);
EXPECT_OK(DestroyDB(dbname, options));
delete env;
}
@ -391,7 +391,7 @@ class TransactionTestBase : public ::testing::Test {
if (txn_db_options.write_policy == WRITE_COMMITTED) {
options.unordered_write = false;
}
ReOpen();
ASSERT_OK(ReOpen());
for (int i = 0; i < 1024; i++) {
auto istr = std::to_string(index);
@ -410,9 +410,9 @@ class TransactionTestBase : public ::testing::Test {
case 1: {
WriteBatch wb;
committed_kvs[k] = v;
wb.Put(k, v);
ASSERT_OK(wb.Put(k, v));
committed_kvs[k] = v2;
wb.Put(k, v2);
ASSERT_OK(wb.Put(k, v2));
ASSERT_OK(db->Write(write_options, &wb));
} break;
@ -432,7 +432,7 @@ class TransactionTestBase : public ::testing::Test {
delete txn;
break;
default:
assert(0);
FAIL();
}
index++;
@ -445,9 +445,9 @@ class TransactionTestBase : public ::testing::Test {
auto db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
// Before upgrade/downgrade the WAL must be emptied
if (empty_wal) {
db_impl->TEST_FlushMemTable();
ASSERT_OK(db_impl->TEST_FlushMemTable());
} else {
db_impl->FlushWAL(true);
ASSERT_OK(db_impl->FlushWAL(true));
}
auto s = ReOpenNoDelete();
if (empty_wal) {
@ -461,7 +461,7 @@ class TransactionTestBase : public ::testing::Test {
db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
// Check that WAL is empty
VectorLogPtr log_files;
db_impl->GetSortedWalFiles(log_files);
ASSERT_OK(db_impl->GetSortedWalFiles(log_files));
ASSERT_EQ(0, log_files.size());
for (auto& kv : committed_kvs) {

@ -201,7 +201,7 @@ TEST(WriteBatchWithIndex, SubBatchCnt) {
Options options;
options.create_if_missing = true;
const std::string dbname = test::PerThreadDBPath("transaction_testdb");
DestroyDB(dbname, options);
EXPECT_OK(DestroyDB(dbname, options));
ASSERT_OK(DB::Open(options, dbname, &db));
ColumnFamilyHandle* cf_handle = nullptr;
ASSERT_OK(db->CreateColumnFamily(cf_options, cf_name, &cf_handle));
@ -215,18 +215,18 @@ TEST(WriteBatchWithIndex, SubBatchCnt) {
batch_cnt_at.push_back(batch_cnt);
batch.SetSavePoint();
save_points++;
batch.Put(Slice("key"), Slice("value"));
ASSERT_OK(batch.Put(Slice("key"), Slice("value")));
ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
batch_cnt_at.push_back(batch_cnt);
batch.SetSavePoint();
save_points++;
batch.Put(Slice("key2"), Slice("value2"));
ASSERT_OK(batch.Put(Slice("key2"), Slice("value2")));
ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
// duplicate the keys
batch_cnt_at.push_back(batch_cnt);
batch.SetSavePoint();
save_points++;
batch.Put(Slice("key"), Slice("value3"));
ASSERT_OK(batch.Put(Slice("key"), Slice("value3")));
batch_cnt++;
ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
// duplicate the 2nd key. It should not be counted duplicate since a
@ -234,14 +234,14 @@ TEST(WriteBatchWithIndex, SubBatchCnt) {
batch_cnt_at.push_back(batch_cnt);
batch.SetSavePoint();
save_points++;
batch.Put(Slice("key2"), Slice("value4"));
ASSERT_OK(batch.Put(Slice("key2"), Slice("value4")));
ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
// duplicate the keys but in a different cf. It should not be counted as
// duplicate keys
batch_cnt_at.push_back(batch_cnt);
batch.SetSavePoint();
save_points++;
batch.Put(cf_handle, Slice("key"), Slice("value5"));
ASSERT_OK(batch.Put(cf_handle, Slice("key"), Slice("value5")));
ASSERT_EQ(batch_cnt, batch.SubBatchCnt());
// Test that the number of sub-batches matches what we count with
@ -256,7 +256,7 @@ TEST(WriteBatchWithIndex, SubBatchCnt) {
// Test that RollbackToSavePoint will properly resets the number of
// sub-batches
for (size_t i = save_points; i > 0; i--) {
batch.RollbackToSavePoint();
ASSERT_OK(batch.RollbackToSavePoint());
ASSERT_EQ(batch_cnt_at[i - 1], batch.SubBatchCnt());
}
@ -277,7 +277,7 @@ TEST(WriteBatchWithIndex, SubBatchCnt) {
Slice key = Slice(keys[ki]);
std::string tmp = rnd.RandomString(16);
Slice value = Slice(tmp);
rndbatch.Put(key, value);
ASSERT_OK(rndbatch.Put(key, value));
}
SubBatchCounter batch_counter(comparators);
ASSERT_OK(rndbatch.GetWriteBatch()->Iterate(&batch_counter));
@ -526,7 +526,7 @@ class WritePreparedTransactionTestBase : public TransactionTestBase {
ASSERT_EQ(expected_versions[i].value, versions[i].value);
}
// Range delete not supported.
assert(expected_versions[i].type != kTypeRangeDeletion);
ASSERT_NE(expected_versions[i].type, kTypeRangeDeletion);
}
}
};
@ -702,8 +702,8 @@ INSTANTIATE_TEST_CASE_P(
TEST_P(WritePreparedTransactionTest, CommitMap) {
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
assert(wp_db);
assert(wp_db->db_impl_);
ASSERT_NE(wp_db, nullptr);
ASSERT_NE(wp_db->db_impl_, nullptr);
size_t size = wp_db->COMMIT_CACHE_SIZE;
CommitEntry c = {5, 12}, e;
bool evicted = wp_db->AddCommitEntry(c.prep_seq % size, c, &e);
@ -797,14 +797,13 @@ TEST_P(WritePreparedTransactionTest, CheckKeySkipOldMemtable) {
for (int attempt = kAttemptHistoryMemtable; attempt <= kAttemptImmMemTable;
attempt++) {
options.max_write_buffer_number_to_maintain = 3;
ReOpen();
ASSERT_OK(ReOpen());
WriteOptions write_options;
ReadOptions read_options;
TransactionOptions txn_options;
txn_options.set_snapshot = true;
string value;
Status s;
ASSERT_OK(db->Put(write_options, Slice("foo"), Slice("bar")));
ASSERT_OK(db->Put(write_options, Slice("foo2"), Slice("bar")));
@ -841,9 +840,9 @@ TEST_P(WritePreparedTransactionTest, CheckKeySkipOldMemtable) {
if (attempt == kAttemptHistoryMemtable) {
ASSERT_OK(db->Flush(flush_ops));
} else {
assert(attempt == kAttemptImmMemTable);
ASSERT_EQ(attempt, kAttemptImmMemTable);
DBImpl* db_impl = static_cast<DBImpl*>(db->GetRootDB());
db_impl->TEST_SwitchMemtable();
ASSERT_OK(db_impl->TEST_SwitchMemtable());
}
uint64_t num_imm_mems;
ASSERT_TRUE(db->GetIntProperty(DB::Properties::kNumImmutableMemTable,
@ -851,7 +850,7 @@ TEST_P(WritePreparedTransactionTest, CheckKeySkipOldMemtable) {
if (attempt == kAttemptHistoryMemtable) {
ASSERT_EQ(0, num_imm_mems);
} else {
assert(attempt == kAttemptImmMemTable);
ASSERT_EQ(attempt, kAttemptImmMemTable);
ASSERT_EQ(1, num_imm_mems);
}
@ -893,7 +892,7 @@ TEST_P(WritePreparedTransactionTest, CheckKeySkipOldMemtable) {
if (attempt == kAttemptHistoryMemtable) {
ASSERT_EQ(3, get_perf_context()->get_from_memtable_count);
} else {
assert(attempt == kAttemptImmMemTable);
ASSERT_EQ(attempt, kAttemptImmMemTable);
ASSERT_EQ(4, get_perf_context()->get_from_memtable_count);
}
@ -910,7 +909,7 @@ TEST_P(WritePreparedTransactionTest, CheckKeySkipOldMemtable) {
// Only active memtable will be checked in snapshot validation but
// both of active and immutable snapshot will be queried when
// getting the value.
assert(attempt == kAttemptImmMemTable);
ASSERT_EQ(attempt, kAttemptImmMemTable);
ASSERT_EQ(3, get_perf_context()->get_from_memtable_count);
}
@ -1091,7 +1090,7 @@ TEST_P(WritePreparedTransactionTest, CheckAgainstSnapshots) {
const uint64_t cache_size = 1ul << snapshot_cache_bits;
// Safety check to express the intended size in the test. Can be adjusted if
// the snapshots lists changed.
assert((1ul << snapshot_cache_bits) * 2 + 1 == snapshots.size());
ASSERT_EQ((1ul << snapshot_cache_bits) * 2 + 1, snapshots.size());
DBImpl* mock_db = new DBImpl(options, dbname);
UpdateTransactionDBOptions(snapshot_cache_bits);
std::unique_ptr<WritePreparedTxnDBMock> wp_db(
@ -1106,7 +1105,7 @@ TEST_P(WritePreparedTransactionTest, CheckAgainstSnapshots) {
std::vector<SequenceNumber> seqs = {50l, 55l, 150l, 155l, 250l, 255l, 350l,
355l, 450l, 455l, 550l, 555l, 650l, 655l,
750l, 755l, 850l, 855l, 950l, 955l};
assert(seqs.size() > 1);
ASSERT_GT(seqs.size(), 1);
for (size_t i = 0; i + 1 < seqs.size(); i++) {
wp_db->old_commit_map_empty_ = true; // reset
CommitEntry commit_entry = {seqs[i], seqs[i + 1]};
@ -1184,7 +1183,7 @@ TEST_P(SnapshotConcurrentAccessTest, SnapshotConcurrentAccess) {
const size_t snapshot_cache_bits = 2;
// Safety check to express the intended size in the test. Can be adjusted if
// the snapshots lists changed.
assert((1ul << snapshot_cache_bits) * 2 + 2 == snapshots.size());
ASSERT_EQ((1ul << snapshot_cache_bits) * 2 + 2, snapshots.size());
SequenceNumber version = 1000l;
// Choose the cache size so that the new snapshot list could replace all the
// existing items in the cache and also have some overflow.
@ -1365,7 +1364,7 @@ TEST_P(WritePreparedTransactionTest, MaxCatchupWithNewSnapshot) {
const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 0; // only 1 entry => frequent eviction
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
ASSERT_OK(ReOpen());
WriteOptions woptions;
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
@ -1378,9 +1377,9 @@ TEST_P(WritePreparedTransactionTest, MaxCatchupWithNewSnapshot) {
// is not published yet, thus causing max evicted seq go higher than last
// published.
for (int b = 0; b < batch_cnt; b++) {
batch.Put("foo", "foo");
ASSERT_OK(batch.Put("foo", "foo"));
}
db->Write(woptions, &batch);
ASSERT_OK(db->Write(woptions, &batch));
}
});
@ -1415,7 +1414,7 @@ TEST_P(WritePreparedTransactionTest, MaxCatchupWithUnbackedSnapshot) {
const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 0; // only 1 entry => frequent eviction
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
ASSERT_OK(ReOpen());
WriteOptions woptions;
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
@ -1423,8 +1422,8 @@ TEST_P(WritePreparedTransactionTest, MaxCatchupWithUnbackedSnapshot) {
ROCKSDB_NAMESPACE::port::Thread t1([&]() {
for (int i = 0; i < writes; i++) {
WriteBatch batch;
batch.Put("key", "foo");
db->Write(woptions, &batch);
ASSERT_OK(batch.Put("key", "foo"));
ASSERT_OK(db->Write(woptions, &batch));
}
});
@ -1474,7 +1473,7 @@ TEST_P(WritePreparedTransactionTest, CleanupSnapshotEqualToMax) {
const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 0; // only 1 entry => frequent eviction
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
ASSERT_OK(ReOpen());
WriteOptions woptions;
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
// Insert something to increase seq
@ -1534,8 +1533,8 @@ TEST_P(WritePreparedTransactionTest, TxnInitialize) {
// udpated
ASSERT_GT(snap_impl->min_uncommitted_, kMinUnCommittedSeq);
txn0->Rollback();
txn1->Rollback();
ASSERT_OK(txn0->Rollback());
ASSERT_OK(txn1->Rollback());
delete txn0;
delete txn1;
}
@ -1548,7 +1547,7 @@ TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqWithDuplicates) {
const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 1; // disable commit cache
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
ASSERT_OK(ReOpen());
ReadOptions ropt;
PinnableSlice pinnable_val;
@ -1569,10 +1568,10 @@ TEST_P(WritePreparedTransactionTest, AdvanceMaxEvictedSeqWithDuplicates) {
delete txn0;
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
wp_db->db_impl_->FlushWAL(true);
ASSERT_OK(wp_db->db_impl_->FlushWAL(true));
wp_db->TEST_Crash();
ReOpenNoDelete();
assert(db != nullptr);
ASSERT_OK(ReOpenNoDelete());
ASSERT_NE(db, nullptr);
s = db->Get(ropt, db->DefaultColumnFamily(), "key", &pinnable_val);
ASSERT_TRUE(s.IsNotFound());
@ -1589,7 +1588,7 @@ TEST_P(WritePreparedTransactionTest, SmallestUnCommittedSeq) {
const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 1; // disable commit cache
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
ASSERT_OK(ReOpen());
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
ReadOptions ropt;
PinnableSlice pinnable_val;
@ -1622,7 +1621,7 @@ TEST_P(WritePreparedTransactionTest, SmallestUnCommittedSeq) {
// Since commit cache is practically disabled, commit results in immediate
// advance in max_evicted_seq_ and subsequently moving some prepared txns
// to delayed_prepared_.
txn->Commit();
ASSERT_OK(txn->Commit());
committed_txns.push_back(txn);
}
});
@ -1651,7 +1650,7 @@ TEST_P(SeqAdvanceConcurrentTest, SeqAdvanceConcurrent) {
// almost infeasible.
txn_db_options.transaction_lock_timeout = 1000;
txn_db_options.default_lock_timeout = 1000;
ReOpen();
ASSERT_OK(ReOpen());
FlushOptions fopt;
// Number of different txn types we use in this test
@ -1671,7 +1670,11 @@ TEST_P(SeqAdvanceConcurrentTest, SeqAdvanceConcurrent) {
}
const size_t max_n = static_cast<size_t>(std::pow(type_cnt, txn_cnt));
printf("Number of cases being tested is %" ROCKSDB_PRIszt "\n", max_n);
for (size_t n = 0; n < max_n; n++, ReOpen()) {
for (size_t n = 0; n < max_n; n++) {
if (n > 0) {
ASSERT_OK(ReOpen());
}
if (n % split_cnt_ != split_id_) continue;
if (n % 1000 == 0) {
printf("Tested %" ROCKSDB_PRIszt " cases so far\n", n);
@ -1731,7 +1734,7 @@ TEST_P(SeqAdvanceConcurrentTest, SeqAdvanceConcurrent) {
threads.emplace_back(txn_t3, bi);
break;
default:
assert(false);
FAIL();
}
// wait to be linked
while (linked.load() <= bi) {
@ -1765,22 +1768,22 @@ TEST_P(SeqAdvanceConcurrentTest, SeqAdvanceConcurrent) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
// Check if recovery preserves the last sequence number
db_impl->FlushWAL(true);
ReOpenNoDelete();
assert(db != nullptr);
ASSERT_OK(db_impl->FlushWAL(true));
ASSERT_OK(ReOpenNoDelete());
ASSERT_NE(db, nullptr);
db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
seq = db_impl->TEST_GetLastVisibleSequence();
ASSERT_LE(exp_seq, seq + with_empty_commits);
// Check if flush preserves the last sequence number
db_impl->Flush(fopt);
ASSERT_OK(db_impl->Flush(fopt));
seq = db_impl->GetLatestSequenceNumber();
ASSERT_LE(exp_seq, seq + with_empty_commits);
// Check if recovery after flush preserves the last sequence number
db_impl->FlushWAL(true);
ReOpenNoDelete();
assert(db != nullptr);
ASSERT_OK(db_impl->FlushWAL(true));
ASSERT_OK(ReOpenNoDelete());
ASSERT_NE(db, nullptr);
db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
seq = db_impl->GetLatestSequenceNumber();
ASSERT_LE(exp_seq, seq + with_empty_commits);
@ -1792,7 +1795,7 @@ TEST_P(SeqAdvanceConcurrentTest, SeqAdvanceConcurrent) {
// properly.
TEST_P(WritePreparedTransactionTest, BasicRecovery) {
options.disable_auto_compactions = true;
ReOpen();
ASSERT_OK(ReOpen());
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
txn_t0(0);
@ -1807,6 +1810,7 @@ TEST_P(WritePreparedTransactionTest, BasicRecovery) {
s = txn0->Put(Slice("foo0" + istr0), Slice("bar0" + istr0));
ASSERT_OK(s);
s = txn0->Prepare();
ASSERT_OK(s);
auto prep_seq_0 = txn0->GetId();
txn_t1(0);
@ -1819,6 +1823,7 @@ TEST_P(WritePreparedTransactionTest, BasicRecovery) {
s = txn1->Put(Slice("foo1" + istr1), Slice("bar"));
ASSERT_OK(s);
s = txn1->Prepare();
ASSERT_OK(s);
auto prep_seq_1 = txn1->GetId();
txn_t2(0);
@ -1832,10 +1837,10 @@ TEST_P(WritePreparedTransactionTest, BasicRecovery) {
delete txn0;
delete txn1;
wp_db->db_impl_->FlushWAL(true);
ASSERT_OK(wp_db->db_impl_->FlushWAL(true));
wp_db->TEST_Crash();
ReOpenNoDelete();
assert(db != nullptr);
ASSERT_OK(ReOpenNoDelete());
ASSERT_NE(db, nullptr);
wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
// After recovery, all the uncommitted txns (0 and 1) should be inserted into
// delayed_prepared_
@ -1863,7 +1868,7 @@ TEST_P(WritePreparedTransactionTest, BasicRecovery) {
// recovery
txn1 = db->GetTransactionByName("xid" + istr1);
ASSERT_NE(txn1, nullptr);
txn1->Commit();
ASSERT_OK(txn1->Commit());
delete txn1;
index++;
@ -1874,13 +1879,14 @@ TEST_P(WritePreparedTransactionTest, BasicRecovery) {
s = txn2->Put(Slice("foo2" + istr2), Slice("bar"));
ASSERT_OK(s);
s = txn2->Prepare();
ASSERT_OK(s);
auto prep_seq_2 = txn2->GetId();
delete txn2;
wp_db->db_impl_->FlushWAL(true);
ASSERT_OK(wp_db->db_impl_->FlushWAL(true));
wp_db->TEST_Crash();
ReOpenNoDelete();
assert(db != nullptr);
ASSERT_OK(ReOpenNoDelete());
ASSERT_NE(db, nullptr);
wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
ASSERT_TRUE(wp_db->prepared_txns_.empty());
ASSERT_FALSE(wp_db->delayed_prepared_empty_);
@ -1900,10 +1906,10 @@ TEST_P(WritePreparedTransactionTest, BasicRecovery) {
// Commit all the remaining txns
txn0 = db->GetTransactionByName("xid" + istr0);
ASSERT_NE(txn0, nullptr);
txn0->Commit();
ASSERT_OK(txn0->Commit());
txn2 = db->GetTransactionByName("xid" + istr2);
ASSERT_NE(txn2, nullptr);
txn2->Commit();
ASSERT_OK(txn2->Commit());
// Check the value is committed after commit
s = db->Get(ropt, db->DefaultColumnFamily(), "foo0" + istr0, &pinnable_val);
@ -1913,9 +1919,9 @@ TEST_P(WritePreparedTransactionTest, BasicRecovery) {
delete txn0;
delete txn2;
wp_db->db_impl_->FlushWAL(true);
ReOpenNoDelete();
assert(db != nullptr);
ASSERT_OK(wp_db->db_impl_->FlushWAL(true));
ASSERT_OK(ReOpenNoDelete());
ASSERT_NE(db, nullptr);
wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
ASSERT_TRUE(wp_db->prepared_txns_.empty());
ASSERT_TRUE(wp_db->delayed_prepared_empty_);
@ -1932,7 +1938,7 @@ TEST_P(WritePreparedTransactionTest, BasicRecovery) {
// committed data before the restart is visible to all snapshots.
TEST_P(WritePreparedTransactionTest, IsInSnapshotEmptyMap) {
for (bool end_with_prepare : {false, true}) {
ReOpen();
ASSERT_OK(ReOpen());
WriteOptions woptions;
ASSERT_OK(db->Put(woptions, "key", "value"));
ASSERT_OK(db->Put(woptions, "key", "value"));
@ -1948,10 +1954,10 @@ TEST_P(WritePreparedTransactionTest, IsInSnapshotEmptyMap) {
}
dynamic_cast<WritePreparedTxnDB*>(db)->TEST_Crash();
auto db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
db_impl->FlushWAL(true);
ReOpenNoDelete();
ASSERT_OK(db_impl->FlushWAL(true));
ASSERT_OK(ReOpenNoDelete());
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
assert(wp_db != nullptr);
ASSERT_NE(wp_db, nullptr);
ASSERT_GT(wp_db->max_evicted_seq_, 0); // max after recovery
// Take a snapshot right after recovery
const Snapshot* snap = db->GetSnapshot();
@ -2190,7 +2196,7 @@ void ASSERT_SAME(ReadOptions roptions, TransactionDB* db, Status exp_s,
Status s;
PinnableSlice v;
s = db->Get(roptions, db->DefaultColumnFamily(), key, &v);
ASSERT_TRUE(exp_s == s);
ASSERT_EQ(exp_s, s);
ASSERT_TRUE(s.ok() || s.IsNotFound());
if (s.ok()) {
ASSERT_TRUE(exp_v == v);
@ -2203,7 +2209,7 @@ void ASSERT_SAME(ReadOptions roptions, TransactionDB* db, Status exp_s,
ASSERT_EQ(1, values.size());
ASSERT_EQ(1, s_vec.size());
s = s_vec[0];
ASSERT_TRUE(exp_s == s);
ASSERT_EQ(exp_s, s);
ASSERT_TRUE(s.ok() || s.IsNotFound());
if (s.ok()) {
ASSERT_TRUE(exp_v == values[0]);
@ -2224,7 +2230,7 @@ TEST_P(WritePreparedTransactionTest, Rollback) {
for (size_t ikey = 1; ikey <= num_keys; ikey++) {
for (size_t ivalue = 0; ivalue < num_values; ivalue++) {
for (bool crash : {false, true}) {
ReOpen();
ASSERT_OK(ReOpen());
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
std::string key_str = "key" + ToString(ikey);
switch (ivalue) {
@ -2243,7 +2249,7 @@ TEST_P(WritePreparedTransactionTest, Rollback) {
ASSERT_OK(db->SingleDelete(woptions, key_str));
break;
default:
assert(0);
FAIL();
}
PinnableSlice v1;
@ -2286,10 +2292,10 @@ TEST_P(WritePreparedTransactionTest, Rollback) {
if (crash) {
delete txn;
auto db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
db_impl->FlushWAL(true);
ASSERT_OK(db_impl->FlushWAL(true));
dynamic_cast<WritePreparedTxnDB*>(db)->TEST_Crash();
ReOpenNoDelete();
assert(db != nullptr);
ASSERT_OK(ReOpenNoDelete());
ASSERT_NE(db, nullptr);
wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
txn = db->GetTransactionByName("xid0");
ASSERT_FALSE(wp_db->delayed_prepared_empty_);
@ -2328,7 +2334,7 @@ TEST_P(WritePreparedTransactionTest, Rollback) {
TEST_P(WritePreparedTransactionTest, DisableGCDuringRecovery) {
// Use large buffer to avoid memtable flush after 1024 insertions
options.write_buffer_size = 1024 * 1024;
ReOpen();
ASSERT_OK(ReOpen());
std::vector<KeyVersion> versions;
uint64_t seq = 0;
for (uint64_t i = 1; i <= 1024; i++) {
@ -2345,10 +2351,10 @@ TEST_P(WritePreparedTransactionTest, DisableGCDuringRecovery) {
std::reverse(std::begin(versions), std::end(versions));
VerifyInternalKeys(versions);
DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
db_impl->FlushWAL(true);
ASSERT_OK(db_impl->FlushWAL(true));
// Use small buffer to ensure memtable flush during recovery
options.write_buffer_size = 1024;
ReOpenNoDelete();
ASSERT_OK(ReOpenNoDelete());
VerifyInternalKeys(versions);
}
@ -2375,7 +2381,7 @@ TEST_P(WritePreparedTransactionTest, SequenceNumberZero) {
// proceed with older versions of the key as-if the new version doesn't exist.
TEST_P(WritePreparedTransactionTest, CompactionShouldKeepUncommittedKeys) {
options.disable_auto_compactions = true;
ReOpen();
ASSERT_OK(ReOpen());
DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
// Snapshots to avoid keys get evicted.
std::vector<const Snapshot*> snapshots;
@ -2466,7 +2472,7 @@ TEST_P(WritePreparedTransactionTest, CompactionShouldKeepUncommittedKeys) {
// not just prepare sequence.
TEST_P(WritePreparedTransactionTest, CompactionShouldKeepSnapshotVisibleKeys) {
options.disable_auto_compactions = true;
ReOpen();
ASSERT_OK(ReOpen());
// Keep track of expected sequence number.
SequenceNumber expected_seq = 0;
auto* txn1 = db->BeginTransaction(WriteOptions());
@ -2532,7 +2538,7 @@ TEST_P(WritePreparedTransactionTest, SmallestUncommittedOptimization) {
const size_t commit_cache_bits = 0; // disable commit cache
for (bool has_recent_prepare : {true, false}) {
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
ASSERT_OK(ReOpen());
ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
auto* transaction =
@ -2581,7 +2587,7 @@ TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction) {
const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 0; // minimum commit cache
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
ASSERT_OK(ReOpen());
ASSERT_OK(db->Put(WriteOptions(), "key1", "value1_1"));
auto* transaction =
@ -2630,7 +2636,7 @@ TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction2) {
const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 0; // minimum commit cache
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
ASSERT_OK(ReOpen());
ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
ASSERT_OK(db->Put(WriteOptions(), "key1", "value2"));
@ -2680,7 +2686,7 @@ TEST_P(WritePreparedTransactionTest, ReleaseSnapshotDuringCompaction3) {
const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 1; // commit cache size = 2
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
ASSERT_OK(ReOpen());
// Add a dummy key to evict v2 commit cache, but keep v1 commit cache.
// It also advance max_evicted_seq and can trigger old_commit_map cleanup.
@ -2731,7 +2737,7 @@ TEST_P(WritePreparedTransactionTest, ReleaseEarliestSnapshotDuringCompaction) {
const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 0; // minimum commit cache
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
ASSERT_OK(ReOpen());
ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
auto* transaction =
@ -2795,7 +2801,7 @@ TEST_P(WritePreparedTransactionTest,
Random rnd(1103);
options.disable_auto_compactions = true;
ReOpen();
ASSERT_OK(ReOpen());
for (size_t i = 0; i < kNumTransactions; i++) {
std::string key = "key" + ToString(i);
@ -2836,7 +2842,7 @@ TEST_P(WritePreparedTransactionTest,
snapshots.push_back(db->GetSnapshot());
snapshot_data.push_back(current_data);
assert(snapshots.size() == snapshot_data.size());
ASSERT_EQ(snapshots.size(), snapshot_data.size());
for (size_t i = 0; i < snapshots.size(); i++) {
VerifyKeys(snapshot_data[i], snapshots[i]);
}
@ -2871,7 +2877,7 @@ TEST_P(WritePreparedTransactionTest,
TEST_P(WritePreparedTransactionTest,
CompactionShouldKeepSequenceForUncommittedKeys) {
options.disable_auto_compactions = true;
ReOpen();
ASSERT_OK(ReOpen());
// Keep track of expected sequence number.
SequenceNumber expected_seq = 0;
auto* transaction = db->BeginTransaction(WriteOptions());
@ -2913,7 +2919,7 @@ TEST_P(WritePreparedTransactionTest,
TEST_P(WritePreparedTransactionTest, CommitAndSnapshotDuringCompaction) {
options.disable_auto_compactions = true;
ReOpen();
ASSERT_OK(ReOpen());
const Snapshot* snapshot = nullptr;
ASSERT_OK(db->Put(WriteOptions(), "key1", "value1"));
@ -2996,6 +3002,7 @@ TEST_P(WritePreparedTransactionTest, Iterate) {
TEST_P(WritePreparedTransactionTest, IteratorRefreshNotSupported) {
Iterator* iter = db->NewIterator(ReadOptions());
ASSERT_OK(iter->status());
ASSERT_TRUE(iter->Refresh().IsNotSupported());
delete iter;
}
@ -3017,13 +3024,13 @@ TEST_P(WritePreparedTransactionTest, NonAtomicCommitOfDelayedPrepared) {
}
for (auto split_before_mutex : split_options) {
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
ASSERT_OK(ReOpen());
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
DBImpl* db_impl = static_cast_with_check<DBImpl>(db->GetRootDB());
// Fill up the commit cache
std::string init_value("value1");
for (int i = 0; i < 10; i++) {
db->Put(WriteOptions(), Slice("key1"), Slice(init_value));
ASSERT_OK(db->Put(WriteOptions(), Slice("key1"), Slice(init_value)));
}
// Prepare a transaction but do not commit it
Transaction* txn =
@ -3034,7 +3041,7 @@ TEST_P(WritePreparedTransactionTest, NonAtomicCommitOfDelayedPrepared) {
// Commit a bunch of entries to advance max evicted seq and make the
// prepared a delayed prepared
for (int i = 0; i < 10; i++) {
db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3")));
}
// The snapshot should not see the delayed prepared entry
auto snap = db->GetSnapshot();
@ -3075,7 +3082,7 @@ TEST_P(WritePreparedTransactionTest, NonAtomicCommitOfDelayedPrepared) {
auto seq = db_impl->TEST_GetLastVisibleSequence();
size_t tries = 0;
while (wp_db->max_evicted_seq_ < seq && tries < 50) {
db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3")));
tries++;
};
ASSERT_LT(tries, 50);
@ -3115,12 +3122,12 @@ TEST_P(WritePreparedTransactionTest, NonAtomicUpdateOfDelayedPrepared) {
const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 3; // 8 entries
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
ASSERT_OK(ReOpen());
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
// Fill up the commit cache
std::string init_value("value1");
for (int i = 0; i < 10; i++) {
db->Put(WriteOptions(), Slice("key1"), Slice(init_value));
ASSERT_OK(db->Put(WriteOptions(), Slice("key1"), Slice(init_value)));
}
// Prepare a transaction but do not commit it
Transaction* txn = db->BeginTransaction(WriteOptions(), TransactionOptions());
@ -3128,8 +3135,8 @@ TEST_P(WritePreparedTransactionTest, NonAtomicUpdateOfDelayedPrepared) {
ASSERT_OK(txn->Put(Slice("key1"), Slice("value2")));
ASSERT_OK(txn->Prepare());
// Create a gap between prepare seq and snapshot seq
db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3")));
ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3")));
// The snapshot should not see the delayed prepared entry
auto snap = db->GetSnapshot();
ASSERT_LT(txn->GetId(), snap->GetSequenceNumber());
@ -3148,7 +3155,7 @@ TEST_P(WritePreparedTransactionTest, NonAtomicUpdateOfDelayedPrepared) {
// prepared a delayed prepared
size_t tries = 0;
while (wp_db->max_evicted_seq_ < txn->GetId() && tries < 50) {
db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3")));
tries++;
};
ASSERT_LT(tries, 50);
@ -3185,13 +3192,13 @@ TEST_P(WritePreparedTransactionTest, NonAtomicUpdateOfMaxEvictedSeq) {
const size_t snapshot_cache_bits = 7; // same as default
const size_t commit_cache_bits = 3; // 8 entries
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
ASSERT_OK(ReOpen());
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
// Fill up the commit cache
std::string init_value("value1");
std::string last_value("value_final");
for (int i = 0; i < 10; i++) {
db->Put(WriteOptions(), Slice("key1"), Slice(init_value));
ASSERT_OK(db->Put(WriteOptions(), Slice("key1"), Slice(init_value)));
}
// Do an uncommitted write to prevent min_uncommitted optimization
Transaction* txn1 =
@ -3206,8 +3213,8 @@ TEST_P(WritePreparedTransactionTest, NonAtomicUpdateOfMaxEvictedSeq) {
ASSERT_OK(txn->Prepare());
ASSERT_OK(txn->Commit());
// Create a gap between commit entry and snapshot seq
db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3")));
ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3")));
// The snapshot should see the last commit
auto snap = db->GetSnapshot();
ASSERT_LE(txn->GetId(), snap->GetSequenceNumber());
@ -3225,7 +3232,7 @@ TEST_P(WritePreparedTransactionTest, NonAtomicUpdateOfMaxEvictedSeq) {
// Commit a bunch of entries to advance max evicted seq beyond txn->GetId()
size_t tries = 0;
while (wp_db->max_evicted_seq_ < txn->GetId() && tries < 50) {
db->Put(WriteOptions(), Slice("key3"), Slice("value3"));
ASSERT_OK(db->Put(WriteOptions(), Slice("key3"), Slice("value3")));
tries++;
};
ASSERT_LT(tries, 50);
@ -3248,7 +3255,7 @@ TEST_P(WritePreparedTransactionTest, NonAtomicUpdateOfMaxEvictedSeq) {
read_thread.join();
commit_thread.join();
delete txn;
txn1->Commit();
ASSERT_OK(txn1->Commit());
delete txn1;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
@ -3266,7 +3273,7 @@ TEST_P(WritePreparedTransactionTest, AddPreparedBeforeMax) {
// 1 entry to advance max after the 2nd commit
const size_t commit_cache_bits = 0;
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
ASSERT_OK(ReOpen());
WritePreparedTxnDB* wp_db = dynamic_cast<WritePreparedTxnDB*>(db);
std::string some_value("value_some");
std::string uncommitted_value("value_uncommitted");
@ -3347,7 +3354,7 @@ TEST_P(WritePreparedTransactionTest, CommitOfDelayedPrepared) {
for (const size_t commit_cache_bits : {0, 2, 3}) {
for (const size_t sub_batch_cnt : {1, 2, 3}) {
UpdateTransactionDBOptions(snapshot_cache_bits, commit_cache_bits);
ReOpen();
ASSERT_OK(ReOpen());
std::atomic<const Snapshot*> snap = {nullptr};
std::atomic<SequenceNumber> exp_prepare = {0};
ROCKSDB_NAMESPACE::port::Thread callback_thread;
@ -3385,7 +3392,7 @@ TEST_P(WritePreparedTransactionTest, CommitOfDelayedPrepared) {
// Too many txns might cause commit_seq - prepare_seq in another thread
// to go beyond DELTA_UPPERBOUND
for (int i = 0; i < 25 * (1 << commit_cache_bits); i++) {
db->Put(WriteOptions(), Slice("key1"), Slice("value1"));
ASSERT_OK(db->Put(WriteOptions(), Slice("key1"), Slice("value1")));
}
});
ROCKSDB_NAMESPACE::port::Thread write_thread([&]() {
@ -3448,7 +3455,7 @@ TEST_P(WritePreparedTransactionTest, AtomicCommit) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
ROCKSDB_NAMESPACE::port::Thread write_thread([&]() {
if (skip_prepare) {
db->Put(WriteOptions(), Slice("key"), Slice("value"));
ASSERT_OK(db->Put(WriteOptions(), Slice("key"), Slice("value")));
} else {
Transaction* txn =
db->BeginTransaction(WriteOptions(), TransactionOptions());

@ -70,16 +70,21 @@ Status WritePreparedTxn::Get(const ReadOptions& options,
wpt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted,
backed_by_snapshot);
auto res = write_batch_.GetFromBatchAndDB(db_, options, column_family, key,
pinnable_val, &callback);
if (LIKELY(callback.valid() &&
wpt_db_->ValidateSnapshot(callback.max_visible_seq(),
backed_by_snapshot))) {
return res;
} else {
wpt_db_->WPRecordTick(TXN_GET_TRY_AGAIN);
return Status::TryAgain();
Status res = write_batch_.GetFromBatchAndDB(db_, options, column_family, key,
pinnable_val, &callback);
const bool callback_valid =
callback.valid(); // NOTE: validity of callback must always be checked
// before it is destructed
if (res.ok()) {
if (!LIKELY(callback_valid &&
wpt_db_->ValidateSnapshot(callback.max_visible_seq(),
backed_by_snapshot))) {
wpt_db_->WPRecordTick(TXN_GET_TRY_AGAIN);
res = Status::TryAgain();
}
}
return res;
}
Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options) {

@ -73,7 +73,7 @@ TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) {
for (uint64_t max_skip : {0, std::numeric_limits<int>::max()}) {
options.max_sequential_skip_in_iterations = max_skip;
options.disable_auto_compactions = true;
ReOpen();
ASSERT_OK(ReOpen());
TransactionOptions txn_options;
WriteOptions woptions;
@ -90,7 +90,7 @@ TEST_P(WriteUnpreparedTransactionTest, ReadYourOwnWrite) {
std::string stored_value = "v" + ToString(i);
ASSERT_OK(txn->Put("a", stored_value));
ASSERT_OK(txn->Put("b", stored_value));
wup_txn->FlushWriteBatchToDB(false);
ASSERT_OK(wup_txn->FlushWriteBatchToDB(false));
// Test Get()
std::string value;
@ -155,7 +155,7 @@ TEST_P(WriteUnpreparedStressTest, ReadYourOwnWriteStress) {
WriteOptions write_options;
txn_db_options.transaction_lock_timeout = -1;
options.disable_auto_compactions = true;
ReOpen();
ASSERT_OK(ReOpen());
std::vector<std::string> keys;
for (uint32_t k = 0; k < kNumKeys * kNumThreads; k++) {
@ -188,7 +188,7 @@ TEST_P(WriteUnpreparedStressTest, ReadYourOwnWriteStress) {
}
txn = db->BeginTransaction(write_options, txn_options);
txn->SetName(ToString(id));
ASSERT_OK(txn->SetName(ToString(id)));
txn->SetSnapshot();
if (a >= RO_SNAPSHOT) {
read_options.snapshot = txn->GetSnapshot();
@ -273,23 +273,27 @@ TEST_P(WriteUnpreparedStressTest, ReadYourOwnWriteStress) {
case 1: // Validate Next()
{
Iterator* iter = txn->GetIterator(read_options);
ASSERT_OK(iter->status());
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
verify_key(iter->key().ToString(), iter->value().ToString());
}
ASSERT_OK(iter->status());
delete iter;
break;
}
case 2: // Validate Prev()
{
Iterator* iter = txn->GetIterator(read_options);
ASSERT_OK(iter->status());
for (iter->SeekToLast(); iter->Valid(); iter->Prev()) {
verify_key(iter->key().ToString(), iter->value().ToString());
}
ASSERT_OK(iter->status());
delete iter;
break;
}
default:
ASSERT_TRUE(false);
FAIL();
}
if (rnd.OneIn(2)) {
@ -334,7 +338,7 @@ TEST_P(WriteUnpreparedTransactionTest, RecoveryTest) {
for (int num_batches = 1; num_batches < 10; num_batches++) {
// Reset database.
prepared_trans.clear();
ReOpen();
ASSERT_OK(ReOpen());
wup_db = dynamic_cast<WriteUnpreparedTxnDB*>(db);
if (!empty) {
for (int i = 0; i < num_batches; i++) {
@ -346,7 +350,7 @@ TEST_P(WriteUnpreparedTransactionTest, RecoveryTest) {
// Write num_batches unprepared batches.
Transaction* txn = db->BeginTransaction(write_options, txn_options);
WriteUnpreparedTxn* wup_txn = dynamic_cast<WriteUnpreparedTxn*>(txn);
txn->SetName("xid");
ASSERT_OK(txn->SetName("xid"));
for (int i = 0; i < num_batches; i++) {
ASSERT_OK(txn->Put("k" + ToString(i), "value" + ToString(i)));
if (txn_options.write_batch_flush_threshold == 1) {
@ -365,14 +369,14 @@ TEST_P(WriteUnpreparedTransactionTest, RecoveryTest) {
// test that recovery does the rollback.
wup_txn->unprep_seqs_.clear();
} else {
txn->Prepare();
ASSERT_OK(txn->Prepare());
}
delete txn;
// Crash and run recovery code paths.
wup_db->db_impl_->FlushWAL(true);
ASSERT_OK(wup_db->db_impl_->FlushWAL(true));
wup_db->TEST_Crash();
ReOpenNoDelete();
ASSERT_OK(ReOpenNoDelete());
assert(db != nullptr);
db->GetAllPreparedTransactions(&prepared_trans);
@ -386,6 +390,7 @@ TEST_P(WriteUnpreparedTransactionTest, RecoveryTest) {
}
Iterator* iter = db->NewIterator(ReadOptions());
ASSERT_OK(iter->status());
iter->SeekToFirst();
// Check that DB has before values.
if (!empty || a == COMMIT) {
@ -402,6 +407,7 @@ TEST_P(WriteUnpreparedTransactionTest, RecoveryTest) {
}
}
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
delete iter;
}
}
@ -422,13 +428,13 @@ TEST_P(WriteUnpreparedTransactionTest, UnpreparedBatch) {
txn_options.write_batch_flush_threshold = batch_size;
for (bool prepare : {false, true}) {
for (bool commit : {false, true}) {
ReOpen();
ASSERT_OK(ReOpen());
Transaction* txn = db->BeginTransaction(write_options, txn_options);
WriteUnpreparedTxn* wup_txn = dynamic_cast<WriteUnpreparedTxn*>(txn);
txn->SetName("xid");
ASSERT_OK(txn->SetName("xid"));
for (int i = 0; i < kNumKeys; i++) {
txn->Put("k" + ToString(i), "v" + ToString(i));
ASSERT_OK(txn->Put("k" + ToString(i), "v" + ToString(i)));
if (txn_options.write_batch_flush_threshold == 1) {
// WriteUnprepared will check write_batch_flush_threshold and
// possibly flush before appending to the write batch. No flush will
@ -445,9 +451,11 @@ TEST_P(WriteUnpreparedTransactionTest, UnpreparedBatch) {
}
Iterator* iter = db->NewIterator(ReadOptions());
ASSERT_OK(iter->status());
iter->SeekToFirst();
assert(!iter->Valid());
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
delete iter;
if (commit) {
@ -458,6 +466,7 @@ TEST_P(WriteUnpreparedTransactionTest, UnpreparedBatch) {
delete txn;
iter = db->NewIterator(ReadOptions());
ASSERT_OK(iter->status());
iter->SeekToFirst();
for (int i = 0; i < (commit ? kNumKeys : 0); i++) {
@ -467,6 +476,7 @@ TEST_P(WriteUnpreparedTransactionTest, UnpreparedBatch) {
iter->Next();
}
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
delete iter;
}
}
@ -490,7 +500,7 @@ TEST_P(WriteUnpreparedTransactionTest, MarkLogWithPrepSection) {
for (bool prepare : {false, true}) {
for (bool commit : {false, true}) {
ReOpen();
ASSERT_OK(ReOpen());
auto wup_db = dynamic_cast<WriteUnpreparedTxnDB*>(db);
auto db_impl = wup_db->db_impl_;
@ -508,7 +518,7 @@ TEST_P(WriteUnpreparedTransactionTest, MarkLogWithPrepSection) {
}
if (i > 0) {
db_impl->TEST_SwitchWAL();
ASSERT_OK(db_impl->TEST_SwitchWAL());
}
}
@ -568,12 +578,14 @@ TEST_P(WriteUnpreparedTransactionTest, NoSnapshotWrite) {
// snapshot, if iterator snapshot is fresh enough.
ReadOptions roptions;
auto iter = txn->GetIterator(roptions);
ASSERT_OK(iter->status());
int keys = 0;
for (iter->SeekToLast(); iter->Valid(); iter->Prev(), keys++) {
ASSERT_OK(iter->status());
ASSERT_EQ(iter->key().ToString(), iter->value().ToString());
}
ASSERT_EQ(keys, 3);
ASSERT_OK(iter->status());
delete iter;
delete txn;
@ -598,6 +610,7 @@ TEST_P(WriteUnpreparedTransactionTest, IterateAndWrite) {
ReadOptions roptions;
auto iter = txn->GetIterator(roptions);
ASSERT_OK(iter->status());
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ASSERT_OK(iter->status());
if (iter->key() == "9") {
@ -612,11 +625,13 @@ TEST_P(WriteUnpreparedTransactionTest, IterateAndWrite) {
ASSERT_OK(txn->Put(iter->key(), "b"));
}
}
ASSERT_OK(iter->status());
delete iter;
ASSERT_OK(txn->Commit());
iter = db->NewIterator(roptions);
ASSERT_OK(iter->status());
if (a == DO_DELETE) {
// Check that db is empty.
iter->SeekToFirst();
@ -630,6 +645,7 @@ TEST_P(WriteUnpreparedTransactionTest, IterateAndWrite) {
}
ASSERT_EQ(keys, 100);
}
ASSERT_OK(iter->status());
delete iter;
delete txn;

@ -167,7 +167,10 @@ Status WriteUnpreparedTxnDB::RollbackRecoveredTransaction(
}
// The Rollback marker will be used as a batch separator
WriteBatchInternal::MarkRollback(&rollback_batch, rtxn->name_);
s = WriteBatchInternal::MarkRollback(&rollback_batch, rtxn->name_);
if (!s.ok()) {
return s;
}
const uint64_t kNoLogRef = 0;
const bool kDisableMemtable = true;

Loading…
Cancel
Save