Make db_basic_test pass assert status checked (#7452)

Summary:
Add db_basic_test status check list. Some of the warnings are suppressed. It is possible that some of them are due to real bugs.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7452

Test Plan: See CI tests pass.

Reviewed By: zhichao-cao

Differential Revision: D23979764

fbshipit-source-id: 6151570c2a9b931b0fbb3fe939a94b2bd1583cbe
main
sdong 4 years ago committed by Facebook GitHub Bot
parent 5e221a98b5
commit d08a9005b7
  1. 1
      Makefile
  2. 4
      db/builder.cc
  3. 13
      db/compacted_db_impl.cc
  4. 10
      db/compaction/compaction_job.cc
  5. 156
      db/db_basic_test.cc
  6. 66
      db/db_impl/db_impl_compaction_flush.cc
  7. 9
      db/db_impl/db_impl_files.cc
  8. 4
      db/db_impl/db_impl_open.cc
  9. 28
      db/db_impl/db_impl_write.cc
  10. 9
      db/db_iter.h
  11. 6
      db/db_test_util.cc
  12. 2
      db/error_handler.cc
  13. 34
      db/table_cache.cc
  14. 5
      db/version_builder.cc
  15. 2
      db/version_set.cc
  16. 23
      table/block_based/block_based_table_builder.cc
  17. 12
      table/block_based/block_based_table_reader.cc
  18. 5
      table/block_based/index_builder.h
  19. 4
      table/plain/plain_table_builder.cc
  20. 1
      table/plain/plain_table_key_coding.cc
  21. 7
      table/plain/plain_table_key_coding.h
  22. 2
      table/plain/plain_table_reader.cc
  23. 2
      utilities/fault_injection_env.h

@ -586,6 +586,7 @@ ifdef ASSERT_STATUS_CHECKED
coding_test \
crc32c_test \
dbformat_test \
db_basic_test \
db_options_test \
options_file_test \
defer_test \

@ -131,6 +131,7 @@ Status BuildTable(
TEST_SYNC_POINT_CALLBACK("BuildTable:create_file", &use_direct_writes);
#endif // !NDEBUG
IOStatus io_s = NewWritableFile(fs, fname, &file, file_options);
assert(s.ok());
s = io_s;
if (io_status->ok()) {
*io_status = io_s;
@ -314,17 +315,18 @@ Status BuildTable(
constexpr IODebugContext* dbg = nullptr;
Status ignored = fs->DeleteFile(fname, IOOptions(), dbg);
ignored.PermitUncheckedError();
assert(blob_file_additions || blob_file_paths.empty());
if (blob_file_additions) {
for (const std::string& blob_file_path : blob_file_paths) {
ignored = fs->DeleteFile(blob_file_path, IOOptions(), dbg);
ignored.PermitUncheckedError();
}
blob_file_additions->clear();
}
ignored.PermitUncheckedError();
}
if (meta->fd.GetFileSize() == 0) {

@ -41,8 +41,11 @@ Status CompactedDBImpl::Get(const ReadOptions& options, ColumnFamilyHandle*,
GetContext::kNotFound, key, value, nullptr, nullptr,
nullptr, true, nullptr, nullptr);
LookupKey lkey(key, kMaxSequenceNumber);
files_.files[FindFile(key)].fd.table_reader->Get(options, lkey.internal_key(),
&get_context, nullptr);
Status s = files_.files[FindFile(key)].fd.table_reader->Get(
options, lkey.internal_key(), &get_context, nullptr);
if (!s.ok() && !s.IsNotFound()) {
return s;
}
if (get_context.State() == GetContext::kFound) {
return Status::OK();
}
@ -74,12 +77,16 @@ std::vector<Status> CompactedDBImpl::MultiGet(const ReadOptions& options,
GetContext::kNotFound, keys[idx], &pinnable_val,
nullptr, nullptr, nullptr, true, nullptr, nullptr);
LookupKey lkey(keys[idx], kMaxSequenceNumber);
r->Get(options, lkey.internal_key(), &get_context, nullptr);
Status s = r->Get(options, lkey.internal_key(), &get_context, nullptr);
if (!s.ok() && !s.IsNotFound()) {
statuses[idx] = s;
} else {
value.assign(pinnable_val.data(), pinnable_val.size());
if (get_context.State() == GetContext::kFound) {
statuses[idx] = Status::OK();
}
}
}
++idx;
}
return statuses;

@ -1102,6 +1102,16 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
SetPerfLevel(prev_perf_level);
}
}
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
if (!status.ok()) {
if (sub_compact->c_iter) {
sub_compact->c_iter->status().PermitUncheckedError();
}
if (input) {
input->status().PermitUncheckedError();
}
}
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
sub_compact->c_iter.reset();
input.reset();

@ -35,9 +35,9 @@ class DBBasicTest : public DBTestBase {
TEST_F(DBBasicTest, OpenWhenOpen) {
Options options = CurrentOptions();
options.env = env_;
ROCKSDB_NAMESPACE::DB* db2 = nullptr;
ROCKSDB_NAMESPACE::Status s = DB::Open(options, dbname_, &db2);
DB* db2 = nullptr;
Status s = DB::Open(options, dbname_, &db2);
ASSERT_NOK(s);
ASSERT_EQ(Status::Code::kIOError, s.code());
ASSERT_EQ(Status::SubCode::kNone, s.subcode());
ASSERT_TRUE(strstr(s.getState(), "lock ") != nullptr);
@ -49,13 +49,13 @@ TEST_F(DBBasicTest, UniqueSession) {
Options options = CurrentOptions();
std::string sid1, sid2, sid3, sid4;
db_->GetDbSessionId(sid1);
ASSERT_OK(db_->GetDbSessionId(sid1));
Reopen(options);
db_->GetDbSessionId(sid2);
ASSERT_OK(db_->GetDbSessionId(sid2));
ASSERT_OK(Put("foo", "v1"));
db_->GetDbSessionId(sid4);
ASSERT_OK(db_->GetDbSessionId(sid4));
Reopen(options);
db_->GetDbSessionId(sid3);
ASSERT_OK(db_->GetDbSessionId(sid3));
ASSERT_NE(sid1, sid2);
ASSERT_NE(sid1, sid3);
@ -73,14 +73,14 @@ TEST_F(DBBasicTest, UniqueSession) {
#ifndef ROCKSDB_LITE
Close();
ASSERT_OK(ReadOnlyReopen(options));
db_->GetDbSessionId(sid1);
ASSERT_OK(db_->GetDbSessionId(sid1));
// Test uniqueness between readonly open (sid1) and regular open (sid3)
ASSERT_NE(sid1, sid3);
Close();
ASSERT_OK(ReadOnlyReopen(options));
db_->GetDbSessionId(sid2);
ASSERT_OK(db_->GetDbSessionId(sid2));
ASSERT_EQ("v1", Get("foo"));
db_->GetDbSessionId(sid3);
ASSERT_OK(db_->GetDbSessionId(sid3));
ASSERT_NE(sid1, sid2);
@ -88,13 +88,13 @@ TEST_F(DBBasicTest, UniqueSession) {
#endif // ROCKSDB_LITE
CreateAndReopenWithCF({"goku"}, options);
db_->GetDbSessionId(sid1);
ASSERT_OK(db_->GetDbSessionId(sid1));
ASSERT_OK(Put("bar", "e1"));
db_->GetDbSessionId(sid2);
ASSERT_OK(db_->GetDbSessionId(sid2));
ASSERT_EQ("e1", Get("bar"));
db_->GetDbSessionId(sid3);
ASSERT_OK(db_->GetDbSessionId(sid3));
ReopenWithColumnFamilies({"default", "goku"}, options);
db_->GetDbSessionId(sid4);
ASSERT_OK(db_->GetDbSessionId(sid4));
ASSERT_EQ(sid1, sid2);
ASSERT_EQ(sid2, sid3);
@ -163,7 +163,7 @@ TEST_F(DBBasicTest, ReadOnlyDBWithWriteDBIdToManifestSet) {
assert(options.env == env_);
ASSERT_OK(ReadOnlyReopen(options));
std::string db_id1;
db_->GetDbIdentity(db_id1);
ASSERT_OK(db_->GetDbIdentity(db_id1));
ASSERT_EQ("v3", Get("foo"));
ASSERT_EQ("v2", Get("bar"));
Iterator* iter = db_->NewIterator(ReadOptions());
@ -186,7 +186,7 @@ TEST_F(DBBasicTest, ReadOnlyDBWithWriteDBIdToManifestSet) {
ASSERT_EQ("v2", Get("bar"));
ASSERT_TRUE(db_->SyncWAL().IsNotSupported());
std::string db_id2;
db_->GetDbIdentity(db_id2);
ASSERT_OK(db_->GetDbIdentity(db_id2));
ASSERT_EQ(db_id1, db_id2);
}
@ -241,7 +241,7 @@ TEST_F(DBBasicTest, CompactedDB) {
ASSERT_OK(Put("hhh", DummyString(kFileSize / 2, 'h')));
ASSERT_OK(Put("iii", DummyString(kFileSize / 2, 'i')));
ASSERT_OK(Put("jjj", DummyString(kFileSize / 2, 'j')));
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ(3, NumTableFilesAtLevel(1));
Close();
@ -299,8 +299,8 @@ TEST_F(DBBasicTest, LevelLimitReopen) {
int i = 0;
while (NumTableFilesAtLevel(2, 1) == 0) {
ASSERT_OK(Put(1, Key(i++), value));
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
ASSERT_OK(dbfull()->TEST_WaitForFlushMemTable());
ASSERT_OK(dbfull()->TEST_WaitForCompact());
}
options.num_levels = 1;
@ -354,8 +354,8 @@ TEST_F(DBBasicTest, EmptyFlush) {
options.disable_auto_compactions = true;
CreateAndReopenWithCF({"pikachu"}, options);
Put(1, "a", Slice());
SingleDelete(1, "a");
ASSERT_OK(Put(1, "a", Slice()));
ASSERT_OK(SingleDelete(1, "a"));
ASSERT_OK(Flush(1));
ASSERT_EQ("[ ]", AllEntriesFor("a", 1));
@ -605,16 +605,16 @@ TEST_F(DBBasicTest, Snapshot) {
options_override.skip_policy = kSkipNoSnapshot;
do {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions(options_override));
Put(0, "foo", "0v1");
Put(1, "foo", "1v1");
ASSERT_OK(Put(0, "foo", "0v1"));
ASSERT_OK(Put(1, "foo", "1v1"));
const Snapshot* s1 = db_->GetSnapshot();
ASSERT_EQ(1U, GetNumSnapshots());
uint64_t time_snap1 = GetTimeOldestSnapshots();
ASSERT_GT(time_snap1, 0U);
ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
Put(0, "foo", "0v2");
Put(1, "foo", "1v2");
ASSERT_OK(Put(0, "foo", "0v2"));
ASSERT_OK(Put(1, "foo", "1v2"));
env_->MockSleepForSeconds(1);
@ -622,8 +622,8 @@ TEST_F(DBBasicTest, Snapshot) {
ASSERT_EQ(2U, GetNumSnapshots());
ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
Put(0, "foo", "0v3");
Put(1, "foo", "1v3");
ASSERT_OK(Put(0, "foo", "0v3"));
ASSERT_OK(Put(1, "foo", "1v3"));
{
ManagedSnapshot s3(db_);
@ -631,8 +631,8 @@ TEST_F(DBBasicTest, Snapshot) {
ASSERT_EQ(time_snap1, GetTimeOldestSnapshots());
ASSERT_EQ(GetSequenceOldestSnapshots(), s1->GetSequenceNumber());
Put(0, "foo", "0v4");
Put(1, "foo", "1v4");
ASSERT_OK(Put(0, "foo", "0v4"));
ASSERT_OK(Put(1, "foo", "1v4"));
ASSERT_EQ("0v1", Get(0, "foo", s1));
ASSERT_EQ("1v1", Get(1, "foo", s1));
ASSERT_EQ("0v2", Get(0, "foo", s2));
@ -698,14 +698,14 @@ TEST_P(DBBasicMultiConfigs, CompactBetweenSnapshots) {
Random rnd(301);
FillLevels("a", "z", 1);
Put(1, "foo", "first");
ASSERT_OK(Put(1, "foo", "first"));
const Snapshot* snapshot1 = db_->GetSnapshot();
Put(1, "foo", "second");
Put(1, "foo", "third");
Put(1, "foo", "fourth");
ASSERT_OK(Put(1, "foo", "second"));
ASSERT_OK(Put(1, "foo", "third"));
ASSERT_OK(Put(1, "foo", "fourth"));
const Snapshot* snapshot2 = db_->GetSnapshot();
Put(1, "foo", "fifth");
Put(1, "foo", "sixth");
ASSERT_OK(Put(1, "foo", "fifth"));
ASSERT_OK(Put(1, "foo", "sixth"));
// All entries (including duplicates) exist
// before any compaction or flush is triggered.
@ -723,7 +723,8 @@ TEST_P(DBBasicMultiConfigs, CompactBetweenSnapshots) {
// after we release the snapshot1, only two values left
db_->ReleaseSnapshot(snapshot1);
FillLevels("a", "z", 1);
dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr);
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
nullptr));
// We have only one valid snapshot snapshot2. Since snapshot1 is
// not valid anymore, "first" should be removed by a compaction.
@ -734,7 +735,8 @@ TEST_P(DBBasicMultiConfigs, CompactBetweenSnapshots) {
// after we release the snapshot2, only one value should be left
db_->ReleaseSnapshot(snapshot2);
FillLevels("a", "z", 1);
dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, nullptr);
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
nullptr));
ASSERT_EQ("sixth", Get(1, "foo"));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ sixth ]");
}
@ -790,18 +792,18 @@ TEST_F(DBBasicTest, CompactOnFlush) {
options.disable_auto_compactions = true;
CreateAndReopenWithCF({"pikachu"}, options);
Put(1, "foo", "v1");
ASSERT_OK(Put(1, "foo", "v1"));
ASSERT_OK(Flush(1));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ v1 ]");
// Write two new keys
Put(1, "a", "begin");
Put(1, "z", "end");
Flush(1);
ASSERT_OK(Put(1, "a", "begin"));
ASSERT_OK(Put(1, "z", "end"));
ASSERT_OK(Flush(1));
// Case1: Delete followed by a put
Delete(1, "foo");
Put(1, "foo", "v2");
ASSERT_OK(Delete(1, "foo"));
ASSERT_OK(Put(1, "foo", "v2"));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, DEL, v1 ]");
// After the current memtable is flushed, the DEL should
@ -809,66 +811,66 @@ TEST_F(DBBasicTest, CompactOnFlush) {
ASSERT_OK(Flush(1));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2, v1 ]");
dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
nullptr);
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1],
nullptr, nullptr));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ v2 ]");
// Case 2: Delete followed by another delete
Delete(1, "foo");
Delete(1, "foo");
ASSERT_OK(Delete(1, "foo"));
ASSERT_OK(Delete(1, "foo"));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, DEL, v2 ]");
ASSERT_OK(Flush(1));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v2 ]");
dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
nullptr);
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1],
nullptr, nullptr));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
// Case 3: Put followed by a delete
Put(1, "foo", "v3");
Delete(1, "foo");
ASSERT_OK(Put(1, "foo", "v3"));
ASSERT_OK(Delete(1, "foo"));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL, v3 ]");
ASSERT_OK(Flush(1));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ DEL ]");
dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
nullptr);
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1],
nullptr, nullptr));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
// Case 4: Put followed by another Put
Put(1, "foo", "v4");
Put(1, "foo", "v5");
ASSERT_OK(Put(1, "foo", "v4"));
ASSERT_OK(Put(1, "foo", "v5"));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5, v4 ]");
ASSERT_OK(Flush(1));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5 ]");
dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
nullptr);
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1],
nullptr, nullptr));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ v5 ]");
// clear database
Delete(1, "foo");
dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
nullptr);
ASSERT_OK(Delete(1, "foo"));
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1],
nullptr, nullptr));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
// Case 5: Put followed by snapshot followed by another Put
// Both puts should remain.
Put(1, "foo", "v6");
ASSERT_OK(Put(1, "foo", "v6"));
const Snapshot* snapshot = db_->GetSnapshot();
Put(1, "foo", "v7");
ASSERT_OK(Put(1, "foo", "v7"));
ASSERT_OK(Flush(1));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ v7, v6 ]");
db_->ReleaseSnapshot(snapshot);
// clear database
Delete(1, "foo");
dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr,
nullptr);
ASSERT_OK(Delete(1, "foo"));
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), handles_[1],
nullptr, nullptr));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ ]");
// Case 5: snapshot followed by a put followed by another Put
// Only the last put should remain.
const Snapshot* snapshot1 = db_->GetSnapshot();
Put(1, "foo", "v8");
Put(1, "foo", "v9");
ASSERT_OK(Put(1, "foo", "v8"));
ASSERT_OK(Put(1, "foo", "v9"));
ASSERT_OK(Flush(1));
ASSERT_EQ(AllEntriesFor("foo", 1), "[ v9 ]");
db_->ReleaseSnapshot(snapshot1);
@ -891,7 +893,7 @@ TEST_F(DBBasicTest, FlushOneColumnFamily) {
ASSERT_OK(Put(7, "popovich", "popovich"));
for (int i = 0; i < 8; ++i) {
Flush(i);
ASSERT_OK(Flush(i));
auto tables = ListTableFiles(env_, dbname_);
ASSERT_EQ(tables.size(), i + 1U);
}
@ -1033,7 +1035,7 @@ class TestEnv : public EnvWrapper {
explicit TestLogger(TestEnv* env_ptr) : Logger() { env = env_ptr; }
~TestLogger() override {
if (!closed_) {
CloseHelper();
CloseHelper().PermitUncheckedError();
}
}
void Logv(const char* /*format*/, va_list /*ap*/) override {}
@ -2231,12 +2233,14 @@ TEST_F(DBBasicTest, RecoverWithMissingFiles) {
std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts, handles_[0]));
iter->SeekToFirst();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
iter.reset(db_->NewIterator(read_opts, handles_[1]));
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("a", iter->key());
iter->Next();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
iter.reset(db_->NewIterator(read_opts, handles_[2]));
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
@ -2246,6 +2250,7 @@ TEST_F(DBBasicTest, RecoverWithMissingFiles) {
ASSERT_EQ("b", iter->key());
iter->Next();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
}
}
@ -2358,12 +2363,14 @@ TEST_F(DBBasicTest, SkipWALIfMissingTableFiles) {
std::unique_ptr<Iterator> iter(db_->NewIterator(read_opts, handles_[0]));
iter->SeekToFirst();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
iter.reset(db_->NewIterator(read_opts, handles_[1]));
iter->SeekToFirst();
ASSERT_TRUE(iter->Valid());
ASSERT_EQ("a", iter->key());
iter->Next();
ASSERT_FALSE(iter->Valid());
ASSERT_OK(iter->status());
}
#endif // !ROCKSDB_LITE
@ -2893,9 +2900,8 @@ class DeadlineFS : public FileSystemWrapper {
std::unique_ptr<FSRandomAccessFile>* result,
IODebugContext* dbg) override {
std::unique_ptr<FSRandomAccessFile> file;
IOStatus s;
s = target()->NewRandomAccessFile(fname, opts, &file, dbg);
IOStatus s = target()->NewRandomAccessFile(fname, opts, &file, dbg);
EXPECT_OK(s);
result->reset(new DeadlineRandomAccessFile(*this, file));
const std::chrono::microseconds deadline = GetDeadline();
@ -3266,7 +3272,7 @@ TEST_P(DBBasicTestDeadline, PointLookupDeadline) {
Random rnd(301);
for (int i = 0; i < 400; ++i) {
std::string key = "k" + ToString(i);
Put(key, rnd.RandomString(100));
ASSERT_OK(Put(key, rnd.RandomString(100)));
}
Flush();
@ -3349,9 +3355,9 @@ TEST_P(DBBasicTestDeadline, IteratorDeadline) {
Random rnd(301);
for (int i = 0; i < 400; ++i) {
std::string key = "k" + ToString(i);
Put(key, rnd.RandomString(100));
ASSERT_OK(Put(key, rnd.RandomString(100)));
}
Flush();
ASSERT_OK(Flush());
bool timedout = true;
// A timeout will be forced when the IO counter reaches this value

@ -126,10 +126,12 @@ IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) {
MarkLogsSynced(current_log_number - 1, true, io_s);
if (!io_s.ok()) {
if (total_log_size_ > 0) {
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush);
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush)
.PermitUncheckedError();
} else {
// If the WAL is empty, we use different error reason
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlushNoWAL);
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlushNoWAL)
.PermitUncheckedError();
}
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Failed");
return io_s;
@ -186,7 +188,8 @@ Status DBImpl::FlushMemTableToOutputFile(
s = io_s;
if (!io_s.ok() && !io_s.IsShutdownInProgress() &&
!io_s.IsColumnFamilyDropped()) {
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush);
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush)
.PermitUncheckedError();
}
} else {
TEST_SYNC_POINT("DBImpl::SyncClosedLogs:Skip");
@ -246,16 +249,24 @@ Status DBImpl::FlushMemTableToOutputFile(
// be pessimistic and try write to a new MANIFEST.
// TODO: distinguish between MANIFEST write and CURRENT renaming
if (!versions_->io_status().ok()) {
error_handler_.SetBGError(io_s, BackgroundErrorReason::kManifestWrite);
// Should handle return error?
error_handler_.SetBGError(io_s, BackgroundErrorReason::kManifestWrite)
.PermitUncheckedError();
} else if (total_log_size_ > 0) {
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush);
// Should handle return error?
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush)
.PermitUncheckedError();
} else {
// If the WAL is empty, we use different error reason
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlushNoWAL);
// Should handle return error?
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlushNoWAL)
.PermitUncheckedError();
}
} else {
Status new_bg_error = s;
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
// Should handle return error?
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush)
.PermitUncheckedError();
}
} else {
// If we got here, then we decided not to care about the i_os status (either
@ -280,7 +291,9 @@ Status DBImpl::FlushMemTableToOutputFile(
TEST_SYNC_POINT_CALLBACK(
"DBImpl::FlushMemTableToOutputFile:MaxAllowedSpaceReached",
&new_bg_error);
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
// Should handle this error?
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush)
.PermitUncheckedError();
}
}
#endif // ROCKSDB_LITE
@ -628,8 +641,9 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
error_handler_.GetBGError().ok()) {
Status new_bg_error =
Status::SpaceLimit("Max allowed space was reached");
error_handler_.SetBGError(new_bg_error,
BackgroundErrorReason::kFlush);
// Should Handle this error?
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush)
.PermitUncheckedError();
}
}
}
@ -646,16 +660,24 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles(
// be pessimistic and try write to a new MANIFEST.
// TODO: distinguish between MANIFEST write and CURRENT renaming
if (!versions_->io_status().ok()) {
error_handler_.SetBGError(io_s, BackgroundErrorReason::kManifestWrite);
// Should Handle this error?
error_handler_.SetBGError(io_s, BackgroundErrorReason::kManifestWrite)
.PermitUncheckedError();
} else if (total_log_size_ > 0) {
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush);
// Should Handle this error?
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush)
.PermitUncheckedError();
} else {
// If the WAL is empty, we use different error reason
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlushNoWAL);
// Should Handle this error?
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlushNoWAL)
.PermitUncheckedError();
}
} else {
Status new_bg_error = s;
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush);
// Should Handle this error?
error_handler_.SetBGError(new_bg_error, BackgroundErrorReason::kFlush)
.PermitUncheckedError();
}
}
@ -1178,9 +1200,11 @@ Status DBImpl::CompactFilesImpl(
job_context->job_id, status.ToString().c_str());
IOStatus io_s = compaction_job.io_status();
if (!io_s.ok()) {
error_handler_.SetBGError(io_s, BackgroundErrorReason::kCompaction);
error_handler_.SetBGError(io_s, BackgroundErrorReason::kCompaction)
.PermitUncheckedError();
} else {
error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction)
.PermitUncheckedError();
}
}
@ -2928,7 +2952,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
mutex_.Unlock();
TEST_SYNC_POINT_CALLBACK(
"DBImpl::BackgroundCompaction:NonTrivial:BeforeRun", nullptr);
compaction_job.Run();
// Should handle erorr?
compaction_job.Run().PermitUncheckedError();
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial:AfterRun");
mutex_.Lock();
@ -2946,6 +2971,8 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
if (status.ok() && !io_s.ok()) {
status = io_s;
} else {
io_s.PermitUncheckedError();
}
if (c != nullptr) {
@ -2982,9 +3009,10 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
auto err_reason = versions_->io_status().ok()
? BackgroundErrorReason::kCompaction
: BackgroundErrorReason::kManifestWrite;
error_handler_.SetBGError(io_s, err_reason);
error_handler_.SetBGError(io_s, err_reason).PermitUncheckedError();
} else {
error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction);
error_handler_.SetBGError(status, BackgroundErrorReason::kCompaction)
.PermitUncheckedError();
}
if (c != nullptr && !is_manual && !error_handler_.IsBGWorkStopped()) {
// Put this cfd back in the compaction queue so we can retry after some

@ -215,7 +215,8 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
if (immutable_db_options_.wal_dir != dbname_) {
std::vector<std::string> log_files;
env_->GetChildren(immutable_db_options_.wal_dir,
&log_files); // Ignore errors
&log_files)
.PermitUncheckedError(); // Ignore errors
for (const std::string& log_file : log_files) {
job_context->full_scan_candidate_files.emplace_back(
log_file, immutable_db_options_.wal_dir);
@ -226,7 +227,8 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
immutable_db_options_.db_log_dir != dbname_) {
std::vector<std::string> info_log_files;
// Ignore errors
env_->GetChildren(immutable_db_options_.db_log_dir, &info_log_files);
env_->GetChildren(immutable_db_options_.db_log_dir, &info_log_files)
.PermitUncheckedError();
for (std::string& log_file : info_log_files) {
job_context->full_scan_candidate_files.emplace_back(
log_file, immutable_db_options_.db_log_dir);
@ -762,7 +764,8 @@ Status DBImpl::FinishBestEffortsRecovery() {
std::set<std::string> files_to_delete;
for (const auto& path : paths) {
std::vector<std::string> files;
env_->GetChildren(path, &files);
// Should we handle it?
env_->GetChildren(path, &files).PermitUncheckedError();
for (const auto& fname : files) {
uint64_t number = 0;
FileType type;

@ -1757,6 +1757,10 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
"DB::Open() failed --- Unable to persist Options file",
persist_options_status.ToString());
}
} else {
ROCKS_LOG_WARN(impl->immutable_db_options_.info_log,
"Persisting Option File error: %s",
persist_options_status.ToString().c_str());
}
if (s.ok()) {
impl->StartStatsDumpScheduler();

@ -101,12 +101,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
assert(!WriteBatchInternal::IsLatestPersistentState(my_batch) ||
disable_memtable);
Status status;
IOStatus io_s;
if (write_options.low_pri) {
status = ThrottleLowPriWritesIfNeeded(write_options, my_batch);
if (!status.ok()) {
return status;
Status s = ThrottleLowPriWritesIfNeeded(write_options, my_batch);
if (!s.ok()) {
return s;
}
}
@ -129,9 +127,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
uint64_t seq;
// Use a write thread to i) optimize for WAL write, ii) publish last
// sequence in in increasing order, iii) call pre_release_callback serially
status = WriteImplWALOnly(&write_thread_, write_options, my_batch, callback,
log_used, log_ref, &seq, sub_batch_cnt,
pre_release_callback, kDoAssignOrder,
Status status = WriteImplWALOnly(
&write_thread_, write_options, my_batch, callback, log_used, log_ref,
&seq, sub_batch_cnt, pre_release_callback, kDoAssignOrder,
kDoPublishLastSeq, disable_memtable);
TEST_SYNC_POINT("DBImpl::WriteImpl:UnorderedWriteAfterWriteWAL");
if (!status.ok()) {
@ -164,6 +162,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
StopWatch write_sw(env_, immutable_db_options_.statistics.get(), DB_WRITE);
write_thread_.JoinBatchGroup(&w);
Status status;
if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
// we are a non-leader in a parallel group
@ -204,6 +203,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
*seq_used = w.sequence;
}
// write is complete and leader has updated sequence
// Should we handle it?
status.PermitUncheckedError();
return w.FinalStatus();
}
// else we are the leader of the write batch group
@ -252,6 +253,7 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
last_batch_group_size_ =
write_thread_.EnterAsBatchGroupLeader(&w, &write_group);
IOStatus io_s;
if (status.ok()) {
// Rules for when we can update the memtable concurrently
// 1. supported by memtable
@ -1899,7 +1901,10 @@ Status DB::Delete(const WriteOptions& opt, ColumnFamilyHandle* column_family,
Status DB::SingleDelete(const WriteOptions& opt,
ColumnFamilyHandle* column_family, const Slice& key) {
WriteBatch batch;
batch.SingleDelete(column_family, key);
Status s = batch.SingleDelete(column_family, key);
if (!s.ok()) {
return s;
}
return Write(opt, &batch);
}
@ -1907,7 +1912,10 @@ Status DB::DeleteRange(const WriteOptions& opt,
ColumnFamilyHandle* column_family,
const Slice& begin_key, const Slice& end_key) {
WriteBatch batch;
batch.DeleteRange(column_family, begin_key, end_key);
Status s = batch.DeleteRange(column_family, begin_key, end_key);
if (!s.ok()) {
return s;
}
return Write(opt, &batch);
}

@ -140,7 +140,14 @@ class DBIter final : public Iterator {
}
ReadRangeDelAggregator* GetRangeDelAggregator() { return &range_del_agg_; }
bool Valid() const override { return valid_; }
bool Valid() const override {
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
if (valid_) {
status_.PermitUncheckedError();
}
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
return valid_;
}
Slice key() const override {
assert(valid_);
if (start_seqnum_ > 0 || timestamp_lb_) {

@ -1182,9 +1182,9 @@ void DBTestBase::FillLevels(const std::string& smallest,
void DBTestBase::MoveFilesToLevel(int level, int cf) {
for (int l = 0; l < level; ++l) {
if (cf > 0) {
dbfull()->TEST_CompactRange(l, nullptr, nullptr, handles_[cf]);
EXPECT_OK(dbfull()->TEST_CompactRange(l, nullptr, nullptr, handles_[cf]));
} else {
dbfull()->TEST_CompactRange(l, nullptr, nullptr);
EXPECT_OK(dbfull()->TEST_CompactRange(l, nullptr, nullptr));
}
}
}
@ -1436,7 +1436,7 @@ std::vector<std::uint64_t> DBTestBase::ListTableFiles(Env* env,
const std::string& path) {
std::vector<std::string> files;
std::vector<uint64_t> file_numbers;
env->GetChildren(path, &files);
EXPECT_OK(env->GetChildren(path, &files));
uint64_t number;
FileType type;
for (size_t i = 0; i < files.size(); ++i) {

@ -338,7 +338,7 @@ Status ErrorHandler::SetBGError(const IOStatus& bg_io_err,
}
if (BackgroundErrorReason::kManifestWrite == reason) {
// Always returns ok
db_->DisableFileDeletionsWithLock();
db_->DisableFileDeletionsWithLock().PermitUncheckedError();
}
Status new_bg_io_err = bg_io_err;

@ -162,7 +162,6 @@ Status TableCache::FindTable(const ReadOptions& ro,
int level, bool prefetch_index_and_filter_in_cache,
size_t max_file_size_for_l0_meta_pin) {
PERF_TIMER_GUARD_WITH_ENV(find_table_nanos, ioptions_.env);
Status s;
uint64_t number = fd.GetNumber();
Slice key = GetSliceForFileNumber(&number);
*handle = cache_->Lookup(key);
@ -177,13 +176,13 @@ Status TableCache::FindTable(const ReadOptions& ro,
// We check the cache again under loading mutex
*handle = cache_->Lookup(key);
if (*handle != nullptr) {
return s;
return Status::OK();
}
std::unique_ptr<TableReader> table_reader;
s = GetTableReader(ro, file_options, internal_comparator, fd,
false /* sequential mode */, record_read_stats,
file_read_hist, &table_reader, prefix_extractor,
Status s = GetTableReader(
ro, file_options, internal_comparator, fd, false /* sequential mode */,
record_read_stats, file_read_hist, &table_reader, prefix_extractor,
skip_filters, level, prefetch_index_and_filter_in_cache,
max_file_size_for_l0_meta_pin);
if (!s.ok()) {
@ -199,8 +198,9 @@ Status TableCache::FindTable(const ReadOptions& ro,
table_reader.release();
}
}
}
return s;
}
return Status::OK();
}
InternalIterator* TableCache::NewIterator(
@ -413,7 +413,8 @@ Status TableCache::Get(const ReadOptions& options,
Status s;
TableReader* t = fd.table_reader;
Cache::Handle* handle = nullptr;
if (!done && s.ok()) {
if (!done) {
assert(s.ok());
if (t == nullptr) {
s = FindTable(options, file_options_, internal_comparator, fd, &handle,
prefix_extractor,
@ -455,8 +456,11 @@ Status TableCache::Get(const ReadOptions& options,
size_t charge =
row_cache_key.Size() + row_cache_entry->size() + sizeof(std::string);
void* row_ptr = new std::string(std::move(*row_cache_entry));
ioptions_.row_cache->Insert(row_cache_key.GetUserKey(), row_ptr, charge,
&DeleteEntry<std::string>);
// If row cache is full, it's OK to continue.
ioptions_.row_cache
->Insert(row_cache_key.GetUserKey(), row_ptr, charge,
&DeleteEntry<std::string>)
.PermitUncheckedError();
}
#endif // ROCKSDB_LITE
@ -575,8 +579,11 @@ Status TableCache::MultiGet(const ReadOptions& options,
size_t charge =
row_cache_key.Size() + row_cache_entry.size() + sizeof(std::string);
void* row_ptr = new std::string(std::move(row_cache_entry));
ioptions_.row_cache->Insert(row_cache_key.GetUserKey(), row_ptr, charge,
&DeleteEntry<std::string>);
// If row cache is full, it's OK.
ioptions_.row_cache
->Insert(row_cache_key.GetUserKey(), row_ptr, charge,
&DeleteEntry<std::string>)
.PermitUncheckedError();
}
}
}
@ -593,17 +600,16 @@ Status TableCache::GetTableProperties(
const InternalKeyComparator& internal_comparator, const FileDescriptor& fd,
std::shared_ptr<const TableProperties>* properties,
const SliceTransform* prefix_extractor, bool no_io) {
Status s;
auto table_reader = fd.table_reader;
// table already been pre-loaded?
if (table_reader) {
*properties = table_reader->GetTableProperties();
return s;
return Status::OK();
}
Cache::Handle* table_handle = nullptr;
s = FindTable(ReadOptions(), file_options, internal_comparator, fd,
Status s = FindTable(ReadOptions(), file_options, internal_comparator, fd,
&table_handle, prefix_extractor, no_io);
if (!s.ok()) {
return s;

@ -962,6 +962,11 @@ class VersionBuilder::Rep {
for (auto& t : threads) {
t.join();
}
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
for (const auto& s : statuses) {
s.PermitUncheckedError();
}
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
for (const auto& s : statuses) {
if (!s.ok()) {
return s;

@ -1836,6 +1836,8 @@ void Version::Get(const ReadOptions& read_options, const LookupKey& k,
GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
get_perf_context()->per_level_perf_context_enabled;
StopWatchNano timer(env_, timer_enabled /* auto_start */);
// Something feels not right here. Should investigate more.
status->PermitUncheckedError();
*status = table_cache_->Get(
read_options, *internal_comparator(), *f->file_metadata, ikey,
&get_context, mutable_cf_options_.prefix_extractor.get(),

@ -496,7 +496,12 @@ struct BlockBasedTableBuilder::Rep {
Rep(const Rep&) = delete;
Rep& operator=(const Rep&) = delete;
~Rep() {}
~Rep() {
// They are supposed to be passed back to users through Finish()
// if the file finishes.
status.PermitUncheckedError();
io_status.PermitUncheckedError();
}
private:
Status status;
@ -1090,6 +1095,7 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
static_cast<char*>(trailer));
io_s = r->file->Append(Slice(trailer, kBlockTrailerSize));
if (io_s.ok()) {
assert(s.ok());
s = InsertBlockInCache(block_contents, type, handle);
if (!s.ok()) {
r->SetStatus(s);
@ -1264,13 +1270,16 @@ Status BlockBasedTableBuilder::InsertBlockInCache(const Slice& block_contents,
static_cast<size_t>(end - r->compressed_cache_key_prefix));
// Insert into compressed block cache.
block_cache_compressed->Insert(
key, block_contents_to_cache,
// How should we deal with compressed cache full?
block_cache_compressed
->Insert(key, block_contents_to_cache,
block_contents_to_cache->ApproximateMemoryUsage(),
&DeleteCachedBlockContents);
&DeleteCachedBlockContents)
.PermitUncheckedError();
// Invalidate OS cache.
r->file->InvalidateCache(static_cast<size_t>(r->get_offset()), size);
r->file->InvalidateCache(static_cast<size_t>(r->get_offset()), size)
.PermitUncheckedError();
}
return Status::OK();
}
@ -1677,7 +1686,9 @@ Status BlockBasedTableBuilder::Finish() {
WriteFooter(metaindex_block_handle, index_block_handle);
}
r->state = Rep::State::kClosed;
return r->GetStatus();
Status ret_status = r->GetStatus();
assert(!ret_status.ok() || io_status().ok());
return ret_status;
}
void BlockBasedTableBuilder::Abandon() {

@ -1692,7 +1692,9 @@ void BlockBasedTable::RetrieveMultipleBlocks(
req.status = s;
}
} else {
file->MultiRead(opts, &read_reqs[0], read_reqs.size(), &direct_io_buf);
// How to handle this status code?
file->MultiRead(opts, &read_reqs[0], read_reqs.size(), &direct_io_buf)
.PermitUncheckedError();
}
}
@ -1800,6 +1802,7 @@ void BlockBasedTable::RetrieveMultipleBlocks(
// block cache is configured. In that case, fall
// through and set up the block explicitly
if (block_entry->GetValue() != nullptr) {
s.PermitUncheckedError();
continue;
}
}
@ -2440,6 +2443,7 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
CachableEntry<UncompressionDict> uncompression_dict;
Status uncompression_dict_status;
uncompression_dict_status.PermitUncheckedError();
bool uncompression_dict_inited = false;
size_t total_len = 0;
ReadOptions ro = read_options;
@ -2707,6 +2711,12 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
}
*(miter->s) = s;
}
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
// Not sure why we need to do it. Should investigate more.
for (auto& st : statuses) {
st.PermitUncheckedError();
}
#endif // ROCKSDB_ASSERT_STATUS_CHECKED
}
}

@ -307,12 +307,13 @@ class HashIndexBuilder : public IndexBuilder {
if (pending_block_num_ != 0) {
FlushPendingPrefix();
}
primary_index_builder_.Finish(index_blocks, last_partition_block_handle);
Status s = primary_index_builder_.Finish(index_blocks,
last_partition_block_handle);
index_blocks->meta_blocks.insert(
{kHashIndexPrefixesBlock.c_str(), prefix_block_});
index_blocks->meta_blocks.insert(
{kHashIndexPrefixesMetadataBlock.c_str(), prefix_meta_block_});
return Status::OK();
return s;
}
virtual size_t IndexSize() const override {

@ -116,6 +116,10 @@ PlainTableBuilder::PlainTableBuilder(
}
PlainTableBuilder::~PlainTableBuilder() {
// They are supposed to have been passed to users through Finish()
// if the file succeeds.
status_.PermitUncheckedError();
io_status_.PermitUncheckedError();
}
void PlainTableBuilder::Add(const Slice& key, const Slice& value) {

@ -487,7 +487,6 @@ Status PlainTableKeyDecoder::NextKeyNoValue(uint32_t start_offset,
if (seekable != nullptr) {
*seekable = true;
}
Status s;
if (encoding_type_ == kPlain) {
return NextPlainEncodingKey(start_offset, parsed_key, internal_key,
bytes_read, seekable);

@ -68,6 +68,12 @@ class PlainTableFileReader {
public:
explicit PlainTableFileReader(const PlainTableReaderFileInfo* _file_info)
: file_info_(_file_info), num_buf_(0) {}
~PlainTableFileReader() {
// Should fix.
status_.PermitUncheckedError();
}
// In mmaped mode, the results point to mmaped area of the file, which
// means it is always valid before closing the file.
// In non-mmap mode, the results point to an internal buffer. If the caller
@ -146,6 +152,7 @@ class PlainTableKeyDecoder {
fixed_user_key_len_(user_key_len),
prefix_extractor_(prefix_extractor),
in_prefix_(false) {}
// Find the next key.
// start: char array where the key starts.
// limit: boundary of the char array

@ -113,6 +113,8 @@ PlainTableReader::PlainTableReader(
table_properties_(nullptr) {}
PlainTableReader::~PlainTableReader() {
// Should fix?
status_.PermitUncheckedError();
}
Status PlainTableReader::Open(

@ -212,6 +212,7 @@ class FaultInjectionTestEnv : public EnvWrapper {
}
void SetFilesystemActiveNoLock(bool active,
Status error = Status::Corruption("Not active")) {
error.PermitUncheckedError();
filesystem_active_ = active;
if (!active) {
error_ = error;
@ -219,6 +220,7 @@ class FaultInjectionTestEnv : public EnvWrapper {
}
void SetFilesystemActive(bool active,
Status error = Status::Corruption("Not active")) {
error.PermitUncheckedError();
MutexLock l(&mutex_);
SetFilesystemActiveNoLock(active, error);
}

Loading…
Cancel
Save