Fix+clean up handling of mock sleeps (#7101)

Summary:
We have a number of tests hanging on MacOS and windows due to
mishandling of code for mock sleeps. In addition, the code was in
terrible shape because the same variable (addon_time_) would sometimes
refer to microseconds and sometimes to seconds. One test even assumed it
was nanoseconds but was written to pass anyway.

This has been cleaned up so that DB tests generally use a SpecialEnv
function to mock sleep, for either some number of microseconds or seconds
depending on the function called. But to call one of these, the test must first
call SetMockSleep (precondition enforced with assertion), which also turns
sleeps in RocksDB into mock sleeps. To also removes accounting for actual
clock time, call SetTimeElapseOnlySleepOnReopen, which implies
SetMockSleep (on DB re-open). This latter setting only works by applying
on DB re-open, otherwise havoc can ensue if Env goes back in time with
DB open.

More specifics:

Removed some unused test classes, and updated comments on the general
problem.

Fixed DBSSTTest.GetTotalSstFilesSize using a sync point callback instead
of mock time. For this we have the only modification to production code,
inserting a sync point callback in flush_job.cc, which is not a change to
production behavior.

Removed unnecessary resetting of mock times to 0 in many tests. RocksDB
deals in relative time. Any behaviors relying on absolute date/time are likely
a bug. (The above test DBSSTTest.GetTotalSstFilesSize was the only one
clearly injecting a specific absolute time for actual testing convenience.) Just
in case I misunderstood some test, I put this note in each replacement:
// NOTE: Presumed unnecessary and removed: resetting mock time in env

Strengthened some tests like MergeTestTime, MergeCompactionTimeTest, and
FilterCompactionTimeTest in db_test.cc

stats_history_test and blob_db_test are each their own beast, rather deeply
dependent on MockTimeEnv. Each gets its own variant of a work-around for
TimedWait in a mock time environment. (Reduces redundancy and
inconsistency in stats_history_test.)

Intended follow-up:

Remove TimedWait from the public API of InstrumentedCondVar, and only
make that accessible through Env by passing in an InstrumentedCondVar and
a deadline. Then the Env implementations mocking time can fix this problem
without using sync points. (Test infrastructure using sync points interferes
with individual tests' control over sync points.)

With that change, we can simplify/consolidate the scattered work-arounds.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/7101

Test Plan: make check on Linux and MacOS

Reviewed By: zhichao-cao

Differential Revision: D23032815

Pulled By: pdillinger

fbshipit-source-id: 7f33967ada8b83011fb54e8279365c008bd6610b
main
Peter Dillinger 4 years ago committed by Facebook GitHub Bot
parent a99fb67233
commit 6ac1d25fd0
  1. 15
      db/db_basic_test.cc
  2. 30
      db/db_compaction_filter_test.cc
  3. 65
      db/db_compaction_test.cc
  4. 2
      db/db_io_failure_test.cc
  5. 20
      db/db_options_test.cc
  6. 36
      db/db_properties_test.cc
  7. 24
      db/db_sst_test.cc
  8. 80
      db/db_test.cc
  9. 28
      db/db_test2.cc
  10. 43
      db/db_test_util.cc
  11. 61
      db/db_test_util.h
  12. 38
      db/db_universal_compaction_test.cc
  13. 8
      db/flush_job.cc
  14. 6
      db/listener_test.cc
  15. 137
      monitoring/stats_history_test.cc
  16. 3
      test_util/mock_time_env.h
  17. 4
      util/rate_limiter_test.cc
  18. 1
      util/timer.h
  19. 13
      util/timer_test.cc
  20. 12
      utilities/blob_db/blob_db_test.cc

@ -592,6 +592,7 @@ TEST_F(DBBasicTest, IdentityAcrossRestarts2) {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
TEST_F(DBBasicTest, Snapshot) { TEST_F(DBBasicTest, Snapshot) {
env_->SetMockSleep();
anon::OptionsOverride options_override; anon::OptionsOverride options_override;
options_override.skip_policy = kSkipNoSnapshot; options_override.skip_policy = kSkipNoSnapshot;
do { do {
@ -607,7 +608,7 @@ TEST_F(DBBasicTest, Snapshot) {
Put(0, "foo", "0v2"); Put(0, "foo", "0v2");
Put(1, "foo", "1v2"); Put(1, "foo", "1v2");
env_->addon_time_.fetch_add(1); env_->MockSleepForSeconds(1);
const Snapshot* s2 = db_->GetSnapshot(); const Snapshot* s2 = db_->GetSnapshot();
ASSERT_EQ(2U, GetNumSnapshots()); ASSERT_EQ(2U, GetNumSnapshots());
@ -3026,13 +3027,13 @@ TEST_F(DBBasicTestMultiGetDeadline, MultiGetDeadlineExceeded) {
std::shared_ptr<DeadlineFS> fs = std::make_shared<DeadlineFS>(env_, false); std::shared_ptr<DeadlineFS> fs = std::make_shared<DeadlineFS>(env_, false);
std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs)); std::unique_ptr<Env> env(new CompositeEnvWrapper(env_, fs));
Options options = CurrentOptions(); Options options = CurrentOptions();
env_->SetTimeElapseOnlySleep(&options);
std::shared_ptr<Cache> cache = NewLRUCache(1048576); std::shared_ptr<Cache> cache = NewLRUCache(1048576);
BlockBasedTableOptions table_options; BlockBasedTableOptions table_options;
table_options.block_cache = cache; table_options.block_cache = cache;
options.table_factory.reset(new BlockBasedTableFactory(table_options)); options.table_factory.reset(new BlockBasedTableFactory(table_options));
options.env = env.get(); options.env = env.get();
SetTimeElapseOnlySleepOnReopen(&options);
ReopenWithColumnFamilies(GetCFNames(), options); ReopenWithColumnFamilies(GetCFNames(), options);
// Test the non-batched version of MultiGet with multiple column // Test the non-batched version of MultiGet with multiple column
@ -3194,9 +3195,6 @@ TEST_P(DBBasicTestDeadline, PointLookupDeadline) {
bool set_deadline = std::get<0>(GetParam()); bool set_deadline = std::get<0>(GetParam());
bool set_timeout = std::get<1>(GetParam()); bool set_timeout = std::get<1>(GetParam());
// Since we call SetTimeElapseOnlySleep, Close() later on may not work
// properly for the DB that's opened by the DBTestBase constructor.
Close();
for (int option_config = kDefault; option_config < kEnd; ++option_config) { for (int option_config = kDefault; option_config < kEnd; ++option_config) {
if (ShouldSkipOptions(option_config, kSkipPlainTable | kSkipMmapReads)) { if (ShouldSkipOptions(option_config, kSkipPlainTable | kSkipMmapReads)) {
continue; continue;
@ -3209,7 +3207,6 @@ TEST_P(DBBasicTestDeadline, PointLookupDeadline) {
options.env = env.get(); options.env = env.get();
options.disable_auto_compactions = true; options.disable_auto_compactions = true;
Cache* block_cache = nullptr; Cache* block_cache = nullptr;
env_->SetTimeElapseOnlySleep(&options);
// Fileter block reads currently don't cause the request to get // Fileter block reads currently don't cause the request to get
// aborted on a read timeout, so its possible those block reads // aborted on a read timeout, so its possible those block reads
// may get issued even if the deadline is past // may get issued even if the deadline is past
@ -3233,6 +3230,7 @@ TEST_P(DBBasicTestDeadline, PointLookupDeadline) {
}); });
SyncPoint::GetInstance()->EnableProcessing(); SyncPoint::GetInstance()->EnableProcessing();
SetTimeElapseOnlySleepOnReopen(&options);
Reopen(options); Reopen(options);
if (options.table_factory && if (options.table_factory &&
@ -3293,9 +3291,6 @@ TEST_P(DBBasicTestDeadline, IteratorDeadline) {
bool set_deadline = std::get<0>(GetParam()); bool set_deadline = std::get<0>(GetParam());
bool set_timeout = std::get<1>(GetParam()); bool set_timeout = std::get<1>(GetParam());
// Since we call SetTimeElapseOnlySleep, Close() later on may not work
// properly for the DB that's opened by the DBTestBase constructor.
Close();
for (int option_config = kDefault; option_config < kEnd; ++option_config) { for (int option_config = kDefault; option_config < kEnd; ++option_config) {
if (ShouldSkipOptions(option_config, kSkipPlainTable | kSkipMmapReads)) { if (ShouldSkipOptions(option_config, kSkipPlainTable | kSkipMmapReads)) {
continue; continue;
@ -3307,7 +3302,6 @@ TEST_P(DBBasicTestDeadline, IteratorDeadline) {
options.env = env.get(); options.env = env.get();
options.disable_auto_compactions = true; options.disable_auto_compactions = true;
Cache* block_cache = nullptr; Cache* block_cache = nullptr;
env_->SetTimeElapseOnlySleep(&options);
// DB open will create table readers unless we reduce the table cache // DB open will create table readers unless we reduce the table cache
// capacity. // capacity.
// SanitizeOptions will set max_open_files to minimum of 20. Table cache // SanitizeOptions will set max_open_files to minimum of 20. Table cache
@ -3322,6 +3316,7 @@ TEST_P(DBBasicTestDeadline, IteratorDeadline) {
}); });
SyncPoint::GetInstance()->EnableProcessing(); SyncPoint::GetInstance()->EnableProcessing();
SetTimeElapseOnlySleepOnReopen(&options);
Reopen(options); Reopen(options);
if (options.table_factory && if (options.table_factory &&

@ -126,22 +126,6 @@ class SkipEvenFilter : public CompactionFilter {
const char* Name() const override { return "DeleteFilter"; } const char* Name() const override { return "DeleteFilter"; }
}; };
class DelayFilter : public CompactionFilter {
public:
explicit DelayFilter(DBTestBase* d) : db_test(d) {}
bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
std::string* /*new_value*/,
bool* /*value_changed*/) const override {
db_test->env_->addon_time_.fetch_add(1000);
return true;
}
const char* Name() const override { return "DelayFilter"; }
private:
DBTestBase* db_test;
};
class ConditionalFilter : public CompactionFilter { class ConditionalFilter : public CompactionFilter {
public: public:
explicit ConditionalFilter(const std::string* filtered_value) explicit ConditionalFilter(const std::string* filtered_value)
@ -248,20 +232,6 @@ class SkipEvenFilterFactory : public CompactionFilterFactory {
const char* Name() const override { return "SkipEvenFilterFactory"; } const char* Name() const override { return "SkipEvenFilterFactory"; }
}; };
class DelayFilterFactory : public CompactionFilterFactory {
public:
explicit DelayFilterFactory(DBTestBase* d) : db_test(d) {}
std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& /*context*/) override {
return std::unique_ptr<CompactionFilter>(new DelayFilter(db_test));
}
const char* Name() const override { return "DelayFilterFactory"; }
private:
DBTestBase* db_test;
};
class ConditionalFilterFactory : public CompactionFilterFactory { class ConditionalFilterFactory : public CompactionFilterFactory {
public: public:
explicit ConditionalFilterFactory(const Slice& filtered_value) explicit ConditionalFilterFactory(const Slice& filtered_value)

@ -3597,10 +3597,11 @@ TEST_F(DBCompactionTest, LevelCompactExpiredTtlFiles) {
options.compression = kNoCompression; options.compression = kNoCompression;
options.ttl = 24 * 60 * 60; // 24 hours options.ttl = 24 * 60 * 60; // 24 hours
options.max_open_files = -1; options.max_open_files = -1;
env_->time_elapse_only_sleep_ = false; env_->SetMockSleep();
options.env = env_; options.env = env_;
env_->addon_time_.store(0); // NOTE: Presumed unnecessary and removed: resetting mock time in env
DestroyAndReopen(options); DestroyAndReopen(options);
Random rnd(301); Random rnd(301);
@ -3627,7 +3628,7 @@ TEST_F(DBCompactionTest, LevelCompactExpiredTtlFiles) {
MoveFilesToLevel(1); MoveFilesToLevel(1);
ASSERT_EQ("0,2,0,2", FilesPerLevel()); ASSERT_EQ("0,2,0,2", FilesPerLevel());
env_->addon_time_.fetch_add(36 * 60 * 60); // 36 hours env_->MockSleepForSeconds(36 * 60 * 60); // 36 hours
ASSERT_EQ("0,2,0,2", FilesPerLevel()); ASSERT_EQ("0,2,0,2", FilesPerLevel());
// Just do a simple write + flush so that the Ttl expired files get // Just do a simple write + flush so that the Ttl expired files get
@ -3647,7 +3648,8 @@ TEST_F(DBCompactionTest, LevelCompactExpiredTtlFiles) {
// Test dynamically changing ttl. // Test dynamically changing ttl.
env_->addon_time_.store(0); // NOTE: Presumed unnecessary and removed: resetting mock time in env
DestroyAndReopen(options); DestroyAndReopen(options);
for (int i = 0; i < kNumLevelFiles; ++i) { for (int i = 0; i < kNumLevelFiles; ++i) {
@ -3675,7 +3677,7 @@ TEST_F(DBCompactionTest, LevelCompactExpiredTtlFiles) {
// Move time forward by 12 hours, and make sure that compaction still doesn't // Move time forward by 12 hours, and make sure that compaction still doesn't
// trigger as ttl is set to 24 hours. // trigger as ttl is set to 24 hours.
env_->addon_time_.fetch_add(12 * 60 * 60); env_->MockSleepForSeconds(12 * 60 * 60);
ASSERT_OK(Put("a", "1")); ASSERT_OK(Put("a", "1"));
Flush(); Flush();
dbfull()->TEST_WaitForCompact(); dbfull()->TEST_WaitForCompact();
@ -3698,6 +3700,7 @@ TEST_F(DBCompactionTest, LevelCompactExpiredTtlFiles) {
} }
TEST_F(DBCompactionTest, LevelTtlCascadingCompactions) { TEST_F(DBCompactionTest, LevelTtlCascadingCompactions) {
env_->SetMockSleep();
const int kValueSize = 100; const int kValueSize = 100;
for (bool if_restart : {false, true}) { for (bool if_restart : {false, true}) {
@ -3728,10 +3731,10 @@ TEST_F(DBCompactionTest, LevelTtlCascadingCompactions) {
} }
}); });
env_->time_elapse_only_sleep_ = false;
options.env = env_; options.env = env_;
env_->addon_time_.store(0); // NOTE: Presumed unnecessary and removed: resetting mock time in env
DestroyAndReopen(options); DestroyAndReopen(options);
int ttl_compactions = 0; int ttl_compactions = 0;
@ -3759,7 +3762,7 @@ TEST_F(DBCompactionTest, LevelTtlCascadingCompactions) {
&level_to_files); &level_to_files);
uint64_t oldest_time = level_to_files[0][0].oldest_ancester_time; uint64_t oldest_time = level_to_files[0][0].oldest_ancester_time;
// Add 1 hour and do another flush. // Add 1 hour and do another flush.
env_->addon_time_.fetch_add(1 * 60 * 60); env_->MockSleepForSeconds(1 * 60 * 60);
for (int i = 101; i <= 200; ++i) { for (int i = 101; i <= 200; ++i) {
ASSERT_OK(Put(Key(i), rnd.RandomString(kValueSize))); ASSERT_OK(Put(Key(i), rnd.RandomString(kValueSize)));
} }
@ -3767,13 +3770,13 @@ TEST_F(DBCompactionTest, LevelTtlCascadingCompactions) {
MoveFilesToLevel(6); MoveFilesToLevel(6);
ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel()); ASSERT_EQ("0,0,0,0,0,0,2", FilesPerLevel());
env_->addon_time_.fetch_add(1 * 60 * 60); env_->MockSleepForSeconds(1 * 60 * 60);
// Add two L4 files with key ranges: [1 .. 50], [51 .. 150]. // Add two L4 files with key ranges: [1 .. 50], [51 .. 150].
for (int i = 1; i <= 50; ++i) { for (int i = 1; i <= 50; ++i) {
ASSERT_OK(Put(Key(i), rnd.RandomString(kValueSize))); ASSERT_OK(Put(Key(i), rnd.RandomString(kValueSize)));
} }
Flush(); Flush();
env_->addon_time_.fetch_add(1 * 60 * 60); env_->MockSleepForSeconds(1 * 60 * 60);
for (int i = 51; i <= 150; ++i) { for (int i = 51; i <= 150; ++i) {
ASSERT_OK(Put(Key(i), rnd.RandomString(kValueSize))); ASSERT_OK(Put(Key(i), rnd.RandomString(kValueSize)));
} }
@ -3781,7 +3784,7 @@ TEST_F(DBCompactionTest, LevelTtlCascadingCompactions) {
MoveFilesToLevel(4); MoveFilesToLevel(4);
ASSERT_EQ("0,0,0,0,2,0,2", FilesPerLevel()); ASSERT_EQ("0,0,0,0,2,0,2", FilesPerLevel());
env_->addon_time_.fetch_add(1 * 60 * 60); env_->MockSleepForSeconds(1 * 60 * 60);
// Add one L1 file with key range: [26, 75]. // Add one L1 file with key range: [26, 75].
for (int i = 26; i <= 75; ++i) { for (int i = 26; i <= 75; ++i) {
ASSERT_OK(Put(Key(i), rnd.RandomString(kValueSize))); ASSERT_OK(Put(Key(i), rnd.RandomString(kValueSize)));
@ -3811,7 +3814,7 @@ TEST_F(DBCompactionTest, LevelTtlCascadingCompactions) {
// 4. A TTL compaction happens between L5 and L6 files. Ouptut in L6. // 4. A TTL compaction happens between L5 and L6 files. Ouptut in L6.
// Add 25 hours and do a write // Add 25 hours and do a write
env_->addon_time_.fetch_add(25 * 60 * 60); env_->MockSleepForSeconds(25 * 60 * 60);
ASSERT_OK(Put(Key(1), "1")); ASSERT_OK(Put(Key(1), "1"));
if (if_restart) { if (if_restart) {
@ -3827,7 +3830,7 @@ TEST_F(DBCompactionTest, LevelTtlCascadingCompactions) {
&level_to_files); &level_to_files);
ASSERT_EQ(oldest_time, level_to_files[6][0].oldest_ancester_time); ASSERT_EQ(oldest_time, level_to_files[6][0].oldest_ancester_time);
env_->addon_time_.fetch_add(25 * 60 * 60); env_->MockSleepForSeconds(25 * 60 * 60);
ASSERT_OK(Put(Key(2), "1")); ASSERT_OK(Put(Key(2), "1"));
if (if_restart) { if (if_restart) {
Reopen(options); Reopen(options);
@ -3844,6 +3847,7 @@ TEST_F(DBCompactionTest, LevelTtlCascadingCompactions) {
} }
TEST_F(DBCompactionTest, LevelPeriodicCompaction) { TEST_F(DBCompactionTest, LevelPeriodicCompaction) {
env_->SetMockSleep();
const int kNumKeysPerFile = 32; const int kNumKeysPerFile = 32;
const int kNumLevelFiles = 2; const int kNumLevelFiles = 2;
const int kValueSize = 100; const int kValueSize = 100;
@ -3875,10 +3879,10 @@ TEST_F(DBCompactionTest, LevelPeriodicCompaction) {
} }
}); });
env_->time_elapse_only_sleep_ = false;
options.env = env_; options.env = env_;
env_->addon_time_.store(0); // NOTE: Presumed unnecessary and removed: resetting mock time in env
DestroyAndReopen(options); DestroyAndReopen(options);
int periodic_compactions = 0; int periodic_compactions = 0;
@ -3906,7 +3910,7 @@ TEST_F(DBCompactionTest, LevelPeriodicCompaction) {
ASSERT_EQ(0, periodic_compactions); ASSERT_EQ(0, periodic_compactions);
// Add 50 hours and do a write // Add 50 hours and do a write
env_->addon_time_.fetch_add(50 * 60 * 60); env_->MockSleepForSeconds(50 * 60 * 60);
ASSERT_OK(Put("a", "1")); ASSERT_OK(Put("a", "1"));
Flush(); Flush();
dbfull()->TEST_WaitForCompact(); dbfull()->TEST_WaitForCompact();
@ -3919,7 +3923,7 @@ TEST_F(DBCompactionTest, LevelPeriodicCompaction) {
ASSERT_EQ("0,3", FilesPerLevel()); ASSERT_EQ("0,3", FilesPerLevel());
// Add another 50 hours and do another write // Add another 50 hours and do another write
env_->addon_time_.fetch_add(50 * 60 * 60); env_->MockSleepForSeconds(50 * 60 * 60);
ASSERT_OK(Put("b", "2")); ASSERT_OK(Put("b", "2"));
if (if_restart) { if (if_restart) {
Reopen(options); Reopen(options);
@ -3933,7 +3937,7 @@ TEST_F(DBCompactionTest, LevelPeriodicCompaction) {
ASSERT_EQ(5, periodic_compactions); ASSERT_EQ(5, periodic_compactions);
// Add another 50 hours and do another write // Add another 50 hours and do another write
env_->addon_time_.fetch_add(50 * 60 * 60); env_->MockSleepForSeconds(50 * 60 * 60);
ASSERT_OK(Put("c", "3")); ASSERT_OK(Put("c", "3"));
Flush(); Flush();
dbfull()->TEST_WaitForCompact(); dbfull()->TEST_WaitForCompact();
@ -3957,10 +3961,11 @@ TEST_F(DBCompactionTest, LevelPeriodicCompactionWithOldDB) {
const int kValueSize = 100; const int kValueSize = 100;
Options options = CurrentOptions(); Options options = CurrentOptions();
env_->time_elapse_only_sleep_ = false; env_->SetMockSleep();
options.env = env_; options.env = env_;
env_->addon_time_.store(0); // NOTE: Presumed unnecessary and removed: resetting mock time in env
DestroyAndReopen(options); DestroyAndReopen(options);
int periodic_compactions = 0; int periodic_compactions = 0;
@ -4008,7 +4013,7 @@ TEST_F(DBCompactionTest, LevelPeriodicCompactionWithOldDB) {
set_file_creation_time_to_zero = false; set_file_creation_time_to_zero = false;
// Forward the clock by 2 days. // Forward the clock by 2 days.
env_->addon_time_.fetch_add(2 * 24 * 60 * 60); env_->MockSleepForSeconds(2 * 24 * 60 * 60);
options.periodic_compaction_seconds = 1 * 24 * 60 * 60; // 1 day options.periodic_compaction_seconds = 1 * 24 * 60 * 60; // 1 day
Reopen(options); Reopen(options);
@ -4029,10 +4034,11 @@ TEST_F(DBCompactionTest, LevelPeriodicAndTtlCompaction) {
options.ttl = 10 * 60 * 60; // 10 hours options.ttl = 10 * 60 * 60; // 10 hours
options.periodic_compaction_seconds = 48 * 60 * 60; // 2 days options.periodic_compaction_seconds = 48 * 60 * 60; // 2 days
options.max_open_files = -1; // needed for both periodic and ttl compactions options.max_open_files = -1; // needed for both periodic and ttl compactions
env_->time_elapse_only_sleep_ = false; env_->SetMockSleep();
options.env = env_; options.env = env_;
env_->addon_time_.store(0); // NOTE: Presumed unnecessary and removed: resetting mock time in env
DestroyAndReopen(options); DestroyAndReopen(options);
int periodic_compactions = 0; int periodic_compactions = 0;
@ -4066,7 +4072,7 @@ TEST_F(DBCompactionTest, LevelPeriodicAndTtlCompaction) {
ASSERT_EQ(0, ttl_compactions); ASSERT_EQ(0, ttl_compactions);
// Add some time greater than periodic_compaction_time. // Add some time greater than periodic_compaction_time.
env_->addon_time_.fetch_add(50 * 60 * 60); env_->MockSleepForSeconds(50 * 60 * 60);
ASSERT_OK(Put("a", "1")); ASSERT_OK(Put("a", "1"));
Flush(); Flush();
dbfull()->TEST_WaitForCompact(); dbfull()->TEST_WaitForCompact();
@ -4076,7 +4082,7 @@ TEST_F(DBCompactionTest, LevelPeriodicAndTtlCompaction) {
ASSERT_EQ(0, ttl_compactions); ASSERT_EQ(0, ttl_compactions);
// Add a little more time than ttl // Add a little more time than ttl
env_->addon_time_.fetch_add(11 * 60 * 60); env_->MockSleepForSeconds(11 * 60 * 60);
ASSERT_OK(Put("b", "1")); ASSERT_OK(Put("b", "1"));
Flush(); Flush();
dbfull()->TEST_WaitForCompact(); dbfull()->TEST_WaitForCompact();
@ -4088,7 +4094,7 @@ TEST_F(DBCompactionTest, LevelPeriodicAndTtlCompaction) {
ASSERT_EQ(3, ttl_compactions); ASSERT_EQ(3, ttl_compactions);
// Add some time greater than periodic_compaction_time. // Add some time greater than periodic_compaction_time.
env_->addon_time_.fetch_add(50 * 60 * 60); env_->MockSleepForSeconds(50 * 60 * 60);
ASSERT_OK(Put("c", "1")); ASSERT_OK(Put("c", "1"));
Flush(); Flush();
dbfull()->TEST_WaitForCompact(); dbfull()->TEST_WaitForCompact();
@ -4121,9 +4127,10 @@ TEST_F(DBCompactionTest, LevelPeriodicCompactionWithCompactionFilters) {
Options options = CurrentOptions(); Options options = CurrentOptions();
TestCompactionFilter test_compaction_filter; TestCompactionFilter test_compaction_filter;
env_->time_elapse_only_sleep_ = false; env_->SetMockSleep();
options.env = env_; options.env = env_;
env_->addon_time_.store(0);
// NOTE: Presumed unnecessary and removed: resetting mock time in env
enum CompactionFilterType { enum CompactionFilterType {
kUseCompactionFilter, kUseCompactionFilter,
@ -4174,7 +4181,7 @@ TEST_F(DBCompactionTest, LevelPeriodicCompactionWithCompactionFilters) {
ASSERT_EQ(0, periodic_compactions); ASSERT_EQ(0, periodic_compactions);
// Add 31 days and do a write // Add 31 days and do a write
env_->addon_time_.fetch_add(31 * 24 * 60 * 60); env_->MockSleepForSeconds(31 * 24 * 60 * 60);
ASSERT_OK(Put("a", "1")); ASSERT_OK(Put("a", "1"));
Flush(); Flush();
dbfull()->TEST_WaitForCompact(); dbfull()->TEST_WaitForCompact();

@ -34,7 +34,7 @@ TEST_F(DBIOFailureTest, DropWrites) {
// Force out-of-space errors // Force out-of-space errors
env_->drop_writes_.store(true, std::memory_order_release); env_->drop_writes_.store(true, std::memory_order_release);
env_->sleep_counter_.Reset(); env_->sleep_counter_.Reset();
env_->no_slowdown_ = true; env_->SetMockSleep();
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
if (option_config_ != kUniversalCompactionMultiLevel && if (option_config_ != kUniversalCompactionMultiLevel &&
option_config_ != kUniversalSubcompactions) { option_config_ != kUniversalSubcompactions) {

@ -563,10 +563,9 @@ static void assert_candidate_files_empty(DBImpl* dbfull, const bool empty) {
} }
TEST_F(DBOptionsTest, DeleteObsoleteFilesPeriodChange) { TEST_F(DBOptionsTest, DeleteObsoleteFilesPeriodChange) {
SpecialEnv env(env_);
env.time_elapse_only_sleep_ = true;
Options options; Options options;
options.env = &env; options.env = env_;
SetTimeElapseOnlySleepOnReopen(&options);
options.create_if_missing = true; options.create_if_missing = true;
ASSERT_OK(TryReopen(options)); ASSERT_OK(TryReopen(options));
@ -585,10 +584,10 @@ TEST_F(DBOptionsTest, DeleteObsoleteFilesPeriodChange) {
assert_candidate_files_empty(dbfull(), true); assert_candidate_files_empty(dbfull(), true);
env.addon_time_.store(20); env_->MockSleepForMicroseconds(20);
assert_candidate_files_empty(dbfull(), true); assert_candidate_files_empty(dbfull(), true);
env.addon_time_.store(21); env_->MockSleepForMicroseconds(1);
assert_candidate_files_empty(dbfull(), false); assert_candidate_files_empty(dbfull(), false);
Close(); Close();
@ -702,11 +701,12 @@ TEST_F(DBOptionsTest, SetFIFOCompactionOptions) {
options.compression = kNoCompression; options.compression = kNoCompression;
options.create_if_missing = true; options.create_if_missing = true;
options.compaction_options_fifo.allow_compaction = false; options.compaction_options_fifo.allow_compaction = false;
env_->time_elapse_only_sleep_ = false; env_->SetMockSleep();
options.env = env_; options.env = env_;
// NOTE: Presumed unnecessary and removed: resetting mock time in env
// Test dynamically changing ttl. // Test dynamically changing ttl.
env_->addon_time_.store(0);
options.ttl = 1 * 60 * 60; // 1 hour options.ttl = 1 * 60 * 60; // 1 hour
ASSERT_OK(TryReopen(options)); ASSERT_OK(TryReopen(options));
@ -721,8 +721,7 @@ TEST_F(DBOptionsTest, SetFIFOCompactionOptions) {
ASSERT_OK(dbfull()->TEST_WaitForCompact()); ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(NumTableFilesAtLevel(0), 10); ASSERT_EQ(NumTableFilesAtLevel(0), 10);
// Add 61 seconds to the time. env_->MockSleepForSeconds(61);
env_->addon_time_.fetch_add(61);
// No files should be compacted as ttl is set to 1 hour. // No files should be compacted as ttl is set to 1 hour.
ASSERT_EQ(dbfull()->GetOptions().ttl, 3600); ASSERT_EQ(dbfull()->GetOptions().ttl, 3600);
@ -736,8 +735,9 @@ TEST_F(DBOptionsTest, SetFIFOCompactionOptions) {
ASSERT_OK(dbfull()->TEST_WaitForCompact()); ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(NumTableFilesAtLevel(0), 0); ASSERT_EQ(NumTableFilesAtLevel(0), 0);
// NOTE: Presumed unnecessary and removed: resetting mock time in env
// Test dynamically changing compaction_options_fifo.max_table_files_size // Test dynamically changing compaction_options_fifo.max_table_files_size
env_->addon_time_.store(0);
options.compaction_options_fifo.max_table_files_size = 500 << 10; // 00KB options.compaction_options_fifo.max_table_files_size = 500 << 10; // 00KB
options.ttl = 0; options.ttl = 0;
DestroyAndReopen(options); DestroyAndReopen(options);

@ -1415,13 +1415,11 @@ TEST_F(DBPropertiesTest, EstimateNumKeysUnderflow) {
} }
TEST_F(DBPropertiesTest, EstimateOldestKeyTime) { TEST_F(DBPropertiesTest, EstimateOldestKeyTime) {
std::unique_ptr<MockTimeEnv> mock_env(new MockTimeEnv(Env::Default()));
uint64_t oldest_key_time = 0; uint64_t oldest_key_time = 0;
Options options; Options options = CurrentOptions();
options.env = mock_env.get(); SetTimeElapseOnlySleepOnReopen(&options);
// "rocksdb.estimate-oldest-key-time" only available to fifo compaction. // "rocksdb.estimate-oldest-key-time" only available to fifo compaction.
mock_env->set_current_time(100);
for (auto compaction : {kCompactionStyleLevel, kCompactionStyleUniversal, for (auto compaction : {kCompactionStyleLevel, kCompactionStyleUniversal,
kCompactionStyleNone}) { kCompactionStyleNone}) {
options.compaction_style = compaction; options.compaction_style = compaction;
@ -1432,60 +1430,60 @@ TEST_F(DBPropertiesTest, EstimateOldestKeyTime) {
DB::Properties::kEstimateOldestKeyTime, &oldest_key_time)); DB::Properties::kEstimateOldestKeyTime, &oldest_key_time));
} }
int64_t mock_start_time;
ASSERT_OK(env_->GetCurrentTime(&mock_start_time));
options.compaction_style = kCompactionStyleFIFO; options.compaction_style = kCompactionStyleFIFO;
options.ttl = 300; options.ttl = 300;
options.compaction_options_fifo.allow_compaction = false; options.compaction_options_fifo.allow_compaction = false;
DestroyAndReopen(options); DestroyAndReopen(options);
mock_env->set_current_time(100); env_->MockSleepForSeconds(100);
ASSERT_OK(Put("k1", "v1")); ASSERT_OK(Put("k1", "v1"));
ASSERT_TRUE(dbfull()->GetIntProperty(DB::Properties::kEstimateOldestKeyTime, ASSERT_TRUE(dbfull()->GetIntProperty(DB::Properties::kEstimateOldestKeyTime,
&oldest_key_time)); &oldest_key_time));
ASSERT_EQ(100, oldest_key_time); ASSERT_EQ(100, oldest_key_time - mock_start_time);
ASSERT_OK(Flush()); ASSERT_OK(Flush());
ASSERT_EQ("1", FilesPerLevel()); ASSERT_EQ("1", FilesPerLevel());
ASSERT_TRUE(dbfull()->GetIntProperty(DB::Properties::kEstimateOldestKeyTime, ASSERT_TRUE(dbfull()->GetIntProperty(DB::Properties::kEstimateOldestKeyTime,
&oldest_key_time)); &oldest_key_time));
ASSERT_EQ(100, oldest_key_time); ASSERT_EQ(100, oldest_key_time - mock_start_time);
mock_env->set_current_time(200); env_->MockSleepForSeconds(100); // -> 200
ASSERT_OK(Put("k2", "v2")); ASSERT_OK(Put("k2", "v2"));
ASSERT_OK(Flush()); ASSERT_OK(Flush());
ASSERT_EQ("2", FilesPerLevel()); ASSERT_EQ("2", FilesPerLevel());
ASSERT_TRUE(dbfull()->GetIntProperty(DB::Properties::kEstimateOldestKeyTime, ASSERT_TRUE(dbfull()->GetIntProperty(DB::Properties::kEstimateOldestKeyTime,
&oldest_key_time)); &oldest_key_time));
ASSERT_EQ(100, oldest_key_time); ASSERT_EQ(100, oldest_key_time - mock_start_time);
mock_env->set_current_time(300); env_->MockSleepForSeconds(100); // -> 300
ASSERT_OK(Put("k3", "v3")); ASSERT_OK(Put("k3", "v3"));
ASSERT_OK(Flush()); ASSERT_OK(Flush());
ASSERT_EQ("3", FilesPerLevel()); ASSERT_EQ("3", FilesPerLevel());
ASSERT_TRUE(dbfull()->GetIntProperty(DB::Properties::kEstimateOldestKeyTime, ASSERT_TRUE(dbfull()->GetIntProperty(DB::Properties::kEstimateOldestKeyTime,
&oldest_key_time)); &oldest_key_time));
ASSERT_EQ(100, oldest_key_time); ASSERT_EQ(100, oldest_key_time - mock_start_time);
mock_env->set_current_time(450); env_->MockSleepForSeconds(150); // -> 450
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ("2", FilesPerLevel()); ASSERT_EQ("2", FilesPerLevel());
ASSERT_TRUE(dbfull()->GetIntProperty(DB::Properties::kEstimateOldestKeyTime, ASSERT_TRUE(dbfull()->GetIntProperty(DB::Properties::kEstimateOldestKeyTime,
&oldest_key_time)); &oldest_key_time));
ASSERT_EQ(200, oldest_key_time); ASSERT_EQ(200, oldest_key_time - mock_start_time);
mock_env->set_current_time(550); env_->MockSleepForSeconds(100); // -> 550
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ("1", FilesPerLevel()); ASSERT_EQ("1", FilesPerLevel());
ASSERT_TRUE(dbfull()->GetIntProperty(DB::Properties::kEstimateOldestKeyTime, ASSERT_TRUE(dbfull()->GetIntProperty(DB::Properties::kEstimateOldestKeyTime,
&oldest_key_time)); &oldest_key_time));
ASSERT_EQ(300, oldest_key_time); ASSERT_EQ(300, oldest_key_time - mock_start_time);
mock_env->set_current_time(650); env_->MockSleepForSeconds(100); // -> 650
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ("", FilesPerLevel()); ASSERT_EQ("", FilesPerLevel());
ASSERT_FALSE(dbfull()->GetIntProperty(DB::Properties::kEstimateOldestKeyTime, ASSERT_FALSE(dbfull()->GetIntProperty(DB::Properties::kEstimateOldestKeyTime,
&oldest_key_time)); &oldest_key_time));
// Close before mock_env destructs.
Close();
} }
TEST_F(DBPropertiesTest, SstFilesSize) { TEST_F(DBPropertiesTest, SstFilesSize) {

@ -359,12 +359,11 @@ TEST_F(DBSSTTest, RateLimitedDelete) {
uint64_t* abs_time_us = static_cast<uint64_t*>(arg); uint64_t* abs_time_us = static_cast<uint64_t*>(arg);
uint64_t cur_time = env_->NowMicros(); uint64_t cur_time = env_->NowMicros();
if (*abs_time_us > cur_time) { if (*abs_time_us > cur_time) {
env_->addon_time_.fetch_add(*abs_time_us - cur_time); env_->MockSleepForMicroseconds(*abs_time_us - cur_time);
} }
// Randomly sleep shortly // Plus an additional short, random amount
env_->addon_time_.fetch_add( env_->MockSleepForMicroseconds(Random::GetTLSInstance()->Uniform(10));
static_cast<uint64_t>(Random::GetTLSInstance()->Uniform(10)));
// Set wait until time to before (actual) current time to force not // Set wait until time to before (actual) current time to force not
// to sleep // to sleep
@ -374,7 +373,7 @@ TEST_F(DBSSTTest, RateLimitedDelete) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Options options = CurrentOptions(); Options options = CurrentOptions();
env_->SetTimeElapseOnlySleep(&options); SetTimeElapseOnlySleepOnReopen(&options);
options.disable_auto_compactions = true; options.disable_auto_compactions = true;
options.env = env_; options.env = env_;
options.statistics = CreateDBStatistics(); options.statistics = CreateDBStatistics();
@ -442,8 +441,6 @@ TEST_F(DBSSTTest, RateLimitedWALDelete) {
"DeleteScheduler::BackgroundEmptyTrash:Wait", "DeleteScheduler::BackgroundEmptyTrash:Wait",
[&](void* arg) { penalties.push_back(*(static_cast<uint64_t*>(arg))); }); [&](void* arg) { penalties.push_back(*(static_cast<uint64_t*>(arg))); });
env_->no_slowdown_ = true;
env_->time_elapse_only_sleep_ = true;
Options options = CurrentOptions(); Options options = CurrentOptions();
options.disable_auto_compactions = true; options.disable_auto_compactions = true;
options.compression = kNoCompression; options.compression = kNoCompression;
@ -457,6 +454,7 @@ TEST_F(DBSSTTest, RateLimitedWALDelete) {
options.sst_file_manager->SetDeleteRateBytesPerSecond(rate_bytes_per_sec); options.sst_file_manager->SetDeleteRateBytesPerSecond(rate_bytes_per_sec);
auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get()); auto sfm = static_cast<SstFileManagerImpl*>(options.sst_file_manager.get());
sfm->delete_scheduler()->SetMaxTrashDBRatio(3.1); sfm->delete_scheduler()->SetMaxTrashDBRatio(3.1);
SetTimeElapseOnlySleepOnReopen(&options);
ASSERT_OK(TryReopen(options)); ASSERT_OK(TryReopen(options));
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
@ -1032,13 +1030,16 @@ TEST_F(DBSSTTest, GetTotalSstFilesSize) {
// we encode table properties as varint64. Force time to be 0 to work around // we encode table properties as varint64. Force time to be 0 to work around
// it. Should remove the workaround after we propagate the property on // it. Should remove the workaround after we propagate the property on
// compaction. // compaction.
std::unique_ptr<MockTimeEnv> mock_env(new MockTimeEnv(Env::Default())); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
mock_env->set_current_time(0); "FlushJob::WriteLevel0Table:oldest_ancester_time", [&](void* arg) {
uint64_t* current_time = static_cast<uint64_t*>(arg);
*current_time = 0;
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Options options = CurrentOptions(); Options options = CurrentOptions();
options.disable_auto_compactions = true; options.disable_auto_compactions = true;
options.compression = kNoCompression; options.compression = kNoCompression;
options.env = mock_env.get();
DestroyAndReopen(options); DestroyAndReopen(options);
// Generate 5 files in L0 // Generate 5 files in L0
for (int i = 0; i < 5; i++) { for (int i = 0; i < 5; i++) {
@ -1126,8 +1127,7 @@ TEST_F(DBSSTTest, GetTotalSstFilesSize) {
// Total SST files = 0 // Total SST files = 0
ASSERT_EQ(total_sst_files_size, 0); ASSERT_EQ(total_sst_files_size, 0);
// Close db before mock_env destruct. ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
Close();
} }
TEST_F(DBSSTTest, GetTotalSstFilesSizeVersionsFilesShared) { TEST_F(DBSSTTest, GetTotalSstFilesSizeVersionsFilesShared) {

@ -974,7 +974,7 @@ class DelayFilter : public CompactionFilter {
bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/, bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
std::string* /*new_value*/, std::string* /*new_value*/,
bool* /*value_changed*/) const override { bool* /*value_changed*/) const override {
db_test->env_->addon_time_.fetch_add(1000); db_test->env_->MockSleepForMicroseconds(1000);
return true; return true;
} }
@ -1719,6 +1719,7 @@ TEST_F(DBTest, ApproximateSizes_MixOfSmallAndLarge) {
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
TEST_F(DBTest, Snapshot) { TEST_F(DBTest, Snapshot) {
env_->SetMockSleep();
anon::OptionsOverride options_override; anon::OptionsOverride options_override;
options_override.skip_policy = kSkipNoSnapshot; options_override.skip_policy = kSkipNoSnapshot;
do { do {
@ -1734,7 +1735,7 @@ TEST_F(DBTest, Snapshot) {
Put(0, "foo", "0v2"); Put(0, "foo", "0v2");
Put(1, "foo", "1v2"); Put(1, "foo", "1v2");
env_->addon_time_.fetch_add(1); env_->MockSleepForSeconds(1);
const Snapshot* s2 = db_->GetSnapshot(); const Snapshot* s2 = db_->GetSnapshot();
ASSERT_EQ(2U, GetNumSnapshots()); ASSERT_EQ(2U, GetNumSnapshots());
@ -3545,13 +3546,14 @@ TEST_F(DBTest, FIFOCompactionWithTTLTest) {
options.arena_block_size = 4096; options.arena_block_size = 4096;
options.compression = kNoCompression; options.compression = kNoCompression;
options.create_if_missing = true; options.create_if_missing = true;
env_->time_elapse_only_sleep_ = false; env_->SetMockSleep();
options.env = env_; options.env = env_;
// Test to make sure that all files with expired ttl are deleted on next // Test to make sure that all files with expired ttl are deleted on next
// manual compaction. // manual compaction.
{ {
env_->addon_time_.store(0); // NOTE: Presumed unnecessary and removed: resetting mock time in env
options.compaction_options_fifo.max_table_files_size = 150 << 10; // 150KB options.compaction_options_fifo.max_table_files_size = 150 << 10; // 150KB
options.compaction_options_fifo.allow_compaction = false; options.compaction_options_fifo.allow_compaction = false;
options.ttl = 1 * 60 * 60 ; // 1 hour options.ttl = 1 * 60 * 60 ; // 1 hour
@ -3570,10 +3572,7 @@ TEST_F(DBTest, FIFOCompactionWithTTLTest) {
ASSERT_EQ(NumTableFilesAtLevel(0), 10); ASSERT_EQ(NumTableFilesAtLevel(0), 10);
// Sleep for 2 hours -- which is much greater than TTL. // Sleep for 2 hours -- which is much greater than TTL.
// Note: Couldn't use SleepForMicroseconds because it takes an int instead env_->MockSleepForSeconds(2 * 60 * 60);
// of uint64_t. Hence used addon_time_ directly.
// env_->SleepForMicroseconds(2 * 60 * 60 * 1000 * 1000);
env_->addon_time_.fetch_add(2 * 60 * 60);
// Since no flushes and compactions have run, the db should still be in // Since no flushes and compactions have run, the db should still be in
// the same state even after considerable time has passed. // the same state even after considerable time has passed.
@ -3605,7 +3604,7 @@ TEST_F(DBTest, FIFOCompactionWithTTLTest) {
ASSERT_EQ(NumTableFilesAtLevel(0), 10); ASSERT_EQ(NumTableFilesAtLevel(0), 10);
// Sleep for 2 hours -- which is much greater than TTL. // Sleep for 2 hours -- which is much greater than TTL.
env_->addon_time_.fetch_add(2 * 60 * 60); env_->MockSleepForSeconds(2 * 60 * 60);
// Just to make sure that we are in the same state even after sleeping. // Just to make sure that we are in the same state even after sleeping.
ASSERT_OK(dbfull()->TEST_WaitForCompact()); ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(NumTableFilesAtLevel(0), 10); ASSERT_EQ(NumTableFilesAtLevel(0), 10);
@ -3647,7 +3646,7 @@ TEST_F(DBTest, FIFOCompactionWithTTLTest) {
ASSERT_EQ(NumTableFilesAtLevel(0), 3); ASSERT_EQ(NumTableFilesAtLevel(0), 3);
// Sleep for 2 hours -- which is much greater than TTL. // Sleep for 2 hours -- which is much greater than TTL.
env_->addon_time_.fetch_add(2 * 60 * 60); env_->MockSleepForSeconds(2 * 60 * 60);
// Just to make sure that we are in the same state even after sleeping. // Just to make sure that we are in the same state even after sleeping.
ASSERT_OK(dbfull()->TEST_WaitForCompact()); ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(NumTableFilesAtLevel(0), 3); ASSERT_EQ(NumTableFilesAtLevel(0), 3);
@ -3688,7 +3687,7 @@ TEST_F(DBTest, FIFOCompactionWithTTLTest) {
ASSERT_EQ(NumTableFilesAtLevel(0), 5); ASSERT_EQ(NumTableFilesAtLevel(0), 5);
// Sleep for 2 hours -- which is much greater than TTL. // Sleep for 2 hours -- which is much greater than TTL.
env_->addon_time_.fetch_add(2 * 60 * 60); env_->MockSleepForSeconds(2 * 60 * 60);
// Just to make sure that we are in the same state even after sleeping. // Just to make sure that we are in the same state even after sleeping.
ASSERT_OK(dbfull()->TEST_WaitForCompact()); ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(NumTableFilesAtLevel(0), 5); ASSERT_EQ(NumTableFilesAtLevel(0), 5);
@ -5444,9 +5443,10 @@ class DelayedMergeOperator : public MergeOperator {
public: public:
explicit DelayedMergeOperator(DBTest* d) : db_test_(d) {} explicit DelayedMergeOperator(DBTest* d) : db_test_(d) {}
bool FullMergeV2(const MergeOperationInput& /*merge_in*/, bool FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const override { MergeOperationOutput* merge_out) const override {
db_test_->env_->addon_time_.fetch_add(1000); db_test_->env_->MockSleepForMicroseconds(1000 *
merge_in.operand_list.size());
merge_out->new_value = ""; merge_out->new_value = "";
return true; return true;
} }
@ -5454,8 +5454,6 @@ class DelayedMergeOperator : public MergeOperator {
const char* Name() const override { return "DelayedMergeOperator"; } const char* Name() const override { return "DelayedMergeOperator"; }
}; };
// TODO: hangs in CircleCI's Windows env
#ifndef OS_WIN
TEST_F(DBTest, MergeTestTime) { TEST_F(DBTest, MergeTestTime) {
std::string one, two, three; std::string one, two, three;
PutFixed64(&one, 1); PutFixed64(&one, 1);
@ -5464,14 +5462,14 @@ TEST_F(DBTest, MergeTestTime) {
// Enable time profiling // Enable time profiling
SetPerfLevel(kEnableTime); SetPerfLevel(kEnableTime);
this->env_->addon_time_.store(0);
this->env_->time_elapse_only_sleep_ = true;
this->env_->no_slowdown_ = true;
Options options = CurrentOptions(); Options options = CurrentOptions();
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
options.merge_operator.reset(new DelayedMergeOperator(this)); options.merge_operator.reset(new DelayedMergeOperator(this));
SetTimeElapseOnlySleepOnReopen(&options);
DestroyAndReopen(options); DestroyAndReopen(options);
// NOTE: Presumed unnecessary and removed: resetting mock time in env
ASSERT_EQ(TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME), 0); ASSERT_EQ(TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME), 0);
db_->Put(WriteOptions(), "foo", one); db_->Put(WriteOptions(), "foo", one);
ASSERT_OK(Flush()); ASSERT_OK(Flush());
@ -5486,7 +5484,7 @@ TEST_F(DBTest, MergeTestTime) {
std::string result; std::string result;
db_->Get(opt, "foo", &result); db_->Get(opt, "foo", &result);
ASSERT_EQ(1000000, TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME)); ASSERT_EQ(2000000, TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME));
ReadOptions read_options; ReadOptions read_options;
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options)); std::unique_ptr<Iterator> iter(db_->NewIterator(read_options));
@ -5497,13 +5495,11 @@ TEST_F(DBTest, MergeTestTime) {
} }
ASSERT_EQ(1, count); ASSERT_EQ(1, count);
ASSERT_EQ(2000000, TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME)); ASSERT_EQ(4000000, TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME));
#ifdef ROCKSDB_USING_THREAD_STATUS #ifdef ROCKSDB_USING_THREAD_STATUS
ASSERT_GT(TestGetTickerCount(options, FLUSH_WRITE_BYTES), 0); ASSERT_GT(TestGetTickerCount(options, FLUSH_WRITE_BYTES), 0);
#endif // ROCKSDB_USING_THREAD_STATUS #endif // ROCKSDB_USING_THREAD_STATUS
this->env_->time_elapse_only_sleep_ = false;
} }
#endif // OS_WIN
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
TEST_P(DBTestWithParam, MergeCompactionTimeTest) { TEST_P(DBTestWithParam, MergeCompactionTimeTest) {
@ -5513,18 +5509,24 @@ TEST_P(DBTestWithParam, MergeCompactionTimeTest) {
options.compaction_filter_factory = std::make_shared<KeepFilterFactory>(); options.compaction_filter_factory = std::make_shared<KeepFilterFactory>();
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
options.merge_operator.reset(new DelayedMergeOperator(this)); options.merge_operator.reset(new DelayedMergeOperator(this));
options.compaction_style = kCompactionStyleUniversal; options.disable_auto_compactions = true;
options.max_subcompactions = max_subcompactions_; options.max_subcompactions = max_subcompactions_;
SetTimeElapseOnlySleepOnReopen(&options);
DestroyAndReopen(options); DestroyAndReopen(options);
for (int i = 0; i < 1000; i++) { constexpr unsigned n = 1000;
for (unsigned i = 0; i < n; i++) {
ASSERT_OK(db_->Merge(WriteOptions(), "foo", "TEST")); ASSERT_OK(db_->Merge(WriteOptions(), "foo", "TEST"));
ASSERT_OK(Flush()); ASSERT_OK(Flush());
} }
dbfull()->TEST_WaitForFlushMemTable(); dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
ASSERT_NE(TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME), 0); CompactRangeOptions cro;
cro.exclusive_manual_compaction = exclusive_manual_compaction_;
ASSERT_OK(db_->CompactRange(cro, nullptr, nullptr));
ASSERT_EQ(uint64_t{n} * 1000000U,
TestGetTickerCount(options, MERGE_OPERATION_TOTAL_TIME));
} }
TEST_P(DBTestWithParam, FilterCompactionTimeTest) { TEST_P(DBTestWithParam, FilterCompactionTimeTest) {
@ -5536,12 +5538,15 @@ TEST_P(DBTestWithParam, FilterCompactionTimeTest) {
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics();
options.statistics->set_stats_level(kExceptTimeForMutex); options.statistics->set_stats_level(kExceptTimeForMutex);
options.max_subcompactions = max_subcompactions_; options.max_subcompactions = max_subcompactions_;
SetTimeElapseOnlySleepOnReopen(&options);
DestroyAndReopen(options); DestroyAndReopen(options);
unsigned n = 0;
// put some data // put some data
for (int table = 0; table < 4; ++table) { for (int table = 0; table < 4; ++table) {
for (int i = 0; i < 10 + table; ++i) { for (int i = 0; i < 10 + table; ++i) {
Put(ToString(table * 100 + i), "val"); Put(ToString(table * 100 + i), "val");
++n;
} }
Flush(); Flush();
} }
@ -5555,7 +5560,8 @@ TEST_P(DBTestWithParam, FilterCompactionTimeTest) {
Iterator* itr = db_->NewIterator(ReadOptions()); Iterator* itr = db_->NewIterator(ReadOptions());
itr->SeekToFirst(); itr->SeekToFirst();
ASSERT_NE(TestGetTickerCount(options, FILTER_OPERATION_TOTAL_TIME), 0); ASSERT_EQ(uint64_t{n} * 1000000U,
TestGetTickerCount(options, FILTER_OPERATION_TOTAL_TIME));
delete itr; delete itr;
} }
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
@ -6007,7 +6013,6 @@ TEST_F(DBTest, DelayedWriteRate) {
Options options = CurrentOptions(); Options options = CurrentOptions();
env_->SetBackgroundThreads(1, Env::LOW); env_->SetBackgroundThreads(1, Env::LOW);
options.env = env_; options.env = env_;
env_->no_slowdown_ = true;
options.write_buffer_size = 100000000; options.write_buffer_size = 100000000;
options.max_write_buffer_number = 256; options.max_write_buffer_number = 256;
options.max_background_compactions = 1; options.max_background_compactions = 1;
@ -6018,6 +6023,7 @@ TEST_F(DBTest, DelayedWriteRate) {
options.memtable_factory.reset( options.memtable_factory.reset(
new SpecialSkipListFactory(kEntriesPerMemTable)); new SpecialSkipListFactory(kEntriesPerMemTable));
SetTimeElapseOnlySleepOnReopen(&options);
CreateAndReopenWithCF({"pikachu"}, options); CreateAndReopenWithCF({"pikachu"}, options);
// Block compactions // Block compactions
@ -6056,12 +6062,9 @@ TEST_F(DBTest, DelayedWriteRate) {
kIncSlowdownRatio * kIncSlowdownRatio); kIncSlowdownRatio * kIncSlowdownRatio);
} }
// Estimate the total sleep time fall into the rough range. // Estimate the total sleep time fall into the rough range.
ASSERT_GT(env_->addon_time_.load(), ASSERT_GT(env_->NowMicros(), estimated_sleep_time / 2);
static_cast<int64_t>(estimated_sleep_time / 2)); ASSERT_LT(env_->NowMicros(), estimated_sleep_time * 2);
ASSERT_LT(env_->addon_time_.load(),
static_cast<int64_t>(estimated_sleep_time * 2));
env_->no_slowdown_ = false;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
sleeping_task_low.WakeUp(); sleeping_task_low.WakeUp();
sleeping_task_low.WaitUntilDone(); sleeping_task_low.WaitUntilDone();
@ -6575,10 +6578,11 @@ TEST_F(DBTest, CreationTimeOfOldestFile) {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.max_open_files = -1; options.max_open_files = -1;
env_->time_elapse_only_sleep_ = false; env_->SetMockSleep();
options.env = env_; options.env = env_;
env_->addon_time_.store(0); // NOTE: Presumed unnecessary and removed: resetting mock time in env
DestroyAndReopen(options); DestroyAndReopen(options);
bool set_file_creation_time_to_zero = true; bool set_file_creation_time_to_zero = true;
@ -6589,7 +6593,7 @@ TEST_F(DBTest, CreationTimeOfOldestFile) {
const uint64_t uint_time_1 = static_cast<uint64_t>(time_1); const uint64_t uint_time_1 = static_cast<uint64_t>(time_1);
// Add 50 hours // Add 50 hours
env_->addon_time_.fetch_add(50 * 60 * 60); env_->MockSleepForSeconds(50 * 60 * 60);
int64_t time_2 = 0; int64_t time_2 = 0;
env_->GetCurrentTime(&time_2); env_->GetCurrentTime(&time_2);
@ -6643,10 +6647,10 @@ TEST_F(DBTest, CreationTimeOfOldestFile) {
set_file_creation_time_to_zero = false; set_file_creation_time_to_zero = false;
options = CurrentOptions(); options = CurrentOptions();
options.max_open_files = -1; options.max_open_files = -1;
env_->time_elapse_only_sleep_ = false;
options.env = env_; options.env = env_;
env_->addon_time_.store(0); // NOTE: Presumed unnecessary and removed: resetting mock time in env
DestroyAndReopen(options); DestroyAndReopen(options);
for (int i = 0; i < kNumLevelFiles; ++i) { for (int i = 0; i < kNumLevelFiles; ++i) {

@ -2158,6 +2158,9 @@ TEST_F(DBTest2, TestPerfContextGetCpuTime) {
ASSERT_OK(Put("foo", "bar")); ASSERT_OK(Put("foo", "bar"));
ASSERT_OK(Flush()); ASSERT_OK(Flush());
env_->now_cpu_count_.store(0); env_->now_cpu_count_.store(0);
env_->SetMockSleep();
// NOTE: Presumed unnecessary and removed: resetting mock time in env
// CPU timing is not enabled with kEnableTimeExceptForMutex // CPU timing is not enabled with kEnableTimeExceptForMutex
SetPerfLevel(PerfLevel::kEnableTimeExceptForMutex); SetPerfLevel(PerfLevel::kEnableTimeExceptForMutex);
@ -2165,19 +2168,20 @@ TEST_F(DBTest2, TestPerfContextGetCpuTime) {
ASSERT_EQ(0, get_perf_context()->get_cpu_nanos); ASSERT_EQ(0, get_perf_context()->get_cpu_nanos);
ASSERT_EQ(0, env_->now_cpu_count_.load()); ASSERT_EQ(0, env_->now_cpu_count_.load());
uint64_t kDummyAddonTime = uint64_t{1000000000000}; constexpr uint64_t kDummyAddonSeconds = uint64_t{1000000};
constexpr uint64_t kDummyAddonNanos = 1000000000U * kDummyAddonSeconds;
// Add time to NowNanos() reading. // Add time to NowNanos() reading.
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"TableCache::FindTable:0", "TableCache::FindTable:0",
[&](void* /*arg*/) { env_->addon_time_.fetch_add(kDummyAddonTime); }); [&](void* /*arg*/) { env_->MockSleepForSeconds(kDummyAddonSeconds); });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex); SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex);
ASSERT_EQ("bar", Get("foo")); ASSERT_EQ("bar", Get("foo"));
ASSERT_GT(env_->now_cpu_count_.load(), 2); ASSERT_GT(env_->now_cpu_count_.load(), 2);
ASSERT_LT(get_perf_context()->get_cpu_nanos, kDummyAddonTime); ASSERT_LT(get_perf_context()->get_cpu_nanos, kDummyAddonNanos);
ASSERT_GT(get_perf_context()->find_table_nanos, kDummyAddonTime); ASSERT_GT(get_perf_context()->find_table_nanos, kDummyAddonNanos);
SetPerfLevel(PerfLevel::kDisable); SetPerfLevel(PerfLevel::kDisable);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
@ -2200,6 +2204,9 @@ TEST_F(DBTest2, TestPerfContextIterCpuTime) {
std::string last_key = "k" + ToString(kNumEntries - 1); std::string last_key = "k" + ToString(kNumEntries - 1);
std::string last_value = "v" + ToString(kNumEntries - 1); std::string last_value = "v" + ToString(kNumEntries - 1);
env_->now_cpu_count_.store(0); env_->now_cpu_count_.store(0);
env_->SetMockSleep();
// NOTE: Presumed unnecessary and removed: resetting mock time in env
// CPU timing is not enabled with kEnableTimeExceptForMutex // CPU timing is not enabled with kEnableTimeExceptForMutex
SetPerfLevel(PerfLevel::kEnableTimeExceptForMutex); SetPerfLevel(PerfLevel::kEnableTimeExceptForMutex);
@ -2227,12 +2234,13 @@ TEST_F(DBTest2, TestPerfContextIterCpuTime) {
ASSERT_EQ(0, env_->now_cpu_count_.load()); ASSERT_EQ(0, env_->now_cpu_count_.load());
delete iter; delete iter;
uint64_t kDummyAddonTime = uint64_t{1000000000000}; constexpr uint64_t kDummyAddonSeconds = uint64_t{1000000};
constexpr uint64_t kDummyAddonNanos = 1000000000U * kDummyAddonSeconds;
// Add time to NowNanos() reading. // Add time to NowNanos() reading.
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"TableCache::FindTable:0", "TableCache::FindTable:0",
[&](void* /*arg*/) { env_->addon_time_.fetch_add(kDummyAddonTime); }); [&](void* /*arg*/) { env_->MockSleepForSeconds(kDummyAddonSeconds); });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex); SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex);
@ -2249,19 +2257,19 @@ TEST_F(DBTest2, TestPerfContextIterCpuTime) {
ASSERT_TRUE(iter->Valid()); ASSERT_TRUE(iter->Valid());
ASSERT_EQ("v0", iter->value().ToString()); ASSERT_EQ("v0", iter->value().ToString());
ASSERT_GT(get_perf_context()->iter_seek_cpu_nanos, 0); ASSERT_GT(get_perf_context()->iter_seek_cpu_nanos, 0);
ASSERT_LT(get_perf_context()->iter_seek_cpu_nanos, kDummyAddonTime); ASSERT_LT(get_perf_context()->iter_seek_cpu_nanos, kDummyAddonNanos);
iter->Next(); iter->Next();
ASSERT_TRUE(iter->Valid()); ASSERT_TRUE(iter->Valid());
ASSERT_EQ("v1", iter->value().ToString()); ASSERT_EQ("v1", iter->value().ToString());
ASSERT_GT(get_perf_context()->iter_next_cpu_nanos, 0); ASSERT_GT(get_perf_context()->iter_next_cpu_nanos, 0);
ASSERT_LT(get_perf_context()->iter_next_cpu_nanos, kDummyAddonTime); ASSERT_LT(get_perf_context()->iter_next_cpu_nanos, kDummyAddonNanos);
iter->Prev(); iter->Prev();
ASSERT_TRUE(iter->Valid()); ASSERT_TRUE(iter->Valid());
ASSERT_EQ("v0", iter->value().ToString()); ASSERT_EQ("v0", iter->value().ToString());
ASSERT_GT(get_perf_context()->iter_prev_cpu_nanos, 0); ASSERT_GT(get_perf_context()->iter_prev_cpu_nanos, 0);
ASSERT_LT(get_perf_context()->iter_prev_cpu_nanos, kDummyAddonTime); ASSERT_LT(get_perf_context()->iter_prev_cpu_nanos, kDummyAddonNanos);
ASSERT_GE(env_->now_cpu_count_.load(), 12); ASSERT_GE(env_->now_cpu_count_.load(), 12);
ASSERT_GT(get_perf_context()->find_table_nanos, kDummyAddonTime); ASSERT_GT(get_perf_context()->find_table_nanos, kDummyAddonNanos);
SetPerfLevel(PerfLevel::kDisable); SetPerfLevel(PerfLevel::kDisable);
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();

@ -26,14 +26,13 @@ int64_t MaybeCurrentTime(Env* env) {
// Special Env used to delay background operations // Special Env used to delay background operations
SpecialEnv::SpecialEnv(Env* base) SpecialEnv::SpecialEnv(Env* base, bool time_elapse_only_sleep)
: EnvWrapper(base), : EnvWrapper(base),
maybe_starting_time_(MaybeCurrentTime(base)), maybe_starting_time_(MaybeCurrentTime(base)),
rnd_(301), rnd_(301),
sleep_counter_(this), sleep_counter_(this),
addon_time_(0), time_elapse_only_sleep_(time_elapse_only_sleep),
time_elapse_only_sleep_(false), no_slowdown_(time_elapse_only_sleep) {
no_slowdown_(false) {
delay_sstable_sync_.store(false, std::memory_order_release); delay_sstable_sync_.store(false, std::memory_order_release);
drop_writes_.store(false, std::memory_order_release); drop_writes_.store(false, std::memory_order_release);
no_space_.store(false, std::memory_order_release); no_space_.store(false, std::memory_order_release);
@ -606,6 +605,39 @@ void DBTestBase::ReopenWithColumnFamilies(const std::vector<std::string>& cfs,
ASSERT_OK(TryReopenWithColumnFamilies(cfs, options)); ASSERT_OK(TryReopenWithColumnFamilies(cfs, options));
} }
void DBTestBase::SetTimeElapseOnlySleepOnReopen(DBOptions* options) {
time_elapse_only_sleep_on_reopen_ = true;
// Need to disable stats dumping and persisting which also use
// RepeatableThread, which uses InstrumentedCondVar::TimedWaitInternal.
// With time_elapse_only_sleep_, this can hang on some platforms (MacOS)
// because (a) on some platforms, pthread_cond_timedwait does not appear
// to release the lock for other threads to operate if the deadline time
// is already passed, and (b) TimedWait calls are currently a bad abstraction
// because the deadline parameter is usually computed from Env time,
// but is interpreted in real clock time.
options->stats_dump_period_sec = 0;
options->stats_persist_period_sec = 0;
}
void DBTestBase::MaybeInstallTimeElapseOnlySleep(const DBOptions& options) {
if (time_elapse_only_sleep_on_reopen_) {
assert(options.env == env_ ||
static_cast_with_check<CompositeEnvWrapper>(options.env)
->env_target() == env_);
assert(options.stats_dump_period_sec == 0);
assert(options.stats_persist_period_sec == 0);
// We cannot set these before destroying the last DB because they might
// cause a deadlock or similar without the appropriate options set in
// the DB.
env_->time_elapse_only_sleep_ = true;
env_->no_slowdown_ = true;
} else {
// Going back in same test run is not yet supported, so no
// reset in this case.
}
}
Status DBTestBase::TryReopenWithColumnFamilies( Status DBTestBase::TryReopenWithColumnFamilies(
const std::vector<std::string>& cfs, const std::vector<Options>& options) { const std::vector<std::string>& cfs, const std::vector<Options>& options) {
Close(); Close();
@ -616,6 +648,7 @@ Status DBTestBase::TryReopenWithColumnFamilies(
} }
DBOptions db_opts = DBOptions(options[0]); DBOptions db_opts = DBOptions(options[0]);
last_options_ = options[0]; last_options_ = options[0];
MaybeInstallTimeElapseOnlySleep(db_opts);
return DB::Open(db_opts, dbname_, column_families, &handles_, &db_); return DB::Open(db_opts, dbname_, column_families, &handles_, &db_);
} }
@ -659,6 +692,7 @@ void DBTestBase::Destroy(const Options& options, bool delete_cf_paths) {
} }
Status DBTestBase::ReadOnlyReopen(const Options& options) { Status DBTestBase::ReadOnlyReopen(const Options& options) {
MaybeInstallTimeElapseOnlySleep(options);
return DB::OpenForReadOnly(options, dbname_, &db_); return DB::OpenForReadOnly(options, dbname_, &db_);
} }
@ -673,6 +707,7 @@ Status DBTestBase::TryReopen(const Options& options) {
// problem, we manually call destructor of table_facotry which eventually // problem, we manually call destructor of table_facotry which eventually
// clears the block cache. // clears the block cache.
last_options_ = options; last_options_ = options;
MaybeInstallTimeElapseOnlySleep(options);
return DB::Open(options, dbname_, &db_); return DB::Open(options, dbname_, &db_);
} }

@ -206,7 +206,7 @@ class SpecialSkipListFactory : public MemTableRepFactory {
// Special Env used to delay background operations // Special Env used to delay background operations
class SpecialEnv : public EnvWrapper { class SpecialEnv : public EnvWrapper {
public: public:
explicit SpecialEnv(Env* base); explicit SpecialEnv(Env* base, bool time_elapse_only_sleep = false);
Status NewWritableFile(const std::string& f, std::unique_ptr<WritableFile>* r, Status NewWritableFile(const std::string& f, std::unique_ptr<WritableFile>* r,
const EnvOptions& soptions) override { const EnvOptions& soptions) override {
@ -529,13 +529,25 @@ class SpecialEnv : public EnvWrapper {
virtual void SleepForMicroseconds(int micros) override { virtual void SleepForMicroseconds(int micros) override {
sleep_counter_.Increment(); sleep_counter_.Increment();
if (no_slowdown_ || time_elapse_only_sleep_) { if (no_slowdown_ || time_elapse_only_sleep_) {
addon_time_.fetch_add(micros); addon_microseconds_.fetch_add(micros);
} }
if (!no_slowdown_) { if (!no_slowdown_) {
target()->SleepForMicroseconds(micros); target()->SleepForMicroseconds(micros);
} }
} }
void MockSleepForMicroseconds(int64_t micros) {
sleep_counter_.Increment();
assert(no_slowdown_);
addon_microseconds_.fetch_add(micros);
}
void MockSleepForSeconds(int64_t seconds) {
sleep_counter_.Increment();
assert(no_slowdown_);
addon_microseconds_.fetch_add(seconds * 1000000);
}
virtual Status GetCurrentTime(int64_t* unix_time) override { virtual Status GetCurrentTime(int64_t* unix_time) override {
Status s; Status s;
if (time_elapse_only_sleep_) { if (time_elapse_only_sleep_) {
@ -544,9 +556,8 @@ class SpecialEnv : public EnvWrapper {
s = target()->GetCurrentTime(unix_time); s = target()->GetCurrentTime(unix_time);
} }
if (s.ok()) { if (s.ok()) {
// FIXME: addon_time_ sometimes used to mean seconds (here) and // mock microseconds elapsed to seconds of time
// sometimes microseconds *unix_time += addon_microseconds_.load() / 1000000;
*unix_time += addon_time_.load();
} }
return s; return s;
} }
@ -558,12 +569,12 @@ class SpecialEnv : public EnvWrapper {
virtual uint64_t NowNanos() override { virtual uint64_t NowNanos() override {
return (time_elapse_only_sleep_ ? 0 : target()->NowNanos()) + return (time_elapse_only_sleep_ ? 0 : target()->NowNanos()) +
addon_time_.load() * 1000; addon_microseconds_.load() * 1000;
} }
virtual uint64_t NowMicros() override { virtual uint64_t NowMicros() override {
return (time_elapse_only_sleep_ ? 0 : target()->NowMicros()) + return (time_elapse_only_sleep_ ? 0 : target()->NowMicros()) +
addon_time_.load(); addon_microseconds_.load();
} }
virtual Status DeleteFile(const std::string& fname) override { virtual Status DeleteFile(const std::string& fname) override {
@ -571,16 +582,7 @@ class SpecialEnv : public EnvWrapper {
return target()->DeleteFile(fname); return target()->DeleteFile(fname);
} }
void SetTimeElapseOnlySleep(Options* options) { void SetMockSleep(bool enabled = true) { no_slowdown_ = enabled; }
time_elapse_only_sleep_ = true;
no_slowdown_ = true;
// Need to disable stats dumping and persisting which also use
// RepeatableThread, which uses InstrumentedCondVar::TimedWaitInternal.
// With time_elapse_only_sleep_, this can hang on some platforms.
// TODO: why? investigate/fix
options->stats_dump_period_sec = 0;
options->stats_persist_period_sec = 0;
}
Status NewDirectory(const std::string& name, Status NewDirectory(const std::string& name,
std::unique_ptr<Directory>* result) override { std::unique_ptr<Directory>* result) override {
@ -658,19 +660,23 @@ class SpecialEnv : public EnvWrapper {
std::function<void()>* table_write_callback_; std::function<void()>* table_write_callback_;
std::atomic<int64_t> addon_time_;
std::atomic<int> now_cpu_count_; std::atomic<int> now_cpu_count_;
std::atomic<int> delete_count_; std::atomic<int> delete_count_;
std::atomic<bool> time_elapse_only_sleep_;
bool no_slowdown_;
std::atomic<bool> is_wal_sync_thread_safe_{true}; std::atomic<bool> is_wal_sync_thread_safe_{true};
std::atomic<size_t> compaction_readahead_size_{}; std::atomic<size_t> compaction_readahead_size_{};
private: // accessing these directly is prone to error
friend class DBTestBase;
std::atomic<int64_t> addon_microseconds_{0};
// Do not modify in the env of a running DB (could cause deadlock)
std::atomic<bool> time_elapse_only_sleep_;
bool no_slowdown_;
}; };
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
@ -1130,6 +1136,15 @@ class DBTestBase : public testing::Test {
Tickers ticker_type) { Tickers ticker_type) {
return options.statistics->getAndResetTickerCount(ticker_type); return options.statistics->getAndResetTickerCount(ticker_type);
} }
// Note: reverting this setting within the same test run is not yet
// supported
void SetTimeElapseOnlySleepOnReopen(DBOptions* options);
private: // Prone to error on direct use
void MaybeInstallTimeElapseOnlySleep(const DBOptions& options);
bool time_elapse_only_sleep_on_reopen_ = false;
}; };
} // namespace ROCKSDB_NAMESPACE } // namespace ROCKSDB_NAMESPACE

@ -91,36 +91,6 @@ class KeepFilterFactory : public CompactionFilterFactory {
std::atomic_bool expect_full_compaction_; std::atomic_bool expect_full_compaction_;
std::atomic_bool expect_manual_compaction_; std::atomic_bool expect_manual_compaction_;
}; };
class DelayFilter : public CompactionFilter {
public:
explicit DelayFilter(DBTestBase* d) : db_test(d) {}
bool Filter(int /*level*/, const Slice& /*key*/, const Slice& /*value*/,
std::string* /*new_value*/,
bool* /*value_changed*/) const override {
db_test->env_->addon_time_.fetch_add(1000);
return true;
}
const char* Name() const override { return "DelayFilter"; }
private:
DBTestBase* db_test;
};
class DelayFilterFactory : public CompactionFilterFactory {
public:
explicit DelayFilterFactory(DBTestBase* d) : db_test(d) {}
std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& /*context*/) override {
return std::unique_ptr<CompactionFilter>(new DelayFilter(db_test));
}
const char* Name() const override { return "DelayFilterFactory"; }
private:
DBTestBase* db_test;
};
} // namespace } // namespace
// Make sure we don't trigger a problem if the trigger condtion is given // Make sure we don't trigger a problem if the trigger condtion is given
@ -2185,9 +2155,11 @@ TEST_F(DBTestUniversalCompaction2, PeriodicCompaction) {
opts.compaction_options_universal.max_size_amplification_percent = 200; opts.compaction_options_universal.max_size_amplification_percent = 200;
opts.periodic_compaction_seconds = 48 * 60 * 60; // 2 days opts.periodic_compaction_seconds = 48 * 60 * 60; // 2 days
opts.num_levels = 5; opts.num_levels = 5;
env_->addon_time_.store(0); env_->SetMockSleep();
Reopen(opts); Reopen(opts);
// NOTE: Presumed unnecessary and removed: resetting mock time in env
int periodic_compactions = 0; int periodic_compactions = 0;
int start_level = -1; int start_level = -1;
int output_level = -1; int output_level = -1;
@ -2210,7 +2182,7 @@ TEST_F(DBTestUniversalCompaction2, PeriodicCompaction) {
ASSERT_EQ(0, periodic_compactions); ASSERT_EQ(0, periodic_compactions);
// Move clock forward so that the flushed file would qualify periodic // Move clock forward so that the flushed file would qualify periodic
// compaction. // compaction.
env_->addon_time_.store(48 * 60 * 60 + 100); env_->MockSleepForSeconds(48 * 60 * 60 + 100);
// Another flush would trigger compaction the oldest file. // Another flush would trigger compaction the oldest file.
ASSERT_OK(Put("foo", "bar2")); ASSERT_OK(Put("foo", "bar2"));
@ -2232,7 +2204,7 @@ TEST_F(DBTestUniversalCompaction2, PeriodicCompaction) {
// After periodic compaction threshold hits, a flush will trigger // After periodic compaction threshold hits, a flush will trigger
// a compaction // a compaction
ASSERT_OK(Put("foo", "bar2")); ASSERT_OK(Put("foo", "bar2"));
env_->addon_time_.fetch_add(48 * 60 * 60 + 100); env_->MockSleepForSeconds(48 * 60 * 60 + 100);
Flush(); Flush();
dbfull()->TEST_WaitForCompact(); dbfull()->TEST_WaitForCompact();
ASSERT_EQ(1, periodic_compactions); ASSERT_EQ(1, periodic_compactions);

@ -375,7 +375,13 @@ Status FlushJob::WriteLevel0Table() {
// It's not clear whether oldest_key_time is always available. In case // It's not clear whether oldest_key_time is always available. In case
// it is not available, use current_time. // it is not available, use current_time.
meta_.oldest_ancester_time = std::min(current_time, oldest_key_time); uint64_t oldest_ancester_time = std::min(current_time, oldest_key_time);
TEST_SYNC_POINT_CALLBACK(
"FlushJob::WriteLevel0Table:oldest_ancester_time",
&oldest_ancester_time);
meta_.oldest_ancester_time = oldest_ancester_time;
meta_.file_creation_time = current_time; meta_.file_creation_time = current_time;
uint64_t creation_time = (cfd_->ioptions()->compaction_style == uint64_t creation_time = (cfd_->ioptions()->compaction_style ==

@ -895,7 +895,7 @@ class BackgroundErrorListener : public EventListener {
// can succeed. // can succeed.
*bg_error = Status::OK(); *bg_error = Status::OK();
env_->drop_writes_.store(false, std::memory_order_release); env_->drop_writes_.store(false, std::memory_order_release);
env_->no_slowdown_ = false; env_->SetMockSleep(false);
} }
++counter_; ++counter_;
} }
@ -921,7 +921,7 @@ TEST_F(EventListenerTest, BackgroundErrorListenerFailedFlushTest) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
env_->drop_writes_.store(true, std::memory_order_release); env_->drop_writes_.store(true, std::memory_order_release);
env_->no_slowdown_ = true; env_->SetMockSleep();
ASSERT_OK(Put("key0", "val")); ASSERT_OK(Put("key0", "val"));
ASSERT_OK(Put("key1", "val")); ASSERT_OK(Put("key1", "val"));
@ -955,7 +955,7 @@ TEST_F(EventListenerTest, BackgroundErrorListenerFailedCompactionTest) {
ASSERT_EQ(2, NumTableFilesAtLevel(0)); ASSERT_EQ(2, NumTableFilesAtLevel(0));
env_->drop_writes_.store(true, std::memory_order_release); env_->drop_writes_.store(true, std::memory_order_release);
env_->no_slowdown_ = true; env_->SetMockSleep();
ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}})); ASSERT_OK(dbfull()->SetOptions({{"disable_auto_compactions", "false"}}));
ASSERT_OK(dbfull()->TEST_WaitForCompact()); ASSERT_OK(dbfull()->TEST_WaitForCompact());
ASSERT_EQ(1, listener->counter()); ASSERT_EQ(1, listener->counter());

@ -30,31 +30,43 @@ class StatsHistoryTest : public DBTestBase {
public: public:
StatsHistoryTest() : DBTestBase("/stats_history_test") {} StatsHistoryTest() : DBTestBase("/stats_history_test") {}
}; };
class SafeMockTimeEnv : public MockTimeEnv {
public:
explicit SafeMockTimeEnv(Env* base) : MockTimeEnv(base) {
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
#if defined(OS_MACOSX) && !defined(NDEBUG)
// This is an alternate way (vs. SpecialEnv) of dealing with the fact
// that on some platforms, pthread_cond_timedwait does not appear to
// release the lock for other threads to operate if the deadline time
// is already passed. (TimedWait calls are currently a bad abstraction
// because the deadline parameter is usually computed from Env time,
// but is interpreted in real clock time.)
SyncPoint::GetInstance()->SetCallBack(
"InstrumentedCondVar::TimedWaitInternal", [&](void* arg) {
uint64_t time_us = *reinterpret_cast<uint64_t*>(arg);
if (time_us < this->RealNowMicros()) {
*reinterpret_cast<uint64_t*>(arg) = this->RealNowMicros() + 1000;
}
});
#endif // OS_MACOSX && !NDEBUG
SyncPoint::GetInstance()->EnableProcessing();
}
};
#ifndef ROCKSDB_LITE #ifndef ROCKSDB_LITE
TEST_F(StatsHistoryTest, RunStatsDumpPeriodSec) { TEST_F(StatsHistoryTest, RunStatsDumpPeriodSec) {
Options options; Options options;
options.create_if_missing = true; options.create_if_missing = true;
options.stats_dump_period_sec = 5; options.stats_dump_period_sec = 5;
std::unique_ptr<ROCKSDB_NAMESPACE::MockTimeEnv> mock_env; std::unique_ptr<SafeMockTimeEnv> mock_env(new SafeMockTimeEnv(env_));
mock_env.reset(new ROCKSDB_NAMESPACE::MockTimeEnv(env_));
mock_env->set_current_time(0); // in seconds mock_env->set_current_time(0); // in seconds
options.env = mock_env.get(); options.env = mock_env.get();
int counter = 0; int counter = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); SyncPoint::GetInstance()->SetCallBack("DBImpl::DumpStats:1",
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); [&](void* /*arg*/) { counter++; });
#if defined(OS_MACOSX) && !defined(NDEBUG)
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"InstrumentedCondVar::TimedWaitInternal", [&](void* arg) {
uint64_t time_us = *reinterpret_cast<uint64_t*>(arg);
if (time_us < mock_env->RealNowMicros()) {
*reinterpret_cast<uint64_t*>(arg) = mock_env->RealNowMicros() + 1000;
}
});
#endif // OS_MACOSX && !NDEBUG
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::DumpStats:1", [&](void* /*arg*/) { counter++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Reopen(options); Reopen(options);
ASSERT_EQ(5u, dbfull()->GetDBOptions().stats_dump_period_sec); ASSERT_EQ(5u, dbfull()->GetDBOptions().stats_dump_period_sec);
dbfull()->TEST_WaitForDumpStatsRun([&] { mock_env->set_current_time(5); }); dbfull()->TEST_WaitForDumpStatsRun([&] { mock_env->set_current_time(5); });
@ -75,25 +87,12 @@ TEST_F(StatsHistoryTest, StatsPersistScheduling) {
Options options; Options options;
options.create_if_missing = true; options.create_if_missing = true;
options.stats_persist_period_sec = 5; options.stats_persist_period_sec = 5;
std::unique_ptr<ROCKSDB_NAMESPACE::MockTimeEnv> mock_env; std::unique_ptr<SafeMockTimeEnv> mock_env(new SafeMockTimeEnv(env_));
mock_env.reset(new ROCKSDB_NAMESPACE::MockTimeEnv(env_));
mock_env->set_current_time(0); // in seconds mock_env->set_current_time(0); // in seconds
options.env = mock_env.get(); options.env = mock_env.get();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
#if defined(OS_MACOSX) && !defined(NDEBUG)
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"InstrumentedCondVar::TimedWaitInternal", [&](void* arg) {
uint64_t time_us = *reinterpret_cast<uint64_t*>(arg);
if (time_us < mock_env->RealNowMicros()) {
*reinterpret_cast<uint64_t*>(arg) = mock_env->RealNowMicros() + 1000;
}
});
#endif // OS_MACOSX && !NDEBUG
int counter = 0; int counter = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( SyncPoint::GetInstance()->SetCallBack("DBImpl::PersistStats:Entry",
"DBImpl::PersistStats:Entry", [&](void* /*arg*/) { counter++; }); [&](void* /*arg*/) { counter++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Reopen(options); Reopen(options);
ASSERT_EQ(5u, dbfull()->GetDBOptions().stats_persist_period_sec); ASSERT_EQ(5u, dbfull()->GetDBOptions().stats_persist_period_sec);
dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); }); dbfull()->TEST_WaitForPersistStatsRun([&] { mock_env->set_current_time(5); });
@ -111,25 +110,12 @@ TEST_F(StatsHistoryTest, PersistentStatsFreshInstall) {
Options options; Options options;
options.create_if_missing = true; options.create_if_missing = true;
options.stats_persist_period_sec = 0; options.stats_persist_period_sec = 0;
std::unique_ptr<ROCKSDB_NAMESPACE::MockTimeEnv> mock_env; std::unique_ptr<SafeMockTimeEnv> mock_env(new SafeMockTimeEnv(env_));
mock_env.reset(new ROCKSDB_NAMESPACE::MockTimeEnv(env_));
mock_env->set_current_time(0); // in seconds mock_env->set_current_time(0); // in seconds
options.env = mock_env.get(); options.env = mock_env.get();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
#if defined(OS_MACOSX) && !defined(NDEBUG)
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"InstrumentedCondVar::TimedWaitInternal", [&](void* arg) {
uint64_t time_us = *reinterpret_cast<uint64_t*>(arg);
if (time_us < mock_env->RealNowMicros()) {
*reinterpret_cast<uint64_t*>(arg) = mock_env->RealNowMicros() + 1000;
}
});
#endif // OS_MACOSX && !NDEBUG
int counter = 0; int counter = 0;
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( SyncPoint::GetInstance()->SetCallBack("DBImpl::PersistStats:Entry",
"DBImpl::PersistStats:Entry", [&](void* /*arg*/) { counter++; }); [&](void* /*arg*/) { counter++; });
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Reopen(options); Reopen(options);
ASSERT_OK(dbfull()->SetDBOptions({{"stats_persist_period_sec", "5"}})); ASSERT_OK(dbfull()->SetDBOptions({{"stats_persist_period_sec", "5"}}));
ASSERT_EQ(5u, dbfull()->GetDBOptions().stats_persist_period_sec); ASSERT_EQ(5u, dbfull()->GetDBOptions().stats_persist_period_sec);
@ -143,24 +129,10 @@ TEST_F(StatsHistoryTest, GetStatsHistoryInMemory) {
Options options; Options options;
options.create_if_missing = true; options.create_if_missing = true;
options.stats_persist_period_sec = 5; options.stats_persist_period_sec = 5;
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); options.statistics = CreateDBStatistics();
std::unique_ptr<ROCKSDB_NAMESPACE::MockTimeEnv> mock_env; std::unique_ptr<SafeMockTimeEnv> mock_env(new SafeMockTimeEnv(env_));
mock_env.reset(new ROCKSDB_NAMESPACE::MockTimeEnv(env_));
mock_env->set_current_time(0); // in seconds mock_env->set_current_time(0); // in seconds
options.env = mock_env.get(); options.env = mock_env.get();
#if defined(OS_MACOSX) && !defined(NDEBUG)
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"InstrumentedCondVar::TimedWaitInternal", [&](void* arg) {
uint64_t time_us = *reinterpret_cast<uint64_t*>(arg);
if (time_us < mock_env->RealNowMicros()) {
*reinterpret_cast<uint64_t*>(arg) = mock_env->RealNowMicros() + 1000;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
#endif // OS_MACOSX && !NDEBUG
CreateColumnFamilies({"pikachu"}, options); CreateColumnFamilies({"pikachu"}, options);
ASSERT_OK(Put("foo", "bar")); ASSERT_OK(Put("foo", "bar"));
ReopenWithColumnFamilies({"default", "pikachu"}, options); ReopenWithColumnFamilies({"default", "pikachu"}, options);
@ -198,24 +170,11 @@ TEST_F(StatsHistoryTest, GetStatsHistoryInMemory) {
TEST_F(StatsHistoryTest, InMemoryStatsHistoryPurging) { TEST_F(StatsHistoryTest, InMemoryStatsHistoryPurging) {
Options options; Options options;
options.create_if_missing = true; options.create_if_missing = true;
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); options.statistics = CreateDBStatistics();
options.stats_persist_period_sec = 1; options.stats_persist_period_sec = 1;
std::unique_ptr<ROCKSDB_NAMESPACE::MockTimeEnv> mock_env; std::unique_ptr<SafeMockTimeEnv> mock_env(new SafeMockTimeEnv(env_));
mock_env.reset(new ROCKSDB_NAMESPACE::MockTimeEnv(env_));
mock_env->set_current_time(0); // in seconds mock_env->set_current_time(0); // in seconds
options.env = mock_env.get(); options.env = mock_env.get();
#if defined(OS_MACOSX) && !defined(NDEBUG)
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"InstrumentedCondVar::TimedWaitInternal", [&](void* arg) {
uint64_t time_us = *reinterpret_cast<uint64_t*>(arg);
if (time_us < mock_env->RealNowMicros()) {
*reinterpret_cast<uint64_t*>(arg) = mock_env->RealNowMicros() + 1000;
}
});
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
#endif // OS_MACOSX && !NDEBUG
CreateColumnFamilies({"pikachu"}, options); CreateColumnFamilies({"pikachu"}, options);
ASSERT_OK(Put("foo", "bar")); ASSERT_OK(Put("foo", "bar"));
@ -312,10 +271,9 @@ TEST_F(StatsHistoryTest, GetStatsHistoryFromDisk) {
Options options; Options options;
options.create_if_missing = true; options.create_if_missing = true;
options.stats_persist_period_sec = 5; options.stats_persist_period_sec = 5;
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); options.statistics = CreateDBStatistics();
options.persist_stats_to_disk = true; options.persist_stats_to_disk = true;
std::unique_ptr<ROCKSDB_NAMESPACE::MockTimeEnv> mock_env; std::unique_ptr<SafeMockTimeEnv> mock_env(new SafeMockTimeEnv(env_));
mock_env.reset(new ROCKSDB_NAMESPACE::MockTimeEnv(env_));
mock_env->set_current_time(0); // in seconds mock_env->set_current_time(0); // in seconds
options.env = mock_env.get(); options.env = mock_env.get();
CreateColumnFamilies({"pikachu"}, options); CreateColumnFamilies({"pikachu"}, options);
@ -393,10 +351,9 @@ TEST_F(StatsHistoryTest, PersitentStatsVerifyValue) {
Options options; Options options;
options.create_if_missing = true; options.create_if_missing = true;
options.stats_persist_period_sec = 5; options.stats_persist_period_sec = 5;
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); options.statistics = CreateDBStatistics();
options.persist_stats_to_disk = true; options.persist_stats_to_disk = true;
std::unique_ptr<ROCKSDB_NAMESPACE::MockTimeEnv> mock_env; std::unique_ptr<SafeMockTimeEnv> mock_env(new SafeMockTimeEnv(env_));
mock_env.reset(new ROCKSDB_NAMESPACE::MockTimeEnv(env_));
std::map<std::string, uint64_t> stats_map_before; std::map<std::string, uint64_t> stats_map_before;
ASSERT_TRUE(options.statistics->getTickerMap(&stats_map_before)); ASSERT_TRUE(options.statistics->getTickerMap(&stats_map_before));
mock_env->set_current_time(0); // in seconds mock_env->set_current_time(0); // in seconds
@ -472,10 +429,9 @@ TEST_F(StatsHistoryTest, PersistentStatsCreateColumnFamilies) {
Options options; Options options;
options.create_if_missing = true; options.create_if_missing = true;
options.stats_persist_period_sec = 5; options.stats_persist_period_sec = 5;
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); options.statistics = CreateDBStatistics();
options.persist_stats_to_disk = true; options.persist_stats_to_disk = true;
std::unique_ptr<ROCKSDB_NAMESPACE::MockTimeEnv> mock_env; std::unique_ptr<SafeMockTimeEnv> mock_env(new SafeMockTimeEnv(env_));
mock_env.reset(new ROCKSDB_NAMESPACE::MockTimeEnv(env_));
mock_env->set_current_time(0); // in seconds mock_env->set_current_time(0); // in seconds
options.env = mock_env.get(); options.env = mock_env.get();
ASSERT_OK(TryReopen(options)); ASSERT_OK(TryReopen(options));
@ -573,10 +529,9 @@ TEST_F(StatsHistoryTest, ForceManualFlushStatsCF) {
options.create_if_missing = true; options.create_if_missing = true;
options.write_buffer_size = 1024 * 1024 * 10; // 10 Mb options.write_buffer_size = 1024 * 1024 * 10; // 10 Mb
options.stats_persist_period_sec = 5; options.stats_persist_period_sec = 5;
options.statistics = ROCKSDB_NAMESPACE::CreateDBStatistics(); options.statistics = CreateDBStatistics();
options.persist_stats_to_disk = true; options.persist_stats_to_disk = true;
std::unique_ptr<ROCKSDB_NAMESPACE::MockTimeEnv> mock_env; std::unique_ptr<SafeMockTimeEnv> mock_env(new SafeMockTimeEnv(env_));
mock_env.reset(new ROCKSDB_NAMESPACE::MockTimeEnv(env_));
mock_env->set_current_time(0); // in seconds mock_env->set_current_time(0); // in seconds
options.env = mock_env.get(); options.env = mock_env.get();
CreateColumnFamilies({"pikachu"}, options); CreateColumnFamilies({"pikachu"}, options);

@ -9,6 +9,9 @@
namespace ROCKSDB_NAMESPACE { namespace ROCKSDB_NAMESPACE {
// NOTE: SpecialEnv offers most of this functionality, along with hooks
// for safe DB behavior under a mock time environment, so should be used
// instead of MockTimeEnv for DB tests.
class MockTimeEnv : public EnvWrapper { class MockTimeEnv : public EnvWrapper {
public: public:
explicit MockTimeEnv(Env* base) : EnvWrapper(base) {} explicit MockTimeEnv(Env* base) : EnvWrapper(base) {}

@ -180,9 +180,7 @@ TEST_F(RateLimiterTest, AutoTuneIncreaseWhenFull) {
const std::chrono::seconds kTimePerRefill(1); const std::chrono::seconds kTimePerRefill(1);
const int kRefillsPerTune = 100; // needs to match util/rate_limiter.cc const int kRefillsPerTune = 100; // needs to match util/rate_limiter.cc
SpecialEnv special_env(Env::Default()); SpecialEnv special_env(Env::Default(), /*time_elapse_only_sleep*/ true);
special_env.no_slowdown_ = true;
special_env.time_elapse_only_sleep_ = true;
auto stats = CreateDBStatistics(); auto stats = CreateDBStatistics();
std::unique_ptr<RateLimiter> rate_limiter(new GenericRateLimiter( std::unique_ptr<RateLimiter> rate_limiter(new GenericRateLimiter(

@ -44,6 +44,7 @@ class Timer {
~Timer() {} ~Timer() {}
// repeat_every_us == 0 means do not repeat
void Add(std::function<void()> fn, void Add(std::function<void()> fn,
const std::string& fn_name, const std::string& fn_name,
uint64_t start_after_us, uint64_t start_after_us,

@ -17,10 +17,13 @@ class TimerTest : public testing::Test {
std::unique_ptr<MockTimeEnv> mock_env_; std::unique_ptr<MockTimeEnv> mock_env_;
#if defined(OS_MACOSX) && !defined(NDEBUG) #if defined(OS_MACOSX) && !defined(NDEBUG)
// On MacOS, `CondVar.TimedWait()` doesn't use the time from MockTimeEnv, // On some platforms (MacOS) pthread_cond_timedwait does not appear
// instead it still uses the system time. // to release the lock for other threads to operate if the deadline time
// This is just a mitigation that always trigger the CV timeout. It is not // is already passed. This is a problem for tests in general because
// perfect, only works for this test. // TimedWait calls are a bad abstraction: the deadline parameter is
// usually computed from Env time, but is interpreted in real clock time.
// Since this test doesn't even pretend to use clock times, we have
// to mock TimedWait to ensure it yields.
void SetUp() override { void SetUp() override {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks(); ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
@ -116,7 +119,7 @@ TEST_F(TimerTest, MultipleScheduleOnceTest) {
time_counter += kSecond; time_counter += kSecond;
mock_env_->set_current_time(time_counter); mock_env_->set_current_time(time_counter);
test_cv1.TimedWait(time_counter); test_cv1.TimedWait(time_counter);
} }
} }
// Wait for execution to finish // Wait for execution to finish

@ -74,6 +74,18 @@ class BlobDBTest : public testing::Test {
Status TryOpen(BlobDBOptions bdb_options = BlobDBOptions(), Status TryOpen(BlobDBOptions bdb_options = BlobDBOptions(),
Options options = Options()) { Options options = Options()) {
options.create_if_missing = true; options.create_if_missing = true;
if (options.env == mock_env_.get()) {
// Need to disable stats dumping and persisting which also use
// RepeatableThread, which uses InstrumentedCondVar::TimedWaitInternal.
// With mocked time, this can hang on some platforms (MacOS)
// because (a) on some platforms, pthread_cond_timedwait does not appear
// to release the lock for other threads to operate if the deadline time
// is already passed, and (b) TimedWait calls are currently a bad
// abstraction because the deadline parameter is usually computed from
// Env time, but is interpreted in real clock time.
options.stats_dump_period_sec = 0;
options.stats_persist_period_sec = 0;
}
return BlobDB::Open(options, bdb_options, dbname_, &blob_db_); return BlobDB::Open(options, bdb_options, dbname_, &blob_db_);
} }

Loading…
Cancel
Save