diff --git a/HISTORY.md b/HISTORY.md index 5f6c9addd..bbe2d0203 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -6,6 +6,7 @@ * Add a new ticker stat rocksdb.number.multiget.keys.found to count number of keys successfully read in MultiGet calls * Touch-up to write-related counters in PerfContext. New counters added: write_scheduling_flushes_compactions_time, write_thread_wait_nanos. Counters whose behavior was fixed or modified: write_memtable_time, write_pre_and_post_process_time, write_delay_time. * Posix Env's NewRandomRWFile() will fail if the file doesn't exist. +* Now, `DBOptions::use_direct_io_for_flush_and_compaction` only applies to background writes, and `DBOptions::use_direct_reads` applies to both user reads and background reads. This conforms with Linux's `open(2)` manpage, which advises against simultaneously reading a file in buffered and direct modes, due to possibly undefined behavior and degraded performance. ### New Features * Introduce TTL for level compaction so that all files older than ttl go through the compaction process to get rid of old data. diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 19e2d21bd..3cc02ecbe 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -3679,7 +3679,7 @@ TEST_P(DBCompactionDirectIOTest, DirectIO) { "TableCache::NewIterator:for_compaction", [&](void* arg) { bool* use_direct_reads = static_cast(arg); ASSERT_EQ(*use_direct_reads, - options.use_direct_io_for_flush_and_compaction); + options.use_direct_reads); }); SyncPoint::GetInstance()->SetCallBack( "CompactionJob::OpenCompactionOutputFile", [&](void* arg) { @@ -3698,7 +3698,7 @@ TEST_P(DBCompactionDirectIOTest, DirectIO) { MakeTables(3, "p", "q", 1); ASSERT_EQ("1,1,1", FilesPerLevel(1)); Compact(1, "p1", "p9"); - ASSERT_FALSE(readahead ^ options.use_direct_io_for_flush_and_compaction); + ASSERT_EQ(readahead, options.use_direct_reads); ASSERT_EQ("0,0,1", FilesPerLevel(1)); Destroy(options); delete options.env; diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index 37d222dd6..0cf2b0b8f 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -106,14 +106,14 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) { result.db_paths.emplace_back(dbname, std::numeric_limits::max()); } - if (result.use_direct_io_for_flush_and_compaction && + if (result.use_direct_reads && result.compaction_readahead_size == 0) { TEST_SYNC_POINT_CALLBACK("SanitizeOptions:direct_io", nullptr); result.compaction_readahead_size = 1024 * 1024 * 2; } if (result.compaction_readahead_size > 0 || - result.use_direct_io_for_flush_and_compaction) { + result.use_direct_reads) { result.new_table_reader_for_compaction_inputs = true; } diff --git a/db/db_test2.cc b/db/db_test2.cc index 2cdefb56a..1550cf551 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -2296,7 +2296,8 @@ TEST_F(DBTest2, RateLimitedCompactionReads) { kBytesPerKey) /* rate_bytes_per_sec */, 10 * 1000 /* refill_period_us */, 10 /* fairness */, RateLimiter::Mode::kReadsOnly)); - options.use_direct_io_for_flush_and_compaction = use_direct_io; + options.use_direct_reads = options.use_direct_io_for_flush_and_compaction = + use_direct_io; BlockBasedTableOptions bbto; bbto.block_size = 16384; bbto.no_block_cache = true; @@ -2318,7 +2319,7 @@ TEST_F(DBTest2, RateLimitedCompactionReads) { // chose 1MB as the upper bound on the total bytes read. size_t rate_limited_bytes = options.rate_limiter->GetTotalBytesThrough(Env::IO_LOW); - // Include the explict prefetch of the footer in direct I/O case. + // Include the explicit prefetch of the footer in direct I/O case. size_t direct_io_extra = use_direct_io ? 512 * 1024 : 0; ASSERT_GE(rate_limited_bytes, static_cast(kNumKeysPerFile * kBytesPerKey * kNumL0Files + diff --git a/env/env.cc b/env/env.cc index 66e293337..9b7f5e40d 100644 --- a/env/env.cc +++ b/env/env.cc @@ -404,8 +404,7 @@ EnvOptions Env::OptimizeForCompactionTableWrite( EnvOptions Env::OptimizeForCompactionTableRead( const EnvOptions& env_options, const ImmutableDBOptions& db_options) const { EnvOptions optimized_env_options(env_options); - optimized_env_options.use_direct_reads = - db_options.use_direct_io_for_flush_and_compaction; + optimized_env_options.use_direct_reads = db_options.use_direct_reads; return optimized_env_options; } diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 6d44e2435..0ba4685d1 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -604,13 +604,13 @@ struct DBOptions { // buffered. The hardware buffer of the devices may however still // be used. Memory mapped files are not impacted by these parameters. - // Use O_DIRECT for user reads + // Use O_DIRECT for user and compaction reads. + // When true, we also force new_table_reader_for_compaction_inputs to true. // Default: false // Not supported in ROCKSDB_LITE mode! bool use_direct_reads = false; - // Use O_DIRECT for both reads and writes in background flush and compactions - // When true, we also force new_table_reader_for_compaction_inputs to true. + // Use O_DIRECT for writes in background flush and compactions. // Default: false // Not supported in ROCKSDB_LITE mode! bool use_direct_io_for_flush_and_compaction = false; diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 4f967a8ef..2aeadf344 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -920,7 +920,7 @@ DEFINE_bool(use_direct_reads, rocksdb::Options().use_direct_reads, DEFINE_bool(use_direct_io_for_flush_and_compaction, rocksdb::Options().use_direct_io_for_flush_and_compaction, - "Use O_DIRECT for background flush and compaction I/O"); + "Use O_DIRECT for background flush and compaction writes"); DEFINE_bool(advise_random_on_open, rocksdb::Options().advise_random_on_open, "Advise random access on table file open"); diff --git a/util/file_reader_writer.cc b/util/file_reader_writer.cc index aa8ca30b0..36ca7c407 100644 --- a/util/file_reader_writer.cc +++ b/util/file_reader_writer.cc @@ -88,7 +88,7 @@ Status RandomAccessFileReader::Read(uint64_t offset, size_t n, Slice* result, buf.AllocateNewBuffer(read_size); while (buf.CurrentSize() < read_size) { size_t allowed; - if (rate_limiter_ != nullptr) { + if (for_compaction_ && rate_limiter_ != nullptr) { allowed = rate_limiter_->RequestToken( buf.Capacity() - buf.CurrentSize(), buf.Alignment(), Env::IOPriority::IO_LOW, stats_, RateLimiter::OpType::kRead);