change use_direct_writes to use_direct_io_for_flush_and_compaction

Summary:
Replace Options::use_direct_writes with Options::use_direct_io_for_flush_and_compaction
Now if Options::use_direct_io_for_flush_and_compaction = true, we will enable direct io for both reads and writes for flush and compaction job. Whereas Options::use_direct_reads controls user reads like iterator and Get().
Closes https://github.com/facebook/rocksdb/pull/2117

Differential Revision: D4860912

Pulled By: lightmark

fbshipit-source-id: d93575a8a5e780cf7e40797287edc425ee648c19
main
Aaron Gao 7 years ago committed by Facebook Github Bot
parent 13b50358fb
commit 44fa8ece9b
  1. 1
      HISTORY.md
  2. 10
      db/builder.cc
  3. 6
      db/c.cc
  4. 11
      db/compaction_job.cc
  5. 39
      db/db_compaction_test.cc
  6. 33
      db/db_flush_test.cc
  7. 15
      db/db_impl_open.cc
  8. 3
      db/db_test2.cc
  9. 4
      db/flush_job.cc
  10. 2
      db/forward_iterator_bench.cc
  11. 4
      db/repair.cc
  12. 5
      db/table_cache.cc
  13. 5
      db/version_set.cc
  14. 19
      env/env.cc
  15. 5
      include/rocksdb/c.h
  16. 15
      include/rocksdb/env.h
  17. 7
      include/rocksdb/options.h
  18. 42
      java/rocksjni/options.cc
  19. 16
      java/src/main/java/org/rocksdb/DBOptions.java
  20. 17
      java/src/main/java/org/rocksdb/DBOptionsInterface.java
  21. 14
      java/src/main/java/org/rocksdb/Options.java
  22. 6
      java/src/test/java/org/rocksdb/DBOptionsTest.java
  23. 6
      java/src/test/java/org/rocksdb/OptionsTest.java
  24. 9
      options/db_options.cc
  25. 2
      options/db_options.h
  26. 3
      options/options.cc
  27. 3
      options/options_helper.cc
  28. 6
      options/options_helper.h
  29. 2
      options/options_settable_test.cc
  30. 4
      options/options_test.cc
  31. 14
      tools/db_bench_tool.cc
  32. 2
      tools/db_bench_tool_test.cc
  33. 12
      tools/db_stress.cc
  34. 2
      util/testutil.cc

@ -8,6 +8,7 @@
* DB::Get in place of std::string accepts PinnableSlice, which avoids the extra memcpy of value to std::string in most of cases.
* PinnableSlice releases the pinned resources that contain the value when it is destructed or when ::Reset() is called on it.
* The old API that accepts std::string, although discouraged, is still supported.
* Replace Options::use_direct_writes with Options::use_direct_io_for_flush_and_compaction. Read Direct IO wiki for details.
### New Features
* Memtable flush can be avoided during checkpoint creation if total log file size is smaller than a threshold specified by the user.

@ -32,6 +32,7 @@
#include "util/file_reader_writer.h"
#include "util/filename.h"
#include "util/stop_watch.h"
#include "util/sync_point.h"
namespace rocksdb {
@ -103,6 +104,10 @@ Status BuildTable(
unique_ptr<WritableFileWriter> file_writer;
{
unique_ptr<WritableFile> file;
#ifndef NDEBUG
bool use_direct_writes = env_options.use_direct_writes;
TEST_SYNC_POINT_CALLBACK("BuildTable:create_file", &use_direct_writes);
#endif // !NDEBUG
s = NewWritableFile(env, fname, &file, env_options);
if (!s.ok()) {
EventHelpers::LogAndNotifyTableFileCreationFinished(
@ -180,6 +185,11 @@ Status BuildTable(
if (s.ok() && !empty) {
// Verify that the table is usable
// We set for_compaction to false and don't OptimizeForCompactionTableRead
// here because this is a special case after we finish the table building
// No matter whether use_direct_io_for_flush_and_compaction is true,
// we will regrad this verification as user reads since the goal is
// to cache it here for further user reads
std::unique_ptr<InternalIterator> it(table_cache->NewIterator(
ReadOptions(), env_options, internal_comparator, meta->fd,
nullptr /* range_del_agg */, nullptr,

@ -2121,9 +2121,9 @@ void rocksdb_options_set_use_direct_reads(rocksdb_options_t* opt,
opt->rep.use_direct_reads = v;
}
void rocksdb_options_set_use_direct_writes(rocksdb_options_t* opt,
unsigned char v) {
opt->rep.use_direct_writes = v;
void rocksdb_options_set_use_direct_io_for_flush_and_compaction(
rocksdb_options_t* opt, unsigned char v) {
opt->rep.use_direct_io_for_flush_and_compaction = v;
}
void rocksdb_options_set_allow_mmap_reads(

@ -1072,6 +1072,11 @@ Status CompactionJob::FinishCompactionOutputFile(
TableProperties tp;
if (s.ok() && current_entries > 0) {
// Verify that the table is usable
// We set for_compaction to false and don't OptimizeForCompactionTableRead
// here because this is a special case after we finish the table building
// No matter whether use_direct_io_for_flush_and_compaction is true,
// we will regrad this verification as user reads since the goal is
// to cache it here for further user reads
InternalIterator* iter = cfd->table_cache()->NewIterator(
ReadOptions(), env_options_, cfd->internal_comparator(), meta->fd,
nullptr /* range_del_agg */, nullptr,
@ -1198,7 +1203,11 @@ Status CompactionJob::OpenCompactionOutputFile(
#endif // !ROCKSDB_LITE
// Make the output file
unique_ptr<WritableFile> writable_file;
Status s = NewWritableFile(env_, fname, &writable_file, env_options_);
EnvOptions opt_env_opts =
env_->OptimizeForCompactionTableWrite(env_options_, db_options_);
TEST_SYNC_POINT_CALLBACK("CompactionJob::OpenCompactionOutputFile",
&opt_env_opts.use_direct_writes);
Status s = NewWritableFile(env_, fname, &writable_file, opt_env_opts);
if (!s.ok()) {
ROCKS_LOG_ERROR(
db_options_.info_log,

@ -40,6 +40,12 @@ class DBCompactionTestWithParam
bool exclusive_manual_compaction_;
};
class DBCompactionDirectIOTest : public DBCompactionTest,
public ::testing::WithParamInterface<bool> {
public:
DBCompactionDirectIOTest() : DBCompactionTest() {}
};
namespace {
class FlushedFileCollector : public EventListener {
@ -2552,6 +2558,39 @@ INSTANTIATE_TEST_CASE_P(DBCompactionTestWithParam, DBCompactionTestWithParam,
std::make_tuple(4, true),
std::make_tuple(4, false)));
TEST_P(DBCompactionDirectIOTest, DirectIO) {
Options options = CurrentOptions();
Destroy(options);
options.create_if_missing = true;
options.disable_auto_compactions = true;
options.use_direct_io_for_flush_and_compaction = GetParam();
options.env = new MockEnv(Env::Default());
Reopen(options);
SyncPoint::GetInstance()->SetCallBack(
"TableCache::NewIterator:for_compaction", [&](void* arg) {
bool* use_direct_reads = static_cast<bool*>(arg);
ASSERT_EQ(*use_direct_reads,
options.use_direct_io_for_flush_and_compaction);
});
SyncPoint::GetInstance()->SetCallBack(
"CompactionJob::OpenCompactionOutputFile", [&](void* arg) {
bool* use_direct_writes = static_cast<bool*>(arg);
ASSERT_EQ(*use_direct_writes,
options.use_direct_io_for_flush_and_compaction);
});
SyncPoint::GetInstance()->EnableProcessing();
CreateAndReopenWithCF({"pikachu"}, options);
MakeTables(3, "p", "q", 1);
ASSERT_EQ("1,1,1", FilesPerLevel(1));
Compact(1, "p1", "p9");
ASSERT_EQ("0,0,1", FilesPerLevel(1));
Destroy(options);
delete options.env;
}
INSTANTIATE_TEST_CASE_P(DBCompactionDirectIOTest, DBCompactionDirectIOTest,
testing::Bool());
class CompactionPriTest : public DBTestBase,
public testing::WithParamInterface<uint32_t> {
public:

@ -19,6 +19,12 @@ class DBFlushTest : public DBTestBase {
DBFlushTest() : DBTestBase("/db_flush_test") {}
};
class DBFlushDirectIOTest : public DBFlushTest,
public ::testing::WithParamInterface<bool> {
public:
DBFlushDirectIOTest() : DBFlushTest() {}
};
// We had issue when two background threads trying to flush at the same time,
// only one of them get committed. The test verifies the issue is fixed.
TEST_F(DBFlushTest, FlushWhileWritingManifest) {
@ -83,6 +89,33 @@ TEST_F(DBFlushTest, SyncFail) {
Destroy(options);
}
TEST_P(DBFlushDirectIOTest, DirectIO) {
Options options;
options.create_if_missing = true;
options.disable_auto_compactions = true;
options.max_background_flushes = 2;
options.use_direct_io_for_flush_and_compaction = GetParam();
options.env = new MockEnv(Env::Default());
SyncPoint::GetInstance()->SetCallBack(
"BuildTable:create_file", [&](void* arg) {
bool* use_direct_writes = static_cast<bool*>(arg);
ASSERT_EQ(*use_direct_writes,
options.use_direct_io_for_flush_and_compaction);
});
SyncPoint::GetInstance()->EnableProcessing();
Reopen(options);
ASSERT_OK(Put("foo", "v"));
FlushOptions flush_options;
flush_options.wait = true;
ASSERT_OK(dbfull()->Flush(flush_options));
Destroy(options);
delete options.env;
}
INSTANTIATE_TEST_CASE_P(DBFlushDirectIOTest, DBFlushDirectIOTest,
testing::Bool());
} // namespace rocksdb
int main(int argc, char** argv) {

@ -101,7 +101,8 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src) {
result.compaction_readahead_size = 1024 * 1024 * 2;
}
if (result.compaction_readahead_size > 0) {
if (result.compaction_readahead_size > 0 ||
result.use_direct_io_for_flush_and_compaction) {
result.new_table_reader_for_compaction_inputs = true;
}
@ -165,10 +166,12 @@ static Status ValidateOptions(
"then direct I/O reads (use_direct_reads) must be disabled. ");
}
if (db_options.allow_mmap_writes && db_options.use_direct_writes) {
if (db_options.allow_mmap_writes &&
db_options.use_direct_io_for_flush_and_compaction) {
return Status::NotSupported(
"If memory mapped writes (allow_mmap_writes) are enabled "
"then direct I/O writes (use_direct_writes) must be disabled. ");
"then direct I/O writes (use_direct_io_for_flush_and_compaction) must "
"be disabled. ");
}
if (db_options.keep_log_file_num == 0) {
@ -823,9 +826,11 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
std::vector<SequenceNumber> snapshot_seqs =
snapshots_.GetAll(&earliest_write_conflict_snapshot);
EnvOptions optimized_env_options =
env_->OptimizeForCompactionTableWrite(env_options_, immutable_db_options_);
s = BuildTable(
dbname_, env_, *cfd->ioptions(), mutable_cf_options, env_options_,
cfd->table_cache(), iter.get(),
dbname_, env_, *cfd->ioptions(), mutable_cf_options,
optimized_env_options, cfd->table_cache(), iter.get(),
std::unique_ptr<InternalIterator>(mem->NewRangeTombstoneIterator(ro)),
&meta, cfd->internal_comparator(),
cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),

@ -1995,7 +1995,8 @@ TEST_F(DBTest2, DirectIO) {
return;
}
Options options = CurrentOptions();
options.use_direct_reads = options.use_direct_writes = true;
options.use_direct_reads = options.use_direct_io_for_flush_and_compaction =
true;
options.allow_mmap_reads = options.allow_mmap_writes = false;
DestroyAndReopen(options);

@ -294,9 +294,11 @@ Status FlushJob::WriteLevel0Table() {
TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table:output_compression",
&output_compression_);
EnvOptions optimized_env_options =
db_options_.env->OptimizeForCompactionTableWrite(env_options_, db_options_);
s = BuildTable(
dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_,
env_options_, cfd_->table_cache(), iter.get(),
optimized_env_options, cfd_->table_cache(), iter.get(),
std::move(range_del_iter), &meta_, cfd_->internal_comparator(),
cfd_->int_tbl_prop_collector_factories(), cfd_->GetID(),
cfd_->GetName(), existing_snapshots_,

@ -331,7 +331,7 @@ int main(int argc, char** argv) {
options.compaction_style = rocksdb::CompactionStyle::kCompactionStyleNone;
options.level0_slowdown_writes_trigger = 99999;
options.level0_stop_writes_trigger = 99999;
options.use_direct_writes = true;
options.use_direct_io_for_flush_and_compaction = true;
options.write_buffer_size = FLAGS_memtable_size;
rocksdb::BlockBasedTableOptions table_options;
table_options.block_cache = rocksdb::NewLRUCache(FLAGS_block_cache_size);

@ -377,9 +377,11 @@ class Repairer {
ro.total_order_seek = true;
Arena arena;
ScopedArenaIterator iter(mem->NewIterator(ro, &arena));
EnvOptions optimized_env_options =
env_->OptimizeForCompactionTableWrite(env_options_, immutable_db_options_);
status = BuildTable(
dbname_, env_, *cfd->ioptions(), *cfd->GetLatestMutableCFOptions(),
env_options_, table_cache_, iter.get(),
optimized_env_options, table_cache_, iter.get(),
std::unique_ptr<InternalIterator>(mem->NewRangeTombstoneIterator(ro)),
&meta, cfd->internal_comparator(),
cfd->int_tbl_prop_collector_factories(), cfd->GetID(), cfd->GetName(),

@ -184,6 +184,11 @@ InternalIterator* TableCache::NewIterator(
}
size_t readahead = 0;
if (for_compaction) {
#ifndef NDEBUG
bool use_direct_reads_for_compaction = env_options.use_direct_reads;
TEST_SYNC_POINT_CALLBACK("TableCache::NewIterator:for_compaction",
&use_direct_reads_for_compaction);
#endif // !NDEBUG
if (ioptions_.new_table_reader_for_compaction_inputs) {
readahead = ioptions_.compaction_readahead_size;
create_new_table_reader = true;

@ -2218,7 +2218,8 @@ VersionSet::VersionSet(const std::string& dbname,
current_version_number_(0),
manifest_file_size_(0),
env_options_(storage_options),
env_options_compactions_(env_options_) {}
env_options_compactions_(
env_->OptimizeForCompactionTableRead(env_options_, *db_options_)) {}
void CloseTables(void* ptr, size_t) {
TableReader* table_reader = reinterpret_cast<TableReader*>(ptr);
@ -3477,7 +3478,7 @@ InternalIterator* VersionSet::MakeInputIterator(
// Create concatenating iterator for the files from this level
list[num++] = NewTwoLevelIterator(
new LevelFileIteratorState(
cfd->table_cache(), read_options, env_options_,
cfd->table_cache(), read_options, env_options_compactions_,
cfd->internal_comparator(),
nullptr /* no per level latency histogram */,
true /* for_compaction */, false /* prefix enabled */,

19
env/env.cc vendored

@ -10,9 +10,9 @@
#include "rocksdb/env.h"
#include <thread>
#include "options/db_options.h"
#include "port/port.h"
#include "port/sys_time.h"
#include "rocksdb/options.h"
#include "util/arena.h"
#include "util/autovector.h"
@ -316,7 +316,6 @@ void AssignEnvOptions(EnvOptions* env_options, const DBOptions& options) {
env_options->use_mmap_reads = options.allow_mmap_reads;
env_options->use_mmap_writes = options.allow_mmap_writes;
env_options->use_direct_reads = options.use_direct_reads;
env_options->use_direct_writes = options.use_direct_writes;
env_options->set_fd_cloexec = options.is_fd_close_on_exec;
env_options->bytes_per_sync = options.bytes_per_sync;
env_options->compaction_readahead_size = options.compaction_readahead_size;
@ -341,6 +340,22 @@ EnvOptions Env::OptimizeForManifestWrite(const EnvOptions& env_options) const {
return env_options;
}
EnvOptions Env::OptimizeForCompactionTableWrite(
const EnvOptions& env_options, const ImmutableDBOptions& db_options) const {
EnvOptions optimized_env_options(env_options);
optimized_env_options.use_direct_writes =
db_options.use_direct_io_for_flush_and_compaction;
return optimized_env_options;
}
EnvOptions Env::OptimizeForCompactionTableRead(
const EnvOptions& env_options, const ImmutableDBOptions& db_options) const {
EnvOptions optimized_env_options(env_options);
optimized_env_options.use_direct_reads =
db_options.use_direct_io_for_flush_and_compaction;
return optimized_env_options;
}
EnvOptions::EnvOptions(const DBOptions& options) {
AssignEnvOptions(this, options);
}

@ -804,8 +804,9 @@ extern ROCKSDB_LIBRARY_API void rocksdb_options_set_allow_mmap_writes(
rocksdb_options_t*, unsigned char);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_use_direct_reads(
rocksdb_options_t*, unsigned char);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_use_direct_writes(
rocksdb_options_t*, unsigned char);
extern ROCKSDB_LIBRARY_API void
rocksdb_options_set_use_direct_io_for_flush_and_compaction(rocksdb_options_t*,
unsigned char);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_is_fd_close_on_exec(
rocksdb_options_t*, unsigned char);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_skip_log_error_on_recovery(

@ -44,6 +44,7 @@ class WritableFile;
class RandomRWFile;
class Directory;
struct DBOptions;
struct ImmutableDBOptions;
class RateLimiter;
class ThreadStatusUpdater;
struct ThreadStatus;
@ -375,6 +376,20 @@ class Env {
virtual EnvOptions OptimizeForManifestWrite(
const EnvOptions& env_options) const;
// OptimizeForCompactionTableWrite will create a new EnvOptions object that is a copy
// of the EnvOptions in the parameters, but is optimized for writing table
// files. Default implementation returns the copy of the same object.
virtual EnvOptions OptimizeForCompactionTableWrite(
const EnvOptions& env_options,
const ImmutableDBOptions& db_options) const;
// OptimizeForCompactionTableWrite will create a new EnvOptions object that is a copy
// of the EnvOptions in the parameters, but is optimized for reading table
// files. Default implementation returns the copy of the same object.
virtual EnvOptions OptimizeForCompactionTableRead(
const EnvOptions& env_options,
const ImmutableDBOptions& db_options) const;
// Returns the status of all threads that belong to the current Env.
virtual Status GetThreadList(std::vector<ThreadStatus>* thread_list) {
return Status::NotSupported("Not supported.");

@ -581,15 +581,16 @@ struct DBOptions {
// bufferized. The hardware buffer of the devices may however still
// be used. Memory mapped files are not impacted by these parameters.
// Use O_DIRECT for reading file
// Use O_DIRECT for user reads
// Default: false
// Not supported in ROCKSDB_LITE mode!
bool use_direct_reads = false;
// Use O_DIRECT for writing file
// Use O_DIRECT for both reads and writes in background flush and compactions
// When true, we also force new_table_reader_for_compaction_inputs to true.
// Default: false
// Not supported in ROCKSDB_LITE mode!
bool use_direct_writes = false;
bool use_direct_io_for_flush_and_compaction = false;
// If false, fallocate() calls are bypassed
bool allow_fallocate = true;

@ -1052,24 +1052,26 @@ void Java_org_rocksdb_Options_setUseDirectReads(JNIEnv* env, jobject jobj,
/*
* Class: org_rocksdb_Options
* Method: useDirectWrites
* Method: useDirectIoForFlushAndCompaction
* Signature: (J)Z
*/
jboolean Java_org_rocksdb_Options_useDirectWrites(JNIEnv* env, jobject jobj,
jlong jhandle) {
return reinterpret_cast<rocksdb::Options*>(jhandle)->use_direct_writes;
jboolean Java_org_rocksdb_Options_useDirectIoForFlushAndCompaction(
JNIEnv* env, jobject jobj, jlong jhandle) {
return reinterpret_cast<rocksdb::Options*>(jhandle)
->use_direct_io_for_flush_and_compaction;
}
/*
* Class: org_rocksdb_Options
* Method: setUseDirectReads
* Method: setUseDirectIoForFlushAndCompaction
* Signature: (JZ)V
*/
void Java_org_rocksdb_Options_setUseDirectWrites(JNIEnv* env, jobject jobj,
jlong jhandle,
jboolean use_direct_writes) {
reinterpret_cast<rocksdb::Options*>(jhandle)->use_direct_writes =
static_cast<bool>(use_direct_writes);
void Java_org_rocksdb_Options_setUseDirectIoForFlushAndCompaction(
JNIEnv* env, jobject jobj, jlong jhandle,
jboolean use_direct_io_for_flush_and_compaction) {
reinterpret_cast<rocksdb::Options*>(jhandle)
->use_direct_io_for_flush_and_compaction =
static_cast<bool>(use_direct_io_for_flush_and_compaction);
}
/*
@ -4920,12 +4922,13 @@ void Java_org_rocksdb_DBOptions_setUseDirectReads(JNIEnv* env, jobject jobj,
/*
* Class: org_rocksdb_DBOptions
* Method: useDirectWrites
* Method: useDirectIoForFlushAndCompaction
* Signature: (J)Z
*/
jboolean Java_org_rocksdb_DBOptions_useDirectWrites(JNIEnv* env, jobject jobj,
jlong jhandle) {
return reinterpret_cast<rocksdb::DBOptions*>(jhandle)->use_direct_writes;
jboolean Java_org_rocksdb_DBOptions_useDirectIoForFlushAndCompaction(
JNIEnv* env, jobject jobj, jlong jhandle) {
return reinterpret_cast<rocksdb::DBOptions*>(jhandle)
->use_direct_io_for_flush_and_compaction;
}
/*
@ -4933,11 +4936,12 @@ jboolean Java_org_rocksdb_DBOptions_useDirectWrites(JNIEnv* env, jobject jobj,
* Method: setUseDirectReads
* Signature: (JZ)V
*/
void Java_org_rocksdb_DBOptions_setUseDirectWrites(JNIEnv* env, jobject jobj,
jlong jhandle,
jboolean use_direct_writes) {
reinterpret_cast<rocksdb::DBOptions*>(jhandle)->use_direct_writes =
static_cast<bool>(use_direct_writes);
void Java_org_rocksdb_DBOptions_setUseDirectIoForFlushAndCompaction(
JNIEnv* env, jobject jobj, jlong jhandle,
jboolean use_direct_io_for_flush_and_compaction) {
reinterpret_cast<rocksdb::DBOptions*>(jhandle)
->use_direct_io_for_flush_and_compaction =
static_cast<bool>(use_direct_io_for_flush_and_compaction);
}
/*

@ -531,17 +531,18 @@ public class DBOptions
}
@Override
public DBOptions setUseDirectWrites(
final boolean useDirectWrites) {
public DBOptions setUseDirectIoForFlushAndCompaction(
final boolean useDirectIoForFlushAndCompaction) {
assert(isOwningHandle());
setUseDirectWrites(nativeHandle_, useDirectWrites);
setUseDirectIoForFlushAndCompaction(nativeHandle_,
useDirectIoForFlushAndCompaction);
return this;
}
@Override
public boolean useDirectWrites() {
public boolean useDirectIoForFlushAndCompaction() {
assert(isOwningHandle());
return useDirectWrites(nativeHandle_);
return useDirectIoForFlushAndCompaction(nativeHandle_);
}
@Override
@ -1025,8 +1026,9 @@ public class DBOptions
private native long manifestPreallocationSize(long handle);
private native void setUseDirectReads(long handle, boolean useDirectReads);
private native boolean useDirectReads(long handle);
private native void setUseDirectWrites(long handle, boolean useDirectWrites);
private native boolean useDirectWrites(long handle);
private native void setUseDirectIoForFlushAndCompaction(
long handle, boolean useDirectIoForFlushAndCompaction);
private native boolean useDirectIoForFlushAndCompaction(long handle);
private native void setAllowFAllocate(final long handle,
final boolean allowFAllocate);
private native boolean allowFAllocate(final long handle);

@ -804,21 +804,24 @@ public interface DBOptionsInterface<T extends DBOptionsInterface> {
boolean useDirectReads();
/**
* Enable the OS to use direct I/O for writing sst tables.
* Enable the OS to use direct reads and writes in flush and
* compaction
* Default: false
*
* @param useDirectWrites if true, then direct write is enabled
* @param useDirectIoForFlushAndCompaction if true, then direct
* I/O will be enabled for background flush and compactions
* @return the instance of the current object.
*/
T setUseDirectWrites(boolean useDirectWrites);
T setUseDirectIoForFlushAndCompaction(boolean useDirectIoForFlushAndCompaction);
/**
* Enable the OS to use direct I/O for writing sst tables.
* Default: false
* Enable the OS to use direct reads and writes in flush and
* compaction
*
* @return if true, then direct writes are enabled
* @return if true, then direct I/O is enabled for flush and
* compaction
*/
boolean useDirectWrites();
boolean useDirectIoForFlushAndCompaction();
/**
* Whether fallocate calls are allowed

@ -593,16 +593,17 @@ public class Options extends RocksObject
}
@Override
public Options setUseDirectWrites(final boolean useDirectWrites) {
public Options setUseDirectIoForFlushAndCompaction(
final boolean useDirectIoForFlushAndCompaction) {
assert(isOwningHandle());
setUseDirectWrites(nativeHandle_, useDirectWrites);
setUseDirectIoForFlushAndCompaction(nativeHandle_, useDirectIoForFlushAndCompaction);
return this;
}
@Override
public boolean useDirectWrites() {
public boolean useDirectIoForFlushAndCompaction() {
assert(isOwningHandle());
return useDirectWrites(nativeHandle_);
return useDirectIoForFlushAndCompaction(nativeHandle_);
}
@Override
@ -1621,8 +1622,9 @@ public class Options extends RocksObject
private native long manifestPreallocationSize(long handle);
private native void setUseDirectReads(long handle, boolean useDirectReads);
private native boolean useDirectReads(long handle);
private native void setUseDirectWrites(long handle, boolean useDirectWrites);
private native boolean useDirectWrites(long handle);
private native void setUseDirectIoForFlushAndCompaction(
long handle, boolean useDirectIoForFlushAndCompaction);
private native boolean useDirectIoForFlushAndCompaction(long handle);
private native void setAllowFAllocate(final long handle,
final boolean allowFAllocate);
private native boolean allowFAllocate(final long handle);

@ -331,11 +331,11 @@ public class DBOptionsTest {
}
@Test
public void useDirectWrites() {
public void useDirectIoForFlushAndCompaction() {
try(final DBOptions opt = new DBOptions()) {
final boolean boolValue = rand.nextBoolean();
opt.setUseDirectWrites(boolValue);
assertThat(opt.useDirectWrites()).isEqualTo(boolValue);
opt.setUseDirectIoForFlushAndCompaction(boolValue);
assertThat(opt.useDirectIoForFlushAndCompaction()).isEqualTo(boolValue);
}
}

@ -553,11 +553,11 @@ public class OptionsTest {
}
@Test
public void useDirectWrites() {
public void useDirectIoForFlushAndCompaction() {
try(final Options opt = new Options()) {
final boolean boolValue = rand.nextBoolean();
opt.setUseDirectWrites(boolValue);
assertThat(opt.useDirectWrites()).isEqualTo(boolValue);
opt.setUseDirectIoForFlushAndCompaction(boolValue);
assertThat(opt.useDirectIoForFlushAndCompaction()).isEqualTo(boolValue);
}
}

@ -53,7 +53,8 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options)
allow_mmap_reads(options.allow_mmap_reads),
allow_mmap_writes(options.allow_mmap_writes),
use_direct_reads(options.use_direct_reads),
use_direct_writes(options.use_direct_writes),
use_direct_io_for_flush_and_compaction(
options.use_direct_io_for_flush_and_compaction),
allow_fallocate(options.allow_fallocate),
is_fd_close_on_exec(options.is_fd_close_on_exec),
advise_random_on_open(options.advise_random_on_open),
@ -127,8 +128,10 @@ void ImmutableDBOptions::Dump(Logger* log) const {
allow_mmap_writes);
ROCKS_LOG_HEADER(log, " Options.use_direct_reads: %d",
use_direct_reads);
ROCKS_LOG_HEADER(log, " Options.use_direct_writes: %d",
use_direct_writes);
ROCKS_LOG_HEADER(log,
" "
"Options.use_direct_io_for_flush_and_compaction: %d",
use_direct_io_for_flush_and_compaction);
ROCKS_LOG_HEADER(log, " Options.create_missing_column_families: %d",
create_missing_column_families);
ROCKS_LOG_HEADER(log, " Options.db_log_dir: %s",

@ -48,7 +48,7 @@ struct ImmutableDBOptions {
bool allow_mmap_reads;
bool allow_mmap_writes;
bool use_direct_reads;
bool use_direct_writes;
bool use_direct_io_for_flush_and_compaction;
bool allow_fallocate;
bool is_fd_close_on_exec;
bool advise_random_on_open;

@ -153,7 +153,8 @@ DBOptions::DBOptions(const Options& options)
allow_mmap_reads(options.allow_mmap_reads),
allow_mmap_writes(options.allow_mmap_writes),
use_direct_reads(options.use_direct_reads),
use_direct_writes(options.use_direct_writes),
use_direct_io_for_flush_and_compaction(
options.use_direct_io_for_flush_and_compaction),
allow_fallocate(options.allow_fallocate),
is_fd_close_on_exec(options.is_fd_close_on_exec),
skip_log_error_on_recovery(options.skip_log_error_on_recovery),

@ -70,7 +70,8 @@ DBOptions BuildDBOptions(const ImmutableDBOptions& immutable_db_options,
options.allow_mmap_reads = immutable_db_options.allow_mmap_reads;
options.allow_mmap_writes = immutable_db_options.allow_mmap_writes;
options.use_direct_reads = immutable_db_options.use_direct_reads;
options.use_direct_writes = immutable_db_options.use_direct_writes;
options.use_direct_io_for_flush_and_compaction =
immutable_db_options.use_direct_io_for_flush_and_compaction;
options.allow_fallocate = immutable_db_options.allow_fallocate;
options.is_fd_close_on_exec = immutable_db_options.is_fd_close_on_exec;
options.stats_dump_period_sec = mutable_db_options.stats_dump_period_sec;

@ -163,8 +163,10 @@ static std::unordered_map<std::string, OptionTypeInfo> db_options_type_info = {
{offsetof(struct DBOptions, use_direct_reads), OptionType::kBoolean,
OptionVerificationType::kNormal, false, 0}},
{"use_direct_writes",
{offsetof(struct DBOptions, use_direct_writes), OptionType::kBoolean,
OptionVerificationType::kNormal, false, 0}},
{0, OptionType::kBoolean, OptionVerificationType::kDeprecated, false, 0}},
{"use_direct_io_for_flush_and_compaction",
{offsetof(struct DBOptions, use_direct_io_for_flush_and_compaction),
OptionType::kBoolean, OptionVerificationType::kNormal, false, 0}},
{"allow_2pc",
{offsetof(struct DBOptions, allow_2pc), OptionType::kBoolean,
OptionVerificationType::kNormal, false, 0}},

@ -274,7 +274,7 @@ TEST_F(OptionsSettableTest, DBOptionsAllFieldsSettable) {
"allow_fallocate=true;"
"allow_mmap_reads=false;"
"use_direct_reads=false;"
"use_direct_writes=false;"
"use_direct_io_for_flush_and_compaction=false;"
"max_log_file_size=4607;"
"random_access_max_buffer_size=1048576;"
"advise_random_on_open=true;"

@ -120,7 +120,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
{"allow_mmap_reads", "true"},
{"allow_mmap_writes", "false"},
{"use_direct_reads", "false"},
{"use_direct_writes", "false"},
{"use_direct_io_for_flush_and_compaction", "false"},
{"is_fd_close_on_exec", "true"},
{"skip_log_error_on_recovery", "false"},
{"stats_dump_period_sec", "46"},
@ -234,7 +234,7 @@ TEST_F(OptionsTest, GetOptionsFromMapTest) {
ASSERT_EQ(new_db_opt.allow_mmap_reads, true);
ASSERT_EQ(new_db_opt.allow_mmap_writes, false);
ASSERT_EQ(new_db_opt.use_direct_reads, false);
ASSERT_EQ(new_db_opt.use_direct_writes, false);
ASSERT_EQ(new_db_opt.use_direct_io_for_flush_and_compaction, false);
ASSERT_EQ(new_db_opt.is_fd_close_on_exec, true);
ASSERT_EQ(new_db_opt.skip_log_error_on_recovery, false);
ASSERT_EQ(new_db_opt.stats_dump_period_sec, 46U);

@ -800,17 +800,18 @@ DEFINE_uint64(wal_size_limit_MB, 0, "Set the size limit for the WAL Files"
" in MB.");
DEFINE_uint64(max_total_wal_size, 0, "Set total max WAL size");
DEFINE_bool(mmap_read, rocksdb::EnvOptions().use_mmap_reads,
DEFINE_bool(mmap_read, rocksdb::Options().allow_mmap_reads,
"Allow reads to occur via mmap-ing files");
DEFINE_bool(mmap_write, rocksdb::EnvOptions().use_mmap_writes,
DEFINE_bool(mmap_write, rocksdb::Options().allow_mmap_writes,
"Allow writes to occur via mmap-ing files");
DEFINE_bool(use_direct_reads, rocksdb::EnvOptions().use_direct_reads,
DEFINE_bool(use_direct_reads, rocksdb::Options().use_direct_reads,
"Use O_DIRECT for reading data");
DEFINE_bool(use_direct_writes, rocksdb::EnvOptions().use_direct_writes,
"Use O_DIRECT for writing data");
DEFINE_bool(use_direct_io_for_flush_and_compaction,
rocksdb::Options().use_direct_io_for_flush_and_compaction,
"Use O_DIRECT for background flush and compaction I/O");
DEFINE_bool(advise_random_on_open, rocksdb::Options().advise_random_on_open,
"Advise random access on table file open");
@ -2813,7 +2814,8 @@ void VerifyDBFromDB(std::string& truth_db_name) {
options.allow_mmap_reads = FLAGS_mmap_read;
options.allow_mmap_writes = FLAGS_mmap_write;
options.use_direct_reads = FLAGS_use_direct_reads;
options.use_direct_writes = FLAGS_use_direct_writes;
options.use_direct_io_for_flush_and_compaction =
FLAGS_use_direct_io_for_flush_and_compaction;
#ifndef ROCKSDB_LITE
options.compaction_options_fifo = CompactionOptionsFIFO(
FLAGS_fifo_compaction_max_table_files_size_mb * 1024 * 1024);

@ -211,7 +211,7 @@ const std::string options_file_content = R"OPTIONS_FILE(
allow_mmap_reads=false
allow_mmap_writes=false
use_direct_reads=false
use_direct_writes=false
use_direct_io_for_flush_and_compaction=false
stats_dump_period_sec=600
allow_fallocate=true
max_log_file_size=83886080

@ -269,16 +269,17 @@ DEFINE_string(db, "", "Use the db with the following name.");
DEFINE_bool(verify_checksum, false,
"Verify checksum for every block read from storage");
DEFINE_bool(mmap_read, rocksdb::EnvOptions().use_mmap_reads,
DEFINE_bool(mmap_read, rocksdb::Options().allow_mmap_reads,
"Allow reads to occur via mmap-ing files");
DEFINE_bool(mmap_write, rocksdb::EnvOptions().use_mmap_writes,
DEFINE_bool(mmap_write, rocksdb::Options().allow_mmap_writes,
"Allow writes to occur via mmap-ing files");
DEFINE_bool(use_direct_reads, rocksdb::EnvOptions().use_direct_reads,
DEFINE_bool(use_direct_reads, rocksdb::Options().use_direct_reads,
"Use O_DIRECT for reading data");
DEFINE_bool(use_direct_writes, rocksdb::EnvOptions().use_direct_writes,
DEFINE_bool(use_direct_io_for_flush_and_compaction,
rocksdb::Options().use_direct_io_for_flush_and_compaction,
"Use O_DIRECT for writing data");
// Database statistics
@ -2156,7 +2157,8 @@ class StressTest {
options_.allow_mmap_reads = FLAGS_mmap_read;
options_.allow_mmap_writes = FLAGS_mmap_write;
options_.use_direct_reads = FLAGS_use_direct_reads;
options_.use_direct_writes = FLAGS_use_direct_writes;
options_.use_direct_io_for_flush_and_compaction =
FLAGS_use_direct_io_for_flush_and_compaction;
options_.target_file_size_base = FLAGS_target_file_size_base;
options_.target_file_size_multiplier = FLAGS_target_file_size_multiplier;
options_.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base;

@ -242,7 +242,7 @@ void RandomInitDBOptions(DBOptions* db_opt, Random* rnd) {
db_opt->allow_mmap_reads = rnd->Uniform(2);
db_opt->allow_mmap_writes = rnd->Uniform(2);
db_opt->use_direct_reads = rnd->Uniform(2);
db_opt->use_direct_writes = rnd->Uniform(2);
db_opt->use_direct_io_for_flush_and_compaction = rnd->Uniform(2);
db_opt->create_if_missing = rnd->Uniform(2);
db_opt->create_missing_column_families = rnd->Uniform(2);
db_opt->enable_thread_tracking = rnd->Uniform(2);

Loading…
Cancel
Save