From 44fa8ece9bfac08780d51c7157bf2f5152343be3 Mon Sep 17 00:00:00 2001 From: Aaron Gao Date: Thu, 13 Apr 2017 13:07:33 -0700 Subject: [PATCH] change use_direct_writes to use_direct_io_for_flush_and_compaction Summary: Replace Options::use_direct_writes with Options::use_direct_io_for_flush_and_compaction Now if Options::use_direct_io_for_flush_and_compaction = true, we will enable direct io for both reads and writes for flush and compaction job. Whereas Options::use_direct_reads controls user reads like iterator and Get(). Closes https://github.com/facebook/rocksdb/pull/2117 Differential Revision: D4860912 Pulled By: lightmark fbshipit-source-id: d93575a8a5e780cf7e40797287edc425ee648c19 --- HISTORY.md | 1 + db/builder.cc | 10 +++++ db/c.cc | 6 +-- db/compaction_job.cc | 11 ++++- db/db_compaction_test.cc | 39 +++++++++++++++++ db/db_flush_test.cc | 33 +++++++++++++++ db/db_impl_open.cc | 15 ++++--- db/db_test2.cc | 3 +- db/flush_job.cc | 4 +- db/forward_iterator_bench.cc | 2 +- db/repair.cc | 4 +- db/table_cache.cc | 5 +++ db/version_set.cc | 5 ++- env/env.cc | 19 ++++++++- include/rocksdb/c.h | 5 ++- include/rocksdb/env.h | 15 +++++++ include/rocksdb/options.h | 7 ++-- java/rocksjni/options.cc | 42 ++++++++++--------- java/src/main/java/org/rocksdb/DBOptions.java | 16 +++---- .../java/org/rocksdb/DBOptionsInterface.java | 17 ++++---- java/src/main/java/org/rocksdb/Options.java | 14 ++++--- .../test/java/org/rocksdb/DBOptionsTest.java | 6 +-- .../test/java/org/rocksdb/OptionsTest.java | 6 +-- options/db_options.cc | 9 ++-- options/db_options.h | 2 +- options/options.cc | 3 +- options/options_helper.cc | 3 +- options/options_helper.h | 6 ++- options/options_settable_test.cc | 2 +- options/options_test.cc | 4 +- tools/db_bench_tool.cc | 14 ++++--- tools/db_bench_tool_test.cc | 2 +- tools/db_stress.cc | 12 +++--- util/testutil.cc | 2 +- 34 files changed, 253 insertions(+), 91 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 3849cba65..8ba8face6 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -8,6 +8,7 @@ * DB::Get in place of std::string accepts PinnableSlice, which avoids the extra memcpy of value to std::string in most of cases. * PinnableSlice releases the pinned resources that contain the value when it is destructed or when ::Reset() is called on it. * The old API that accepts std::string, although discouraged, is still supported. +* Replace Options::use_direct_writes with Options::use_direct_io_for_flush_and_compaction. Read Direct IO wiki for details. ### New Features * Memtable flush can be avoided during checkpoint creation if total log file size is smaller than a threshold specified by the user. diff --git a/db/builder.cc b/db/builder.cc index ba233b43a..65ffd449a 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -32,6 +32,7 @@ #include "util/file_reader_writer.h" #include "util/filename.h" #include "util/stop_watch.h" +#include "util/sync_point.h" namespace rocksdb { @@ -103,6 +104,10 @@ Status BuildTable( unique_ptr file_writer; { unique_ptr file; +#ifndef NDEBUG + bool use_direct_writes = env_options.use_direct_writes; + TEST_SYNC_POINT_CALLBACK("BuildTable:create_file", &use_direct_writes); +#endif // !NDEBUG s = NewWritableFile(env, fname, &file, env_options); if (!s.ok()) { EventHelpers::LogAndNotifyTableFileCreationFinished( @@ -180,6 +185,11 @@ Status BuildTable( if (s.ok() && !empty) { // Verify that the table is usable + // We set for_compaction to false and don't OptimizeForCompactionTableRead + // here because this is a special case after we finish the table building + // No matter whether use_direct_io_for_flush_and_compaction is true, + // we will regrad this verification as user reads since the goal is + // to cache it here for further user reads std::unique_ptr it(table_cache->NewIterator( ReadOptions(), env_options, internal_comparator, meta->fd, nullptr /* range_del_agg */, nullptr, diff --git a/db/c.cc b/db/c.cc index cbba46451..231d8871f 100644 --- a/db/c.cc +++ b/db/c.cc @@ -2121,9 +2121,9 @@ void rocksdb_options_set_use_direct_reads(rocksdb_options_t* opt, opt->rep.use_direct_reads = v; } -void rocksdb_options_set_use_direct_writes(rocksdb_options_t* opt, - unsigned char v) { - opt->rep.use_direct_writes = v; +void rocksdb_options_set_use_direct_io_for_flush_and_compaction( + rocksdb_options_t* opt, unsigned char v) { + opt->rep.use_direct_io_for_flush_and_compaction = v; } void rocksdb_options_set_allow_mmap_reads( diff --git a/db/compaction_job.cc b/db/compaction_job.cc index b427d3e40..bcf1155d5 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -1072,6 +1072,11 @@ Status CompactionJob::FinishCompactionOutputFile( TableProperties tp; if (s.ok() && current_entries > 0) { // Verify that the table is usable + // We set for_compaction to false and don't OptimizeForCompactionTableRead + // here because this is a special case after we finish the table building + // No matter whether use_direct_io_for_flush_and_compaction is true, + // we will regrad this verification as user reads since the goal is + // to cache it here for further user reads InternalIterator* iter = cfd->table_cache()->NewIterator( ReadOptions(), env_options_, cfd->internal_comparator(), meta->fd, nullptr /* range_del_agg */, nullptr, @@ -1198,7 +1203,11 @@ Status CompactionJob::OpenCompactionOutputFile( #endif // !ROCKSDB_LITE // Make the output file unique_ptr writable_file; - Status s = NewWritableFile(env_, fname, &writable_file, env_options_); + EnvOptions opt_env_opts = + env_->OptimizeForCompactionTableWrite(env_options_, db_options_); + TEST_SYNC_POINT_CALLBACK("CompactionJob::OpenCompactionOutputFile", + &opt_env_opts.use_direct_writes); + Status s = NewWritableFile(env_, fname, &writable_file, opt_env_opts); if (!s.ok()) { ROCKS_LOG_ERROR( db_options_.info_log, diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 28fbe8fa3..f51a8a3ad 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -40,6 +40,12 @@ class DBCompactionTestWithParam bool exclusive_manual_compaction_; }; +class DBCompactionDirectIOTest : public DBCompactionTest, + public ::testing::WithParamInterface { + public: + DBCompactionDirectIOTest() : DBCompactionTest() {} +}; + namespace { class FlushedFileCollector : public EventListener { @@ -2552,6 +2558,39 @@ INSTANTIATE_TEST_CASE_P(DBCompactionTestWithParam, DBCompactionTestWithParam, std::make_tuple(4, true), std::make_tuple(4, false))); +TEST_P(DBCompactionDirectIOTest, DirectIO) { + Options options = CurrentOptions(); + Destroy(options); + options.create_if_missing = true; + options.disable_auto_compactions = true; + options.use_direct_io_for_flush_and_compaction = GetParam(); + options.env = new MockEnv(Env::Default()); + Reopen(options); + SyncPoint::GetInstance()->SetCallBack( + "TableCache::NewIterator:for_compaction", [&](void* arg) { + bool* use_direct_reads = static_cast(arg); + ASSERT_EQ(*use_direct_reads, + options.use_direct_io_for_flush_and_compaction); + }); + SyncPoint::GetInstance()->SetCallBack( + "CompactionJob::OpenCompactionOutputFile", [&](void* arg) { + bool* use_direct_writes = static_cast(arg); + ASSERT_EQ(*use_direct_writes, + options.use_direct_io_for_flush_and_compaction); + }); + SyncPoint::GetInstance()->EnableProcessing(); + CreateAndReopenWithCF({"pikachu"}, options); + MakeTables(3, "p", "q", 1); + ASSERT_EQ("1,1,1", FilesPerLevel(1)); + Compact(1, "p1", "p9"); + ASSERT_EQ("0,0,1", FilesPerLevel(1)); + Destroy(options); + delete options.env; +} + +INSTANTIATE_TEST_CASE_P(DBCompactionDirectIOTest, DBCompactionDirectIOTest, + testing::Bool()); + class CompactionPriTest : public DBTestBase, public testing::WithParamInterface { public: diff --git a/db/db_flush_test.cc b/db/db_flush_test.cc index f57663baa..b9ebc8e00 100644 --- a/db/db_flush_test.cc +++ b/db/db_flush_test.cc @@ -19,6 +19,12 @@ class DBFlushTest : public DBTestBase { DBFlushTest() : DBTestBase("/db_flush_test") {} }; +class DBFlushDirectIOTest : public DBFlushTest, + public ::testing::WithParamInterface { + public: + DBFlushDirectIOTest() : DBFlushTest() {} +}; + // We had issue when two background threads trying to flush at the same time, // only one of them get committed. The test verifies the issue is fixed. TEST_F(DBFlushTest, FlushWhileWritingManifest) { @@ -83,6 +89,33 @@ TEST_F(DBFlushTest, SyncFail) { Destroy(options); } +TEST_P(DBFlushDirectIOTest, DirectIO) { + Options options; + options.create_if_missing = true; + options.disable_auto_compactions = true; + options.max_background_flushes = 2; + options.use_direct_io_for_flush_and_compaction = GetParam(); + options.env = new MockEnv(Env::Default()); + SyncPoint::GetInstance()->SetCallBack( + "BuildTable:create_file", [&](void* arg) { + bool* use_direct_writes = static_cast(arg); + ASSERT_EQ(*use_direct_writes, + options.use_direct_io_for_flush_and_compaction); + }); + + SyncPoint::GetInstance()->EnableProcessing(); + Reopen(options); + ASSERT_OK(Put("foo", "v")); + FlushOptions flush_options; + flush_options.wait = true; + ASSERT_OK(dbfull()->Flush(flush_options)); + Destroy(options); + delete options.env; +} + +INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest, + testing::Bool()); + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_impl_open.cc b/db/db_impl_open.cc index d043d678f..9e7d3a02d 100644 --- a/db/db_impl_open.cc +++ b/db/db_impl_open.cc @@ -101,7 +101,8 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) { result.compaction_readahead_size = 1024 * 1024 * 2; } - if (result.compaction_readahead_size > 0) { + if (result.compaction_readahead_size > 0 || + result.use_direct_io_for_flush_and_compaction) { result.new_table_reader_for_compaction_inputs = true; } @@ -165,10 +166,12 @@ static Status ValidateOptions( "then direct I/O reads (use_direct_reads) must be disabled. "); } - if (db_options.allow_mmap_writes && db_options.use_direct_writes) { + if (db_options.allow_mmap_writes && + db_options.use_direct_io_for_flush_and_compaction) { return Status::NotSupported( "If memory mapped writes (allow_mmap_writes) are enabled " - "then direct I/O writes (use_direct_writes) must be disabled. "); + "then direct I/O writes (use_direct_io_for_flush_and_compaction) must " + "be disabled. "); } if (db_options.keep_log_file_num == 0) { @@ -823,9 +826,11 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd, std::vector snapshot_seqs = snapshots_.GetAll(&earliest_write_conflict_snapshot); + EnvOptions optimized_env_options = + env_->OptimizeForCompactionTableWrite(env_options_, immutable_db_options_); s = BuildTable( - dbname_, env_, *cfd->ioptions(), mutable_cf_options, env_options_, - cfd->table_cache(), iter.get(), + dbname_, env_, *cfd->ioptions(), mutable_cf_options, + optimized_env_options, cfd->table_cache(), iter.get(), std::unique_ptr(mem->NewRangeTombstoneIterator(ro)), &meta, cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(), diff --git a/db/db_test2.cc b/db/db_test2.cc index f673d24b6..51543fd8a 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -1995,7 +1995,8 @@ TEST_F(DBTest2, DirectIO) { return; } Options options = CurrentOptions(); - options.use_direct_reads = options.use_direct_writes = true; + options.use_direct_reads = options.use_direct_io_for_flush_and_compaction = + true; options.allow_mmap_reads = options.allow_mmap_writes = false; DestroyAndReopen(options); diff --git a/db/flush_job.cc b/db/flush_job.cc index 6dc5915ff..5e4e210a2 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -294,9 +294,11 @@ Status FlushJob::WriteLevel0Table() { TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression", &output_compression_); + EnvOptions optimized_env_options = + db_options_.env->OptimizeForCompactionTableWrite(env_options_, db_options_); s = BuildTable( dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_, - env_options_, cfd_->table_cache(), iter.get(), + optimized_env_options, cfd_->table_cache(), iter.get(), std::move(range_del_iter), &meta_, cfd_->internal_comparator(), cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(), cfd_->GetName(), existing_snapshots_, diff --git a/db/forward_iterator_bench.cc b/db/forward_iterator_bench.cc index bdd402539..e86fa8afc 100644 --- a/db/forward_iterator_bench.cc +++ b/db/forward_iterator_bench.cc @@ -331,7 +331,7 @@ int main(int argc, char** argv) { options.compaction_style = rocksdb::CompactionStyle::kCompactionStyleNone; options.level0_slowdown_writes_trigger = 99999; options.level0_stop_writes_trigger = 99999; - options.use_direct_writes = true; + options.use_direct_io_for_flush_and_compaction = true; options.write_buffer_size = FLAGS_memtable_size; rocksdb::BlockBasedTableOptions table_options; table_options.block_cache = rocksdb::NewLRUCache(FLAGS_block_cache_size); diff --git a/db/repair.cc b/db/repair.cc index 4cd6aa03f..5563449a9 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -377,9 +377,11 @@ class Repairer { ro.total_order_seek = true; Arena arena; ScopedArenaIterator iter(mem->NewIterator(ro, &arena)); + EnvOptions optimized_env_options = + env_->OptimizeForCompactionTableWrite(env_options_, immutable_db_options_); status = BuildTable( dbname_, env_, *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(), - env_options_, table_cache_, iter.get(), + optimized_env_options, table_cache_, iter.get(), std::unique_ptr(mem->NewRangeTombstoneIterator(ro)), &meta, cfd->internal_comparator(), cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(), diff --git a/db/table_cache.cc b/db/table_cache.cc index c1caa8b0e..9c1a63b36 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -184,6 +184,11 @@ InternalIterator* TableCache::NewIterator( } size_t readahead = 0; if (for_compaction) { +#ifndef NDEBUG + bool use_direct_reads_for_compaction = env_options.use_direct_reads; + TEST_SYNC_POINT_CALLBACK("TableCache::NewIterator:for_compaction", + &use_direct_reads_for_compaction); +#endif // !NDEBUG if (ioptions_.new_table_reader_for_compaction_inputs) { readahead = ioptions_.compaction_readahead_size; create_new_table_reader = true; diff --git a/db/version_set.cc b/db/version_set.cc index 96254a94d..9696f8107 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -2218,7 +2218,8 @@ VersionSet::VersionSet(const std::string& dbname, current_version_number_(0), manifest_file_size_(0), env_options_(storage_options), - env_options_compactions_(env_options_) {} + env_options_compactions_( + env_->OptimizeForCompactionTableRead(env_options_, *db_options_)) {} void CloseTables(void* ptr, size_t) { TableReader* table_reader = reinterpret_cast(ptr); @@ -3477,7 +3478,7 @@ InternalIterator* VersionSet::MakeInputIterator( // Create concatenating iterator for the files from this level list[num++] = NewTwoLevelIterator( new LevelFileIteratorState( - cfd->table_cache(), read_options, env_options_, + cfd->table_cache(), read_options, env_options_compactions_, cfd->internal_comparator(), nullptr /* no per level latency histogram */, true /* for_compaction */, false /* prefix enabled */, diff --git a/env/env.cc b/env/env.cc index 7a57f5e86..ff3b908f1 100644 --- a/env/env.cc +++ b/env/env.cc @@ -10,9 +10,9 @@ #include "rocksdb/env.h" #include +#include "options/db_options.h" #include "port/port.h" #include "port/sys_time.h" - #include "rocksdb/options.h" #include "util/arena.h" #include "util/autovector.h" @@ -316,7 +316,6 @@ void AssignEnvOptions(EnvOptions* env_options, const DBOptions& options) { env_options->use_mmap_reads = options.allow_mmap_reads; env_options->use_mmap_writes = options.allow_mmap_writes; env_options->use_direct_reads = options.use_direct_reads; - env_options->use_direct_writes = options.use_direct_writes; env_options->set_fd_cloexec = options.is_fd_close_on_exec; env_options->bytes_per_sync = options.bytes_per_sync; env_options->compaction_readahead_size = options.compaction_readahead_size; @@ -341,6 +340,22 @@ EnvOptions Env::OptimizeForManifestWrite(const EnvOptions& env_options) const { return env_options; } +EnvOptions Env::OptimizeForCompactionTableWrite( + const EnvOptions& env_options, const ImmutableDBOptions& db_options) const { + EnvOptions optimized_env_options(env_options); + optimized_env_options.use_direct_writes = + db_options.use_direct_io_for_flush_and_compaction; + return optimized_env_options; +} + +EnvOptions Env::OptimizeForCompactionTableRead( + const EnvOptions& env_options, const ImmutableDBOptions& db_options) const { + EnvOptions optimized_env_options(env_options); + optimized_env_options.use_direct_reads = + db_options.use_direct_io_for_flush_and_compaction; + return optimized_env_options; +} + EnvOptions::EnvOptions(const DBOptions& options) { AssignEnvOptions(this, options); } diff --git a/include/rocksdb/c.h b/include/rocksdb/c.h index 3b087f305..7f528b7e7 100644 --- a/include/rocksdb/c.h +++ b/include/rocksdb/c.h @@ -804,8 +804,9 @@ extern ROCKSDB_LIBRARY_API void rocksdb_options_set_allow_mmap_writes( rocksdb_options_t*, unsigned char); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_use_direct_reads( rocksdb_options_t*, unsigned char); -extern ROCKSDB_LIBRARY_API void rocksdb_options_set_use_direct_writes( - rocksdb_options_t*, unsigned char); +extern ROCKSDB_LIBRARY_API void +rocksdb_options_set_use_direct_io_for_flush_and_compaction(rocksdb_options_t*, + unsigned char); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_is_fd_close_on_exec( rocksdb_options_t*, unsigned char); extern ROCKSDB_LIBRARY_API void rocksdb_options_set_skip_log_error_on_recovery( diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 7337a5fc6..c74f4bb28 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -44,6 +44,7 @@ class WritableFile; class RandomRWFile; class Directory; struct DBOptions; +struct ImmutableDBOptions; class RateLimiter; class ThreadStatusUpdater; struct ThreadStatus; @@ -375,6 +376,20 @@ class Env { virtual EnvOptions OptimizeForManifestWrite( const EnvOptions& env_options) const; + // OptimizeForCompactionTableWrite will create a new EnvOptions object that is a copy + // of the EnvOptions in the parameters, but is optimized for writing table + // files. Default implementation returns the copy of the same object. + virtual EnvOptions OptimizeForCompactionTableWrite( + const EnvOptions& env_options, + const ImmutableDBOptions& db_options) const; + + // OptimizeForCompactionTableWrite will create a new EnvOptions object that is a copy + // of the EnvOptions in the parameters, but is optimized for reading table + // files. Default implementation returns the copy of the same object. + virtual EnvOptions OptimizeForCompactionTableRead( + const EnvOptions& env_options, + const ImmutableDBOptions& db_options) const; + // Returns the status of all threads that belong to the current Env. virtual Status GetThreadList(std::vector* thread_list) { return Status::NotSupported("Not supported."); diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index b0ee950a6..eaa201c66 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -581,15 +581,16 @@ struct DBOptions { // bufferized. The hardware buffer of the devices may however still // be used. Memory mapped files are not impacted by these parameters. - // Use O_DIRECT for reading file + // Use O_DIRECT for user reads // Default: false // Not supported in ROCKSDB_LITE mode! bool use_direct_reads = false; - // Use O_DIRECT for writing file + // Use O_DIRECT for both reads and writes in background flush and compactions + // When true, we also force new_table_reader_for_compaction_inputs to true. // Default: false // Not supported in ROCKSDB_LITE mode! - bool use_direct_writes = false; + bool use_direct_io_for_flush_and_compaction = false; // If false, fallocate() calls are bypassed bool allow_fallocate = true; diff --git a/java/rocksjni/options.cc b/java/rocksjni/options.cc index 4999c3f2f..744b9e6f6 100644 --- a/java/rocksjni/options.cc +++ b/java/rocksjni/options.cc @@ -1052,24 +1052,26 @@ void Java_org_rocksdb_Options_setUseDirectReads(JNIEnv* env, jobject jobj, /* * Class: org_rocksdb_Options - * Method: useDirectWrites + * Method: useDirectIoForFlushAndCompaction * Signature: (J)Z */ -jboolean Java_org_rocksdb_Options_useDirectWrites(JNIEnv* env, jobject jobj, - jlong jhandle) { - return reinterpret_cast(jhandle)->use_direct_writes; +jboolean Java_org_rocksdb_Options_useDirectIoForFlushAndCompaction( + JNIEnv* env, jobject jobj, jlong jhandle) { + return reinterpret_cast(jhandle) + ->use_direct_io_for_flush_and_compaction; } /* * Class: org_rocksdb_Options - * Method: setUseDirectReads + * Method: setUseDirectIoForFlushAndCompaction * Signature: (JZ)V */ -void Java_org_rocksdb_Options_setUseDirectWrites(JNIEnv* env, jobject jobj, - jlong jhandle, - jboolean use_direct_writes) { - reinterpret_cast(jhandle)->use_direct_writes = - static_cast(use_direct_writes); +void Java_org_rocksdb_Options_setUseDirectIoForFlushAndCompaction( + JNIEnv* env, jobject jobj, jlong jhandle, + jboolean use_direct_io_for_flush_and_compaction) { + reinterpret_cast(jhandle) + ->use_direct_io_for_flush_and_compaction = + static_cast(use_direct_io_for_flush_and_compaction); } /* @@ -4920,12 +4922,13 @@ void Java_org_rocksdb_DBOptions_setUseDirectReads(JNIEnv* env, jobject jobj, /* * Class: org_rocksdb_DBOptions - * Method: useDirectWrites + * Method: useDirectIoForFlushAndCompaction * Signature: (J)Z */ -jboolean Java_org_rocksdb_DBOptions_useDirectWrites(JNIEnv* env, jobject jobj, - jlong jhandle) { - return reinterpret_cast(jhandle)->use_direct_writes; +jboolean Java_org_rocksdb_DBOptions_useDirectIoForFlushAndCompaction( + JNIEnv* env, jobject jobj, jlong jhandle) { + return reinterpret_cast(jhandle) + ->use_direct_io_for_flush_and_compaction; } /* @@ -4933,11 +4936,12 @@ jboolean Java_org_rocksdb_DBOptions_useDirectWrites(JNIEnv* env, jobject jobj, * Method: setUseDirectReads * Signature: (JZ)V */ -void Java_org_rocksdb_DBOptions_setUseDirectWrites(JNIEnv* env, jobject jobj, - jlong jhandle, - jboolean use_direct_writes) { - reinterpret_cast(jhandle)->use_direct_writes = - static_cast(use_direct_writes); +void Java_org_rocksdb_DBOptions_setUseDirectIoForFlushAndCompaction( + JNIEnv* env, jobject jobj, jlong jhandle, + jboolean use_direct_io_for_flush_and_compaction) { + reinterpret_cast(jhandle) + ->use_direct_io_for_flush_and_compaction = + static_cast(use_direct_io_for_flush_and_compaction); } /* diff --git a/java/src/main/java/org/rocksdb/DBOptions.java b/java/src/main/java/org/rocksdb/DBOptions.java index 776d22309..c29ca9cb8 100644 --- a/java/src/main/java/org/rocksdb/DBOptions.java +++ b/java/src/main/java/org/rocksdb/DBOptions.java @@ -531,17 +531,18 @@ public class DBOptions } @Override - public DBOptions setUseDirectWrites( - final boolean useDirectWrites) { + public DBOptions setUseDirectIoForFlushAndCompaction( + final boolean useDirectIoForFlushAndCompaction) { assert(isOwningHandle()); - setUseDirectWrites(nativeHandle_, useDirectWrites); + setUseDirectIoForFlushAndCompaction(nativeHandle_, + useDirectIoForFlushAndCompaction); return this; } @Override - public boolean useDirectWrites() { + public boolean useDirectIoForFlushAndCompaction() { assert(isOwningHandle()); - return useDirectWrites(nativeHandle_); + return useDirectIoForFlushAndCompaction(nativeHandle_); } @Override @@ -1025,8 +1026,9 @@ public class DBOptions private native long manifestPreallocationSize(long handle); private native void setUseDirectReads(long handle, boolean useDirectReads); private native boolean useDirectReads(long handle); - private native void setUseDirectWrites(long handle, boolean useDirectWrites); - private native boolean useDirectWrites(long handle); + private native void setUseDirectIoForFlushAndCompaction( + long handle, boolean useDirectIoForFlushAndCompaction); + private native boolean useDirectIoForFlushAndCompaction(long handle); private native void setAllowFAllocate(final long handle, final boolean allowFAllocate); private native boolean allowFAllocate(final long handle); diff --git a/java/src/main/java/org/rocksdb/DBOptionsInterface.java b/java/src/main/java/org/rocksdb/DBOptionsInterface.java index 2e0e295ed..72dd59340 100644 --- a/java/src/main/java/org/rocksdb/DBOptionsInterface.java +++ b/java/src/main/java/org/rocksdb/DBOptionsInterface.java @@ -804,21 +804,24 @@ public interface DBOptionsInterface { boolean useDirectReads(); /** - * Enable the OS to use direct I/O for writing sst tables. + * Enable the OS to use direct reads and writes in flush and + * compaction * Default: false * - * @param useDirectWrites if true, then direct write is enabled + * @param useDirectIoForFlushAndCompaction if true, then direct + * I/O will be enabled for background flush and compactions * @return the instance of the current object. */ - T setUseDirectWrites(boolean useDirectWrites); + T setUseDirectIoForFlushAndCompaction(boolean useDirectIoForFlushAndCompaction); /** - * Enable the OS to use direct I/O for writing sst tables. - * Default: false + * Enable the OS to use direct reads and writes in flush and + * compaction * - * @return if true, then direct writes are enabled + * @return if true, then direct I/O is enabled for flush and + * compaction */ - boolean useDirectWrites(); + boolean useDirectIoForFlushAndCompaction(); /** * Whether fallocate calls are allowed diff --git a/java/src/main/java/org/rocksdb/Options.java b/java/src/main/java/org/rocksdb/Options.java index fce34c2e1..f11ad392a 100644 --- a/java/src/main/java/org/rocksdb/Options.java +++ b/java/src/main/java/org/rocksdb/Options.java @@ -593,16 +593,17 @@ public class Options extends RocksObject } @Override - public Options setUseDirectWrites(final boolean useDirectWrites) { + public Options setUseDirectIoForFlushAndCompaction( + final boolean useDirectIoForFlushAndCompaction) { assert(isOwningHandle()); - setUseDirectWrites(nativeHandle_, useDirectWrites); + setUseDirectIoForFlushAndCompaction(nativeHandle_, useDirectIoForFlushAndCompaction); return this; } @Override - public boolean useDirectWrites() { + public boolean useDirectIoForFlushAndCompaction() { assert(isOwningHandle()); - return useDirectWrites(nativeHandle_); + return useDirectIoForFlushAndCompaction(nativeHandle_); } @Override @@ -1621,8 +1622,9 @@ public class Options extends RocksObject private native long manifestPreallocationSize(long handle); private native void setUseDirectReads(long handle, boolean useDirectReads); private native boolean useDirectReads(long handle); - private native void setUseDirectWrites(long handle, boolean useDirectWrites); - private native boolean useDirectWrites(long handle); + private native void setUseDirectIoForFlushAndCompaction( + long handle, boolean useDirectIoForFlushAndCompaction); + private native boolean useDirectIoForFlushAndCompaction(long handle); private native void setAllowFAllocate(final long handle, final boolean allowFAllocate); private native boolean allowFAllocate(final long handle); diff --git a/java/src/test/java/org/rocksdb/DBOptionsTest.java b/java/src/test/java/org/rocksdb/DBOptionsTest.java index be2083243..2bbaf1465 100644 --- a/java/src/test/java/org/rocksdb/DBOptionsTest.java +++ b/java/src/test/java/org/rocksdb/DBOptionsTest.java @@ -331,11 +331,11 @@ public class DBOptionsTest { } @Test - public void useDirectWrites() { + public void useDirectIoForFlushAndCompaction() { try(final DBOptions opt = new DBOptions()) { final boolean boolValue = rand.nextBoolean(); - opt.setUseDirectWrites(boolValue); - assertThat(opt.useDirectWrites()).isEqualTo(boolValue); + opt.setUseDirectIoForFlushAndCompaction(boolValue); + assertThat(opt.useDirectIoForFlushAndCompaction()).isEqualTo(boolValue); } } diff --git a/java/src/test/java/org/rocksdb/OptionsTest.java b/java/src/test/java/org/rocksdb/OptionsTest.java index 67f3a9f00..8b46ea6b7 100644 --- a/java/src/test/java/org/rocksdb/OptionsTest.java +++ b/java/src/test/java/org/rocksdb/OptionsTest.java @@ -553,11 +553,11 @@ public class OptionsTest { } @Test - public void useDirectWrites() { + public void useDirectIoForFlushAndCompaction() { try(final Options opt = new Options()) { final boolean boolValue = rand.nextBoolean(); - opt.setUseDirectWrites(boolValue); - assertThat(opt.useDirectWrites()).isEqualTo(boolValue); + opt.setUseDirectIoForFlushAndCompaction(boolValue); + assertThat(opt.useDirectIoForFlushAndCompaction()).isEqualTo(boolValue); } } diff --git a/options/db_options.cc b/options/db_options.cc index f1d1fad3d..25c8cf358 100644 --- a/options/db_options.cc +++ b/options/db_options.cc @@ -53,7 +53,8 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options) allow_mmap_reads(options.allow_mmap_reads), allow_mmap_writes(options.allow_mmap_writes), use_direct_reads(options.use_direct_reads), - use_direct_writes(options.use_direct_writes), + use_direct_io_for_flush_and_compaction( + options.use_direct_io_for_flush_and_compaction), allow_fallocate(options.allow_fallocate), is_fd_close_on_exec(options.is_fd_close_on_exec), advise_random_on_open(options.advise_random_on_open), @@ -127,8 +128,10 @@ void ImmutableDBOptions::Dump(Logger* log) const { allow_mmap_writes); ROCKS_LOG_HEADER(log, " Options.use_direct_reads: %d", use_direct_reads); - ROCKS_LOG_HEADER(log, " Options.use_direct_writes: %d", - use_direct_writes); + ROCKS_LOG_HEADER(log, + " " + "Options.use_direct_io_for_flush_and_compaction: %d", + use_direct_io_for_flush_and_compaction); ROCKS_LOG_HEADER(log, " Options.create_missing_column_families: %d", create_missing_column_families); ROCKS_LOG_HEADER(log, " Options.db_log_dir: %s", diff --git a/options/db_options.h b/options/db_options.h index 8b0135aca..bea73560a 100644 --- a/options/db_options.h +++ b/options/db_options.h @@ -48,7 +48,7 @@ struct ImmutableDBOptions { bool allow_mmap_reads; bool allow_mmap_writes; bool use_direct_reads; - bool use_direct_writes; + bool use_direct_io_for_flush_and_compaction; bool allow_fallocate; bool is_fd_close_on_exec; bool advise_random_on_open; diff --git a/options/options.cc b/options/options.cc index 2b35c043f..6fe52aa43 100644 --- a/options/options.cc +++ b/options/options.cc @@ -153,7 +153,8 @@ DBOptions::DBOptions(const Options& options) allow_mmap_reads(options.allow_mmap_reads), allow_mmap_writes(options.allow_mmap_writes), use_direct_reads(options.use_direct_reads), - use_direct_writes(options.use_direct_writes), + use_direct_io_for_flush_and_compaction( + options.use_direct_io_for_flush_and_compaction), allow_fallocate(options.allow_fallocate), is_fd_close_on_exec(options.is_fd_close_on_exec), skip_log_error_on_recovery(options.skip_log_error_on_recovery), diff --git a/options/options_helper.cc b/options/options_helper.cc index 15ee46c92..675eeb9fb 100644 --- a/options/options_helper.cc +++ b/options/options_helper.cc @@ -70,7 +70,8 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options, options.allow_mmap_reads = immutable_db_options.allow_mmap_reads; options.allow_mmap_writes = immutable_db_options.allow_mmap_writes; options.use_direct_reads = immutable_db_options.use_direct_reads; - options.use_direct_writes = immutable_db_options.use_direct_writes; + options.use_direct_io_for_flush_and_compaction = + immutable_db_options.use_direct_io_for_flush_and_compaction; options.allow_fallocate = immutable_db_options.allow_fallocate; options.is_fd_close_on_exec = immutable_db_options.is_fd_close_on_exec; options.stats_dump_period_sec = mutable_db_options.stats_dump_period_sec; diff --git a/options/options_helper.h b/options/options_helper.h index 23d6dc4e2..207e5edee 100644 --- a/options/options_helper.h +++ b/options/options_helper.h @@ -163,8 +163,10 @@ static std::unordered_map db_options_type_info = { {offsetof(struct DBOptions, use_direct_reads), OptionType::kBoolean, OptionVerificationType::kNormal, false, 0}}, {"use_direct_writes", - {offsetof(struct DBOptions, use_direct_writes), OptionType::kBoolean, - OptionVerificationType::kNormal, false, 0}}, + {0, OptionType::kBoolean, OptionVerificationType::kDeprecated, false, 0}}, + {"use_direct_io_for_flush_and_compaction", + {offsetof(struct DBOptions, use_direct_io_for_flush_and_compaction), + OptionType::kBoolean, OptionVerificationType::kNormal, false, 0}}, {"allow_2pc", {offsetof(struct DBOptions, allow_2pc), OptionType::kBoolean, OptionVerificationType::kNormal, false, 0}}, diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index 218a7bb24..04060834f 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -274,7 +274,7 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) { "allow_fallocate=true;" "allow_mmap_reads=false;" "use_direct_reads=false;" - "use_direct_writes=false;" + "use_direct_io_for_flush_and_compaction=false;" "max_log_file_size=4607;" "random_access_max_buffer_size=1048576;" "advise_random_on_open=true;" diff --git a/options/options_test.cc b/options/options_test.cc index 73acc2c1a..fc339af91 100644 --- a/options/options_test.cc +++ b/options/options_test.cc @@ -120,7 +120,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { {"allow_mmap_reads", "true"}, {"allow_mmap_writes", "false"}, {"use_direct_reads", "false"}, - {"use_direct_writes", "false"}, + {"use_direct_io_for_flush_and_compaction", "false"}, {"is_fd_close_on_exec", "true"}, {"skip_log_error_on_recovery", "false"}, {"stats_dump_period_sec", "46"}, @@ -234,7 +234,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) { ASSERT_EQ(new_db_opt.allow_mmap_reads, true); ASSERT_EQ(new_db_opt.allow_mmap_writes, false); ASSERT_EQ(new_db_opt.use_direct_reads, false); - ASSERT_EQ(new_db_opt.use_direct_writes, false); + ASSERT_EQ(new_db_opt.use_direct_io_for_flush_and_compaction, false); ASSERT_EQ(new_db_opt.is_fd_close_on_exec, true); ASSERT_EQ(new_db_opt.skip_log_error_on_recovery, false); ASSERT_EQ(new_db_opt.stats_dump_period_sec, 46U); diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 04d679a68..a52738df3 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -800,17 +800,18 @@ DEFINE_uint64(wal_size_limit_MB, 0, "Set the size limit for the WAL Files" " in MB."); DEFINE_uint64(max_total_wal_size, 0, "Set total max WAL size"); -DEFINE_bool(mmap_read, rocksdb::EnvOptions().use_mmap_reads, +DEFINE_bool(mmap_read, rocksdb::Options().allow_mmap_reads, "Allow reads to occur via mmap-ing files"); -DEFINE_bool(mmap_write, rocksdb::EnvOptions().use_mmap_writes, +DEFINE_bool(mmap_write, rocksdb::Options().allow_mmap_writes, "Allow writes to occur via mmap-ing files"); -DEFINE_bool(use_direct_reads, rocksdb::EnvOptions().use_direct_reads, +DEFINE_bool(use_direct_reads, rocksdb::Options().use_direct_reads, "Use O_DIRECT for reading data"); -DEFINE_bool(use_direct_writes, rocksdb::EnvOptions().use_direct_writes, - "Use O_DIRECT for writing data"); +DEFINE_bool(use_direct_io_for_flush_and_compaction, + rocksdb::Options().use_direct_io_for_flush_and_compaction, + "Use O_DIRECT for background flush and compaction I/O"); DEFINE_bool(advise_random_on_open, rocksdb::Options().advise_random_on_open, "Advise random access on table file open"); @@ -2813,7 +2814,8 @@ void VerifyDBFromDB(std::string& truth_db_name) { options.allow_mmap_reads = FLAGS_mmap_read; options.allow_mmap_writes = FLAGS_mmap_write; options.use_direct_reads = FLAGS_use_direct_reads; - options.use_direct_writes = FLAGS_use_direct_writes; + options.use_direct_io_for_flush_and_compaction = + FLAGS_use_direct_io_for_flush_and_compaction; #ifndef ROCKSDB_LITE options.compaction_options_fifo = CompactionOptionsFIFO( FLAGS_fifo_compaction_max_table_files_size_mb * 1024 * 1024); diff --git a/tools/db_bench_tool_test.cc b/tools/db_bench_tool_test.cc index 87de28290..d229d025e 100644 --- a/tools/db_bench_tool_test.cc +++ b/tools/db_bench_tool_test.cc @@ -211,7 +211,7 @@ const std::string options_file_content = R"OPTIONS_FILE( allow_mmap_reads=false allow_mmap_writes=false use_direct_reads=false - use_direct_writes=false + use_direct_io_for_flush_and_compaction=false stats_dump_period_sec=600 allow_fallocate=true max_log_file_size=83886080 diff --git a/tools/db_stress.cc b/tools/db_stress.cc index 4e8ea7e4a..41a697f48 100644 --- a/tools/db_stress.cc +++ b/tools/db_stress.cc @@ -269,16 +269,17 @@ DEFINE_string(db, "", "Use the db with the following name."); DEFINE_bool(verify_checksum, false, "Verify checksum for every block read from storage"); -DEFINE_bool(mmap_read, rocksdb::EnvOptions().use_mmap_reads, +DEFINE_bool(mmap_read, rocksdb::Options().allow_mmap_reads, "Allow reads to occur via mmap-ing files"); -DEFINE_bool(mmap_write, rocksdb::EnvOptions().use_mmap_writes, +DEFINE_bool(mmap_write, rocksdb::Options().allow_mmap_writes, "Allow writes to occur via mmap-ing files"); -DEFINE_bool(use_direct_reads, rocksdb::EnvOptions().use_direct_reads, +DEFINE_bool(use_direct_reads, rocksdb::Options().use_direct_reads, "Use O_DIRECT for reading data"); -DEFINE_bool(use_direct_writes, rocksdb::EnvOptions().use_direct_writes, +DEFINE_bool(use_direct_io_for_flush_and_compaction, + rocksdb::Options().use_direct_io_for_flush_and_compaction, "Use O_DIRECT for writing data"); // Database statistics @@ -2156,7 +2157,8 @@ class StressTest { options_.allow_mmap_reads = FLAGS_mmap_read; options_.allow_mmap_writes = FLAGS_mmap_write; options_.use_direct_reads = FLAGS_use_direct_reads; - options_.use_direct_writes = FLAGS_use_direct_writes; + options_.use_direct_io_for_flush_and_compaction = + FLAGS_use_direct_io_for_flush_and_compaction; options_.target_file_size_base = FLAGS_target_file_size_base; options_.target_file_size_multiplier = FLAGS_target_file_size_multiplier; options_.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base; diff --git a/util/testutil.cc b/util/testutil.cc index fef81f406..230f9d73c 100644 --- a/util/testutil.cc +++ b/util/testutil.cc @@ -242,7 +242,7 @@ void RandomInitDBOptions(DBOptions* db_opt, Random* rnd) { db_opt->allow_mmap_reads = rnd->Uniform(2); db_opt->allow_mmap_writes = rnd->Uniform(2); db_opt->use_direct_reads = rnd->Uniform(2); - db_opt->use_direct_writes = rnd->Uniform(2); + db_opt->use_direct_io_for_flush_and_compaction = rnd->Uniform(2); db_opt->create_if_missing = rnd->Uniform(2); db_opt->create_missing_column_families = rnd->Uniform(2); db_opt->enable_thread_tracking = rnd->Uniform(2);