Use SystemClock* instead of std::shared_ptr<SystemClock> in lower level routines (#8033)

Summary:
For performance purposes, the lower level routines were changed to use a SystemClock* instead of a std::shared_ptr<SystemClock>.  The shared ptr has some performance degradation on certain hardware classes.

For most of the system, there is no risk of the pointer being deleted/invalid because the shared_ptr will be stored elsewhere.  For example, the ImmutableDBOptions stores the Env which has a std::shared_ptr<SystemClock> in it.  The SystemClock* within the ImmutableDBOptions is essentially a "short cut" to gain access to this constant resource.

There were a few classes (PeriodicWorkScheduler?) where the "short cut" property did not hold.  In those cases, the shared pointer was preserved.

Using db_bench readrandom perf_level=3 on my EC2 box, this change performed as well or better than 6.17:

6.17: readrandom   :      28.046 micros/op 854902 ops/sec;   61.3 MB/s (355999 of 355999 found)
6.18: readrandom   :      32.615 micros/op 735306 ops/sec;   52.7 MB/s (290999 of 290999 found)
PR: readrandom   :      27.500 micros/op 871909 ops/sec;   62.5 MB/s (367999 of 367999 found)

(Note that the times for 6.18 are prior to revert of the SystemClock).

Pull Request resolved: https://github.com/facebook/rocksdb/pull/8033

Reviewed By: pdillinger

Differential Revision: D27014563

Pulled By: mrambacher

fbshipit-source-id: ad0459eba03182e454391b5926bf5cdd45657b67
main
mrambacher 4 years ago committed by Facebook GitHub Bot
parent b8f40f7f7b
commit 3dff28cf9b
  1. 25
      db/blob/blob_file_builder.cc
  2. 5
      db/blob/blob_file_builder.h
  3. 81
      db/blob/blob_file_builder_test.cc
  4. 8
      db/blob/blob_file_cache_test.cc
  5. 3
      db/blob/blob_file_reader.cc
  6. 16
      db/blob/blob_file_reader_test.cc
  7. 4
      db/blob/blob_log_sequential_reader.cc
  8. 5
      db/blob/blob_log_sequential_reader.h
  9. 6
      db/blob/blob_log_writer.cc
  10. 5
      db/blob/blob_log_writer.h
  11. 14
      db/builder.cc
  12. 4
      db/column_family.cc
  13. 2
      db/compaction/compaction_iterator.cc
  14. 2
      db/compaction/compaction_iterator.h
  15. 21
      db/compaction/compaction_job.cc
  16. 1
      db/compaction/compaction_job.h
  17. 2
      db/compaction/compaction_picker_fifo.cc
  18. 57
      db/db_impl/db_impl.cc
  19. 2
      db/db_impl/db_impl.h
  20. 7
      db/db_impl/db_impl_compaction_flush.cc
  21. 2
      db/db_impl/db_impl_files.cc
  22. 20
      db/db_impl/db_impl_open.cc
  23. 4
      db/db_impl/db_impl_secondary.cc
  24. 24
      db/db_impl/db_impl_write.cc
  25. 2
      db/db_iter.cc
  26. 2
      db/db_iter.h
  27. 2
      db/error_handler.cc
  28. 10
      db/external_sst_file_ingestion_job.h
  29. 2
      db/flush_job.cc
  30. 2
      db/flush_job.h
  31. 7
      db/import_column_family_job.h
  32. 8
      db/internal_stats.h
  33. 4
      db/memtable.cc
  34. 2
      db/memtable.h
  35. 14
      db/merge_helper.cc
  36. 14
      db/merge_helper.h
  37. 20
      db/perf_context_test.cc
  38. 4
      db/periodic_work_scheduler.cc
  39. 7
      db/prefix_test.cc
  40. 6
      db/range_del_aggregator_bench.cc
  41. 3
      db/repair.cc
  42. 12
      db/table_cache.cc
  43. 12
      db/version_set.cc
  44. 4
      db/version_set.h
  45. 2
      db/version_set_test.cc
  46. 5
      db/wal_manager.cc
  47. 1
      db/wal_manager_test.cc
  48. 3
      db/write_batch.cc
  49. 6
      db/write_controller.cc
  50. 5
      db/write_controller.h
  51. 48
      db/write_controller_test.cc
  52. 21
      db_stress_tool/db_stress_driver.cc
  53. 8
      db_stress_tool/db_stress_stat.h
  54. 34
      db_stress_tool/db_stress_test_base.cc
  55. 2
      db_stress_tool/db_stress_test_base.h
  56. 3
      env/env_encryption.cc
  57. 2
      env/env_test.cc
  58. 20
      env/file_system_tracer.h
  59. 4
      env/fs_posix.cc
  60. 46
      env/mock_env.cc
  61. 6
      env/mock_env.h
  62. 5
      file/delete_scheduler.cc
  63. 4
      file/delete_scheduler.h
  64. 5
      file/file_util.h
  65. 6
      file/filename.cc
  66. 3
      file/filename.h
  67. 4
      file/random_access_file_reader.cc
  68. 4
      file/random_access_file_reader.h
  69. 2
      file/random_access_file_reader_test.cc
  70. 5
      file/sst_file_manager_impl.cc
  71. 5
      file/writable_file_writer.h
  72. 6
      include/rocksdb/metadata.h
  73. 12
      logging/env_logger.h
  74. 2
      memtable/memtablerep_bench.cc
  75. 5
      monitoring/instrumented_mutex.cc
  76. 10
      monitoring/instrumented_mutex.h
  77. 2
      monitoring/iostats_context_imp.h
  78. 4
      monitoring/perf_context_imp.h
  79. 1
      options/cf_options.cc
  80. 2
      options/cf_options.h
  81. 6
      options/db_options.cc
  82. 2
      options/db_options.h
  83. 2
      port/win/env_win.cc
  84. 3
      port/win/win_logger.cc
  85. 5
      port/win/win_logger.h
  86. 4
      table/block_based/block_based_table_builder.cc
  87. 11
      table/block_based/block_based_table_reader.cc
  88. 3
      table/block_based/block_based_table_reader.h
  89. 4
      table/block_based/block_based_table_reader_test.cc
  90. 4
      table/block_fetcher_test.cc
  91. 5
      table/format.cc
  92. 9
      table/get_context.cc
  93. 8
      table/get_context.h
  94. 5
      table/sst_file_writer.cc
  95. 5
      table/table_reader_bench.cc
  96. 3
      table/table_test.cc
  97. 8
      tools/block_cache_analyzer/block_cache_trace_analyzer.cc
  98. 8
      tools/block_cache_analyzer/block_cache_trace_analyzer_test.cc
  99. 36
      tools/db_bench_tool.cc
  100. 10
      trace_replay/block_cache_tracer.cc
  101. Some files were not shown because too many files have changed in this diff Show More

@ -27,7 +27,7 @@
namespace ROCKSDB_NAMESPACE {
BlobFileBuilder::BlobFileBuilder(
VersionSet* versions, Env* env, FileSystem* fs,
VersionSet* versions, FileSystem* fs,
const ImmutableCFOptions* immutable_cf_options,
const MutableCFOptions* mutable_cf_options, const FileOptions* file_options,
int job_id, uint32_t column_family_id,
@ -36,14 +36,14 @@ BlobFileBuilder::BlobFileBuilder(
const std::shared_ptr<IOTracer>& io_tracer,
std::vector<std::string>* blob_file_paths,
std::vector<BlobFileAddition>* blob_file_additions)
: BlobFileBuilder([versions]() { return versions->NewFileNumber(); }, env,
fs, immutable_cf_options, mutable_cf_options,
file_options, job_id, column_family_id,
column_family_name, io_priority, write_hint, io_tracer,
blob_file_paths, blob_file_additions) {}
: BlobFileBuilder([versions]() { return versions->NewFileNumber(); }, fs,
immutable_cf_options, mutable_cf_options, file_options,
job_id, column_family_id, column_family_name, io_priority,
write_hint, io_tracer, blob_file_paths,
blob_file_additions) {}
BlobFileBuilder::BlobFileBuilder(
std::function<uint64_t()> file_number_generator, Env* env, FileSystem* fs,
std::function<uint64_t()> file_number_generator, FileSystem* fs,
const ImmutableCFOptions* immutable_cf_options,
const MutableCFOptions* mutable_cf_options, const FileOptions* file_options,
int job_id, uint32_t column_family_id,
@ -70,7 +70,6 @@ BlobFileBuilder::BlobFileBuilder(
blob_count_(0),
blob_bytes_(0) {
assert(file_number_generator_);
assert(env);
assert(fs_);
assert(immutable_cf_options_);
assert(file_options_);
@ -78,7 +77,6 @@ BlobFileBuilder::BlobFileBuilder(
assert(blob_file_paths_->empty());
assert(blob_file_additions_);
assert(blob_file_additions_->empty());
clock_ = env->GetSystemClock();
}
BlobFileBuilder::~BlobFileBuilder() = default;
@ -185,16 +183,17 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() {
FileTypeSet tmp_set = immutable_cf_options_->checksum_handoff_file_types;
Statistics* const statistics = immutable_cf_options_->statistics;
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(file), blob_file_paths_->back(), *file_options_, clock_,
io_tracer_, statistics, immutable_cf_options_->listeners,
std::move(file), blob_file_paths_->back(), *file_options_,
immutable_cf_options_->clock, io_tracer_, statistics,
immutable_cf_options_->listeners,
immutable_cf_options_->file_checksum_gen_factory,
tmp_set.Contains(FileType::kBlobFile)));
constexpr bool do_flush = false;
std::unique_ptr<BlobLogWriter> blob_log_writer(new BlobLogWriter(
std::move(file_writer), clock_, statistics, blob_file_number,
immutable_cf_options_->use_fsync, do_flush));
std::move(file_writer), immutable_cf_options_->clock, statistics,
blob_file_number, immutable_cf_options_->use_fsync, do_flush));
constexpr bool has_ttl = false;
constexpr ExpirationRange expiration_range;

@ -30,7 +30,7 @@ class IOTracer;
class BlobFileBuilder {
public:
BlobFileBuilder(VersionSet* versions, Env* env, FileSystem* fs,
BlobFileBuilder(VersionSet* versions, FileSystem* fs,
const ImmutableCFOptions* immutable_cf_options,
const MutableCFOptions* mutable_cf_options,
const FileOptions* file_options, int job_id,
@ -42,7 +42,7 @@ class BlobFileBuilder {
std::vector<std::string>* blob_file_paths,
std::vector<BlobFileAddition>* blob_file_additions);
BlobFileBuilder(std::function<uint64_t()> file_number_generator, Env* env,
BlobFileBuilder(std::function<uint64_t()> file_number_generator,
FileSystem* fs,
const ImmutableCFOptions* immutable_cf_options,
const MutableCFOptions* mutable_cf_options,
@ -74,7 +74,6 @@ class BlobFileBuilder {
std::function<uint64_t()> file_number_generator_;
FileSystem* fs_;
std::shared_ptr<SystemClock> clock_;
const ImmutableCFOptions* immutable_cf_options_;
uint64_t min_blob_size_;
uint64_t blob_file_size_;

@ -41,7 +41,7 @@ class BlobFileBuilderTest : public testing::Test {
protected:
BlobFileBuilderTest() : mock_env_(Env::Default()) {
fs_ = mock_env_.GetFileSystem().get();
clock_ = mock_env_.GetSystemClock();
clock_ = mock_env_.GetSystemClock().get();
}
void VerifyBlobFile(uint64_t blob_file_number,
@ -110,7 +110,7 @@ class BlobFileBuilderTest : public testing::Test {
MockEnv mock_env_;
FileSystem* fs_;
std::shared_ptr<SystemClock> clock_;
SystemClock* clock_;
FileOptions file_options_;
};
@ -127,6 +127,7 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckOneFile) {
"BlobFileBuilderTest_BuildAndCheckOneFile"),
0);
options.enable_blob_files = true;
options.env = &mock_env_;
ImmutableCFOptions immutable_cf_options(options);
MutableCFOptions mutable_cf_options(options);
@ -140,11 +141,11 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckOneFile) {
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;
BlobFileBuilder builder(
TestFileNumberGenerator(), &mock_env_, fs_, &immutable_cf_options,
&mutable_cf_options, &file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint, nullptr /*IOTracer*/,
&blob_file_paths, &blob_file_additions);
BlobFileBuilder builder(TestFileNumberGenerator(), fs_, &immutable_cf_options,
&mutable_cf_options, &file_options_, job_id,
column_family_id, column_family_name, io_priority,
write_hint, nullptr /*IOTracer*/, &blob_file_paths,
&blob_file_additions);
std::vector<std::pair<std::string, std::string>> expected_key_value_pairs(
number_of_blobs);
@ -210,6 +211,7 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckMultipleFiles) {
0);
options.enable_blob_files = true;
options.blob_file_size = value_size;
options.env = &mock_env_;
ImmutableCFOptions immutable_cf_options(options);
MutableCFOptions mutable_cf_options(options);
@ -223,11 +225,11 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckMultipleFiles) {
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;
BlobFileBuilder builder(
TestFileNumberGenerator(), &mock_env_, fs_, &immutable_cf_options,
&mutable_cf_options, &file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint, nullptr /*IOTracer*/,
&blob_file_paths, &blob_file_additions);
BlobFileBuilder builder(TestFileNumberGenerator(), fs_, &immutable_cf_options,
&mutable_cf_options, &file_options_, job_id,
column_family_id, column_family_name, io_priority,
write_hint, nullptr /*IOTracer*/, &blob_file_paths,
&blob_file_additions);
std::vector<std::pair<std::string, std::string>> expected_key_value_pairs(
number_of_blobs);
@ -295,6 +297,7 @@ TEST_F(BlobFileBuilderTest, InlinedValues) {
0);
options.enable_blob_files = true;
options.min_blob_size = 1024;
options.env = &mock_env_;
ImmutableCFOptions immutable_cf_options(options);
MutableCFOptions mutable_cf_options(options);
@ -308,11 +311,11 @@ TEST_F(BlobFileBuilderTest, InlinedValues) {
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;
BlobFileBuilder builder(
TestFileNumberGenerator(), &mock_env_, fs_, &immutable_cf_options,
&mutable_cf_options, &file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint, nullptr /*IOTracer*/,
&blob_file_paths, &blob_file_additions);
BlobFileBuilder builder(TestFileNumberGenerator(), fs_, &immutable_cf_options,
&mutable_cf_options, &file_options_, job_id,
column_family_id, column_family_name, io_priority,
write_hint, nullptr /*IOTracer*/, &blob_file_paths,
&blob_file_additions);
for (size_t i = 0; i < number_of_blobs; ++i) {
const std::string key = std::to_string(i);
@ -347,6 +350,7 @@ TEST_F(BlobFileBuilderTest, Compression) {
test::PerThreadDBPath(&mock_env_, "BlobFileBuilderTest_Compression"), 0);
options.enable_blob_files = true;
options.blob_compression_type = kSnappyCompression;
options.env = &mock_env_;
ImmutableCFOptions immutable_cf_options(options);
MutableCFOptions mutable_cf_options(options);
@ -360,11 +364,11 @@ TEST_F(BlobFileBuilderTest, Compression) {
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;
BlobFileBuilder builder(
TestFileNumberGenerator(), &mock_env_, fs_, &immutable_cf_options,
&mutable_cf_options, &file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint, nullptr /*IOTracer*/,
&blob_file_paths, &blob_file_additions);
BlobFileBuilder builder(TestFileNumberGenerator(), fs_, &immutable_cf_options,
&mutable_cf_options, &file_options_, job_id,
column_family_id, column_family_name, io_priority,
write_hint, nullptr /*IOTracer*/, &blob_file_paths,
&blob_file_additions);
const std::string key("1");
const std::string uncompressed_value(value_size, 'x');
@ -429,7 +433,7 @@ TEST_F(BlobFileBuilderTest, CompressionError) {
0);
options.enable_blob_files = true;
options.blob_compression_type = kSnappyCompression;
options.env = &mock_env_;
ImmutableCFOptions immutable_cf_options(options);
MutableCFOptions mutable_cf_options(options);
@ -442,11 +446,11 @@ TEST_F(BlobFileBuilderTest, CompressionError) {
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;
BlobFileBuilder builder(
TestFileNumberGenerator(), &mock_env_, fs_, &immutable_cf_options,
&mutable_cf_options, &file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint, nullptr /*IOTracer*/,
&blob_file_paths, &blob_file_additions);
BlobFileBuilder builder(TestFileNumberGenerator(), fs_, &immutable_cf_options,
&mutable_cf_options, &file_options_, job_id,
column_family_id, column_family_name, io_priority,
write_hint, nullptr /*IOTracer*/, &blob_file_paths,
&blob_file_additions);
SyncPoint::GetInstance()->SetCallBack("CompressData:TamperWithReturnValue",
[](void* arg) {
@ -506,6 +510,7 @@ TEST_F(BlobFileBuilderTest, Checksum) {
options.enable_blob_files = true;
options.file_checksum_gen_factory =
std::make_shared<DummyFileChecksumGenFactory>();
options.env = &mock_env_;
ImmutableCFOptions immutable_cf_options(options);
MutableCFOptions mutable_cf_options(options);
@ -519,11 +524,11 @@ TEST_F(BlobFileBuilderTest, Checksum) {
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;
BlobFileBuilder builder(
TestFileNumberGenerator(), &mock_env_, fs_, &immutable_cf_options,
&mutable_cf_options, &file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint, nullptr /*IOTracer*/,
&blob_file_paths, &blob_file_additions);
BlobFileBuilder builder(TestFileNumberGenerator(), fs_, &immutable_cf_options,
&mutable_cf_options, &file_options_, job_id,
column_family_id, column_family_name, io_priority,
write_hint, nullptr /*IOTracer*/, &blob_file_paths,
&blob_file_additions);
const std::string key("1");
const std::string value("deadbeef");
@ -615,11 +620,11 @@ TEST_P(BlobFileBuilderIOErrorTest, IOError) {
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;
BlobFileBuilder builder(
TestFileNumberGenerator(), &mock_env_, fs_, &immutable_cf_options,
&mutable_cf_options, &file_options_, job_id, column_family_id,
column_family_name, io_priority, write_hint, nullptr /*IOTracer*/,
&blob_file_paths, &blob_file_additions);
BlobFileBuilder builder(TestFileNumberGenerator(), fs_, &immutable_cf_options,
&mutable_cf_options, &file_options_, job_id,
column_family_id, column_family_name, io_priority,
write_hint, nullptr /*IOTracer*/, &blob_file_paths,
&blob_file_additions);
SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* arg) {
Status* const s = static_cast<Status*>(arg);

@ -42,15 +42,15 @@ void WriteBlobFile(uint32_t column_family_id,
std::unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), blob_file_path, FileOptions(),
immutable_cf_options.env->GetSystemClock()));
immutable_cf_options.clock));
constexpr Statistics* statistics = nullptr;
constexpr bool use_fsync = false;
constexpr bool do_flush = false;
BlobLogWriter blob_log_writer(
std::move(file_writer), immutable_cf_options.env->GetSystemClock(),
statistics, blob_file_number, use_fsync, do_flush);
BlobLogWriter blob_log_writer(std::move(file_writer),
immutable_cf_options.clock, statistics,
blob_file_number, use_fsync, do_flush);
constexpr bool has_ttl = false;
constexpr ExpirationRange expiration_range;

@ -118,8 +118,7 @@ Status BlobFileReader::OpenFile(
}
file_reader->reset(new RandomAccessFileReader(
std::move(file), blob_file_path,
immutable_cf_options.env->GetSystemClock(), io_tracer,
std::move(file), blob_file_path, immutable_cf_options.clock, io_tracer,
immutable_cf_options.statistics, BLOB_DB_BLOB_FILE_READ_MICROS,
blob_file_read_hist, immutable_cf_options.rate_limiter,
immutable_cf_options.listeners));

@ -50,15 +50,15 @@ void WriteBlobFile(const ImmutableCFOptions& immutable_cf_options,
std::unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), blob_file_path, FileOptions(),
immutable_cf_options.env->GetSystemClock()));
immutable_cf_options.clock));
constexpr Statistics* statistics = nullptr;
constexpr bool use_fsync = false;
constexpr bool do_flush = false;
BlobLogWriter blob_log_writer(
std::move(file_writer), immutable_cf_options.env->GetSystemClock(),
statistics, blob_file_number, use_fsync, do_flush);
BlobLogWriter blob_log_writer(std::move(file_writer),
immutable_cf_options.clock, statistics,
blob_file_number, use_fsync, do_flush);
BlobLogHeader header(column_family_id, compression_type, has_ttl,
expiration_range_header);
@ -280,15 +280,15 @@ TEST_F(BlobFileReaderTest, Malformed) {
std::unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), blob_file_path, FileOptions(),
immutable_cf_options.env->GetSystemClock()));
immutable_cf_options.clock));
constexpr Statistics* statistics = nullptr;
constexpr bool use_fsync = false;
constexpr bool do_flush = false;
BlobLogWriter blob_log_writer(
std::move(file_writer), immutable_cf_options.env->GetSystemClock(),
statistics, blob_file_number, use_fsync, do_flush);
BlobLogWriter blob_log_writer(std::move(file_writer),
immutable_cf_options.clock, statistics,
blob_file_number, use_fsync, do_flush);
BlobLogHeader header(column_family_id, kNoCompression, has_ttl,
expiration_range);

@ -13,8 +13,8 @@
namespace ROCKSDB_NAMESPACE {
BlobLogSequentialReader::BlobLogSequentialReader(
std::unique_ptr<RandomAccessFileReader>&& file_reader,
const std::shared_ptr<SystemClock>& clock, Statistics* statistics)
std::unique_ptr<RandomAccessFileReader>&& file_reader, SystemClock* clock,
Statistics* statistics)
: file_(std::move(file_reader)),
clock_(clock),
statistics_(statistics),

@ -36,8 +36,7 @@ class BlobLogSequentialReader {
// Create a reader that will return log records from "*file_reader".
BlobLogSequentialReader(std::unique_ptr<RandomAccessFileReader>&& file_reader,
const std::shared_ptr<SystemClock>& clock,
Statistics* statistics);
SystemClock* clock, Statistics* statistics);
// No copying allowed
BlobLogSequentialReader(const BlobLogSequentialReader&) = delete;
@ -65,7 +64,7 @@ class BlobLogSequentialReader {
Status ReadSlice(uint64_t size, Slice* slice, char* buf);
const std::unique_ptr<RandomAccessFileReader> file_;
std::shared_ptr<SystemClock> clock_;
SystemClock* clock_;
Statistics* statistics_;

@ -19,9 +19,9 @@
namespace ROCKSDB_NAMESPACE {
BlobLogWriter::BlobLogWriter(std::unique_ptr<WritableFileWriter>&& dest,
const std::shared_ptr<SystemClock>& clock,
Statistics* statistics, uint64_t log_number,
bool use_fs, bool do_flush, uint64_t boffset)
SystemClock* clock, Statistics* statistics,
uint64_t log_number, bool use_fs, bool do_flush,
uint64_t boffset)
: dest_(std::move(dest)),
clock_(clock),
statistics_(statistics),

@ -31,8 +31,7 @@ class BlobLogWriter {
// Create a writer that will append data to "*dest".
// "*dest" must be initially empty.
// "*dest" must remain live while this BlobLogWriter is in use.
BlobLogWriter(std::unique_ptr<WritableFileWriter>&& dest,
const std::shared_ptr<SystemClock>& clock,
BlobLogWriter(std::unique_ptr<WritableFileWriter>&& dest, SystemClock* clock,
Statistics* statistics, uint64_t log_number, bool use_fsync,
bool do_flush, uint64_t boffset = 0);
// No copying allowed
@ -69,7 +68,7 @@ class BlobLogWriter {
private:
std::unique_ptr<WritableFileWriter> dest_;
std::shared_ptr<SystemClock> clock_;
SystemClock* clock_;
Statistics* statistics_;
uint64_t log_number_;
uint64_t block_offset_; // Current offset in block

@ -125,7 +125,6 @@ Status BuildTable(
assert(env);
FileSystem* fs = db_options.fs.get();
assert(fs);
const auto& clock = env->GetSystemClock();
TableProperties tp;
if (iter->Valid() || !range_del_agg->IsEmpty()) {
@ -154,7 +153,7 @@ Status BuildTable(
file->SetIOPriority(io_priority);
file->SetWriteLifeTimeHint(write_hint);
file_writer.reset(new WritableFileWriter(
std::move(file), fname, file_options, clock, io_tracer,
std::move(file), fname, file_options, ioptions.clock, io_tracer,
ioptions.statistics, ioptions.listeners,
ioptions.file_checksum_gen_factory,
tmp_set.Contains(FileType::kTableFile)));
@ -176,11 +175,10 @@ Status BuildTable(
std::unique_ptr<BlobFileBuilder> blob_file_builder(
(mutable_cf_options.enable_blob_files && blob_file_additions)
? new BlobFileBuilder(versions, env, fs, &ioptions,
&mutable_cf_options, &file_options, job_id,
column_family_id, column_family_name,
io_priority, write_hint, io_tracer,
&blob_file_paths, blob_file_additions)
? new BlobFileBuilder(
versions, fs, &ioptions, &mutable_cf_options, &file_options,
job_id, column_family_id, column_family_name, io_priority,
write_hint, io_tracer, &blob_file_paths, blob_file_additions)
: nullptr);
CompactionIterator c_iter(
@ -258,7 +256,7 @@ Status BuildTable(
// Finish and check for file errors
TEST_SYNC_POINT("BuildTable:BeforeSyncTable");
if (s.ok() && !empty) {
StopWatch sw(clock, ioptions.statistics, TABLE_SYNC_MICROS);
StopWatch sw(ioptions.clock, ioptions.statistics, TABLE_SYNC_MICROS);
*io_status = file_writer->Sync(ioptions.use_fsync);
}
TEST_SYNC_POINT("BuildTable:BeforeCloseTableFile");

@ -557,8 +557,8 @@ ColumnFamilyData::ColumnFamilyData(
// if _dummy_versions is nullptr, then this is a dummy column family.
if (_dummy_versions != nullptr) {
internal_stats_.reset(new InternalStats(
ioptions_.num_levels, db_options.env->GetSystemClock(), this));
internal_stats_.reset(
new InternalStats(ioptions_.num_levels, ioptions_.clock, this));
table_cache_.reset(new TableCache(ioptions_, file_options, _table_cache,
block_cache_tracer, io_tracer));
blob_file_cache_.reset(

@ -80,7 +80,7 @@ CompactionIterator::CompactionIterator(
earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
snapshot_checker_(snapshot_checker),
env_(env),
clock_(env_->GetSystemClock()),
clock_(env_->GetSystemClock().get()),
report_detailed_time_(report_detailed_time),
expect_valid_internal_key_(expect_valid_internal_key),
range_del_agg_(range_del_agg),

@ -248,7 +248,7 @@ class CompactionIterator {
const SequenceNumber earliest_write_conflict_snapshot_;
const SnapshotChecker* const snapshot_checker_;
Env* env_;
std::shared_ptr<SystemClock> clock_;
SystemClock* clock_;
bool report_detailed_time_;
bool expect_valid_internal_key_;
CompactionRangeDelAggregator* range_del_agg_;

@ -321,7 +321,6 @@ CompactionJob::CompactionJob(
db_options_(db_options),
file_options_(file_options),
env_(db_options.env),
clock_(env_->GetSystemClock()),
io_tracer_(io_tracer),
fs_(db_options.fs, io_tracer),
file_options_for_read_(
@ -421,7 +420,7 @@ void CompactionJob::Prepare() {
if (c->ShouldFormSubcompactions()) {
{
StopWatch sw(clock_, stats_, SUBCOMPACTION_SETUP_TIME);
StopWatch sw(db_options_.clock, stats_, SUBCOMPACTION_SETUP_TIME);
GenSubcompactionBoundaries();
}
assert(sizes_.size() == boundaries_.size() + 1);
@ -587,7 +586,7 @@ Status CompactionJob::Run() {
const size_t num_threads = compact_->sub_compact_states.size();
assert(num_threads > 0);
const uint64_t start_micros = clock_->NowMicros();
const uint64_t start_micros = db_options_.clock->NowMicros();
// Launch a thread for each of subcompactions 1...num_threads-1
std::vector<port::Thread> thread_pool;
@ -606,7 +605,7 @@ Status CompactionJob::Run() {
thread.join();
}
compaction_stats_.micros = clock_->NowMicros() - start_micros;
compaction_stats_.micros = db_options_.clock->NowMicros() - start_micros;
compaction_stats_.cpu_micros = 0;
for (size_t i = 0; i < compact_->sub_compact_states.size(); i++) {
compaction_stats_.cpu_micros +=
@ -902,7 +901,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
assert(sub_compact);
assert(sub_compact->compaction);
uint64_t prev_cpu_micros = clock_->CPUNanos() / 1000;
uint64_t prev_cpu_micros = db_options_.clock->CPUNanos() / 1000;
ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
@ -980,7 +979,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
std::unique_ptr<BlobFileBuilder> blob_file_builder(
mutable_cf_options->enable_blob_files
? new BlobFileBuilder(
versions_, env_, fs_.get(),
versions_, fs_.get(),
sub_compact->compaction->immutable_cf_options(),
mutable_cf_options, &file_options_, job_id_, cfd->GetID(),
cfd->GetName(), Env::IOPriority::IO_LOW, write_hint_,
@ -1196,7 +1195,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
}
sub_compact->compaction_job_stats.cpu_micros =
clock_->CPUNanos() / 1000 - prev_cpu_micros;
db_options_.clock->CPUNanos() / 1000 - prev_cpu_micros;
if (measure_io_stats_) {
sub_compact->compaction_job_stats.file_write_nanos +=
@ -1475,7 +1474,7 @@ Status CompactionJob::FinishCompactionOutputFile(
// Finish and check for file errors
if (s.ok()) {
StopWatch sw(clock_, stats_, COMPACTION_OUTFILE_SYNC_MICROS);
StopWatch sw(db_options_.clock, stats_, COMPACTION_OUTFILE_SYNC_MICROS);
io_s = sub_compact->outfile->Sync(db_options_.use_fsync);
}
if (s.ok() && io_s.ok()) {
@ -1714,7 +1713,7 @@ Status CompactionJob::OpenCompactionOutputFile(
// Try to figure out the output file's oldest ancester time.
int64_t temp_current_time = 0;
auto get_time_status = env_->GetCurrentTime(&temp_current_time);
auto get_time_status = db_options_.clock->GetCurrentTime(&temp_current_time);
// Safe to proceed even if GetCurrentTime fails. So, log and proceed.
if (!get_time_status.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
@ -1751,8 +1750,8 @@ Status CompactionJob::OpenCompactionOutputFile(
const auto& listeners =
sub_compact->compaction->immutable_cf_options()->listeners;
sub_compact->outfile.reset(new WritableFileWriter(
std::move(writable_file), fname, file_options_, clock_, io_tracer_,
db_options_.statistics.get(), listeners,
std::move(writable_file), fname, file_options_, db_options_.clock,
io_tracer_, db_options_.statistics.get(), listeners,
db_options_.file_checksum_gen_factory.get(),
tmp_set.Contains(FileType::kTableFile)));

@ -160,7 +160,6 @@ class CompactionJob {
const FileOptions file_options_;
Env* env_;
std::shared_ptr<SystemClock> clock_;
std::shared_ptr<IOTracer> io_tracer_;
FileSystemPtr fs_;
// env_option optimized for compaction table reads

@ -45,7 +45,7 @@ Compaction* FIFOCompactionPicker::PickTTLCompaction(
uint64_t total_size = GetTotalFilesSize(level_files);
int64_t _current_time;
auto status = ioptions_.env->GetCurrentTime(&_current_time);
auto status = ioptions_.clock->GetCurrentTime(&_current_time);
if (!status.ok()) {
ROCKS_LOG_BUFFER(log_buffer,
"[%s] FIFO compaction: Couldn't get current time: %s. "

@ -151,13 +151,12 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
own_info_log_(options.info_log == nullptr),
initial_db_options_(SanitizeOptions(dbname, options)),
env_(initial_db_options_.env),
clock_(initial_db_options_.env->GetSystemClock()),
io_tracer_(std::make_shared<IOTracer>()),
immutable_db_options_(initial_db_options_),
fs_(immutable_db_options_.fs, io_tracer_),
mutable_db_options_(initial_db_options_),
stats_(immutable_db_options_.statistics.get()),
mutex_(stats_, clock_, DB_MUTEX_WAIT_MICROS,
mutex_(stats_, immutable_db_options_.clock, DB_MUTEX_WAIT_MICROS,
immutable_db_options_.use_adaptive_mutex),
default_cf_handle_(nullptr),
max_total_in_memory_state_(0),
@ -192,7 +191,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
bg_purge_scheduled_(0),
disable_delete_obsolete_files_(0),
pending_purge_obsolete_files_(0),
delete_obsolete_files_last_run_(clock_->NowMicros()),
delete_obsolete_files_last_run_(immutable_db_options_.clock->NowMicros()),
last_stats_dump_time_microsec_(0),
next_job_id_(1),
has_unpersisted_data_(false),
@ -753,7 +752,8 @@ void DBImpl::PersistStats() {
return;
}
TEST_SYNC_POINT("DBImpl::PersistStats:StartRunning");
uint64_t now_seconds = clock_->NowMicros() / kMicrosInSecond;
uint64_t now_seconds =
immutable_db_options_.clock->NowMicros() / kMicrosInSecond;
Statistics* statistics = immutable_db_options_.statistics.get();
if (!statistics) {
@ -1654,8 +1654,8 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
}
#endif // NDEBUG
PERF_CPU_TIMER_GUARD(get_cpu_nanos, clock_);
StopWatch sw(clock_, stats_, DB_GET);
PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
StopWatch sw(immutable_db_options_.clock, stats_, DB_GET);
PERF_TIMER_GUARD(get_snapshot_time);
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(
@ -1845,8 +1845,8 @@ std::vector<Status> DBImpl::MultiGet(
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys, std::vector<std::string>* values,
std::vector<std::string>* timestamps) {
PERF_CPU_TIMER_GUARD(get_cpu_nanos, clock_);
StopWatch sw(clock_, stats_, DB_MULTIGET);
PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
StopWatch sw(immutable_db_options_.clock, stats_, DB_MULTIGET);
PERF_TIMER_GUARD(get_snapshot_time);
#ifndef NDEBUG
@ -1975,9 +1975,8 @@ std::vector<Status> DBImpl::MultiGet(
break;
}
}
if (read_options.deadline.count() &&
clock_->NowMicros() >
immutable_db_options_.clock->NowMicros() >
static_cast<uint64_t>(read_options.deadline.count())) {
break;
}
@ -1986,7 +1985,7 @@ std::vector<Status> DBImpl::MultiGet(
if (keys_read < num_keys) {
// The only reason to break out of the loop is when the deadline is
// exceeded
assert(clock_->NowMicros() >
assert(immutable_db_options_.clock->NowMicros() >
static_cast<uint64_t>(read_options.deadline.count()));
for (++keys_read; keys_read < num_keys; ++keys_read) {
stat_list[keys_read] = Status::TimedOut();
@ -2426,8 +2425,8 @@ Status DBImpl::MultiGetImpl(
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys,
SuperVersion* super_version, SequenceNumber snapshot,
ReadCallback* callback) {
PERF_CPU_TIMER_GUARD(get_cpu_nanos, clock_);
StopWatch sw(clock_, stats_, DB_MULTIGET);
PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
StopWatch sw(immutable_db_options_.clock, stats_, DB_MULTIGET);
// For each of the given keys, apply the entire "get" process as follows:
// First look in the memtable, then in the immutable memtable (if any).
@ -2438,7 +2437,7 @@ Status DBImpl::MultiGetImpl(
uint64_t curr_value_size = 0;
while (keys_left) {
if (read_options.deadline.count() &&
clock_->NowMicros() >
immutable_db_options_.clock->NowMicros() >
static_cast<uint64_t>(read_options.deadline.count())) {
s = Status::TimedOut();
break;
@ -2991,7 +2990,8 @@ const Snapshot* DBImpl::GetSnapshotForWriteConflictBoundary() {
SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary,
bool lock) {
int64_t unix_time = 0;
env_->GetCurrentTime(&unix_time).PermitUncheckedError(); // Ignore error
immutable_db_options_.clock->GetCurrentTime(&unix_time)
.PermitUncheckedError(); // Ignore error
SnapshotImpl* s = new SnapshotImpl;
if (lock) {
@ -3136,12 +3136,16 @@ FileSystem* DBImpl::GetFileSystem() const {
return immutable_db_options_.fs.get();
}
SystemClock* DBImpl::GetSystemClock() const {
return immutable_db_options_.clock;
}
#ifndef ROCKSDB_LITE
Status DBImpl::StartIOTrace(Env* env, const TraceOptions& trace_options,
Status DBImpl::StartIOTrace(Env* /*env*/, const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer) {
assert(trace_writer != nullptr);
return io_tracer_->StartIOTrace(env->GetSystemClock(), trace_options,
return io_tracer_->StartIOTrace(GetSystemClock(), trace_options,
std::move(trace_writer));
}
@ -4422,9 +4426,9 @@ Status DBImpl::IngestExternalFiles(
std::vector<ExternalSstFileIngestionJob> ingestion_jobs;
for (const auto& arg : args) {
auto* cfd = static_cast<ColumnFamilyHandleImpl*>(arg.column_family)->cfd();
ingestion_jobs.emplace_back(
clock_, versions_.get(), cfd, immutable_db_options_, file_options_,
&snapshots_, arg.options, &directories_, &event_logger_, io_tracer_);
ingestion_jobs.emplace_back(versions_.get(), cfd, immutable_db_options_,
file_options_, &snapshots_, arg.options,
&directories_, &event_logger_, io_tracer_);
}
// TODO(yanqin) maybe make jobs run in parallel
@ -4691,9 +4695,9 @@ Status DBImpl::CreateColumnFamilyWithImport(
// Import sst files from metadata.
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(*handle);
auto cfd = cfh->cfd();
ImportColumnFamilyJob import_job(clock_, versions_.get(), cfd,
immutable_db_options_, file_options_,
import_options, metadata.files, io_tracer_);
ImportColumnFamilyJob import_job(versions_.get(), cfd, immutable_db_options_,
file_options_, import_options,
metadata.files, io_tracer_);
SuperVersionContext dummy_sv_ctx(/* create_superversion */ true);
VersionEdit dummy_edit;
@ -4969,7 +4973,8 @@ void DBImpl::WaitForIngestFile() {
Status DBImpl::StartTrace(const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer) {
InstrumentedMutexLock lock(&trace_mutex_);
tracer_.reset(new Tracer(clock_, trace_options, std::move(trace_writer)));
tracer_.reset(new Tracer(immutable_db_options_.clock, trace_options,
std::move(trace_writer)));
return Status::OK();
}
@ -4988,8 +4993,8 @@ Status DBImpl::EndTrace() {
Status DBImpl::StartBlockCacheTrace(
const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer) {
return block_cache_tracer_.StartTrace(env_, trace_options,
std::move(trace_writer));
return block_cache_tracer_.StartTrace(immutable_db_options_.clock,
trace_options, std::move(trace_writer));
}
Status DBImpl::EndBlockCacheTrace() {

@ -486,6 +486,7 @@ class DBImpl : public DB {
#endif // ROCKSDB_LITE
// ---- End of implementations of the DB interface ----
SystemClock* GetSystemClock() const;
struct GetImplOptions {
ColumnFamilyHandle* column_family = nullptr;
@ -1057,7 +1058,6 @@ class DBImpl : public DB {
bool own_info_log_;
const DBOptions initial_db_options_;
Env* const env_;
std::shared_ptr<SystemClock> clock_;
std::shared_ptr<IOTracer> io_tracer_;
const ImmutableDBOptions immutable_db_options_;
FileSystemPtr fs_;

@ -2564,7 +2564,7 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
s.ToString().c_str(), error_cnt);
log_buffer.FlushBufferToLog();
LogFlush(immutable_db_options_.info_log);
clock_->SleepForMicroseconds(1000000);
immutable_db_options_.clock->SleepForMicroseconds(1000000);
mutex_.Lock();
}
@ -2637,7 +2637,8 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
if (s.IsBusy()) {
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
mutex_.Unlock();
clock_->SleepForMicroseconds(10000); // prevent hot loop
immutable_db_options_.clock->SleepForMicroseconds(
10000); // prevent hot loop
mutex_.Lock();
} else if (!s.ok() && !s.IsShutdownInProgress() &&
!s.IsManualCompactionPaused() && !s.IsColumnFamilyDropped()) {
@ -2655,7 +2656,7 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
"Accumulated background error counts: %" PRIu64,
s.ToString().c_str(), error_cnt);
LogFlush(immutable_db_options_.info_log);
clock_->SleepForMicroseconds(1000000);
immutable_db_options_.clock->SleepForMicroseconds(1000000);
mutex_.Lock();
} else if (s.IsManualCompactionPaused()) {
ManualCompactionState* m = prepicked_compaction->manual_compaction_state;

@ -120,7 +120,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
mutable_db_options_.delete_obsolete_files_period_micros == 0) {
doing_the_full_scan = true;
} else {
const uint64_t now_micros = clock_->NowMicros();
const uint64_t now_micros = immutable_db_options_.clock->NowMicros();
if ((delete_obsolete_files_last_run_ +
mutable_db_options_.delete_obsolete_files_period_micros) <
now_micros) {

@ -293,15 +293,15 @@ Status DBImpl::NewDB(std::vector<std::string>* new_filenames) {
file->SetPreallocationBlockSize(
immutable_db_options_.manifest_preallocation_size);
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(file), manifest, file_options, clock_, io_tracer_,
nullptr /* stats */, immutable_db_options_.listeners, nullptr,
tmp_set.Contains(FileType::kDescriptorFile)));
std::move(file), manifest, file_options, immutable_db_options_.clock,
io_tracer_, nullptr /* stats */, immutable_db_options_.listeners,
nullptr, tmp_set.Contains(FileType::kDescriptorFile)));
log::Writer log(std::move(file_writer), 0, false);
std::string record;
new_db.EncodeTo(&record);
s = log.AddRecord(record);
if (s.ok()) {
s = SyncManifest(clock_, &immutable_db_options_, log.file());
s = SyncManifest(&immutable_db_options_, log.file());
}
}
if (s.ok()) {
@ -1297,7 +1297,7 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& wal_numbers) {
Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
MemTable* mem, VersionEdit* edit) {
mutex_.AssertHeld();
const uint64_t start_micros = clock_->NowMicros();
const uint64_t start_micros = immutable_db_options_.clock->NowMicros();
FileMetaData meta;
std::vector<BlobFileAddition> blob_file_additions;
@ -1325,7 +1325,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
cfd->GetLatestMutableCFOptions()->paranoid_file_checks;
int64_t _current_time = 0;
env_->GetCurrentTime(&_current_time)
immutable_db_options_.clock->GetCurrentTime(&_current_time)
.PermitUncheckedError(); // ignore error
const uint64_t current_time = static_cast<uint64_t>(_current_time);
meta.oldest_ancester_time = current_time;
@ -1399,7 +1399,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
}
InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
stats.micros = clock_->NowMicros() - start_micros;
stats.micros = immutable_db_options_.clock->NowMicros() - start_micros;
if (has_output) {
stats.bytes_written = meta.fd.GetFileSize();
@ -1492,9 +1492,9 @@ IOStatus DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number,
const auto& listeners = immutable_db_options_.listeners;
FileTypeSet tmp_set = immutable_db_options_.checksum_handoff_file_types;
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
std::move(lfile), log_fname, opt_file_options, clock_, io_tracer_,
nullptr /* stats */, listeners, nullptr,
tmp_set.Contains(FileType::kWalFile)));
std::move(lfile), log_fname, opt_file_options,
immutable_db_options_.clock, io_tracer_, nullptr /* stats */, listeners,
nullptr, tmp_set.Contains(FileType::kWalFile)));
*new_log = new log::Writer(std::move(file_writer), log_file_num,
immutable_db_options_.recycle_log_file_num > 0,
immutable_db_options_.manual_wal_flush);

@ -327,8 +327,8 @@ Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* pinnable_val) {
assert(pinnable_val != nullptr);
PERF_CPU_TIMER_GUARD(get_cpu_nanos, clock_);
StopWatch sw(clock_, stats_, DB_GET);
PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
StopWatch sw(immutable_db_options_.clock, stats_, DB_GET);
PERF_TIMER_GUARD(get_snapshot_time);
auto cfh = static_cast<ColumnFamilyHandleImpl*>(column_family);

@ -160,7 +160,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
RecordTick(stats_, WRITE_WITH_WAL);
}
StopWatch write_sw(clock_, immutable_db_options_.statistics.get(), DB_WRITE);
StopWatch write_sw(immutable_db_options_.clock,
immutable_db_options_.statistics.get(), DB_WRITE);
write_thread_.JoinBatchGroup(&w);
if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
@ -465,7 +466,8 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
uint64_t* log_used, uint64_t log_ref,
bool disable_memtable, uint64_t* seq_used) {
PERF_TIMER_GUARD(write_pre_and_post_process_time);
StopWatch write_sw(clock_, immutable_db_options_.statistics.get(), DB_WRITE);
StopWatch write_sw(immutable_db_options_.clock,
immutable_db_options_.statistics.get(), DB_WRITE);
WriteContext write_context;
@ -621,7 +623,8 @@ Status DBImpl::UnorderedWriteMemtable(const WriteOptions& write_options,
SequenceNumber seq,
const size_t sub_batch_cnt) {
PERF_TIMER_GUARD(write_pre_and_post_process_time);
StopWatch write_sw(clock_, immutable_db_options_.statistics.get(), DB_WRITE);
StopWatch write_sw(immutable_db_options_.clock,
immutable_db_options_.statistics.get(), DB_WRITE);
WriteThread::Writer w(write_options, my_batch, callback, log_ref,
false /*disable_memtable*/);
@ -676,7 +679,8 @@ Status DBImpl::WriteImplWALOnly(
WriteThread::Writer w(write_options, my_batch, callback, log_ref,
disable_memtable, sub_batch_cnt, pre_release_callback);
RecordTick(stats_, WRITE_WITH_WAL);
StopWatch write_sw(clock_, immutable_db_options_.statistics.get(), DB_WRITE);
StopWatch write_sw(immutable_db_options_.clock,
immutable_db_options_.statistics.get(), DB_WRITE);
write_thread->JoinBatchGroup(&w);
assert(w.state != WriteThread::STATE_PARALLEL_MEMTABLE_WRITER);
@ -1093,7 +1097,7 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
}
if (io_s.ok() && need_log_sync) {
StopWatch sw(clock_, stats_, WAL_FILE_SYNC_MICROS);
StopWatch sw(immutable_db_options_.clock, stats_, WAL_FILE_SYNC_MICROS);
// It's safe to access logs_ with unlocked mutex_ here because:
// - we've set getting_synced=true for all logs,
// so other threads won't pop from logs_ while we're here,
@ -1457,8 +1461,10 @@ Status DBImpl::DelayWrite(uint64_t num_bytes,
uint64_t time_delayed = 0;
bool delayed = false;
{
StopWatch sw(clock_, stats_, WRITE_STALL, &time_delayed);
uint64_t delay = write_controller_.GetDelay(clock_, num_bytes);
StopWatch sw(immutable_db_options_.clock, stats_, WRITE_STALL,
&time_delayed);
uint64_t delay =
write_controller_.GetDelay(immutable_db_options_.clock, num_bytes);
if (delay > 0) {
if (write_options.no_slowdown) {
return Status::Incomplete("Write stall");
@ -1475,14 +1481,14 @@ Status DBImpl::DelayWrite(uint64_t num_bytes,
const uint64_t kDelayInterval = 1000;
uint64_t stall_end = sw.start_time() + delay;
while (write_controller_.NeedsDelay()) {
if (clock_->NowMicros() >= stall_end) {
if (immutable_db_options_.clock->NowMicros() >= stall_end) {
// We already delayed this write `delay` microseconds
break;
}
delayed = true;
// Sleep for 0.001 seconds
clock_->SleepForMicroseconds(kDelayInterval);
immutable_db_options_.clock->SleepForMicroseconds(kDelayInterval);
}
mutex_.Lock();
write_thread_.EndWriteStall();

@ -45,7 +45,7 @@ DBIter::DBIter(Env* _env, const ReadOptions& read_options,
ColumnFamilyData* cfd, bool expose_blob_index)
: prefix_extractor_(mutable_cf_options.prefix_extractor.get()),
env_(_env),
clock_(_env->GetSystemClock()),
clock_(_env->GetSystemClock().get()),
logger_(cf_options.info_log),
user_comparator_(cmp),
merge_operator_(cf_options.merge_operator),

@ -300,7 +300,7 @@ class DBIter final : public Iterator {
const SliceTransform* prefix_extractor_;
Env* const env_;
std::shared_ptr<SystemClock> clock_;
SystemClock* clock_;
Logger* logger_;
UserComparatorWrapper user_comparator_;
const MergeOperator* const merge_operator_;

@ -670,7 +670,7 @@ void ErrorHandler::RecoverFromRetryableBGIOError() {
// a period of time and redo auto resume if it is allowed.
TEST_SYNC_POINT("RecoverFromRetryableBGIOError:BeforeWait0");
TEST_SYNC_POINT("RecoverFromRetryableBGIOError:BeforeWait1");
int64_t wait_until = db_->clock_->NowMicros() + wait_interval;
int64_t wait_until = db_options_.clock->NowMicros() + wait_interval;
cv_.TimedWait(wait_until);
TEST_SYNC_POINT("RecoverFromRetryableBGIOError:AfterWait0");
} else {

@ -74,13 +74,13 @@ struct IngestedFileInfo {
class ExternalSstFileIngestionJob {
public:
ExternalSstFileIngestionJob(
const std::shared_ptr<SystemClock>& clock, VersionSet* versions,
ColumnFamilyData* cfd, const ImmutableDBOptions& db_options,
const EnvOptions& env_options, SnapshotList* db_snapshots,
VersionSet* versions, ColumnFamilyData* cfd,
const ImmutableDBOptions& db_options, const EnvOptions& env_options,
SnapshotList* db_snapshots,
const IngestExternalFileOptions& ingestion_options,
Directories* directories, EventLogger* event_logger,
const std::shared_ptr<IOTracer>& io_tracer)
: clock_(clock),
: clock_(db_options.clock),
fs_(db_options.fs, io_tracer),
versions_(versions),
cfd_(cfd),
@ -170,7 +170,7 @@ class ExternalSstFileIngestionJob {
template <typename TWritableFile>
Status SyncIngestedFile(TWritableFile* file);
std::shared_ptr<SystemClock> clock_;
SystemClock* clock_;
FileSystemPtr fs_;
VersionSet* versions_;
ColumnFamilyData* cfd_;

@ -127,7 +127,7 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
pick_memtable_called(false),
thread_pri_(thread_pri),
io_tracer_(io_tracer),
clock_(db_options_.env->GetSystemClock()),
clock_(db_options_.clock),
full_history_ts_low_(std::move(full_history_ts_low)) {
// Update the thread status to indicate flush.
ReportStartedFlush();

@ -162,7 +162,7 @@ class FlushJob {
IOStatus io_status_;
const std::shared_ptr<IOTracer> io_tracer_;
const std::shared_ptr<SystemClock> clock_;
SystemClock* clock_;
const std::string full_history_ts_low_;
};

@ -21,14 +21,13 @@ class SystemClock;
// to ExternalSstFileIngestionJob.
class ImportColumnFamilyJob {
public:
ImportColumnFamilyJob(const std::shared_ptr<SystemClock>& clock,
VersionSet* versions, ColumnFamilyData* cfd,
ImportColumnFamilyJob(VersionSet* versions, ColumnFamilyData* cfd,
const ImmutableDBOptions& db_options,
const EnvOptions& env_options,
const ImportColumnFamilyOptions& import_options,
const std::vector<LiveFileMetaData>& metadata,
const std::shared_ptr<IOTracer>& io_tracer)
: clock_(clock),
: clock_(db_options.clock),
versions_(versions),
cfd_(cfd),
db_options_(db_options),
@ -61,7 +60,7 @@ class ImportColumnFamilyJob {
IngestedFileInfo* file_to_import,
SuperVersion* sv);
std::shared_ptr<SystemClock> clock_;
SystemClock* clock_;
VersionSet* versions_;
ColumnFamilyData* cfd_;
const ImmutableDBOptions& db_options_;

@ -125,8 +125,7 @@ class InternalStats {
kIntStatsNumMax,
};
InternalStats(int num_levels, const std::shared_ptr<SystemClock>& clock,
ColumnFamilyData* cfd)
InternalStats(int num_levels, SystemClock* clock, ColumnFamilyData* cfd)
: db_stats_{},
cf_stats_value_{},
cf_stats_count_{},
@ -638,7 +637,7 @@ class InternalStats {
uint64_t bg_error_count_;
const int number_levels_;
const std::shared_ptr<SystemClock> clock_;
SystemClock* clock_;
ColumnFamilyData* cfd_;
uint64_t started_at_;
};
@ -677,8 +676,7 @@ class InternalStats {
kIntStatsNumMax,
};
InternalStats(int /*num_levels*/,
const std::shared_ptr<SystemClock>& /*clock*/,
InternalStats(int /*num_levels*/, SystemClock* /*clock*/,
ColumnFamilyData* /*cfd*/) {}
struct CompactionStats {

@ -104,7 +104,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
: 0),
prefix_extractor_(mutable_cf_options.prefix_extractor.get()),
flush_state_(FLUSH_NOT_REQUESTED),
clock_(ioptions.env->GetSystemClock()),
clock_(ioptions.clock),
insert_with_hint_prefix_extractor_(
ioptions.memtable_insert_with_hint_prefix_extractor),
oldest_key_time_(std::numeric_limits<uint64_t>::max()),
@ -684,7 +684,7 @@ struct Saver {
Statistics* statistics;
bool inplace_update_support;
bool do_merge;
std::shared_ptr<SystemClock> clock;
SystemClock* clock;
ReadCallback* callback_;
bool* is_blob_index;

@ -512,7 +512,7 @@ class MemTable {
std::atomic<FlushStateEnum> flush_state_;
std::shared_ptr<SystemClock> clock_;
SystemClock* clock_;
// Extract sequential insert prefixes.
const SliceTransform* insert_with_hint_prefix_extractor_;

@ -29,7 +29,7 @@ MergeHelper::MergeHelper(Env* env, const Comparator* user_comparator,
Statistics* stats,
const std::atomic<bool>* shutting_down)
: env_(env),
clock_(env->GetSystemClock()),
clock_(env->GetSystemClock().get()),
user_comparator_(user_comparator),
user_merge_operator_(user_merge_operator),
compaction_filter_(compaction_filter),
@ -50,11 +50,13 @@ MergeHelper::MergeHelper(Env* env, const Comparator* user_comparator,
}
}
Status MergeHelper::TimedFullMerge(
const MergeOperator* merge_operator, const Slice& key, const Slice* value,
const std::vector<Slice>& operands, std::string* result, Logger* logger,
Statistics* statistics, const std::shared_ptr<SystemClock>& clock,
Slice* result_operand, bool update_num_ops_stats) {
Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator,
const Slice& key, const Slice* value,
const std::vector<Slice>& operands,
std::string* result, Logger* logger,
Statistics* statistics, SystemClock* clock,
Slice* result_operand,
bool update_num_ops_stats) {
assert(merge_operator != nullptr);
if (operands.size() == 0) {

@ -45,11 +45,13 @@ class MergeHelper {
// Returns one of the following statuses:
// - OK: Entries were successfully merged.
// - Corruption: Merge operator reported unsuccessful merge.
static Status TimedFullMerge(
const MergeOperator* merge_operator, const Slice& key, const Slice* value,
const std::vector<Slice>& operands, std::string* result, Logger* logger,
Statistics* statistics, const std::shared_ptr<SystemClock>& clock,
Slice* result_operand = nullptr, bool update_num_ops_stats = false);
static Status TimedFullMerge(const MergeOperator* merge_operator,
const Slice& key, const Slice* value,
const std::vector<Slice>& operands,
std::string* result, Logger* logger,
Statistics* statistics, SystemClock* clock,
Slice* result_operand = nullptr,
bool update_num_ops_stats = false);
// Merge entries until we hit
// - a corrupted key
@ -139,7 +141,7 @@ class MergeHelper {
private:
Env* env_;
std::shared_ptr<SystemClock> clock_;
SystemClock* clock_;
const Comparator* user_comparator_;
const MergeOperator* user_merge_operator_;
const CompactionFilter* compaction_filter_;

@ -93,7 +93,7 @@ TEST_F(PerfContextTest, SeekIntoDeletion) {
std::string value;
get_perf_context()->Reset();
StopWatchNano timer(SystemClock::Default());
StopWatchNano timer(SystemClock::Default().get());
timer.Start();
auto status = db->Get(read_options, key, &value);
auto elapsed_nanos = timer.ElapsedNanos();
@ -112,7 +112,7 @@ TEST_F(PerfContextTest, SeekIntoDeletion) {
std::unique_ptr<Iterator> iter(db->NewIterator(read_options));
get_perf_context()->Reset();
StopWatchNano timer(SystemClock::Default(), true);
StopWatchNano timer(SystemClock::Default().get(), true);
iter->SeekToFirst();
hist_seek_to_first.Add(get_perf_context()->user_key_comparison_count);
auto elapsed_nanos = timer.ElapsedNanos();
@ -133,7 +133,7 @@ TEST_F(PerfContextTest, SeekIntoDeletion) {
std::string key = "k" + ToString(i);
get_perf_context()->Reset();
StopWatchNano timer(SystemClock::Default(), true);
StopWatchNano timer(SystemClock::Default().get(), true);
iter->Seek(key);
auto elapsed_nanos = timer.ElapsedNanos();
hist_seek.Add(get_perf_context()->user_key_comparison_count);
@ -147,7 +147,7 @@ TEST_F(PerfContextTest, SeekIntoDeletion) {
get_perf_context()->Reset();
ASSERT_TRUE(iter->Valid());
StopWatchNano timer2(SystemClock::Default(), true);
StopWatchNano timer2(SystemClock::Default().get(), true);
iter->Next();
auto elapsed_nanos2 = timer2.ElapsedNanos();
if (FLAGS_verbose) {
@ -166,7 +166,7 @@ TEST_F(PerfContextTest, StopWatchNanoOverhead) {
const int kTotalIterations = 1000000;
std::vector<uint64_t> timings(kTotalIterations);
StopWatchNano timer(SystemClock::Default(), true);
StopWatchNano timer(SystemClock::Default().get(), true);
for (auto& timing : timings) {
timing = timer.ElapsedNanos(true /* reset */);
}
@ -187,7 +187,7 @@ TEST_F(PerfContextTest, StopWatchOverhead) {
uint64_t elapsed = 0;
std::vector<uint64_t> timings(kTotalIterations);
StopWatch timer(SystemClock::Default(), nullptr, 0, &elapsed);
StopWatch timer(SystemClock::Default().get(), nullptr, 0, &elapsed);
for (auto& timing : timings) {
timing = elapsed;
}
@ -541,7 +541,7 @@ TEST_F(PerfContextTest, SeekKeyComparison) {
HistogramImpl hist_time_diff;
SetPerfLevel(kEnableTime);
StopWatchNano timer(SystemClock::Default());
StopWatchNano timer(SystemClock::Default().get());
for (const int i : keys) {
std::string key = "k" + ToString(i);
std::string value = "v" + ToString(i);
@ -594,7 +594,8 @@ TEST_F(PerfContextTest, DBMutexLockCounter) {
for (PerfLevel perf_level_test :
{PerfLevel::kEnableTimeExceptForMutex, PerfLevel::kEnableTime}) {
for (int c = 0; c < 2; ++c) {
InstrumentedMutex mutex(nullptr, SystemClock::Default(), stats_code[c]);
InstrumentedMutex mutex(nullptr, SystemClock::Default().get(),
stats_code[c]);
mutex.Lock();
ROCKSDB_NAMESPACE::port::Thread child_thread([&] {
SetPerfLevel(perf_level_test);
@ -621,7 +622,8 @@ TEST_F(PerfContextTest, FalseDBMutexWait) {
SetPerfLevel(kEnableTime);
int stats_code[] = {0, static_cast<int>(DB_MUTEX_WAIT_MICROS)};
for (int c = 0; c < 2; ++c) {
InstrumentedMutex mutex(nullptr, SystemClock::Default(), stats_code[c]);
InstrumentedMutex mutex(nullptr, SystemClock::Default().get(),
stats_code[c]);
InstrumentedCondVar lock(&mutex);
get_perf_context()->Reset();
mutex.Lock();

@ -13,7 +13,7 @@ namespace ROCKSDB_NAMESPACE {
PeriodicWorkScheduler::PeriodicWorkScheduler(
const std::shared_ptr<SystemClock>& clock) {
timer = std::unique_ptr<Timer>(new Timer(clock));
timer = std::unique_ptr<Timer>(new Timer(clock.get()));
}
void PeriodicWorkScheduler::Register(DBImpl* dbi,
@ -87,7 +87,7 @@ PeriodicWorkTestScheduler* PeriodicWorkTestScheduler::Default(
MutexLock timer_mu_guard(&scheduler.timer_mu_);
scheduler.timer->Shutdown();
}
scheduler.timer.reset(new Timer(clock));
scheduler.timer.reset(new Timer(clock.get()));
}
}
return &scheduler;

@ -598,7 +598,6 @@ TEST_F(PrefixTest, DynamicPrefixIterator) {
HistogramImpl hist_put_time;
HistogramImpl hist_put_comparison;
// insert x random prefix, each with y continuous element.
for (auto prefix : prefixes) {
for (uint64_t sorted = 0; sorted < FLAGS_items_per_prefix; sorted++) {
@ -609,7 +608,7 @@ TEST_F(PrefixTest, DynamicPrefixIterator) {
std::string value(FLAGS_value_size, 0);
get_perf_context()->Reset();
StopWatchNano timer(SystemClock::Default(), true);
StopWatchNano timer(SystemClock::Default().get(), true);
ASSERT_OK(db->Put(write_options, key, value));
hist_put_time.Add(timer.ElapsedNanos());
hist_put_comparison.Add(get_perf_context()->user_key_comparison_count);
@ -632,7 +631,7 @@ TEST_F(PrefixTest, DynamicPrefixIterator) {
std::string value = "v" + ToString(0);
get_perf_context()->Reset();
StopWatchNano timer(SystemClock::Default(), true);
StopWatchNano timer(SystemClock::Default().get(), true);
auto key_prefix = options.prefix_extractor->Transform(key);
uint64_t total_keys = 0;
for (iter->Seek(key);
@ -666,7 +665,7 @@ TEST_F(PrefixTest, DynamicPrefixIterator) {
Slice key = TestKeyToSlice(s, test_key);
get_perf_context()->Reset();
StopWatchNano timer(SystemClock::Default(), true);
StopWatchNano timer(SystemClock::Default().get(), true);
iter->Seek(key);
hist_no_seek_time.Add(timer.ElapsedNanos());
hist_no_seek_comparison.Add(get_perf_context()->user_key_comparison_count);

@ -171,6 +171,8 @@ int main(int argc, char** argv) {
ParseCommandLineFlags(&argc, &argv, true);
Stats stats;
ROCKSDB_NAMESPACE::SystemClock* clock =
ROCKSDB_NAMESPACE::SystemClock::Default().get();
ROCKSDB_NAMESPACE::Random64 rnd(FLAGS_seed);
std::default_random_engine random_gen(FLAGS_seed);
std::normal_distribution<double> normal_dist(FLAGS_tombstone_width_mean,
@ -219,7 +221,7 @@ int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::kMaxSequenceNumber));
ROCKSDB_NAMESPACE::StopWatchNano stop_watch_add_tombstones(
ROCKSDB_NAMESPACE::SystemClock::Default(), true /* auto_start */);
clock, true /* auto_start */);
range_del_agg.AddTombstones(std::move(fragmented_range_del_iter));
stats.time_add_tombstones += stop_watch_add_tombstones.ElapsedNanos();
}
@ -236,7 +238,7 @@ int main(int argc, char** argv) {
parsed_key.user_key = key_string;
ROCKSDB_NAMESPACE::StopWatchNano stop_watch_should_delete(
ROCKSDB_NAMESPACE::SystemClock::Default(), true /* auto_start */);
clock, true /* auto_start */);
range_del_agg.ShouldDelete(parsed_key, mode);
uint64_t call_time = stop_watch_should_delete.ElapsedNanos();

@ -425,7 +425,8 @@ class Repairer {
Arena arena;
ScopedArenaIterator iter(mem->NewIterator(ro, &arena));
int64_t _current_time = 0;
status = env_->GetCurrentTime(&_current_time); // ignore error
immutable_db_options_.clock->GetCurrentTime(&_current_time)
.PermitUncheckedError(); // ignore error
const uint64_t current_time = static_cast<uint64_t>(_current_time);
SnapshotChecker* snapshot_checker = DisableGCSnapshotChecker::Instance();

@ -106,15 +106,14 @@ Status TableCache::GetTableReader(
TableFileName(ioptions_.cf_paths, fd.GetNumber(), fd.GetPathId());
std::unique_ptr<FSRandomAccessFile> file;
FileOptions fopts = file_options;
const auto& clock = ioptions_.env->GetSystemClock();
Status s = PrepareIOFromReadOptions(ro, clock, fopts.io_options);
Status s = PrepareIOFromReadOptions(ro, ioptions_.clock, fopts.io_options);
if (s.ok()) {
s = ioptions_.fs->NewRandomAccessFile(fname, fopts, &file, nullptr);
}
RecordTick(ioptions_.statistics, NO_FILE_OPENS);
if (s.IsPathNotFound()) {
fname = Rocks2LevelTableFileName(fname);
s = PrepareIOFromReadOptions(ro, clock, fopts.io_options);
s = PrepareIOFromReadOptions(ro, ioptions_.clock, fopts.io_options);
if (s.ok()) {
s = ioptions_.fs->NewRandomAccessFile(fname, file_options, &file,
nullptr);
@ -126,10 +125,10 @@ Status TableCache::GetTableReader(
if (!sequential_mode && ioptions_.advise_random_on_open) {
file->Hint(FSRandomAccessFile::kRandom);
}
StopWatch sw(clock, ioptions_.statistics, TABLE_OPEN_IO_MICROS);
StopWatch sw(ioptions_.clock, ioptions_.statistics, TABLE_OPEN_IO_MICROS);
std::unique_ptr<RandomAccessFileReader> file_reader(
new RandomAccessFileReader(
std::move(file), fname, clock, io_tracer_,
std::move(file), fname, ioptions_.clock, io_tracer_,
record_read_stats ? ioptions_.statistics : nullptr, SST_READ_MICROS,
file_read_hist, ioptions_.rate_limiter, ioptions_.listeners));
s = ioptions_.table_factory->NewTableReader(
@ -162,8 +161,7 @@ Status TableCache::FindTable(const ReadOptions& ro,
HistogramImpl* file_read_hist, bool skip_filters,
int level, bool prefetch_index_and_filter_in_cache,
size_t max_file_size_for_l0_meta_pin) {
PERF_TIMER_GUARD_WITH_CLOCK(find_table_nanos,
ioptions_.env->GetSystemClock());
PERF_TIMER_GUARD_WITH_CLOCK(find_table_nanos, ioptions_.clock);
uint64_t number = fd.GetNumber();
Slice key = GetSliceForFileNumber(&number);
*handle = cache_->Lookup(key);

@ -1762,7 +1762,7 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
const std::shared_ptr<IOTracer>& io_tracer,
uint64_t version_number)
: env_(vset->env_),
clock_(env_->GetSystemClock()),
clock_(vset->clock_),
cfd_(column_family_data),
info_log_((cfd_ == nullptr) ? nullptr : cfd_->ioptions()->info_log),
db_statistics_((cfd_ == nullptr) ? nullptr
@ -2534,7 +2534,7 @@ uint32_t GetExpiredTtlFilesCount(const ImmutableCFOptions& ioptions,
uint32_t ttl_expired_files_count = 0;
int64_t _current_time;
auto status = ioptions.env->GetCurrentTime(&_current_time);
auto status = ioptions.clock->GetCurrentTime(&_current_time);
if (status.ok()) {
const uint64_t current_time = static_cast<uint64_t>(_current_time);
for (FileMetaData* f : files) {
@ -2703,7 +2703,7 @@ void VersionStorageInfo::ComputeExpiredTtlFiles(
expired_ttl_files_.clear();
int64_t _current_time;
auto status = ioptions.env->GetCurrentTime(&_current_time);
auto status = ioptions.clock->GetCurrentTime(&_current_time);
if (!status.ok()) {
return;
}
@ -2730,7 +2730,7 @@ void VersionStorageInfo::ComputeFilesMarkedForPeriodicCompaction(
files_marked_for_periodic_compaction_.clear();
int64_t temp_current_time;
auto status = ioptions.env->GetCurrentTime(&temp_current_time);
auto status = ioptions.clock->GetCurrentTime(&temp_current_time);
if (!status.ok()) {
return;
}
@ -3789,7 +3789,7 @@ VersionSet::VersionSet(const std::string& dbname,
table_cache_(table_cache),
env_(_db_options->env),
fs_(_db_options->fs, io_tracer),
clock_(env_->GetSystemClock()),
clock_(_db_options->clock),
dbname_(dbname),
db_options_(_db_options),
next_file_number_(2),
@ -4176,7 +4176,7 @@ Status VersionSet::ProcessManifestWrites(
}
}
if (s.ok()) {
io_s = SyncManifest(clock_, db_options_, descriptor_log_->file());
io_s = SyncManifest(db_options_, descriptor_log_->file());
TEST_SYNC_POINT_CALLBACK(
"VersionSet::ProcessManifestWrites:AfterSyncManifest", &io_s);
}

@ -794,7 +794,7 @@ class Version {
private:
Env* env_;
std::shared_ptr<SystemClock> clock_;
SystemClock* clock_;
friend class ReactiveVersionSet;
friend class VersionSet;
@ -1345,7 +1345,7 @@ class VersionSet {
Cache* table_cache_;
Env* const env_;
FileSystemPtr const fs_;
const std::shared_ptr<SystemClock> clock_;
SystemClock* const clock_;
const std::string dbname_;
std::string db_id_;
const ImmutableDBOptions* const db_options_;

@ -2773,7 +2773,7 @@ class VersionSetTestMissingFiles : public VersionSetTestBase,
Status s = fs_->NewWritableFile(fname, FileOptions(), &file, nullptr);
ASSERT_OK(s);
std::unique_ptr<WritableFileWriter> fwriter(new WritableFileWriter(
std::move(file), fname, FileOptions(), env_->GetSystemClock()));
std::move(file), fname, FileOptions(), env_->GetSystemClock().get()));
std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
int_tbl_prop_collector_factories;

@ -140,8 +140,8 @@ void WalManager::PurgeObsoleteWALFiles() {
return;
}
int64_t current_time;
Status s = env_->GetCurrentTime(&current_time);
int64_t current_time = 0;
Status s = db_options_.clock->GetCurrentTime(&current_time);
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log, "Can't get current time: %s",
s.ToString().c_str());
@ -171,7 +171,6 @@ void WalManager::PurgeObsoleteWALFiles() {
size_t log_files_num = 0;
uint64_t log_file_size = 0;
for (auto& f : files) {
uint64_t number;
FileType type;

@ -49,6 +49,7 @@ class WalManagerTest : public testing::Test {
db_options_.wal_dir = dbname_;
db_options_.env = env_.get();
db_options_.fs = env_->GetFileSystem();
db_options_.clock = env_->GetSystemClock().get();
versions_.reset(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),

@ -2041,7 +2041,8 @@ class MemTableInserter : public WriteBatch::Handler {
std::string new_value;
Status merge_status = MergeHelper::TimedFullMerge(
merge_operator, key, &get_value_slice, {value}, &new_value,
moptions->info_log, moptions->statistics, SystemClock::Default());
moptions->info_log, moptions->statistics,
SystemClock::Default().get());
if (!merge_status.ok()) {
// Failed to merge!

@ -43,8 +43,7 @@ bool WriteController::IsStopped() const {
// If it turns out to be a performance issue, we can redesign the thread
// synchronization model here.
// The function trust caller will sleep micros returned.
uint64_t WriteController::GetDelay(const std::shared_ptr<SystemClock>& clock,
uint64_t num_bytes) {
uint64_t WriteController::GetDelay(SystemClock* clock, uint64_t num_bytes) {
if (total_stopped_.load(std::memory_order_relaxed) > 0) {
return 0;
}
@ -108,8 +107,7 @@ uint64_t WriteController::GetDelay(const std::shared_ptr<SystemClock>& clock,
return sleep_amount;
}
uint64_t WriteController::NowMicrosMonotonic(
const std::shared_ptr<SystemClock>& clock) {
uint64_t WriteController::NowMicrosMonotonic(SystemClock* clock) {
return clock->NowNanos() / std::milli::den;
}

@ -57,8 +57,7 @@ class WriteController {
// return how many microseconds the caller needs to sleep after the call
// num_bytes: how many number of bytes to put into the DB.
// Prerequisite: DB mutex held.
uint64_t GetDelay(const std::shared_ptr<SystemClock>& clock,
uint64_t num_bytes);
uint64_t GetDelay(SystemClock* clock, uint64_t num_bytes);
void set_delayed_write_rate(uint64_t write_rate) {
// avoid divide 0
if (write_rate == 0) {
@ -86,7 +85,7 @@ class WriteController {
RateLimiter* low_pri_rate_limiter() { return low_pri_rate_limiter_.get(); }
private:
uint64_t NowMicrosMonotonic(const std::shared_ptr<SystemClock>& clock);
uint64_t NowMicrosMonotonic(SystemClock* clock);
friend class WriteControllerToken;
friend class StopWriteToken;

@ -32,21 +32,21 @@ TEST_F(WriteControllerTest, ChangeDelayRateTest) {
auto delay_token_0 =
controller.GetDelayToken(controller.delayed_write_rate());
ASSERT_EQ(static_cast<uint64_t>(2000000),
controller.GetDelay(clock_, 20000000u));
controller.GetDelay(clock_.get(), 20000000u));
auto delay_token_1 = controller.GetDelayToken(2000000u);
ASSERT_EQ(static_cast<uint64_t>(10000000),
controller.GetDelay(clock_, 20000000u));
controller.GetDelay(clock_.get(), 20000000u));
auto delay_token_2 = controller.GetDelayToken(1000000u);
ASSERT_EQ(static_cast<uint64_t>(20000000),
controller.GetDelay(clock_, 20000000u));
controller.GetDelay(clock_.get(), 20000000u));
auto delay_token_3 = controller.GetDelayToken(20000000u);
ASSERT_EQ(static_cast<uint64_t>(1000000),
controller.GetDelay(clock_, 20000000u));
controller.GetDelay(clock_.get(), 20000000u));
// This is more than max rate. Max delayed rate will be used.
auto delay_token_4 =
controller.GetDelayToken(controller.delayed_write_rate() * 3);
ASSERT_EQ(static_cast<uint64_t>(500000),
controller.GetDelay(clock_, 20000000u));
controller.GetDelay(clock_.get(), 20000000u));
}
TEST_F(WriteControllerTest, SanityTest) {
@ -62,69 +62,77 @@ TEST_F(WriteControllerTest, SanityTest) {
auto delay_token_1 = controller.GetDelayToken(10000000u);
ASSERT_EQ(static_cast<uint64_t>(2000000),
controller.GetDelay(clock_, 20000000u));
controller.GetDelay(clock_.get(), 20000000u));
clock_->now_micros_ += 1999900u; // sleep debt 1000
auto delay_token_2 = controller.GetDelayToken(10000000u);
// Rate reset after changing the token.
ASSERT_EQ(static_cast<uint64_t>(2000000),
controller.GetDelay(clock_, 20000000u));
controller.GetDelay(clock_.get(), 20000000u));
clock_->now_micros_ += 1999900u; // sleep debt 1000
// One refill: 10240 bytes allowed, 1000 used, 9240 left
ASSERT_EQ(static_cast<uint64_t>(1124), controller.GetDelay(clock_, 1000u));
ASSERT_EQ(static_cast<uint64_t>(1124),
controller.GetDelay(clock_.get(), 1000u));
clock_->now_micros_ += 1124u; // sleep debt 0
delay_token_2.reset();
// 1000 used, 8240 left
ASSERT_EQ(static_cast<uint64_t>(0), controller.GetDelay(clock_, 1000u));
ASSERT_EQ(static_cast<uint64_t>(0), controller.GetDelay(clock_.get(), 1000u));
clock_->now_micros_ += 100u; // sleep credit 100
// 1000 used, 7240 left
ASSERT_EQ(static_cast<uint64_t>(0), controller.GetDelay(clock_, 1000u));
ASSERT_EQ(static_cast<uint64_t>(0), controller.GetDelay(clock_.get(), 1000u));
clock_->now_micros_ += 100u; // sleep credit 200
// One refill: 10240 fileed, sleep credit generates 2000. 8000 used
// 7240 + 10240 + 2000 - 8000 = 11480 left
ASSERT_EQ(static_cast<uint64_t>(1024u), controller.GetDelay(clock_, 8000u));
ASSERT_EQ(static_cast<uint64_t>(1024u),
controller.GetDelay(clock_.get(), 8000u));
clock_->now_micros_ += 200u; // sleep debt 824
// 1000 used, 10480 left.
ASSERT_EQ(static_cast<uint64_t>(0), controller.GetDelay(clock_, 1000u));
ASSERT_EQ(static_cast<uint64_t>(0), controller.GetDelay(clock_.get(), 1000u));
clock_->now_micros_ += 200u; // sleep debt 624
// Out of bound sleep, still 10480 left
ASSERT_EQ(static_cast<uint64_t>(3000624u),
controller.GetDelay(clock_, 30000000u));
controller.GetDelay(clock_.get(), 30000000u));
clock_->now_micros_ += 3000724u; // sleep credit 100
// 6000 used, 4480 left.
ASSERT_EQ(static_cast<uint64_t>(0), controller.GetDelay(clock_, 6000u));
ASSERT_EQ(static_cast<uint64_t>(0), controller.GetDelay(clock_.get(), 6000u));
clock_->now_micros_ += 200u; // sleep credit 300
// One refill, credit 4480 balance + 3000 credit + 10240 refill
// Use 8000, 9720 left
ASSERT_EQ(static_cast<uint64_t>(1024u), controller.GetDelay(clock_, 8000u));
ASSERT_EQ(static_cast<uint64_t>(1024u),
controller.GetDelay(clock_.get(), 8000u));
clock_->now_micros_ += 3024u; // sleep credit 2000
// 1720 left
ASSERT_EQ(static_cast<uint64_t>(0u), controller.GetDelay(clock_, 8000u));
ASSERT_EQ(static_cast<uint64_t>(0u),
controller.GetDelay(clock_.get(), 8000u));
// 1720 balance + 20000 credit = 20170 left
// Use 8000, 12170 left
ASSERT_EQ(static_cast<uint64_t>(0u), controller.GetDelay(clock_, 8000u));
ASSERT_EQ(static_cast<uint64_t>(0u),
controller.GetDelay(clock_.get(), 8000u));
// 4170 left
ASSERT_EQ(static_cast<uint64_t>(0u), controller.GetDelay(clock_, 8000u));
ASSERT_EQ(static_cast<uint64_t>(0u),
controller.GetDelay(clock_.get(), 8000u));
// Need a refill
ASSERT_EQ(static_cast<uint64_t>(1024u), controller.GetDelay(clock_, 9000u));
ASSERT_EQ(static_cast<uint64_t>(1024u),
controller.GetDelay(clock_.get(), 9000u));
delay_token_1.reset();
ASSERT_EQ(static_cast<uint64_t>(0), controller.GetDelay(clock_, 30000000u));
ASSERT_EQ(static_cast<uint64_t>(0),
controller.GetDelay(clock_.get(), 30000000u));
delay_token_1.reset();
ASSERT_FALSE(controller.IsStopped());
}

@ -57,6 +57,7 @@ void ThreadBody(void* v) {
}
bool RunStressTest(StressTest* stress) {
SystemClock* clock = db_stress_env->GetSystemClock().get();
stress->InitDb();
SharedState shared(db_stress_env, stress);
stress->FinishInitDb(&shared);
@ -69,9 +70,9 @@ bool RunStressTest(StressTest* stress) {
uint32_t n = shared.GetNumThreads();
uint64_t now = db_stress_env->NowMicros();
uint64_t now = clock->NowMicros();
fprintf(stdout, "%s Initializing worker threads\n",
db_stress_env->TimeToString(now / 1000000).c_str());
clock->TimeToString(now / 1000000).c_str());
std::vector<ThreadState*> threads(n);
for (uint32_t i = 0; i < n; i++) {
threads[i] = new ThreadState(i, &shared);
@ -104,9 +105,9 @@ bool RunStressTest(StressTest* stress) {
}
}
now = db_stress_env->NowMicros();
now = clock->NowMicros();
fprintf(stdout, "%s Starting database operations\n",
db_stress_env->TimeToString(now / 1000000).c_str());
clock->TimeToString(now / 1000000).c_str());
shared.SetStart();
shared.GetCondVar()->SignalAll();
@ -114,16 +115,16 @@ bool RunStressTest(StressTest* stress) {
shared.GetCondVar()->Wait();
}
now = db_stress_env->NowMicros();
now = clock->NowMicros();
if (FLAGS_test_batches_snapshots) {
fprintf(stdout, "%s Limited verification already done during gets\n",
db_stress_env->TimeToString((uint64_t)now / 1000000).c_str());
clock->TimeToString((uint64_t)now / 1000000).c_str());
} else if (FLAGS_skip_verifydb) {
fprintf(stdout, "%s Verification skipped\n",
db_stress_env->TimeToString((uint64_t)now / 1000000).c_str());
clock->TimeToString((uint64_t)now / 1000000).c_str());
} else {
fprintf(stdout, "%s Starting verification\n",
db_stress_env->TimeToString((uint64_t)now / 1000000).c_str());
clock->TimeToString((uint64_t)now / 1000000).c_str());
}
shared.SetStartVerify();
@ -142,11 +143,11 @@ bool RunStressTest(StressTest* stress) {
delete threads[i];
threads[i] = nullptr;
}
now = db_stress_env->NowMicros();
now = clock->NowMicros();
if (!FLAGS_skip_verifydb && !FLAGS_test_batches_snapshots &&
!shared.HasVerificationFailedYet()) {
fprintf(stdout, "%s Verification successful\n",
db_stress_env->TimeToString(now / 1000000).c_str());
clock->TimeToString(now / 1000000).c_str());
}
stress->PrintStatistics();

@ -11,9 +11,9 @@
#include "monitoring/histogram.h"
#include "port/port.h"
#include "rocksdb/env.h"
#include "rocksdb/snapshot.h"
#include "rocksdb/statistics.h"
#include "rocksdb/system_clock.h"
#include "util/gflags_compat.h"
#include "util/random.h"
@ -73,7 +73,7 @@ class Stats {
seconds_ = 0;
num_compact_files_succeed_ = 0;
num_compact_files_failed_ = 0;
start_ = Env::Default()->NowMicros();
start_ = SystemClock::Default()->NowMicros();
last_op_finish_ = start_;
finish_ = start_;
}
@ -102,13 +102,13 @@ class Stats {
}
void Stop() {
finish_ = Env::Default()->NowMicros();
finish_ = SystemClock::Default()->NowMicros();
seconds_ = (finish_ - start_) * 1e-6;
}
void FinishedSingleOp() {
if (FLAGS_histogram) {
auto now = Env::Default()->NowMicros();
auto now = SystemClock::Default()->NowMicros();
auto micros = now - last_op_finish_;
hist_.Add(micros);
if (micros > 20000) {

@ -35,6 +35,7 @@ StressTest::StressTest()
#ifndef ROCKSDB_LITE
txn_db_(nullptr),
#endif
clock_(db_stress_env->GetSystemClock().get()),
new_column_family_name_(1),
num_times_reopened_(0),
db_preload_finished_(false),
@ -226,9 +227,9 @@ bool StressTest::BuildOptionsTable() {
}
void StressTest::InitDb() {
uint64_t now = db_stress_env->NowMicros();
uint64_t now = clock_->NowMicros();
fprintf(stdout, "%s Initializing db_stress\n",
db_stress_env->TimeToString(now / 1000000).c_str());
clock_->TimeToString(now / 1000000).c_str());
PrintEnv();
Open();
BuildOptionsTable();
@ -236,9 +237,9 @@ void StressTest::InitDb() {
void StressTest::FinishInitDb(SharedState* shared) {
if (FLAGS_read_only) {
uint64_t now = db_stress_env->NowMicros();
uint64_t now = clock_->NowMicros();
fprintf(stdout, "%s Preloading db with %" PRIu64 " KVs\n",
db_stress_env->TimeToString(now / 1000000).c_str(), FLAGS_max_key);
clock_->TimeToString(now / 1000000).c_str(), FLAGS_max_key);
PreloadDbAndReopenAsReadOnly(FLAGS_max_key, shared);
}
if (FLAGS_enable_compaction_filter) {
@ -255,10 +256,9 @@ void StressTest::FinishInitDb(SharedState* shared) {
bool StressTest::VerifySecondaries() {
#ifndef ROCKSDB_LITE
if (FLAGS_test_secondary) {
uint64_t now = db_stress_env->NowMicros();
fprintf(
stdout, "%s Start to verify secondaries against primary\n",
db_stress_env->TimeToString(static_cast<uint64_t>(now) / 1000000).c_str());
uint64_t now = clock_->NowMicros();
fprintf(stdout, "%s Start to verify secondaries against primary\n",
clock_->TimeToString(static_cast<uint64_t>(now) / 1000000).c_str());
}
for (size_t k = 0; k != secondaries_.size(); ++k) {
Status s = secondaries_[k]->TryCatchUpWithPrimary();
@ -300,10 +300,9 @@ bool StressTest::VerifySecondaries() {
}
}
if (FLAGS_test_secondary) {
uint64_t now = db_stress_env->NowMicros();
fprintf(
stdout, "%s Verification of secondaries succeeded\n",
db_stress_env->TimeToString(static_cast<uint64_t>(now) / 1000000).c_str());
uint64_t now = clock_->NowMicros();
fprintf(stdout, "%s Verification of secondaries succeeded\n",
clock_->TimeToString(static_cast<uint64_t>(now) / 1000000).c_str());
}
#endif // ROCKSDB_LITE
return true;
@ -462,9 +461,9 @@ void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,
#endif
db_preload_finished_.store(true);
auto now = db_stress_env->NowMicros();
auto now = clock_->NowMicros();
fprintf(stdout, "%s Reopening database in read-only\n",
db_stress_env->TimeToString(now / 1000000).c_str());
clock_->TimeToString(now / 1000000).c_str());
// Reopen as read-only, can ignore all options related to updates
Open();
} else {
@ -1724,7 +1723,7 @@ Status StressTest::TestPauseBackground(ThreadState* thread) {
// 1 chance in 625 of pausing full 16s.)
int pwr2_micros =
std::min(thread->rand.Uniform(25), thread->rand.Uniform(25));
db_stress_env->SleepForMicroseconds(1 << pwr2_micros);
clock_->SleepForMicroseconds(1 << pwr2_micros);
return db_->ContinueBackgroundWork();
}
@ -2487,10 +2486,9 @@ void StressTest::Reopen(ThreadState* thread) {
secondaries_.clear();
num_times_reopened_++;
auto now = db_stress_env->NowMicros();
auto now = clock_->NowMicros();
fprintf(stdout, "%s Reopening database for the %dth time\n",
db_stress_env->TimeToString(now / 1000000).c_str(),
num_times_reopened_);
clock_->TimeToString(now / 1000000).c_str(), num_times_reopened_);
Open();
}
} // namespace ROCKSDB_NAMESPACE

@ -13,6 +13,7 @@
#include "db_stress_tool/db_stress_shared_state.h"
namespace ROCKSDB_NAMESPACE {
class SystemClock;
class Transaction;
class TransactionDB;
@ -218,6 +219,7 @@ class StressTest {
TransactionDB* txn_db_;
#endif
Options options_;
SystemClock* clock_;
std::vector<ColumnFamilyHandle*> column_families_;
std::vector<std::string> column_family_names_;
std::atomic<int> new_column_family_name_;

@ -15,6 +15,7 @@
#include "env/env_encryption_ctr.h"
#include "monitoring/perf_context_imp.h"
#include "rocksdb/convenience.h"
#include "rocksdb/system_clock.h"
#include "util/aligned_buffer.h"
#include "util/coding.h"
#include "util/random.h"
@ -1063,7 +1064,7 @@ Status CTREncryptionProvider::CreateNewPrefix(const std::string& /*fname*/,
return Status::InvalidArgument("Encryption Cipher is missing");
}
// Create & seed rnd.
Random rnd((uint32_t)Env::Default()->NowMicros());
Random rnd((uint32_t)SystemClock::Default()->NowMicros());
// Fill entire prefix block with random values.
for (size_t i = 0; i < prefixLength; i++) {
prefix[i] = rnd.Uniform(256) & 0xFF;

2
env/env_test.cc vendored

@ -2215,7 +2215,7 @@ TEST_F(EnvTest, IsDirectory) {
std::unique_ptr<WritableFileWriter> fwriter;
fwriter.reset(new WritableFileWriter(std::move(wfile), test_file_path,
FileOptions(),
SystemClock::Default()));
SystemClock::Default().get()));
constexpr char buf[] = "test";
s = fwriter->Append(buf);
ASSERT_OK(s);

@ -23,7 +23,7 @@ class FileSystemTracingWrapper : public FileSystemWrapper {
const std::shared_ptr<IOTracer>& io_tracer)
: FileSystemWrapper(t),
io_tracer_(io_tracer),
clock_(SystemClock::Default()) {}
clock_(SystemClock::Default().get()) {}
~FileSystemTracingWrapper() override {}
@ -86,7 +86,7 @@ class FileSystemTracingWrapper : public FileSystemWrapper {
private:
std::shared_ptr<IOTracer> io_tracer_;
std::shared_ptr<SystemClock> clock_;
SystemClock* clock_;
};
// The FileSystemPtr is a wrapper class that takes pointer to storage systems
@ -138,7 +138,7 @@ class FSSequentialFileTracingWrapper : public FSSequentialFileWrapper {
const std::string& file_name)
: FSSequentialFileWrapper(t),
io_tracer_(io_tracer),
clock_(SystemClock::Default()),
clock_(SystemClock::Default().get()),
file_name_(file_name) {}
~FSSequentialFileTracingWrapper() override {}
@ -154,7 +154,7 @@ class FSSequentialFileTracingWrapper : public FSSequentialFileWrapper {
private:
std::shared_ptr<IOTracer> io_tracer_;
std::shared_ptr<SystemClock> clock_;
SystemClock* clock_;
std::string file_name_;
};
@ -210,7 +210,7 @@ class FSRandomAccessFileTracingWrapper : public FSRandomAccessFileWrapper {
const std::string& file_name)
: FSRandomAccessFileWrapper(t),
io_tracer_(io_tracer),
clock_(SystemClock::Default()),
clock_(SystemClock::Default().get()),
file_name_(file_name) {}
~FSRandomAccessFileTracingWrapper() override {}
@ -229,7 +229,7 @@ class FSRandomAccessFileTracingWrapper : public FSRandomAccessFileWrapper {
private:
std::shared_ptr<IOTracer> io_tracer_;
std::shared_ptr<SystemClock> clock_;
SystemClock* clock_;
// Stores file name instead of full path.
std::string file_name_;
};
@ -285,7 +285,7 @@ class FSWritableFileTracingWrapper : public FSWritableFileWrapper {
const std::string& file_name)
: FSWritableFileWrapper(t),
io_tracer_(io_tracer),
clock_(SystemClock::Default()),
clock_(SystemClock::Default().get()),
file_name_(file_name) {}
~FSWritableFileTracingWrapper() override {}
@ -319,7 +319,7 @@ class FSWritableFileTracingWrapper : public FSWritableFileWrapper {
private:
std::shared_ptr<IOTracer> io_tracer_;
std::shared_ptr<SystemClock> clock_;
SystemClock* clock_;
// Stores file name instead of full path.
std::string file_name_;
};
@ -382,7 +382,7 @@ class FSRandomRWFileTracingWrapper : public FSRandomRWFileWrapper {
const std::string& file_name)
: FSRandomRWFileWrapper(t),
io_tracer_(io_tracer),
clock_(SystemClock::Default()),
clock_(SystemClock::Default().get()),
file_name_(file_name) {}
~FSRandomRWFileTracingWrapper() override {}
@ -404,7 +404,7 @@ class FSRandomRWFileTracingWrapper : public FSRandomRWFileWrapper {
private:
std::shared_ptr<IOTracer> io_tracer_;
std::shared_ptr<SystemClock> clock_;
SystemClock* clock_;
// Stores file name instead of full path.
std::string file_name_;
};

4
env/fs_posix.cc vendored

@ -774,7 +774,9 @@ class PosixFileSystem : public FileSystem {
LockHoldingInfo lhi;
int64_t current_time = 0;
// Ignore status code as the time is only used for error message.
Env::Default()->GetCurrentTime(&current_time).PermitUncheckedError();
SystemClock::Default()
->GetCurrentTime(&current_time)
.PermitUncheckedError();
lhi.acquire_time = current_time;
lhi.acquiring_thread = Env::Default()->GetThreadID();

46
env/mock_env.cc vendored

@ -1033,26 +1033,43 @@ Status MockFileSystem::CorruptBuffer(const std::string& fname) {
iter->second->CorruptBuffer();
return Status::OK();
}
namespace {
class MockSystemClock : public SystemClockWrapper {
public:
explicit MockSystemClock(const std::shared_ptr<SystemClock>& c)
: SystemClockWrapper(c), fake_sleep_micros_(0) {}
MockEnv::MockEnv(Env* base_env)
: CompositeEnvWrapper(base_env, std::make_shared<MockFileSystem>(this)),
fake_sleep_micros_(0) {}
void FakeSleepForMicroseconds(int64_t micros) {
fake_sleep_micros_.fetch_add(micros);
}
Status MockEnv::GetCurrentTime(int64_t* unix_time) {
auto s = CompositeEnvWrapper::GetCurrentTime(unix_time);
const char* Name() const override { return "MockSystemClock"; }
Status GetCurrentTime(int64_t* unix_time) override {
auto s = SystemClockWrapper::GetCurrentTime(unix_time);
if (s.ok()) {
*unix_time += fake_sleep_micros_.load() / (1000 * 1000);
auto fake_time = fake_sleep_micros_.load() / (1000 * 1000);
*unix_time += fake_time;
}
return s;
}
}
uint64_t MockEnv::NowMicros() {
return CompositeEnvWrapper::NowMicros() + fake_sleep_micros_.load();
}
uint64_t NowMicros() override {
return SystemClockWrapper::NowMicros() + fake_sleep_micros_.load();
}
uint64_t MockEnv::NowNanos() {
return CompositeEnvWrapper::NowNanos() + fake_sleep_micros_.load() * 1000;
}
uint64_t NowNanos() override {
return SystemClockWrapper::NowNanos() + fake_sleep_micros_.load() * 1000;
}
private:
std::atomic<int64_t> fake_sleep_micros_;
};
} // namespace
MockEnv::MockEnv(Env* base_env)
: CompositeEnvWrapper(
base_env, std::make_shared<MockFileSystem>(this),
std::make_shared<MockSystemClock>(base_env->GetSystemClock())) {}
Status MockEnv::CorruptBuffer(const std::string& fname) {
auto mock = static_cast_with_check<MockFileSystem>(GetFileSystem().get());
@ -1060,7 +1077,8 @@ Status MockEnv::CorruptBuffer(const std::string& fname) {
}
void MockEnv::FakeSleepForMicroseconds(int64_t micros) {
fake_sleep_micros_.fetch_add(micros);
auto mock = static_cast_with_check<MockSystemClock>(GetSystemClock().get());
mock->FakeSleepForMicroseconds(micros);
}
#ifndef ROCKSDB_LITE

6
env/mock_env.h vendored

@ -23,11 +23,6 @@ class MockEnv : public CompositeEnvWrapper {
public:
explicit MockEnv(Env* base_env);
// Results of these can be affected by FakeSleepForMicroseconds()
Status GetCurrentTime(int64_t* unix_time) override;
uint64_t NowMicros() override;
uint64_t NowNanos() override;
Status CorruptBuffer(const std::string& fname);
// Doesn't really sleep, just affects output of GetCurrentTime(), NowMicros()
@ -35,7 +30,6 @@ class MockEnv : public CompositeEnvWrapper {
void FakeSleepForMicroseconds(int64_t micros);
private:
std::atomic<int64_t> fake_sleep_micros_;
};
} // namespace ROCKSDB_NAMESPACE

@ -22,9 +22,8 @@
namespace ROCKSDB_NAMESPACE {
DeleteScheduler::DeleteScheduler(const std::shared_ptr<SystemClock>& clock,
FileSystem* fs, int64_t rate_bytes_per_sec,
Logger* info_log,
DeleteScheduler::DeleteScheduler(SystemClock* clock, FileSystem* fs,
int64_t rate_bytes_per_sec, Logger* info_log,
SstFileManagerImpl* sst_file_manager,
double max_trash_db_ratio,
uint64_t bytes_max_delete_chunk)

@ -34,7 +34,7 @@ class SystemClock;
// case DeleteScheduler will delete files immediately.
class DeleteScheduler {
public:
DeleteScheduler(const std::shared_ptr<SystemClock>& clock, FileSystem* fs,
DeleteScheduler(SystemClock* clock, FileSystem* fs,
int64_t rate_bytes_per_sec, Logger* info_log,
SstFileManagerImpl* sst_file_manager,
double max_trash_db_ratio, uint64_t bytes_max_delete_chunk);
@ -101,7 +101,7 @@ class DeleteScheduler {
void MaybeCreateBackgroundThread();
const std::shared_ptr<SystemClock> clock_;
SystemClock* clock_;
FileSystem* fs_;
// total size of trash files

@ -68,9 +68,8 @@ inline IOStatus GenerateOneFileChecksum(
allow_mmap_reads, io_tracer);
}
inline IOStatus PrepareIOFromReadOptions(
const ReadOptions& ro, const std::shared_ptr<SystemClock>& clock,
IOOptions& opts) {
inline IOStatus PrepareIOFromReadOptions(const ReadOptions& ro,
SystemClock* clock, IOOptions& opts) {
if (ro.deadline.count()) {
std::chrono::microseconds now =
std::chrono::microseconds(clock->NowMicros());

@ -419,11 +419,11 @@ Status SetIdentityFile(Env* env, const std::string& dbname,
return s;
}
IOStatus SyncManifest(const std::shared_ptr<SystemClock>& clock,
const ImmutableDBOptions* db_options,
IOStatus SyncManifest(const ImmutableDBOptions* db_options,
WritableFileWriter* file) {
TEST_KILL_RANDOM("SyncManifest:0", rocksdb_kill_odds * REDUCE_ODDS2);
StopWatch sw(clock, db_options->statistics.get(), MANIFEST_FILE_SYNC_MICROS);
StopWatch sw(db_options->clock, db_options->statistics.get(),
MANIFEST_FILE_SYNC_MICROS);
return file->Sync(db_options->use_fsync);
}

@ -167,8 +167,7 @@ extern Status SetIdentityFile(Env* env, const std::string& dbname,
const std::string& db_id = {});
// Sync manifest file `file`.
extern IOStatus SyncManifest(const std::shared_ptr<SystemClock>& clock,
const ImmutableDBOptions* db_options,
extern IOStatus SyncManifest(const ImmutableDBOptions* db_options,
WritableFileWriter* file);
// Return list of file names of info logs in `file_names`.

@ -326,10 +326,10 @@ Status RandomAccessFileReader::MultiRead(const IOOptions& opts,
IOStatus RandomAccessFileReader::PrepareIOOptions(const ReadOptions& ro,
IOOptions& opts) {
if (clock_.get() != nullptr) {
if (clock_ != nullptr) {
return PrepareIOFromReadOptions(ro, clock_, opts);
} else {
return PrepareIOFromReadOptions(ro, SystemClock::Default(), opts);
return PrepareIOFromReadOptions(ro, SystemClock::Default().get(), opts);
}
}
} // namespace ROCKSDB_NAMESPACE

@ -67,7 +67,7 @@ class RandomAccessFileReader {
FSRandomAccessFilePtr file_;
std::string file_name_;
std::shared_ptr<SystemClock> clock_;
SystemClock* clock_;
Statistics* stats_;
uint32_t hist_type_;
HistogramImpl* file_read_hist_;
@ -77,7 +77,7 @@ class RandomAccessFileReader {
public:
explicit RandomAccessFileReader(
std::unique_ptr<FSRandomAccessFile>&& raf, const std::string& _file_name,
const std::shared_ptr<SystemClock>& clock = nullptr,
SystemClock* clock = nullptr,
const std::shared_ptr<IOTracer>& io_tracer = nullptr,
Statistics* stats = nullptr, uint32_t hist_type = 0,
HistogramImpl* file_read_hist = nullptr,

@ -43,7 +43,7 @@ class RandomAccessFileReaderTest : public testing::Test {
std::unique_ptr<FSRandomAccessFile> f;
ASSERT_OK(fs_->NewRandomAccessFile(fpath, opts, &f, nullptr));
(*reader).reset(new RandomAccessFileReader(std::move(f), fpath,
env_->GetSystemClock()));
env_->GetSystemClock().get()));
}
void AssertResult(const std::string& content,

@ -31,8 +31,9 @@ SstFileManagerImpl::SstFileManagerImpl(
compaction_buffer_size_(0),
cur_compactions_reserved_size_(0),
max_allowed_space_(0),
delete_scheduler_(clock_, fs_.get(), rate_bytes_per_sec, logger.get(),
this, max_trash_db_ratio, bytes_max_delete_chunk),
delete_scheduler_(clock_.get(), fs_.get(), rate_bytes_per_sec,
logger.get(), this, max_trash_db_ratio,
bytes_max_delete_chunk),
cv_(&mu_),
closing_(false),
bg_thread_(nullptr),

@ -123,7 +123,7 @@ class WritableFileWriter {
std::string file_name_;
FSWritableFilePtr writable_file_;
std::shared_ptr<SystemClock> clock_;
SystemClock* clock_;
AlignedBuffer buf_;
size_t max_buffer_size_;
// Actually written data size can be used for truncate
@ -148,8 +148,7 @@ class WritableFileWriter {
public:
WritableFileWriter(
std::unique_ptr<FSWritableFile>&& file, const std::string& _file_name,
const FileOptions& options,
const std::shared_ptr<SystemClock>& clock = nullptr,
const FileOptions& options, SystemClock* clock = nullptr,
const std::shared_ptr<IOTracer>& io_tracer = nullptr,
Statistics* stats = nullptr,
const std::vector<std::shared_ptr<EventListener>>& listeners = {},

@ -117,13 +117,13 @@ struct SstFileMetaData {
// An SST file may be generated by compactions whose input files may
// in turn be generated by earlier compactions. The creation time of the
// oldest SST file that is the compaction ancester of this file.
// The timestamp is provided Env::GetCurrentTime().
// The timestamp is provided SystemClock::GetCurrentTime().
// 0 if the information is not available.
//
// Note: for TTL blob files, it contains the start of the expiration range.
uint64_t oldest_ancester_time;
// Timestamp when the SST file is created, provided by Env::GetCurrentTime().
// 0 if the information is not available.
// Timestamp when the SST file is created, provided by
// SystemClock::GetCurrentTime(). 0 if the information is not available.
uint64_t file_creation_time;
// The checksum of a SST file, the value is decided by the file content and

@ -31,9 +31,10 @@ class EnvLogger : public Logger {
const std::string& fname, const EnvOptions& options, Env* env,
InfoLogLevel log_level = InfoLogLevel::ERROR_LEVEL)
: Logger(log_level),
file_(std::move(writable_file), fname, options, env->GetSystemClock()),
last_flush_micros_(0),
env_(env),
clock_(env_->GetSystemClock().get()),
file_(std::move(writable_file), fname, options, clock_),
last_flush_micros_(0),
flush_pending_(false) {}
~EnvLogger() {
@ -50,7 +51,7 @@ class EnvLogger : public Logger {
flush_pending_ = false;
file_.Flush().PermitUncheckedError();
}
last_flush_micros_ = env_->NowMicros();
last_flush_micros_ = clock_->NowMicros();
}
void Flush() override {
@ -136,7 +137,7 @@ class EnvLogger : public Logger {
// We will ignore any error returned by Append().
file_.Append(Slice(base, p - base)).PermitUncheckedError();
flush_pending_ = true;
const uint64_t now_micros = env_->NowMicros();
const uint64_t now_micros = clock_->NowMicros();
if (now_micros - last_flush_micros_ >= flush_every_seconds_ * 1000000) {
FlushLocked();
}
@ -154,11 +155,12 @@ class EnvLogger : public Logger {
}
private:
Env* env_;
SystemClock* clock_;
WritableFileWriter file_;
mutable port::Mutex mutex_; // Mutex to protect the shared variables below.
const static uint64_t flush_every_seconds_ = 5;
std::atomic_uint_fast64_t last_flush_micros_;
Env* env_;
std::atomic<bool> flush_pending_;
};

@ -418,7 +418,7 @@ class Benchmark {
uint64_t bytes_written = 0;
uint64_t bytes_read = 0;
uint64_t read_hits = 0;
StopWatchNano timer(SystemClock::Default(), true);
StopWatchNano timer(SystemClock::Default().get(), true);
RunThreads(&threads, &bytes_written, &bytes_read, true, &read_hits);
auto elapsed_time = static_cast<double>(timer.ElapsedNanos() / 1000);
std::cout << "Elapsed time: " << static_cast<int>(elapsed_time) << " us"

@ -13,9 +13,8 @@
namespace ROCKSDB_NAMESPACE {
namespace {
#ifndef NPERF_CONTEXT
Statistics* stats_for_report(const std::shared_ptr<SystemClock>& clock,
Statistics* stats) {
if (clock.get() != nullptr && stats != nullptr &&
Statistics* stats_for_report(SystemClock* clock, Statistics* stats) {
if (clock != nullptr && stats != nullptr &&
stats->get_stats_level() > kExceptTimeForMutex) {
return stats;
} else {

@ -22,12 +22,10 @@ class InstrumentedMutex {
explicit InstrumentedMutex(bool adaptive = false)
: mutex_(adaptive), stats_(nullptr), clock_(nullptr), stats_code_(0) {}
explicit InstrumentedMutex(const std::shared_ptr<SystemClock>& clock,
bool adaptive = false)
explicit InstrumentedMutex(SystemClock* clock, bool adaptive = false)
: mutex_(adaptive), stats_(nullptr), clock_(clock), stats_code_(0) {}
InstrumentedMutex(Statistics* stats,
const std::shared_ptr<SystemClock>& clock, int stats_code,
InstrumentedMutex(Statistics* stats, SystemClock* clock, int stats_code,
bool adaptive = false)
: mutex_(adaptive),
stats_(stats),
@ -49,7 +47,7 @@ class InstrumentedMutex {
friend class InstrumentedCondVar;
port::Mutex mutex_;
Statistics* stats_;
std::shared_ptr<SystemClock> clock_;
SystemClock* clock_;
int stats_code_;
};
@ -96,7 +94,7 @@ class InstrumentedCondVar {
bool TimedWaitInternal(uint64_t abs_time_us);
port::CondVar cond_;
Statistics* stats_;
const std::shared_ptr<SystemClock> clock_;
SystemClock* clock_;
int stats_code_;
};

@ -40,7 +40,7 @@ extern __thread IOStatsContext iostats_context;
// Declare and set start time of the timer
#define IOSTATS_CPU_TIMER_GUARD(metric, clock) \
PerfStepTimer iostats_step_timer_##metric( \
&(iostats_context.metric), clock.get(), true, \
&(iostats_context.metric), clock, true, \
PerfLevel::kEnableTimeAndCPUTimeExceptForMutex); \
iostats_step_timer_##metric.Start();

@ -47,13 +47,13 @@ extern thread_local PerfContext perf_context;
// Declare and set start time of the timer
#define PERF_TIMER_GUARD_WITH_CLOCK(metric, clock) \
PerfStepTimer perf_step_timer_##metric(&(perf_context.metric), clock.get()); \
PerfStepTimer perf_step_timer_##metric(&(perf_context.metric), clock); \
perf_step_timer_##metric.Start();
// Declare and set start time of the timer
#define PERF_CPU_TIMER_GUARD(metric, clock) \
PerfStepTimer perf_step_timer_##metric( \
&(perf_context.metric), clock.get(), true, \
&(perf_context.metric), clock, true, \
PerfLevel::kEnableTimeAndCPUTimeExceptForMutex); \
perf_step_timer_##metric.Start();

@ -818,6 +818,7 @@ ImmutableCFOptions::ImmutableCFOptions(const ImmutableDBOptions& db_options,
info_log_level(db_options.info_log_level),
env(db_options.env),
fs(db_options.fs.get()),
clock(db_options.clock),
allow_mmap_reads(db_options.allow_mmap_reads),
allow_mmap_writes(db_options.allow_mmap_writes),
db_paths(db_options.db_paths),

@ -64,6 +64,8 @@ struct ImmutableCFOptions {
FileSystem* fs;
SystemClock* clock;
// Allow the OS to mmap file for reading sst tables. Default: false
bool allow_mmap_reads;

@ -17,6 +17,7 @@
#include "rocksdb/file_system.h"
#include "rocksdb/rate_limiter.h"
#include "rocksdb/sst_file_manager.h"
#include "rocksdb/system_clock.h"
#include "rocksdb/utilities/options_type.h"
#include "rocksdb/wal_filter.h"
#include "util/string_util.h"
@ -582,6 +583,11 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options)
allow_data_in_errors(options.allow_data_in_errors),
db_host_id(options.db_host_id),
checksum_handoff_file_types(options.checksum_handoff_file_types) {
if (env != nullptr) {
clock = env->GetSystemClock().get();
} else {
clock = SystemClock::Default().get();
}
}
void ImmutableDBOptions::Dump(Logger* log) const {

@ -11,6 +11,7 @@
#include "rocksdb/options.h"
namespace ROCKSDB_NAMESPACE {
class SystemClock;
struct ImmutableDBOptions {
static const char* kName() { return "ImmutableDBOptions"; }
@ -26,6 +27,7 @@ struct ImmutableDBOptions {
bool track_and_verify_wals_in_manifest;
Env* env;
std::shared_ptr<FileSystem> fs;
SystemClock* clock;
std::shared_ptr<RateLimiter> rate_limiter;
std::shared_ptr<SstFileManager> sst_file_manager;
std::shared_ptr<Logger> info_log;

@ -1059,7 +1059,7 @@ IOStatus WinFileSystem::NewLogger(const std::string& fname,
// Set creation, last access and last write time to the same value
SetFileTime(hFile, &ft, &ft, &ft);
}
result->reset(new WinLogger(&WinEnvThreads::gettid, clock_, hFile));
result->reset(new WinLogger(&WinEnvThreads::gettid, clock_.get(), hFile));
}
return s;
}

@ -32,8 +32,7 @@ namespace ROCKSDB_NAMESPACE {
namespace port {
WinLogger::WinLogger(uint64_t (*gettid)(),
const std::shared_ptr<SystemClock>& clock, HANDLE file,
WinLogger::WinLogger(uint64_t (*gettid)(), SystemClock* clock, HANDLE file,
const InfoLogLevel log_level)
: Logger(log_level),
file_(file),

@ -26,8 +26,7 @@ class SystemClock;
namespace port {
class WinLogger : public ROCKSDB_NAMESPACE::Logger {
public:
WinLogger(uint64_t (*gettid)(), const std::shared_ptr<SystemClock>& clock,
HANDLE file,
WinLogger(uint64_t (*gettid)(), SystemClock* clock, HANDLE file,
const InfoLogLevel log_level = InfoLogLevel::ERROR_LEVEL);
virtual ~WinLogger();
@ -54,7 +53,7 @@ protected:
uint64_t (*gettid_)(); // Return the thread id for the current thread
std::atomic_size_t log_size_;
std::atomic_uint_fast64_t last_flush_micros_;
std::shared_ptr<SystemClock> clock_;
SystemClock* clock_;
bool flush_pending_;
Status CloseInternal();

@ -1083,7 +1083,7 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock(
bool abort_compression = false;
StopWatchNano timer(
r->ioptions.env->GetSystemClock(),
r->ioptions.clock,
ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics));
if (is_status_ok && raw_block_contents.size() < kCompressionSizeLimit) {
@ -1177,7 +1177,7 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
Rep* r = rep_;
Status s = Status::OK();
IOStatus io_s = IOStatus::OK();
StopWatch sw(r->ioptions.env->GetSystemClock(), r->ioptions.statistics,
StopWatch sw(r->ioptions.clock, r->ioptions.statistics,
WRITE_RAW_BLOCK_MICROS);
handle->set_offset(r->get_offset());
handle->set_size(block_contents.size());

@ -1501,7 +1501,7 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
CompressionType raw_block_comp_type;
BlockContents raw_block_contents;
if (!contents) {
StopWatch sw(rep_->clock, statistics, READ_BLOCK_GET_MICROS);
StopWatch sw(rep_->ioptions.clock, statistics, READ_BLOCK_GET_MICROS);
BlockFetcher block_fetcher(
rep_->file.get(), prefetch_buffer, rep_->footer, ro, handle,
&raw_block_contents, rep_->ioptions, do_uncompress,
@ -1590,7 +1590,7 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
// Avoid making copy of block_key and cf_name when constructing the access
// record.
BlockCacheTraceRecord access_record(
rep_->clock->NowMicros(),
rep_->ioptions.clock->NowMicros(),
/*block_key=*/"", trace_block_type,
/*block_size=*/usage, rep_->cf_id_for_tracing(),
/*cf_name=*/"", rep_->level_for_tracing(),
@ -1935,7 +1935,8 @@ Status BlockBasedTable::RetrieveBlock(
std::unique_ptr<TBlocklike> block;
{
StopWatch sw(rep_->clock, rep_->ioptions.statistics, READ_BLOCK_GET_MICROS);
StopWatch sw(rep_->ioptions.clock, rep_->ioptions.statistics,
READ_BLOCK_GET_MICROS);
s = ReadBlockFromFile(
rep_->file.get(), prefetch_buffer, rep_->footer, ro, handle, &block,
rep_->ioptions, do_uncompress, maybe_compressed, block_type,
@ -2427,7 +2428,7 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
referenced_key = key;
}
BlockCacheTraceRecord access_record(
rep_->clock->NowMicros(),
rep_->ioptions.clock->NowMicros(),
/*block_key=*/"", lookup_data_block_context.block_type,
lookup_data_block_context.block_size, rep_->cf_id_for_tracing(),
/*cf_name=*/"", rep_->level_for_tracing(),
@ -2763,7 +2764,7 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
referenced_key = key;
}
BlockCacheTraceRecord access_record(
rep_->clock->NowMicros(),
rep_->ioptions.clock->NowMicros(),
/*block_key=*/"", lookup_data_block_context.block_type,
lookup_data_block_context.block_size, rep_->cf_id_for_tracing(),
/*cf_name=*/"", rep_->level_for_tracing(),

@ -32,7 +32,6 @@ class Footer;
class InternalKeyComparator;
class Iterator;
class FSRandomAccessFile;
class SystemClock;
class TableCache;
class TableReader;
class WritableFile;
@ -521,7 +520,6 @@ struct BlockBasedTable::Rep {
file_size(_file_size),
level(_level),
immortal_table(_immortal_table) {
clock = ioptions.env->GetSystemClock();
}
~Rep() { status.PermitUncheckedError(); }
const ImmutableCFOptions& ioptions;
@ -529,7 +527,6 @@ struct BlockBasedTable::Rep {
const BlockBasedTableOptions table_options;
const FilterPolicy* const filter_policy;
const InternalKeyComparator& internal_comparator;
std::shared_ptr<SystemClock> clock;
Status status;
std::unique_ptr<RandomAccessFileReader> file;
char cache_key_prefix[kMaxCacheKeyPrefixSize];

@ -135,8 +135,8 @@ class BlockBasedTableReaderTest
std::string path = Path(filename);
std::unique_ptr<FSRandomAccessFile> f;
ASSERT_OK(fs_->NewRandomAccessFile(path, opt, &f, nullptr));
reader->reset(
new RandomAccessFileReader(std::move(f), path, env_->GetSystemClock()));
reader->reset(new RandomAccessFileReader(std::move(f), path,
env_->GetSystemClock().get()));
}
std::string ToInternalKey(const std::string& key) {

@ -267,8 +267,8 @@ class BlockFetcherTest : public testing::Test {
std::string path = Path(filename);
std::unique_ptr<FSRandomAccessFile> f;
ASSERT_OK(fs_->NewRandomAccessFile(path, opt, &f, nullptr));
reader->reset(
new RandomAccessFileReader(std::move(f), path, env_->GetSystemClock()));
reader->reset(new RandomAccessFileReader(std::move(f), path,
env_->GetSystemClock().get()));
}
void NewTableReader(const ImmutableCFOptions& ioptions,

@ -353,9 +353,8 @@ Status UncompressBlockContentsForCompressionType(
assert(uncompression_info.type() != kNoCompression &&
"Invalid compression type");
StopWatchNano timer(
ioptions.env->GetSystemClock(),
ShouldReportDetailedTime(ioptions.env, ioptions.statistics));
StopWatchNano timer(ioptions.clock, ShouldReportDetailedTime(
ioptions.env, ioptions.statistics));
size_t uncompressed_size = 0;
CacheAllocationPtr ubuf =
UncompressData(uncompression_info, data, n, &uncompressed_size,

@ -44,10 +44,9 @@ GetContext::GetContext(
Statistics* statistics, GetState init_state, const Slice& user_key,
PinnableSlice* pinnable_val, std::string* timestamp, bool* value_found,
MergeContext* merge_context, bool do_merge,
SequenceNumber* _max_covering_tombstone_seq,
const std::shared_ptr<SystemClock>& clock, SequenceNumber* seq,
PinnedIteratorsManager* _pinned_iters_mgr, ReadCallback* callback,
bool* is_blob_index, uint64_t tracing_get_id)
SequenceNumber* _max_covering_tombstone_seq, SystemClock* clock,
SequenceNumber* seq, PinnedIteratorsManager* _pinned_iters_mgr,
ReadCallback* callback, bool* is_blob_index, uint64_t tracing_get_id)
: ucmp_(ucmp),
merge_operator_(merge_operator),
logger_(logger),
@ -78,7 +77,7 @@ GetContext::GetContext(
Statistics* statistics, GetState init_state, const Slice& user_key,
PinnableSlice* pinnable_val, bool* value_found, MergeContext* merge_context,
bool do_merge, SequenceNumber* _max_covering_tombstone_seq,
const std::shared_ptr<SystemClock>& clock, SequenceNumber* seq,
SystemClock* clock, SequenceNumber* seq,
PinnedIteratorsManager* _pinned_iters_mgr, ReadCallback* callback,
bool* is_blob_index, uint64_t tracing_get_id)
: GetContext(ucmp, merge_operator, logger, statistics, init_state, user_key,

@ -99,8 +99,7 @@ class GetContext {
Logger* logger, Statistics* statistics, GetState init_state,
const Slice& user_key, PinnableSlice* value, bool* value_found,
MergeContext* merge_context, bool do_merge,
SequenceNumber* max_covering_tombstone_seq,
const std::shared_ptr<SystemClock>& clock,
SequenceNumber* max_covering_tombstone_seq, SystemClock* clock,
SequenceNumber* seq = nullptr,
PinnedIteratorsManager* _pinned_iters_mgr = nullptr,
ReadCallback* callback = nullptr, bool* is_blob_index = nullptr,
@ -110,8 +109,7 @@ class GetContext {
const Slice& user_key, PinnableSlice* value,
std::string* timestamp, bool* value_found,
MergeContext* merge_context, bool do_merge,
SequenceNumber* max_covering_tombstone_seq,
const std::shared_ptr<SystemClock>& clock,
SequenceNumber* max_covering_tombstone_seq, SystemClock* clock,
SequenceNumber* seq = nullptr,
PinnedIteratorsManager* _pinned_iters_mgr = nullptr,
ReadCallback* callback = nullptr, bool* is_blob_index = nullptr,
@ -185,7 +183,7 @@ class GetContext {
bool* value_found_; // Is value set correctly? Used by KeyMayExist
MergeContext* merge_context_;
SequenceNumber* max_covering_tombstone_seq_;
std::shared_ptr<SystemClock> clock_;
SystemClock* clock_;
// If a key is found, seq_ will be set to the SequenceNumber of most recent
// write to the key or kMaxSequenceNumber if unknown
SequenceNumber* seq_;

@ -259,9 +259,8 @@ Status SstFileWriter::Open(const std::string& file_path) {
0 /* file_creation_time */, "SST Writer" /* db_id */, db_session_id);
FileTypeSet tmp_set = r->ioptions.checksum_handoff_file_types;
r->file_writer.reset(new WritableFileWriter(
std::move(sst_file), file_path, r->env_options,
r->ioptions.env->GetSystemClock(), nullptr /* io_tracer */,
nullptr /* stats */, r->ioptions.listeners,
std::move(sst_file), file_path, r->env_options, r->ioptions.clock,
nullptr /* io_tracer */, nullptr /* stats */, r->ioptions.listeners,
r->ioptions.file_checksum_gen_factory,
tmp_set.Contains(FileType::kTableFile)));

@ -51,8 +51,7 @@ static std::string MakeKey(int i, int j, bool through_db) {
return key.Encode().ToString();
}
uint64_t Now(const std::shared_ptr<SystemClock>& clock,
bool measured_by_nanosecond) {
uint64_t Now(SystemClock* clock, bool measured_by_nanosecond) {
return measured_by_nanosecond ? clock->NowNanos() : clock->NowMicros();
}
} // namespace
@ -83,7 +82,7 @@ void TableReaderBenchmark(Options& opts, EnvOptions& env_options,
std::string dbname = test::PerThreadDBPath("rocksdb_table_reader_bench_db");
WriteOptions wo;
Env* env = Env::Default();
const auto& clock = env->GetSystemClock();
auto* clock = env->GetSystemClock().get();
TableBuilder* tb = nullptr;
DB* db = nullptr;
Status s;

@ -1140,7 +1140,8 @@ class BlockBasedTableTest
&trace_writer));
// Always return Status::OK().
assert(c->block_cache_tracer_
.StartTrace(env_, trace_opt, std::move(trace_writer))
.StartTrace(env_->GetSystemClock().get(), trace_opt,
std::move(trace_writer))
.ok());
{
std::string user_key = "k01";

@ -19,6 +19,7 @@
#include <sstream>
#include "monitoring/histogram.h"
#include "rocksdb/system_clock.h"
#include "util/gflags_compat.h"
#include "util/string_util.h"
@ -1519,6 +1520,7 @@ Status BlockCacheTraceAnalyzer::RecordAccess(
}
Status BlockCacheTraceAnalyzer::Analyze() {
SystemClock* clock = env_->GetSystemClock().get();
std::unique_ptr<BlockCacheTraceReader> reader;
Status s = Status::OK();
if (is_human_readable_trace_file_) {
@ -1542,7 +1544,7 @@ Status BlockCacheTraceAnalyzer::Analyze() {
return s;
}
}
uint64_t start = env_->NowMicros();
uint64_t start = clock->NowMicros();
uint64_t time_interval = 0;
while (s.ok()) {
BlockCacheTraceRecord access;
@ -1568,7 +1570,7 @@ Status BlockCacheTraceAnalyzer::Analyze() {
cache_simulator_->Access(access);
}
access_sequence_number_++;
uint64_t now = env_->NowMicros();
uint64_t now = clock->NowMicros();
uint64_t duration = (now - start) / kMicrosInSecond;
if (duration > 10 * time_interval) {
uint64_t trace_duration =
@ -1582,7 +1584,7 @@ Status BlockCacheTraceAnalyzer::Analyze() {
time_interval++;
}
}
uint64_t now = env_->NowMicros();
uint64_t now = clock->NowMicros();
uint64_t duration = (now - start) / kMicrosInSecond;
uint64_t trace_duration =
trace_end_timestamp_in_seconds_ - trace_start_timestamp_in_seconds_;

@ -225,7 +225,9 @@ TEST_F(BlockCacheTracerTest, BlockCacheAnalyzer) {
std::unique_ptr<TraceWriter> trace_writer;
ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
&trace_writer));
BlockCacheTraceWriter writer(env_, trace_opt, std::move(trace_writer));
const auto& clock = env_->GetSystemClock();
BlockCacheTraceWriter writer(clock.get(), trace_opt,
std::move(trace_writer));
ASSERT_OK(writer.WriteHeader());
WriteBlockAccess(&writer, 0, TraceType::kBlockTraceDataBlock, 50);
ASSERT_OK(env_->FileExists(trace_file_path_));
@ -610,9 +612,11 @@ TEST_F(BlockCacheTracerTest, MixedBlocks) {
// kSSTStoringEvenKeys.
TraceOptions trace_opt;
std::unique_ptr<TraceWriter> trace_writer;
const auto& clock = env_->GetSystemClock();
ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
&trace_writer));
BlockCacheTraceWriter writer(env_, trace_opt, std::move(trace_writer));
BlockCacheTraceWriter writer(clock.get(), trace_opt,
std::move(trace_writer));
ASSERT_OK(writer.WriteHeader());
// Write blocks of different types.
WriteBlockAccess(&writer, 0, TraceType::kBlockTraceUncompressionDictBlock,

@ -1854,7 +1854,8 @@ class ReporterAgent {
private:
std::string Header() const { return "secs_elapsed,interval_qps"; }
void SleepAndReport() {
auto time_started = env_->NowMicros();
auto* clock = env_->GetSystemClock().get();
auto time_started = clock->NowMicros();
while (true) {
{
std::unique_lock<std::mutex> lk(mutex_);
@ -1869,7 +1870,7 @@ class ReporterAgent {
auto total_ops_done_snapshot = total_ops_done_.load();
// round the seconds elapsed
auto secs_elapsed =
(env_->NowMicros() - time_started + kMicrosInSecond / 2) /
(clock->NowMicros() - time_started + kMicrosInSecond / 2) /
kMicrosInSecond;
std::string report = ToString(secs_elapsed) + "," +
ToString(total_ops_done_snapshot - last_report_) +
@ -1932,6 +1933,7 @@ static std::unordered_map<OperationType, std::string, std::hash<unsigned char>>
class CombinedStats;
class Stats {
private:
SystemClock* clock_;
int id_;
uint64_t start_ = 0;
uint64_t sine_interval_;
@ -1951,7 +1953,7 @@ class Stats {
friend class CombinedStats;
public:
Stats() { Start(-1); }
Stats() : clock_(FLAGS_env->GetSystemClock().get()) { Start(-1); }
void SetReporterAgent(ReporterAgent* reporter_agent) {
reporter_agent_ = reporter_agent;
@ -1966,8 +1968,8 @@ class Stats {
last_report_done_ = 0;
bytes_ = 0;
seconds_ = 0;
start_ = FLAGS_env->NowMicros();
sine_interval_ = FLAGS_env->NowMicros();
start_ = clock_->NowMicros();
sine_interval_ = clock_->NowMicros();
finish_ = start_;
last_report_finish_ = start_;
message_.clear();
@ -1999,7 +2001,7 @@ class Stats {
}
void Stop() {
finish_ = FLAGS_env->NowMicros();
finish_ = clock_->NowMicros();
seconds_ = (finish_ - start_) * 1e-6;
}
@ -2019,7 +2021,7 @@ class Stats {
"ElapsedTime", "Stage", "State", "OperationProperties");
int64_t current_time = 0;
FLAGS_env->GetCurrentTime(&current_time);
clock_->GetCurrentTime(&current_time).PermitUncheckedError();
for (auto ts : thread_list) {
fprintf(stderr, "%18" PRIu64 " %10s %12s %20s %13s %45s %12s",
ts.thread_id,
@ -2040,9 +2042,7 @@ class Stats {
}
}
void ResetSineInterval() {
sine_interval_ = FLAGS_env->NowMicros();
}
void ResetSineInterval() { sine_interval_ = clock_->NowMicros(); }
uint64_t GetSineInterval() {
return sine_interval_;
@ -2054,7 +2054,7 @@ class Stats {
void ResetLastOpTime() {
// Set to now to avoid latency from calls to SleepForMicroseconds
last_op_finish_ = FLAGS_env->NowMicros();
last_op_finish_ = clock_->NowMicros();
}
void FinishedOps(DBWithColumnFamilies* db_with_cfh, DB* db, int64_t num_ops,
@ -2063,7 +2063,7 @@ class Stats {
reporter_agent_->ReportFinishedOps(num_ops);
}
if (FLAGS_histogram) {
uint64_t now = FLAGS_env->NowMicros();
uint64_t now = clock_->NowMicros();
uint64_t micros = now - last_op_finish_;
if (hist_.find(op_type) == hist_.end())
@ -2092,7 +2092,7 @@ class Stats {
else next_report_ += 100000;
fprintf(stderr, "... finished %" PRIu64 " ops%30s\r", done_, "");
} else {
uint64_t now = FLAGS_env->NowMicros();
uint64_t now = clock_->NowMicros();
int64_t usecs_since_last = now - last_report_finish_;
// Determine whether to print status where interval is either
@ -2104,15 +2104,13 @@ class Stats {
next_report_ += FLAGS_stats_interval;
} else {
fprintf(stderr,
"%s ... thread %d: (%" PRIu64 ",%" PRIu64 ") ops and "
"%s ... thread %d: (%" PRIu64 ",%" PRIu64
") ops and "
"(%.1f,%.1f) ops/second in (%.6f,%.6f) seconds\n",
FLAGS_env->TimeToString(now/1000000).c_str(),
id_,
clock_->TimeToString(now / 1000000).c_str(), id_,
done_ - last_report_done_, done_,
(done_ - last_report_done_) /
(usecs_since_last / 1000000.0),
(done_ - last_report_done_) / (usecs_since_last / 1000000.0),
done_ / ((now - start_) / 1000000.0),
(now - last_report_finish_) / 1000000.0,
(now - start_) / 1000000.0);

@ -99,9 +99,9 @@ uint64_t BlockCacheTraceHelper::GetBlockOffsetInFile(
}
BlockCacheTraceWriter::BlockCacheTraceWriter(
Env* env, const TraceOptions& trace_options,
SystemClock* clock, const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer)
: env_(env),
: clock_(clock),
trace_options_(trace_options),
trace_writer_(std::move(trace_writer)) {}
@ -142,7 +142,7 @@ Status BlockCacheTraceWriter::WriteBlockAccess(
Status BlockCacheTraceWriter::WriteHeader() {
Trace trace;
trace.ts = env_->NowMicros();
trace.ts = clock_->NowMicros();
trace.type = TraceType::kTraceBegin;
PutLengthPrefixedSlice(&trace.payload, kTraceMagic);
PutFixed32(&trace.payload, kMajorVersion);
@ -444,7 +444,7 @@ BlockCacheTracer::BlockCacheTracer() { writer_.store(nullptr); }
BlockCacheTracer::~BlockCacheTracer() { EndTrace(); }
Status BlockCacheTracer::StartTrace(
Env* env, const TraceOptions& trace_options,
SystemClock* clock, const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer) {
InstrumentedMutexLock lock_guard(&trace_writer_mutex_);
if (writer_.load()) {
@ -453,7 +453,7 @@ Status BlockCacheTracer::StartTrace(
get_id_counter_.store(1);
trace_options_ = trace_options;
writer_.store(
new BlockCacheTraceWriter(env, trace_options, std::move(trace_writer)));
new BlockCacheTraceWriter(clock, trace_options, std::move(trace_writer)));
return writer_.load()->WriteHeader();
}

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save