diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 76bc69567..d2c7eaf7d 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -500,7 +500,8 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, unique_ptr file_reader; { unique_ptr file; - status = env_->NewSequentialFile(fname, &file, env_options_); + status = env_->NewSequentialFile(fname, &file, + env_->OptimizeForLogRead(env_options_)); if (!status.ok()) { MaybeIgnoreError(&status); if (!status.ok()) { diff --git a/db/db_iterator_test.cc b/db/db_iterator_test.cc index f31ca21dd..b65cca50d 100644 --- a/db/db_iterator_test.cc +++ b/db/db_iterator_test.cc @@ -1007,6 +1007,7 @@ TEST_F(DBIteratorTest, DBIteratorBoundTest) { TEST_F(DBIteratorTest, DBIteratorBoundOptimizationTest) { int upper_bound_hits = 0; + Options options = CurrentOptions(); rocksdb::SyncPoint::GetInstance()->SetCallBack( "BlockBasedTable::BlockEntryIteratorState::KeyReachedUpperBound", [&upper_bound_hits](void* arg) { @@ -1014,7 +1015,6 @@ TEST_F(DBIteratorTest, DBIteratorBoundOptimizationTest) { upper_bound_hits += (*static_cast(arg) ? 1 : 0); }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); - Options options = CurrentOptions(); options.env = env_; options.create_if_missing = true; options.prefix_extractor = nullptr; diff --git a/db/db_test_util.cc b/db/db_test_util.cc index 5cfe5e815..a697522e6 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -265,6 +265,14 @@ Options DBTestBase::CurrentOptions( Options options = defaultOptions; BlockBasedTableOptions table_options; bool set_block_based_table_factory = true; +#if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && \ + !defined(OS_AIX) + rocksdb::SyncPoint::GetInstance()->ClearCallBack( + "NewRandomAccessFile:O_DIRECT"); + rocksdb::SyncPoint::GetInstance()->ClearCallBack( + "NewWritableFile:O_DIRECT"); +#endif + switch (option_config_) { #ifndef ROCKSDB_LITE case kHashSkipList: @@ -429,6 +437,26 @@ Options DBTestBase::CurrentOptions( options.enable_write_thread_adaptive_yield = true; break; } + case kDirectIO: { + options.use_direct_reads = true; + options.use_direct_io_for_flush_and_compaction = true; + options.compaction_readahead_size = 2 * 1024 * 1024; +#if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && \ + !defined(OS_AIX) + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "NewWritableFile:O_DIRECT", [&](void* arg) { + int* val = static_cast(arg); + *val &= ~O_DIRECT; + }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "NewRandomAccessFile:O_DIRECT", [&](void* arg) { + int* val = static_cast(arg); + *val &= ~O_DIRECT; + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); +#endif + break; + } default: break; diff --git a/db/db_test_util.h b/db/db_test_util.h index 03cb4f245..699f87106 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -620,12 +620,13 @@ class DBTestBase : public testing::Test { kRowCache = 27, kRecycleLogFiles = 28, kConcurrentSkipList = 29, - kEnd = 30, - kLevelSubcompactions = 31, - kUniversalSubcompactions = 32, - kBlockBasedTableWithIndexRestartInterval = 33, - kBlockBasedTableWithPartitionedIndex = 34, - kPartitionedFilterWithNewTableReaderForCompactions = 35, + kDirectIO = 30, + kEnd = 31, + kLevelSubcompactions = 32, + kUniversalSubcompactions = 33, + kBlockBasedTableWithIndexRestartInterval = 34, + kBlockBasedTableWithPartitionedIndex = 35, + kPartitionedFilterWithNewTableReaderForCompactions = 36, }; int option_config_; diff --git a/db/repair.cc b/db/repair.cc index 158839835..1f9e344e1 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -316,7 +316,8 @@ class Repairer { // Open the log file std::string logname = LogFileName(dbname_, log); unique_ptr lfile; - Status status = env_->NewSequentialFile(logname, &lfile, env_options_); + Status status = env_->NewSequentialFile( + logname, &lfile, env_->OptimizeForLogRead(env_options_)); if (!status.ok()) { return status; } diff --git a/db/transaction_log_impl.cc b/db/transaction_log_impl.cc index aadd47eb8..5753a65f4 100644 --- a/db/transaction_log_impl.cc +++ b/db/transaction_log_impl.cc @@ -47,17 +47,18 @@ Status TransactionLogIteratorImpl::OpenLogFile( Env* env = options_->env; unique_ptr file; Status s; + EnvOptions optimized_env_options = env->OptimizeForLogRead(soptions_); if (logFile->Type() == kArchivedLogFile) { std::string fname = ArchivedLogFileName(dir_, logFile->LogNumber()); - s = env->NewSequentialFile(fname, &file, soptions_); + s = env->NewSequentialFile(fname, &file, optimized_env_options); } else { std::string fname = LogFileName(dir_, logFile->LogNumber()); - s = env->NewSequentialFile(fname, &file, soptions_); + s = env->NewSequentialFile(fname, &file, optimized_env_options); if (!s.ok()) { // If cannot open file in DB directory. // Try the archive dir, as it could have moved in the meanwhile. fname = ArchivedLogFileName(dir_, logFile->LogNumber()); - s = env->NewSequentialFile(fname, &file, soptions_); + s = env->NewSequentialFile(fname, &file, optimized_env_options); } } if (s.ok()) { diff --git a/db/version_set.cc b/db/version_set.cc index 031198d9c..0032c19a0 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2637,7 +2637,7 @@ Status VersionSet::Recover( { unique_ptr manifest_file; s = env_->NewSequentialFile(manifest_filename, &manifest_file, - env_options_); + env_->OptimizeForManifestRead(env_options_)); if (!s.ok()) { return s; } @@ -3064,7 +3064,8 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname, Status s; { unique_ptr file; - s = options.env->NewSequentialFile(dscname, &file, env_options_); + s = options.env->NewSequentialFile( + dscname, &file, env_->OptimizeForManifestRead(env_options_)); if (!s.ok()) { return s; } diff --git a/db/wal_manager.cc b/db/wal_manager.cc index c80ffacc1..1ccf153e8 100644 --- a/db/wal_manager.cc +++ b/db/wal_manager.cc @@ -433,7 +433,8 @@ Status WalManager::ReadFirstLine(const std::string& fname, }; std::unique_ptr file; - Status status = env_->NewSequentialFile(fname, &file, env_options_); + Status status = env_->NewSequentialFile( + fname, &file, env_->OptimizeForLogRead(env_options_)); unique_ptr file_reader( new SequentialFileReader(std::move(file))); diff --git a/env/env.cc b/env/env.cc index 55a1ede80..8e1d78bfe 100644 --- a/env/env.cc +++ b/env/env.cc @@ -342,6 +342,18 @@ EnvOptions Env::OptimizeForManifestWrite(const EnvOptions& env_options) const { return env_options; } +EnvOptions Env::OptimizeForLogRead(const EnvOptions& env_options) const { + EnvOptions optimized_env_options(env_options); + optimized_env_options.use_direct_reads = false; + return optimized_env_options; +} + +EnvOptions Env::OptimizeForManifestRead(const EnvOptions& env_options) const { + EnvOptions optimized_env_options(env_options); + optimized_env_options.use_direct_reads = false; + return optimized_env_options; +} + EnvOptions Env::OptimizeForCompactionTableWrite( const EnvOptions& env_options, const ImmutableDBOptions& db_options) const { EnvOptions optimized_env_options(env_options); diff --git a/env/env_test.cc b/env/env_test.cc index afd3f4170..4f9028192 100644 --- a/env/env_test.cc +++ b/env/env_test.cc @@ -1233,7 +1233,7 @@ TEST_P(EnvPosixTestWithParam, ConsistentChildrenAttributes) { ASSERT_EQ(size, 4096 * i); ASSERT_EQ(size, file_attrs_iter->size_bytes); } - rocksdb::SyncPoint::GetInstance()->ClearTrace(); + rocksdb::SyncPoint::GetInstance()->ClearTrace(); } // Test that all WritableFileWrapper forwards all calls to WritableFile. diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 053765911..670a7e546 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -379,6 +379,16 @@ class Env { // Generates a unique id that can be used to identify a db virtual std::string GenerateUniqueId(); + // OptimizeForLogWrite will create a new EnvOptions object that is a copy of + // the EnvOptions in the parameters, but is optimized for reading log files. + virtual EnvOptions OptimizeForLogRead(const EnvOptions& env_options) const; + + // OptimizeForManifestRead will create a new EnvOptions object that is a copy + // of the EnvOptions in the parameters, but is optimized for reading manifest + // files. + virtual EnvOptions OptimizeForManifestRead( + const EnvOptions& env_options) const; + // OptimizeForLogWrite will create a new EnvOptions object that is a copy of // the EnvOptions in the parameters, but is optimized for writing log files. // Default implementation returns the copy of the same object. @@ -390,16 +400,16 @@ class Env { virtual EnvOptions OptimizeForManifestWrite( const EnvOptions& env_options) const; - // OptimizeForCompactionTableWrite will create a new EnvOptions object that is a copy - // of the EnvOptions in the parameters, but is optimized for writing table - // files. Default implementation returns the copy of the same object. + // OptimizeForCompactionTableWrite will create a new EnvOptions object that is + // a copy of the EnvOptions in the parameters, but is optimized for writing + // table files. virtual EnvOptions OptimizeForCompactionTableWrite( const EnvOptions& env_options, const ImmutableDBOptions& db_options) const; - // OptimizeForCompactionTableWrite will create a new EnvOptions object that is a copy - // of the EnvOptions in the parameters, but is optimized for reading table - // files. Default implementation returns the copy of the same object. + // OptimizeForCompactionTableWrite will create a new EnvOptions object that + // is a copy of the EnvOptions in the parameters, but is optimized for reading + // table files. virtual EnvOptions OptimizeForCompactionTableRead( const EnvOptions& env_options, const ImmutableDBOptions& db_options) const; diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 6aaa20f49..f14c15e01 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -31,6 +31,7 @@ int main() { #else #define __STDC_FORMAT_MACROS +#include #include #include #include @@ -60,7 +61,12 @@ int main() { #include "util/mutexlock.h" #include "util/random.h" #include "util/string_util.h" +// SyncPoint is not supported in Released Windows Mode. +#if !(defined NDEBUG) || !defined(OS_WIN) +#include "util/sync_point.h" +#endif // !(defined NDEBUG) || !defined(OS_WIN) #include "util/testutil.h" + #include "utilities/merge_operators.h" using GFLAGS::ParseCommandLineFlags; @@ -1165,6 +1171,8 @@ class StressTest { ToString(FLAGS_max_bytes_for_level_multiplier), "1", "2", }}, {"max_sequential_skip_in_iterations", {"4", "8", "12"}}, + {"use_direct_reads", {"false", "true"}}, + {"use_direct_io_for_flush_and_compaction", {"false", "true"}}, }; options_table_ = std::move(options_tbl); @@ -2352,6 +2360,20 @@ int main(int argc, char** argv) { SetUsageMessage(std::string("\nUSAGE:\n") + std::string(argv[0]) + " [OPTIONS]..."); ParseCommandLineFlags(&argc, &argv, true); +#if !defined(OS_MACOSX) && !defined(OS_WIN) && !defined(OS_SOLARIS) && \ + !defined(OS_AIX) + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "NewWritableFile:O_DIRECT", [&](void* arg) { + int* val = static_cast(arg); + *val &= ~O_DIRECT; + }); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "NewRandomAccessFile:O_DIRECT", [&](void* arg) { + int* val = static_cast(arg); + *val &= ~O_DIRECT; + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); +#endif if (FLAGS_statistics) { dbstats = rocksdb::CreateDBStatistics(); diff --git a/util/sync_point.cc b/util/sync_point.cc index 0e909db45..65c346a64 100644 --- a/util/sync_point.cc +++ b/util/sync_point.cc @@ -91,6 +91,14 @@ void SyncPoint::SetCallBack(const std::string point, callbacks_[point] = callback; } +void SyncPoint::ClearCallBack(const std::string point) { + std::unique_lock lock(mutex_); + while (num_callbacks_running_ > 0) { + cv_.wait(lock); + } + callbacks_.erase(point); +} + void SyncPoint::ClearAllCallBacks() { std::unique_lock lock(mutex_); while (num_callbacks_running_ > 0) { diff --git a/util/sync_point.h b/util/sync_point.h index a55cf13b2..9d0e8b1f9 100644 --- a/util/sync_point.h +++ b/util/sync_point.h @@ -85,6 +85,10 @@ class SyncPoint { // Set up a call back function in sync point. void SetCallBack(const std::string point, std::function callback); + + // Clear callback function by point + void ClearCallBack(const std::string point); + // Clear all call back functions. void ClearAllCallBacks(); diff --git a/utilities/backupable/backupable_db.cc b/utilities/backupable/backupable_db.cc index aec6206bd..233b34388 100644 --- a/utilities/backupable/backupable_db.cc +++ b/utilities/backupable/backupable_db.cc @@ -1154,7 +1154,7 @@ Status BackupEngineImpl::CopyOrCreateFile( unique_ptr src_file; EnvOptions env_options; env_options.use_mmap_writes = false; - // TODO:(gzh) maybe use direct writes here if possible + // TODO:(gzh) maybe use direct reads/writes here if possible if (size != nullptr) { *size = 0; }