diff --git a/TARGETS b/TARGETS index ea89b657f..d6b14caa2 100644 --- a/TARGETS +++ b/TARGETS @@ -780,6 +780,7 @@ cpp_library( "test_util/testutil.cc", "tools/block_cache_analyzer/block_cache_trace_analyzer.cc", "tools/db_bench_tool.cc", + "tools/simulated_hybrid_file_system.cc", "tools/trace_analyzer_tool.cc", ], auto_headers = AutoHeaders.RECURSIVE_GLOB, diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index e0f034aeb..36a70ce2e 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -1685,9 +1685,16 @@ Status CompactionJob::OpenCompactionOutputFile( TEST_SYNC_POINT_CALLBACK("CompactionJob::OpenCompactionOutputFile", &syncpoint_arg); #endif + + // Pass temperature of botommost files to FileSystem. + FileOptions fo_copy = file_options_; + if (bottommost_level_) { + fo_copy.temperature = + sub_compact->compaction->mutable_cf_options()->bottommost_temperature; + } + Status s; - IOStatus io_s = - NewWritableFile(fs_.get(), fname, &writable_file, file_options_); + IOStatus io_s = NewWritableFile(fs_.get(), fname, &writable_file, fo_copy); s = io_s; if (sub_compact->io_status.ok()) { sub_compact->io_status = io_s; diff --git a/include/rocksdb/advanced_options.h b/include/rocksdb/advanced_options.h index 1454e8d45..84a3061b0 100644 --- a/include/rocksdb/advanced_options.h +++ b/include/rocksdb/advanced_options.h @@ -186,6 +186,16 @@ struct CompressionOptions { max_dict_buffer_bytes(_max_dict_buffer_bytes) {} }; +// Temperature of a file. Used to pass to FileSystem for a different +// placement and/or coding. +enum class Temperature : uint8_t { + kHot, + kWarm, + kCold, + kTotal, + kUnknown = kTotal, +}; + enum UpdateStatus { // Return status For inplace update callback UPDATE_FAILED = 0, // Nothing to update UPDATED_INPLACE = 1, // Value updated inplace @@ -758,6 +768,13 @@ struct AdvancedColumnFamilyOptions { // data is left uncompressed (unless compression is also requested). uint64_t sample_for_compression = 0; + // EXPERIMENTAL + // The feature is still in development and is incomplete. + // If this option is set, when creating bottommost files, pass this + // temperature to FileSystem used. Should be no-op for default FileSystem + // and users need to plug in their own FileSystem to take advantage of it. + Temperature bottommost_temperature = Temperature::kUnknown; + // When set, large values (blobs) are written to separate blob files, and // only pointers to them are stored in SST files. This can reduce write // amplification for large-value use cases at the cost of introducing a level diff --git a/include/rocksdb/file_system.h b/include/rocksdb/file_system.h index daa15a4ce..c45dc5d9e 100644 --- a/include/rocksdb/file_system.h +++ b/include/rocksdb/file_system.h @@ -100,6 +100,13 @@ struct FileOptions : EnvOptions { // to be issued for the file open/creation IOOptions io_options; + // EXPERIMENTAL + // The feature is in development and is subject to change. + // When creating a new file, set the temperature of the file so that + // underlying file systems can put it with appropriate storage media and/or + // coding. + Temperature temperature = Temperature::kUnknown; + // The checksum type that is used to calculate the checksum value for // handoff during file writes. ChecksumType handoff_checksum_type; @@ -115,6 +122,7 @@ struct FileOptions : EnvOptions { FileOptions(const FileOptions& opts) : EnvOptions(opts), io_options(opts.io_options), + temperature(opts.temperature), handoff_checksum_type(opts.handoff_checksum_type) {} FileOptions& operator=(const FileOptions& opts) = default; diff --git a/options/cf_options.h b/options/cf_options.h index 92e5f3136..8330a4380 100644 --- a/options/cf_options.h +++ b/options/cf_options.h @@ -186,6 +186,7 @@ struct MutableCFOptions { bottommost_compression(options.bottommost_compression), compression_opts(options.compression_opts), bottommost_compression_opts(options.bottommost_compression_opts), + bottommost_temperature(options.bottommost_temperature), sample_for_compression( options.sample_for_compression) { // TODO: is 0 fine here? RefreshDerivedOptions(options.num_levels, options.compaction_style); @@ -227,6 +228,7 @@ struct MutableCFOptions { report_bg_io_stats(false), compression(Snappy_Supported() ? kSnappyCompression : kNoCompression), bottommost_compression(kDisableCompressionOption), + bottommost_temperature(Temperature::kUnknown), sample_for_compression(0) {} explicit MutableCFOptions(const Options& options); @@ -294,6 +296,9 @@ struct MutableCFOptions { CompressionType bottommost_compression; CompressionOptions compression_opts; CompressionOptions bottommost_compression_opts; + // TODO this experimental option isn't made configurable + // through strings yet. + Temperature bottommost_temperature; uint64_t sample_for_compression; diff --git a/options/options_settable_test.cc b/options/options_settable_test.cc index a9f46cd92..3ecdf952e 100644 --- a/options/options_settable_test.cc +++ b/options/options_settable_test.cc @@ -441,6 +441,7 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) { options->max_mem_compaction_level = 0; options->compaction_filter = nullptr; options->sst_partitioner_factory = nullptr; + options->bottommost_temperature = Temperature::kUnknown; char* new_options_ptr = new char[sizeof(ColumnFamilyOptions)]; ColumnFamilyOptions* new_options = diff --git a/src.mk b/src.mk index a0c2e76e6..7077ccc4e 100644 --- a/src.mk +++ b/src.mk @@ -318,6 +318,7 @@ MOCK_LIB_SOURCES = \ BENCH_LIB_SOURCES = \ tools/db_bench_tool.cc \ + tools/simulated_hybrid_file_system.cc \ STRESS_LIB_SOURCES = \ db_stress_tool/batched_ops_stress.cc \ diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc index 0c064856d..145add864 100644 --- a/tools/db_bench_tool.cc +++ b/tools/db_bench_tool.cc @@ -65,6 +65,7 @@ #include "rocksdb/write_batch.h" #include "test_util/testutil.h" #include "test_util/transaction_test_util.h" +#include "tools/simulated_hybrid_file_system.h" #include "util/cast_util.h" #include "util/compression.h" #include "util/crc32c.h" @@ -1032,6 +1033,10 @@ DEFINE_string(fs_uri, "", DEFINE_string(hdfs, "", "Name of hdfs environment. Mutually exclusive with" " --env_uri and --fs_uri"); +DEFINE_string(simulate_hybrid_fs_file, "", + "File for Store Metadata for Simulate hybrid FS. Empty means " + "disable the feature. Now, if it is set, " + "bottommost_temperature is set to kWarm."); static std::shared_ptr env_guard; @@ -4050,6 +4055,9 @@ class Benchmark { options.level0_slowdown_writes_trigger = FLAGS_level0_slowdown_writes_trigger; options.compression = FLAGS_compression_type_e; + if (FLAGS_simulate_hybrid_fs_file != "") { + options.bottommost_temperature = Temperature::kWarm; + } options.sample_for_compression = FLAGS_sample_for_compression; options.WAL_ttl_seconds = FLAGS_wal_ttl_seconds; options.WAL_size_limit_MB = FLAGS_wal_size_limit_MB; @@ -7672,6 +7680,9 @@ int db_bench_tool(int argc, char** argv) { exit(1); } FLAGS_env = GetCompositeEnv(fs); + } else if (FLAGS_simulate_hybrid_fs_file != "") { + FLAGS_env = GetCompositeEnv(std::make_shared( + FileSystem::Default(), FLAGS_simulate_hybrid_fs_file)); } #endif // ROCKSDB_LITE if (FLAGS_use_existing_keys && !FLAGS_use_existing_db) { diff --git a/tools/simulated_hybrid_file_system.cc b/tools/simulated_hybrid_file_system.cc new file mode 100644 index 000000000..771b2e0db --- /dev/null +++ b/tools/simulated_hybrid_file_system.cc @@ -0,0 +1,145 @@ +// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#ifndef ROCKSDB_LITE + +#include "tools/simulated_hybrid_file_system.h" + +#include +#include + +#include "rocksdb/rate_limiter.h" + +namespace ROCKSDB_NAMESPACE { + +const int kLatencyAddedPerRequestUs = 15000; +const int64_t kRequestPerSec = 100; +const int64_t kDummyBytesPerRequest = 1024 * 1024; + +// The metadata file format: each line is a full filename of a file which is +// warm +SimulatedHybridFileSystem::SimulatedHybridFileSystem( + const std::shared_ptr& base, + const std::string& metadata_file_name) + : FileSystemWrapper(base), + // Limit to 100 requests per second. + rate_limiter_(NewGenericRateLimiter( + kDummyBytesPerRequest * kRequestPerSec /* rate_bytes_per_sec */, + 1000 /* refill_period_us */)), + metadata_file_name_(metadata_file_name), + name_("SimulatedHybridFileSystem: " + std::string(target()->Name())) { + IOStatus s = base->FileExists(metadata_file_name, IOOptions(), nullptr); + if (s.IsNotFound()) { + return; + } + std::string metadata; + s = ReadFileToString(base.get(), metadata_file_name, &metadata); + if (!s.ok()) { + fprintf(stderr, "Error reading from file %s: %s", + metadata_file_name.c_str(), s.ToString().c_str()); + // Exit rather than assert as this file system is built to run with + // benchmarks, which usually run on release mode. + std::exit(1); + } + std::istringstream input; + input.str(metadata); + std::string line; + while (std::getline(input, line)) { + fprintf(stderr, "Warm file %s\n", line.c_str()); + warm_file_set_.insert(line); + } +} + +// Need to write out the metadata file to file. See comment of +// SimulatedHybridFileSystem::SimulatedHybridFileSystem() for format of the +// file. +SimulatedHybridFileSystem::~SimulatedHybridFileSystem() { + std::string metadata; + for (const auto& f : warm_file_set_) { + metadata += f; + metadata += "\n"; + } + IOStatus s = WriteStringToFile(target(), metadata, metadata_file_name_, true); + if (!s.ok()) { + fprintf(stderr, "Error writing to file %s: %s", metadata_file_name_.c_str(), + s.ToString().c_str()); + } +} + +IOStatus SimulatedHybridFileSystem::NewRandomAccessFile( + const std::string& fname, const FileOptions& file_opts, + std::unique_ptr* result, IODebugContext* dbg) { + Temperature temperature = Temperature::kUnknown; + { + const std::lock_guard lock(mutex_); + if (warm_file_set_.find(fname) != warm_file_set_.end()) { + temperature = Temperature::kWarm; + } + } + IOStatus s = target()->NewRandomAccessFile(fname, file_opts, result, dbg); + result->reset( + new SimulatedHybridRaf(result->release(), rate_limiter_, temperature)); + return s; +} + +IOStatus SimulatedHybridFileSystem::NewWritableFile( + const std::string& fname, const FileOptions& file_opts, + std::unique_ptr* result, IODebugContext* dbg) { + if (file_opts.temperature == Temperature::kWarm) { + const std::lock_guard lock(mutex_); + fprintf(stderr, "warm file %s\n", fname.c_str()); + warm_file_set_.insert(fname); + } + return target()->NewWritableFile(fname, file_opts, result, dbg); +} + +IOStatus SimulatedHybridFileSystem::DeleteFile(const std::string& fname, + const IOOptions& options, + IODebugContext* dbg) { + { + const std::lock_guard lock(mutex_); + warm_file_set_.erase(fname); + } + return target()->DeleteFile(fname, options, dbg); +} + +IOStatus SimulatedHybridRaf::Read(uint64_t offset, size_t n, + const IOOptions& options, Slice* result, + char* scratch, IODebugContext* dbg) const { + if (temperature_ == Temperature::kWarm) { + Env::Default()->SleepForMicroseconds(kLatencyAddedPerRequestUs); + rate_limiter_->Request(kDummyBytesPerRequest, Env::IOPriority::IO_LOW, + nullptr); + } + return target()->Read(offset, n, options, result, scratch, dbg); +} + +IOStatus SimulatedHybridRaf::MultiRead(FSReadRequest* reqs, size_t num_reqs, + const IOOptions& options, + IODebugContext* dbg) { + if (temperature_ == Temperature::kWarm) { + Env::Default()->SleepForMicroseconds(kLatencyAddedPerRequestUs * + static_cast(num_reqs)); + rate_limiter_->Request( + static_cast(num_reqs) * kDummyBytesPerRequest, + Env::IOPriority::IO_LOW, nullptr); + } + return target()->MultiRead(reqs, num_reqs, options, dbg); +} + +IOStatus SimulatedHybridRaf::Prefetch(uint64_t offset, size_t n, + const IOOptions& options, + IODebugContext* dbg) { + if (temperature_ == Temperature::kWarm) { + rate_limiter_->Request(kDummyBytesPerRequest, Env::IOPriority::IO_LOW, + nullptr); + Env::Default()->SleepForMicroseconds(kLatencyAddedPerRequestUs); + } + return target()->Prefetch(offset, n, options, dbg); +} + +} // namespace ROCKSDB_NAMESPACE + +#endif // ROCKSDB_LITE diff --git a/tools/simulated_hybrid_file_system.h b/tools/simulated_hybrid_file_system.h new file mode 100644 index 000000000..40e68575a --- /dev/null +++ b/tools/simulated_hybrid_file_system.h @@ -0,0 +1,89 @@ +// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#pragma once + +#ifndef ROCKSDB_LITE + +#include + +#include "rocksdb/file_system.h" + +namespace ROCKSDB_NAMESPACE { + +// A FileSystem simulates hybrid file system by ingesting latency and limit +// IOPs. +// This class is only used for development purpose and should not be used +// in production. +// Right now we ingest 15ms latency and allow 100 requests per second when +// the file is for warm temperature. +// When the object is destroyed, the list of warm files are written to a +// file, which can be used to reopen a FileSystem and still recover the +// list. This is to allow the information to preserve between db_bench +// runs. +class SimulatedHybridFileSystem : public FileSystemWrapper { + public: + // metadata_file_name stores metadata of the files, so that it can be + // loaded after process restarts. If the file doesn't exist, create + // one. The file is written when the class is destroyed. + explicit SimulatedHybridFileSystem(const std::shared_ptr& base, + const std::string& metadata_file_name); + + ~SimulatedHybridFileSystem() override; + + public: + IOStatus NewRandomAccessFile(const std::string& fname, + const FileOptions& file_opts, + std::unique_ptr* result, + IODebugContext* dbg) override; + IOStatus NewWritableFile(const std::string& fname, + const FileOptions& file_opts, + std::unique_ptr* result, + IODebugContext* dbg) override; + IOStatus DeleteFile(const std::string& fname, const IOOptions& options, + IODebugContext* dbg) override; + + const char* Name() const override { return name_.c_str(); } + + private: + // Limit 100 requests per second. Rate limiter is designed to byte but + // we use it as fixed bytes is one request. + std::shared_ptr rate_limiter_; + std::mutex mutex_; + std::unordered_set warm_file_set_; + std::string metadata_file_name_; + std::string name_; +}; + +// Simulated random access file that can control IOPs and latency to simulate +// specific storage media +class SimulatedHybridRaf : public FSRandomAccessFileWrapper { + public: + SimulatedHybridRaf(FSRandomAccessFile* t, + std::shared_ptr rate_limiter, + Temperature temperature) + : FSRandomAccessFileWrapper(t), + rate_limiter_(rate_limiter), + temperature_(temperature) {} + + ~SimulatedHybridRaf() override {} + + IOStatus Read(uint64_t offset, size_t n, const IOOptions& options, + Slice* result, char* scratch, + IODebugContext* dbg) const override; + + IOStatus MultiRead(FSReadRequest* reqs, size_t num_reqs, + const IOOptions& options, IODebugContext* dbg) override; + + IOStatus Prefetch(uint64_t offset, size_t n, const IOOptions& options, + IODebugContext* dbg) override; + + private: + std::shared_ptr rate_limiter_; + Temperature temperature_; +}; +} // namespace ROCKSDB_NAMESPACE + +#endif // ROCKSDB_LITE