diff --git a/CMakeLists.txt b/CMakeLists.txt index 4280b7b8b..0de0e2507 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -973,6 +973,7 @@ if(WITH_TESTS) db/db_tailing_iter_test.cc db/db_test.cc db/db_test2.cc + db/db_logical_block_size_cache_test.cc db/db_universal_compaction_test.cc db/db_wal_test.cc db/db_write_test.cc @@ -1009,6 +1010,7 @@ if(WITH_TESTS) db/write_controller_test.cc env/env_basic_test.cc env/env_test.cc + env/io_posix_test.cc env/mock_env_test.cc file/delete_scheduler_test.cc logging/auto_roll_logger_test.cc diff --git a/Makefile b/Makefile index 6aadf8194..07ecd429d 100644 --- a/Makefile +++ b/Makefile @@ -459,6 +459,7 @@ TESTS = \ env_basic_test \ env_test \ env_logger_test \ + io_posix_test \ hash_test \ random_test \ thread_local_test \ @@ -468,6 +469,7 @@ TESTS = \ db_wal_test \ db_block_cache_test \ db_test \ + db_logical_block_size_cache_test \ db_blob_index_test \ db_iter_test \ db_iter_stress_test \ @@ -1313,6 +1315,9 @@ db_test: db/db_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) db_test2: db/db_test2.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +db_logical_block_size_cache_test: db/db_logical_block_size_cache_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + db_blob_index_test: db/db_blob_index_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) @@ -1480,6 +1485,9 @@ env_basic_test: env/env_basic_test.o $(LIBOBJECTS) $(TESTHARNESS) env_test: env/env_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +io_posix_test: env/io_posix_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + fault_injection_test: db/fault_injection_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/TARGETS b/TARGETS index 91ab98fa6..32c5c1423 100644 --- a/TARGETS +++ b/TARGETS @@ -802,6 +802,13 @@ ROCKS_TESTS = [ [], [], ], + [ + "db_logical_block_size_cache_test", + "db/db_logical_block_size_cache_test.cc", + "serial", + [], + [], + ], [ "db_memtable_test", "db/db_memtable_test.cc", @@ -1096,6 +1103,13 @@ ROCKS_TESTS = [ [], [], ], + [ + "io_posix_test", + "env/io_posix_test.cc", + "serial", + [], + [], + ], [ "iostats_context_test", "monitoring/iostats_context_test.cc", diff --git a/db/column_family.cc b/db/column_family.cc index ee008821a..407e684a1 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -471,6 +471,17 @@ void SuperVersionUnrefHandle(void* ptr) { } } // anonymous namespace +std::vector ColumnFamilyData::GetDbPaths() const { + std::vector paths; + paths.reserve(ioptions_.cf_paths.size()); + for (const DbPath& db_path : ioptions_.cf_paths) { + paths.emplace_back(db_path.path); + } + return paths; +} + +const uint32_t ColumnFamilyData::kDummyColumnFamilyDataId = port::kMaxUint32; + ColumnFamilyData::ColumnFamilyData( uint32_t id, const std::string& name, Version* _dummy_versions, Cache* _table_cache, WriteBufferManager* write_buffer_manager, @@ -507,7 +518,23 @@ ColumnFamilyData::ColumnFamilyData( queued_for_compaction_(false), prev_compaction_needed_bytes_(0), allow_2pc_(db_options.allow_2pc), - last_memtable_id_(0) { + last_memtable_id_(0), + db_paths_registered_(false) { + if (id_ != kDummyColumnFamilyDataId) { + // TODO(cc): RegisterDbPaths can be expensive, considering moving it + // outside of this constructor which might be called with db mutex held. + // TODO(cc): considering using ioptions_.fs, currently some tests rely on + // EnvWrapper, that's the main reason why we use env here. + Status s = ioptions_.env->RegisterDbPaths(GetDbPaths()); + if (s.ok()) { + db_paths_registered_ = true; + } else { + ROCKS_LOG_ERROR( + ioptions_.info_log, + "Failed to register data paths of column family (id: %d, name: %s)", + id_, name_.c_str()); + } + } Ref(); // Convert user defined table properties collector factories to internal ones. @@ -601,6 +628,18 @@ ColumnFamilyData::~ColumnFamilyData() { for (MemTable* m : to_delete) { delete m; } + + if (db_paths_registered_) { + // TODO(cc): considering using ioptions_.fs, currently some tests rely on + // EnvWrapper, that's the main reason why we use env here. + Status s = ioptions_.env->UnregisterDbPaths(GetDbPaths()); + if (!s.ok()) { + ROCKS_LOG_ERROR( + ioptions_.info_log, + "Failed to unregister data paths of column family (id: %d, name: %s)", + id_, name_.c_str()); + } + } } bool ColumnFamilyData::UnrefAndTryDelete() { @@ -1363,8 +1402,9 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname, BlockCacheTracer* const block_cache_tracer) : max_column_family_(0), dummy_cfd_(new ColumnFamilyData( - 0, "", nullptr, nullptr, nullptr, ColumnFamilyOptions(), *db_options, - file_options, nullptr, block_cache_tracer)), + ColumnFamilyData::kDummyColumnFamilyDataId, "", nullptr, nullptr, + nullptr, ColumnFamilyOptions(), *db_options, file_options, nullptr, + block_cache_tracer)), default_cfd_cache_(nullptr), db_name_(dbname), db_options_(db_options), diff --git a/db/column_family.h b/db/column_family.h index 04bd5d81e..342f6ba8c 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -508,6 +508,7 @@ class ColumnFamilyData { private: friend class ColumnFamilySet; + static const uint32_t kDummyColumnFamilyDataId; ColumnFamilyData(uint32_t id, const std::string& name, Version* dummy_versions, Cache* table_cache, WriteBufferManager* write_buffer_manager, @@ -517,6 +518,8 @@ class ColumnFamilyData { ColumnFamilySet* column_family_set, BlockCacheTracer* const block_cache_tracer); + std::vector GetDbPaths() const; + uint32_t id_; const std::string name_; Version* dummy_versions_; // Head of circular doubly-linked list of versions. @@ -593,6 +596,8 @@ class ColumnFamilyData { // Directories corresponding to cf_paths. std::vector> data_dirs_; + + bool db_paths_registered_; }; // ColumnFamilySet has interesting thread-safety requirements diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 8816b9c1d..23b28bbcb 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -21,7 +21,6 @@ #include #include #include -#include #include #include @@ -3528,24 +3527,15 @@ Status DestroyDB(const std::string& dbname, const Options& options, } } - std::vector paths; - - for (const auto& path : options.db_paths) { - paths.emplace_back(path.path); + std::set paths; + for (const DbPath& db_path : options.db_paths) { + paths.insert(db_path.path); } - for (const auto& cf : column_families) { - for (const auto& path : cf.options.cf_paths) { - paths.emplace_back(path.path); + for (const ColumnFamilyDescriptor& cf : column_families) { + for (const DbPath& cf_path : cf.options.cf_paths) { + paths.insert(cf_path.path); } } - - // Remove duplicate paths. - // Note that we compare only the actual paths but not path ids. - // This reason is that same path can appear at different path_ids - // for different column families. - std::sort(paths.begin(), paths.end()); - paths.erase(std::unique(paths.begin(), paths.end()), paths.end()); - for (const auto& path : paths) { if (env->GetChildren(path, &filenames).ok()) { for (const auto& fname : filenames) { diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 55a3a4712..4318e5b75 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1401,13 +1401,9 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, impl->error_handler_.EnableAutoRecovery(); } } - - if (!s.ok()) { - delete impl; - return s; + if (s.ok()) { + s = impl->CreateArchivalDirectory(); } - - s = impl->CreateArchivalDirectory(); if (!s.ok()) { delete impl; return s; diff --git a/db/db_logical_block_size_cache_test.cc b/db/db_logical_block_size_cache_test.cc new file mode 100644 index 000000000..fd18792e2 --- /dev/null +++ b/db/db_logical_block_size_cache_test.cc @@ -0,0 +1,483 @@ +// Copyright (c) 2020-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "test_util/testharness.h" + +#ifdef OS_LINUX +#include "env/io_posix.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" + +namespace ROCKSDB_NAMESPACE { +class EnvWithCustomLogicalBlockSizeCache : public EnvWrapper { + public: + EnvWithCustomLogicalBlockSizeCache(Env* env, LogicalBlockSizeCache* cache) + : EnvWrapper(env), cache_(cache) {} + + Status RegisterDbPaths(const std::vector& paths) override { + return cache_->RefAndCacheLogicalBlockSize(paths); + } + + Status UnregisterDbPaths(const std::vector& paths) override { + cache_->UnrefAndTryRemoveCachedLogicalBlockSize(paths); + return Status::OK(); + } + + private: + LogicalBlockSizeCache* cache_; +}; + +class DBLogicalBlockSizeCacheTest : public testing::Test { + public: + DBLogicalBlockSizeCacheTest() + : dbname_(test::PerThreadDBPath("logical_block_size_cache_test")), + data_path_0_(dbname_ + "/data_path_0"), + data_path_1_(dbname_ + "/data_path_1"), + cf_path_0_(dbname_ + "/cf_path_0"), + cf_path_1_(dbname_ + "/cf_path_1") { + auto get_fd_block_size = [&](int fd) { + return fd; + }; + auto get_dir_block_size = [&](const std::string& /*dir*/, size_t* size) { + *size = 1024; + return Status::OK(); + }; + cache_.reset(new LogicalBlockSizeCache( + get_fd_block_size, get_dir_block_size)); + env_.reset(new EnvWithCustomLogicalBlockSizeCache( + Env::Default(), cache_.get())); + } + + protected: + std::string dbname_; + std::string data_path_0_; + std::string data_path_1_; + std::string cf_path_0_; + std::string cf_path_1_; + std::unique_ptr cache_; + std::unique_ptr env_; +}; + +TEST_F(DBLogicalBlockSizeCacheTest, OpenClose) { + // Tests that Open will cache the logical block size for data paths, + // and Close will remove the cached sizes. + Options options; + options.create_if_missing = true; + options.env = env_.get(); + options.db_paths = {{data_path_0_, 2048}, {data_path_1_, 2048}}; + + for (int i = 0; i < 2; i++) { + DB* db; + if (!i) { + printf("Open\n"); + ASSERT_OK(DB::Open(options, dbname_, &db)); + } else { + printf("OpenForReadOnly\n"); + ASSERT_OK(DB::OpenForReadOnly(options, dbname_, &db)); + } + ASSERT_EQ(2, cache_->Size()); + ASSERT_TRUE(cache_->Contains(data_path_0_)); + ASSERT_EQ(1, cache_->GetRefCount(data_path_0_)); + ASSERT_TRUE(cache_->Contains(data_path_1_)); + ASSERT_EQ(1, cache_->GetRefCount(data_path_1_)); + ASSERT_OK(db->Close()); + ASSERT_EQ(0, cache_->Size()); + delete db; + } +} + +TEST_F(DBLogicalBlockSizeCacheTest, OpenDelete) { + // Tests that Open will cache the logical block size for data paths, + // and delete the db pointer will remove the cached sizes. + Options options; + options.create_if_missing = true; + options.env = env_.get(); + + for (int i = 0; i < 2; i++) { + DB* db; + if (!i) { + printf("Open\n"); + ASSERT_OK(DB::Open(options, dbname_, &db)); + } else { + printf("OpenForReadOnly\n"); + ASSERT_OK(DB::OpenForReadOnly(options, dbname_, &db)); + } + ASSERT_EQ(1, cache_->Size()); + ASSERT_TRUE(cache_->Contains(dbname_)); + ASSERT_EQ(1, cache_->GetRefCount(dbname_)); + delete db; + ASSERT_EQ(0, cache_->Size()); + } +} + +TEST_F(DBLogicalBlockSizeCacheTest, CreateColumnFamily) { + // Tests that CreateColumnFamily will cache the cf_paths, + // drop the column family handle won't drop the cache, + // drop and then delete the column family handle will drop the cache. + Options options; + options.create_if_missing = true; + options.env = env_.get(); + ColumnFamilyOptions cf_options; + cf_options.cf_paths = {{cf_path_0_, 1024}, {cf_path_1_, 2048}}; + + DB* db; + ASSERT_OK(DB::Open(options, dbname_, &db)); + ASSERT_EQ(1, cache_->Size()); + ASSERT_TRUE(cache_->Contains(dbname_)); + ASSERT_EQ(1, cache_->GetRefCount(dbname_)); + + ColumnFamilyHandle* cf = nullptr; + ASSERT_OK(db->CreateColumnFamily(cf_options, "cf", &cf)); + ASSERT_EQ(3, cache_->Size()); + ASSERT_TRUE(cache_->Contains(dbname_)); + ASSERT_EQ(1, cache_->GetRefCount(dbname_)); + ASSERT_TRUE(cache_->Contains(cf_path_0_)); + ASSERT_EQ(1, cache_->GetRefCount(cf_path_0_)); + ASSERT_TRUE(cache_->Contains(cf_path_1_)); + ASSERT_EQ(1, cache_->GetRefCount(cf_path_1_)); + + // Drop column family does not drop cache. + ASSERT_OK(db->DropColumnFamily(cf)); + ASSERT_EQ(3, cache_->Size()); + ASSERT_TRUE(cache_->Contains(dbname_)); + ASSERT_EQ(1, cache_->GetRefCount(dbname_)); + ASSERT_TRUE(cache_->Contains(cf_path_0_)); + ASSERT_EQ(1, cache_->GetRefCount(cf_path_0_)); + ASSERT_TRUE(cache_->Contains(cf_path_1_)); + ASSERT_EQ(1, cache_->GetRefCount(cf_path_1_)); + + // Delete handle will drop cache. + ASSERT_OK(db->DestroyColumnFamilyHandle(cf)); + ASSERT_TRUE(cache_->Contains(dbname_)); + ASSERT_EQ(1, cache_->GetRefCount(dbname_)); + + delete db; + ASSERT_EQ(0, cache_->Size()); +} + +TEST_F(DBLogicalBlockSizeCacheTest, CreateColumnFamilies) { + // Tests that CreateColumnFamilies will cache the cf_paths, + // drop the column family handle won't drop the cache, + // drop and then delete the column family handle will drop the cache. + Options options; + options.create_if_missing = true; + options.env = env_.get(); + ColumnFamilyOptions cf_options; + cf_options.cf_paths = {{cf_path_0_, 1024}}; + + DB* db; + ASSERT_OK(DB::Open(options, dbname_, &db)); + ASSERT_EQ(1, cache_->Size()); + ASSERT_TRUE(cache_->Contains(dbname_)); + ASSERT_EQ(1, cache_->GetRefCount(dbname_)); + + std::vector cfs; + ASSERT_OK(db->CreateColumnFamilies(cf_options, {"cf1", "cf2"}, &cfs)); + ASSERT_EQ(2, cache_->Size()); + ASSERT_TRUE(cache_->Contains(dbname_)); + ASSERT_EQ(1, cache_->GetRefCount(dbname_)); + ASSERT_TRUE(cache_->Contains(cf_path_0_)); + ASSERT_EQ(2, cache_->GetRefCount(cf_path_0_)); + + // Drop column family does not drop cache. + for (ColumnFamilyHandle* cf : cfs) { + ASSERT_OK(db->DropColumnFamily(cf)); + ASSERT_EQ(2, cache_->Size()); + ASSERT_TRUE(cache_->Contains(dbname_)); + ASSERT_EQ(1, cache_->GetRefCount(dbname_)); + ASSERT_TRUE(cache_->Contains(cf_path_0_)); + ASSERT_EQ(2, cache_->GetRefCount(cf_path_0_)); + } + + // Delete one handle will not drop cache because another handle is still + // referencing cf_path_0_. + ASSERT_OK(db->DestroyColumnFamilyHandle(cfs[0])); + ASSERT_EQ(2, cache_->Size()); + ASSERT_TRUE(cache_->Contains(dbname_)); + ASSERT_EQ(1, cache_->GetRefCount(dbname_)); + ASSERT_TRUE(cache_->Contains(cf_path_0_)); + ASSERT_EQ(1, cache_->GetRefCount(cf_path_0_)); + + // Delete the last handle will drop cache. + ASSERT_OK(db->DestroyColumnFamilyHandle(cfs[1])); + ASSERT_EQ(1, cache_->Size()); + ASSERT_TRUE(cache_->Contains(dbname_)); + ASSERT_EQ(1, cache_->GetRefCount(dbname_)); + + delete db; + ASSERT_EQ(0, cache_->Size()); +} + +TEST_F(DBLogicalBlockSizeCacheTest, OpenWithColumnFamilies) { + // Tests that Open two column families with the same cf_path will cache the + // cf_path and have 2 references to the cached size, + // drop the column family handle won't drop the cache, + // drop and then delete the column family handle will drop the cache. + Options options; + options.create_if_missing = true; + options.env = env_.get(); + + ColumnFamilyOptions cf_options; + cf_options.cf_paths = {{cf_path_0_, 1024}}; + + for (int i = 0; i < 2; i++) { + DB* db; + ColumnFamilyHandle* cf1 = nullptr; + ColumnFamilyHandle* cf2 = nullptr; + ASSERT_OK(DB::Open(options, dbname_, &db)); + ASSERT_OK(db->CreateColumnFamily(cf_options, "cf1", &cf1)); + ASSERT_OK(db->CreateColumnFamily(cf_options, "cf2", &cf2)); + ASSERT_OK(db->DestroyColumnFamilyHandle(cf1)); + ASSERT_OK(db->DestroyColumnFamilyHandle(cf2)); + delete db; + ASSERT_EQ(0, cache_->Size()); + + std::vector cfs; + if (!i) { + printf("Open\n"); + ASSERT_OK(DB::Open(options, dbname_, + {{"cf1", cf_options}, + {"cf2", cf_options}, + {"default", ColumnFamilyOptions()}}, + &cfs, &db)); + } else { + printf("OpenForReadOnly\n"); + ASSERT_OK(DB::OpenForReadOnly(options, dbname_, + {{"cf1", cf_options}, + {"cf2", cf_options}, + {"default", ColumnFamilyOptions()}}, + &cfs, &db)); + } + + // Logical block sizes of dbname_ and cf_path_0_ are cached during Open. + ASSERT_EQ(2, cache_->Size()); + ASSERT_TRUE(cache_->Contains(dbname_)); + ASSERT_EQ(1, cache_->GetRefCount(dbname_)); + ASSERT_TRUE(cache_->Contains(cf_path_0_)); + ASSERT_EQ(2, cache_->GetRefCount(cf_path_0_)); + + // Drop handles won't drop the cache. + ASSERT_OK(db->DropColumnFamily(cfs[0])); + ASSERT_OK(db->DropColumnFamily(cfs[1])); + ASSERT_EQ(2, cache_->Size()); + ASSERT_TRUE(cache_->Contains(dbname_)); + ASSERT_EQ(1, cache_->GetRefCount(dbname_)); + ASSERT_TRUE(cache_->Contains(cf_path_0_)); + ASSERT_EQ(2, cache_->GetRefCount(cf_path_0_)); + + // Delete 1st handle won't drop the cache for cf_path_0_. + ASSERT_OK(db->DestroyColumnFamilyHandle(cfs[0])); + ASSERT_EQ(2, cache_->Size()); + ASSERT_TRUE(cache_->Contains(dbname_)); + ASSERT_EQ(1, cache_->GetRefCount(dbname_)); + ASSERT_TRUE(cache_->Contains(cf_path_0_)); + ASSERT_EQ(1, cache_->GetRefCount(cf_path_0_)); + + // Delete 2nd handle will drop the cache for cf_path_0_. + ASSERT_OK(db->DestroyColumnFamilyHandle(cfs[1])); + ASSERT_EQ(1, cache_->Size()); + ASSERT_TRUE(cache_->Contains(dbname_)); + ASSERT_EQ(1, cache_->GetRefCount(dbname_)); + + // Delete the default handle won't affect the cache because db still refers + // to the default CF. + ASSERT_OK(db->DestroyColumnFamilyHandle(cfs[2])); + ASSERT_EQ(1, cache_->Size()); + ASSERT_TRUE(cache_->Contains(dbname_)); + ASSERT_EQ(1, cache_->GetRefCount(dbname_)); + + delete db; + ASSERT_EQ(0, cache_->Size()); + } +} + +TEST_F(DBLogicalBlockSizeCacheTest, DestroyColumnFamilyHandle) { + // Tests that destroy column family without dropping won't drop the cache, + // because compaction and flush might still need to get logical block size + // when opening new files. + Options options; + options.create_if_missing = true; + options.env = env_.get(); + ColumnFamilyOptions cf_options; + cf_options.cf_paths = {{cf_path_0_, 1024}}; + + DB* db; + ASSERT_OK(DB::Open(options, dbname_, &db)); + ASSERT_EQ(1, cache_->Size()); + ASSERT_TRUE(cache_->Contains(dbname_)); + ASSERT_EQ(1, cache_->GetRefCount(dbname_)); + ColumnFamilyHandle* cf = nullptr; + ASSERT_OK(db->CreateColumnFamily(cf_options, "cf", &cf)); + ASSERT_EQ(2, cache_->Size()); + ASSERT_TRUE(cache_->Contains(dbname_)); + ASSERT_EQ(1, cache_->GetRefCount(dbname_)); + ASSERT_TRUE(cache_->Contains(cf_path_0_)); + ASSERT_EQ(1, cache_->GetRefCount(cf_path_0_)); + + // Delete handle won't drop cache. + ASSERT_OK(db->DestroyColumnFamilyHandle(cf)); + ASSERT_EQ(2, cache_->Size()); + ASSERT_TRUE(cache_->Contains(dbname_)); + ASSERT_EQ(1, cache_->GetRefCount(dbname_)); + ASSERT_TRUE(cache_->Contains(cf_path_0_)); + ASSERT_EQ(1, cache_->GetRefCount(cf_path_0_)); + + delete db; + ASSERT_EQ(0, cache_->Size()); + + // Open with column families. + std::vector cfs; + for (int i = 0; i < 2; i++) { + if (!i) { + printf("Open\n"); + ASSERT_OK(DB::Open( + options, dbname_, + {{"cf", cf_options}, {"default", ColumnFamilyOptions()}}, &cfs, &db)); + } else { + printf("OpenForReadOnly\n"); + ASSERT_OK(DB::OpenForReadOnly( + options, dbname_, + {{"cf", cf_options}, {"default", ColumnFamilyOptions()}}, &cfs, &db)); + } + // cf_path_0_ and dbname_ are cached. + ASSERT_EQ(2, cache_->Size()); + ASSERT_TRUE(cache_->Contains(dbname_)); + ASSERT_EQ(1, cache_->GetRefCount(dbname_)); + ASSERT_TRUE(cache_->Contains(cf_path_0_)); + ASSERT_EQ(1, cache_->GetRefCount(cf_path_0_)); + + // Deleting handle won't drop cache. + ASSERT_OK(db->DestroyColumnFamilyHandle(cfs[0])); + ASSERT_OK(db->DestroyColumnFamilyHandle(cfs[1])); + ASSERT_EQ(2, cache_->Size()); + ASSERT_TRUE(cache_->Contains(dbname_)); + ASSERT_EQ(1, cache_->GetRefCount(dbname_)); + ASSERT_TRUE(cache_->Contains(cf_path_0_)); + ASSERT_EQ(1, cache_->GetRefCount(cf_path_0_)); + + delete db; + ASSERT_EQ(0, cache_->Size()); + } +} + +TEST_F(DBLogicalBlockSizeCacheTest, MultiDBWithDifferentPaths) { + // Tests the cache behavior when there are multiple DBs sharing the same env + // with different db_paths and cf_paths. + Options options; + options.create_if_missing = true; + options.env = env_.get(); + + DB* db0; + ASSERT_OK(DB::Open(options, data_path_0_, &db0)); + ASSERT_EQ(1, cache_->Size()); + ASSERT_TRUE(cache_->Contains(data_path_0_)); + + ColumnFamilyOptions cf_options0; + cf_options0.cf_paths = {{cf_path_0_, 1024}}; + ColumnFamilyHandle* cf0; + db0->CreateColumnFamily(cf_options0, "cf", &cf0); + ASSERT_EQ(2, cache_->Size()); + ASSERT_TRUE(cache_->Contains(data_path_0_)); + ASSERT_EQ(1, cache_->GetRefCount(data_path_0_)); + ASSERT_TRUE(cache_->Contains(cf_path_0_)); + ASSERT_EQ(1, cache_->GetRefCount(cf_path_0_)); + + DB* db1; + ASSERT_OK(DB::Open(options, data_path_1_, &db1)); + ASSERT_EQ(3, cache_->Size()); + ASSERT_TRUE(cache_->Contains(data_path_0_)); + ASSERT_EQ(1, cache_->GetRefCount(data_path_0_)); + ASSERT_TRUE(cache_->Contains(cf_path_0_)); + ASSERT_EQ(1, cache_->GetRefCount(cf_path_0_)); + ASSERT_TRUE(cache_->Contains(data_path_1_)); + ASSERT_EQ(1, cache_->GetRefCount(data_path_1_)); + + ColumnFamilyOptions cf_options1; + cf_options1.cf_paths = {{cf_path_1_, 1024}}; + ColumnFamilyHandle* cf1; + db1->CreateColumnFamily(cf_options1, "cf", &cf1); + ASSERT_EQ(4, cache_->Size()); + ASSERT_TRUE(cache_->Contains(data_path_0_)); + ASSERT_EQ(1, cache_->GetRefCount(data_path_0_)); + ASSERT_TRUE(cache_->Contains(cf_path_0_)); + ASSERT_EQ(1, cache_->GetRefCount(cf_path_0_)); + ASSERT_TRUE(cache_->Contains(data_path_1_)); + ASSERT_EQ(1, cache_->GetRefCount(data_path_1_)); + ASSERT_TRUE(cache_->Contains(cf_path_1_)); + ASSERT_EQ(1, cache_->GetRefCount(cf_path_1_)); + + db0->DestroyColumnFamilyHandle(cf0); + delete db0; + ASSERT_EQ(2, cache_->Size()); + ASSERT_TRUE(cache_->Contains(data_path_1_)); + ASSERT_EQ(1, cache_->GetRefCount(data_path_1_)); + ASSERT_TRUE(cache_->Contains(cf_path_1_)); + ASSERT_EQ(1, cache_->GetRefCount(cf_path_1_)); + + db1->DestroyColumnFamilyHandle(cf1); + delete db1; + ASSERT_EQ(0, cache_->Size()); +} + +TEST_F(DBLogicalBlockSizeCacheTest, MultiDBWithSamePaths) { + // Tests the cache behavior when there are multiple DBs sharing the same env + // with the same db_paths and cf_paths. + Options options; + options.create_if_missing = true; + options.env = env_.get(); + options.db_paths = {{data_path_0_, 1024}}; + ColumnFamilyOptions cf_options; + cf_options.cf_paths = {{cf_path_0_, 1024}}; + + DB* db0; + ASSERT_OK(DB::Open(options, dbname_ + "/db0", &db0)); + ASSERT_EQ(1, cache_->Size()); + ASSERT_TRUE(cache_->Contains(data_path_0_)); + ASSERT_EQ(1, cache_->GetRefCount(data_path_0_)); + + ColumnFamilyHandle* cf0; + db0->CreateColumnFamily(cf_options, "cf", &cf0); + ASSERT_EQ(2, cache_->Size()); + ASSERT_TRUE(cache_->Contains(data_path_0_)); + ASSERT_EQ(1, cache_->GetRefCount(data_path_0_)); + ASSERT_TRUE(cache_->Contains(cf_path_0_)); + ASSERT_EQ(1, cache_->GetRefCount(cf_path_0_)); + + DB* db1; + ASSERT_OK(DB::Open(options, dbname_ + "/db1", &db1)); + ASSERT_EQ(2, cache_->Size()); + ASSERT_TRUE(cache_->Contains(data_path_0_)); + ASSERT_EQ(2, cache_->GetRefCount(data_path_0_)); + ASSERT_TRUE(cache_->Contains(cf_path_0_)); + ASSERT_EQ(1, cache_->GetRefCount(cf_path_0_)); + + ColumnFamilyHandle* cf1; + db1->CreateColumnFamily(cf_options, "cf", &cf1); + ASSERT_EQ(2, cache_->Size()); + ASSERT_TRUE(cache_->Contains(data_path_0_)); + ASSERT_EQ(2, cache_->GetRefCount(data_path_0_)); + ASSERT_TRUE(cache_->Contains(cf_path_0_)); + ASSERT_EQ(2, cache_->GetRefCount(cf_path_0_)); + + db0->DestroyColumnFamilyHandle(cf0); + delete db0; + ASSERT_EQ(2, cache_->Size()); + ASSERT_TRUE(cache_->Contains(data_path_0_)); + ASSERT_EQ(1, cache_->GetRefCount(data_path_0_)); + ASSERT_TRUE(cache_->Contains(cf_path_0_)); + ASSERT_EQ(1, cache_->GetRefCount(cf_path_0_)); + + db1->DestroyColumnFamilyHandle(cf1); + delete db1; + ASSERT_EQ(0, cache_->Size()); +} + +} // namespace ROCKSDB_NAMESPACE +#endif // OS_LINUX + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/db/version_builder.cc b/db/version_builder.cc index 4694218a1..955c42c0d 100644 --- a/db/version_builder.cc +++ b/db/version_builder.cc @@ -405,7 +405,7 @@ class VersionBuilder::Rep { size_t max_load = port::kMaxSizet; if (!always_load) { - // If it is initial loading and not set to always laoding all the + // If it is initial loading and not set to always loading all the // files, we only load up to kInitialLoadLimit files, to limit the // time reopening the DB. const size_t kInitialLoadLimit = 16; diff --git a/env/composite_env_wrapper.h b/env/composite_env_wrapper.h index fbc0b93fa..24ac9e9de 100644 --- a/env/composite_env_wrapper.h +++ b/env/composite_env_wrapper.h @@ -300,6 +300,13 @@ class CompositeEnvWrapper : public Env { FileSystem* fs_env_target() const { return fs_env_target_; } + Status RegisterDbPaths(const std::vector& paths) override { + return fs_env_target_->RegisterDbPaths(paths); + } + Status UnregisterDbPaths(const std::vector& paths) override { + return fs_env_target_->UnregisterDbPaths(paths); + } + // The following text is boilerplate that forwards all methods to target() Status NewSequentialFile(const std::string& f, std::unique_ptr* r, diff --git a/env/env_chroot.cc b/env/env_chroot.cc index 246437bdd..d0019c37e 100644 --- a/env/env_chroot.cc +++ b/env/env_chroot.cc @@ -38,6 +38,32 @@ class ChrootEnv : public EnvWrapper { #endif } + Status RegisterDbPaths(const std::vector& paths) override { + std::vector encoded_paths; + encoded_paths.reserve(paths.size()); + for (auto& path : paths) { + auto status_and_enc_path = EncodePathWithNewBasename(path); + if (!status_and_enc_path.first.ok()) { + return status_and_enc_path.first; + } + encoded_paths.emplace_back(status_and_enc_path.second); + } + return EnvWrapper::Env::RegisterDbPaths(encoded_paths); + } + + Status UnregisterDbPaths(const std::vector& paths) override { + std::vector encoded_paths; + encoded_paths.reserve(paths.size()); + for (auto& path : paths) { + auto status_and_enc_path = EncodePathWithNewBasename(path); + if (!status_and_enc_path.first.ok()) { + return status_and_enc_path.first; + } + encoded_paths.emplace_back(status_and_enc_path.second); + } + return EnvWrapper::Env::UnregisterDbPaths(encoded_paths); + } + Status NewSequentialFile(const std::string& fname, std::unique_ptr* result, const EnvOptions& options) override { diff --git a/env/fs_posix.cc b/env/fs_posix.cc index 14c9d546d..966145575 100644 --- a/env/fs_posix.cc +++ b/env/fs_posix.cc @@ -183,7 +183,8 @@ class PosixFileSystem : public FileSystem { errno); } } - result->reset(new PosixSequentialFile(fname, file, fd, options)); + result->reset(new PosixSequentialFile( + fname, file, fd, GetLogicalBlockSize(fname, fd), options)); return IOStatus::OK(); } @@ -242,12 +243,13 @@ class PosixFileSystem : public FileSystem { } #endif } - result->reset(new PosixRandomAccessFile(fname, fd, options + result->reset(new PosixRandomAccessFile( + fname, fd, GetLogicalBlockSize(fname, fd), options #if defined(ROCKSDB_IOURING_PRESENT) - , - thread_local_io_urings_.get() + , + thread_local_io_urings_.get() #endif - )); + )); } return s; } @@ -328,12 +330,14 @@ class PosixFileSystem : public FileSystem { } } #endif - result->reset(new PosixWritableFile(fname, fd, options)); + result->reset(new PosixWritableFile( + fname, fd, GetLogicalBlockSize(fname, fd), options)); } else { // disable mmap writes EnvOptions no_mmap_writes_options = options; no_mmap_writes_options.use_mmap_writes = false; - result->reset(new PosixWritableFile(fname, fd, no_mmap_writes_options)); + result->reset(new PosixWritableFile( + fname, fd, GetLogicalBlockSize(fname, fd), no_mmap_writes_options)); } return s; } @@ -428,12 +432,14 @@ class PosixFileSystem : public FileSystem { } } #endif - result->reset(new PosixWritableFile(fname, fd, options)); + result->reset(new PosixWritableFile( + fname, fd, GetLogicalBlockSize(fname, fd), options)); } else { // disable mmap writes FileOptions no_mmap_writes_options = options; no_mmap_writes_options.use_mmap_writes = false; - result->reset(new PosixWritableFile(fname, fd, no_mmap_writes_options)); + result->reset(new PosixWritableFile( + fname, fd, GetLogicalBlockSize(fname, fd), no_mmap_writes_options)); } return s; } @@ -853,7 +859,15 @@ class PosixFileSystem : public FileSystem { optimized.fallocate_with_keep_size = true; return optimized; } - +#ifdef OS_LINUX + Status RegisterDbPaths(const std::vector& paths) override { + return logical_block_size_cache_.RefAndCacheLogicalBlockSize(paths); + } + Status UnregisterDbPaths(const std::vector& paths) override { + logical_block_size_cache_.UnrefAndTryRemoveCachedLogicalBlockSize(paths); + return Status::OK(); + } +#endif private: bool checkedDiskForMmap_; bool forceMmapOff_; // do we override Env options? @@ -899,8 +913,26 @@ class PosixFileSystem : public FileSystem { // If true, allow non owner read access for db files. Otherwise, non-owner // has no access to db files. bool allow_non_owner_access_; + +#ifdef OS_LINUX + static LogicalBlockSizeCache logical_block_size_cache_; +#endif + static size_t GetLogicalBlockSize(const std::string& fname, int fd); }; +#ifdef OS_LINUX +LogicalBlockSizeCache PosixFileSystem::logical_block_size_cache_; +#endif + +size_t PosixFileSystem::GetLogicalBlockSize(const std::string& fname, int fd) { +#ifdef OS_LINUX + return logical_block_size_cache_.GetLogicalBlockSize(fname, fd); +#else + (void) fname; + return PosixHelper::GetLogicalBlockSizeOfFd(fd); +#endif +} + PosixFileSystem::PosixFileSystem() : checkedDiskForMmap_(false), forceMmapOff_(false), diff --git a/env/io_posix.cc b/env/io_posix.cc index a2bbab38d..b44dbd717 100644 --- a/env/io_posix.cc +++ b/env/io_posix.cc @@ -45,6 +45,35 @@ namespace ROCKSDB_NAMESPACE { +std::string IOErrorMsg(const std::string& context, + const std::string& file_name) { + if (file_name.empty()) { + return context; + } + return context + ": " + file_name; +} + +// file_name can be left empty if it is not unkown. +IOStatus IOError(const std::string& context, const std::string& file_name, + int err_number) { + switch (err_number) { + case ENOSPC: { + IOStatus s = IOStatus::NoSpace(IOErrorMsg(context, file_name), + strerror(err_number)); + s.SetRetryable(true); + return s; + } + case ESTALE: + return IOStatus::IOError(IOStatus::kStaleFile); + case ENOENT: + return IOStatus::PathNotFound(IOErrorMsg(context, file_name), + strerror(err_number)); + default: + return IOStatus::IOError(IOErrorMsg(context, file_name), + strerror(err_number)); + } +} + // A wrapper for fadvise, if the platform doesn't support fadvise, // it will simply return 0. int Fadvise(int fd, off_t offset, size_t len, int advice) { @@ -112,75 +141,6 @@ bool PosixPositionedWrite(int fd, const char* buf, size_t nbyte, off_t offset) { return true; } -size_t GetLogicalBufferSize(int __attribute__((__unused__)) fd) { -#ifdef OS_LINUX - struct stat buf; - int result = fstat(fd, &buf); - if (result == -1) { - return kDefaultPageSize; - } - if (major(buf.st_dev) == 0) { - // Unnamed devices (e.g. non-device mounts), reserved as null device number. - // These don't have an entry in /sys/dev/block/. Return a sensible default. - return kDefaultPageSize; - } - - // Reading queue/logical_block_size does not require special permissions. - const int kBufferSize = 100; - char path[kBufferSize]; - char real_path[PATH_MAX + 1]; - snprintf(path, kBufferSize, "/sys/dev/block/%u:%u", major(buf.st_dev), - minor(buf.st_dev)); - if (realpath(path, real_path) == nullptr) { - return kDefaultPageSize; - } - std::string device_dir(real_path); - if (!device_dir.empty() && device_dir.back() == '/') { - device_dir.pop_back(); - } - // NOTE: sda3 and nvme0n1p1 do not have a `queue/` subdir, only the parent sda - // and nvme0n1 have it. - // $ ls -al '/sys/dev/block/8:3' - // lrwxrwxrwx. 1 root root 0 Jun 26 01:38 /sys/dev/block/8:3 -> - // ../../block/sda/sda3 - // $ ls -al '/sys/dev/block/259:4' - // lrwxrwxrwx 1 root root 0 Jan 31 16:04 /sys/dev/block/259:4 -> - // ../../devices/pci0000:17/0000:17:00.0/0000:18:00.0/nvme/nvme0/nvme0n1/nvme0n1p1 - size_t parent_end = device_dir.rfind('/', device_dir.length() - 1); - if (parent_end == std::string::npos) { - return kDefaultPageSize; - } - size_t parent_begin = device_dir.rfind('/', parent_end - 1); - if (parent_begin == std::string::npos) { - return kDefaultPageSize; - } - std::string parent = - device_dir.substr(parent_begin + 1, parent_end - parent_begin - 1); - std::string child = device_dir.substr(parent_end + 1, std::string::npos); - if (parent != "block" && - (child.compare(0, 4, "nvme") || child.find('p') != std::string::npos)) { - device_dir = device_dir.substr(0, parent_end); - } - std::string fname = device_dir + "/queue/logical_block_size"; - FILE* fp; - size_t size = 0; - fp = fopen(fname.c_str(), "r"); - if (fp != nullptr) { - char* line = nullptr; - size_t len = 0; - if (getline(&line, &len, fp) != -1) { - sscanf(line, "%zu", &size); - } - free(line); - fclose(fp); - } - if (size != 0 && (size & (size - 1)) == 0) { - return size; - } -#endif - return kDefaultPageSize; -} - #ifdef ROCKSDB_RANGESYNC_PRESENT #if !defined(ZFS_SUPER_MAGIC) @@ -247,12 +207,13 @@ bool IsSectorAligned(const void* ptr, size_t sector_size) { * PosixSequentialFile */ PosixSequentialFile::PosixSequentialFile(const std::string& fname, FILE* file, - int fd, const EnvOptions& options) + int fd, size_t logical_block_size, + const EnvOptions& options) : filename_(fname), file_(file), fd_(fd), use_direct_io_(options.use_direct_reads), - logical_sector_size_(GetLogicalBufferSize(fd_)) { + logical_sector_size_(logical_block_size) { assert(!options.use_direct_reads || !options.use_mmap_reads); } @@ -409,13 +370,178 @@ size_t PosixHelper::GetUniqueIdFromFile(int fd, char* id, size_t max_size) { return static_cast(rid - id); } #endif + +#ifdef OS_LINUX +std::string RemoveTrailingSlash(const std::string& path) { + std::string p = path; + if (p.size() > 1 && p.back() == '/') { + p.pop_back(); + } + return p; +} + +Status LogicalBlockSizeCache::RefAndCacheLogicalBlockSize( + const std::vector& directories) { + std::vector dirs; + dirs.reserve(directories.size()); + for (auto& d : directories) { + dirs.emplace_back(RemoveTrailingSlash(d)); + } + + std::map dir_sizes; + { + ReadLock lock(&cache_mutex_); + for (const auto& dir : dirs) { + if (cache_.find(dir) == cache_.end()) { + dir_sizes.emplace(dir, 0); + } + } + } + + Status s; + for (auto& dir_size : dir_sizes) { + s = get_logical_block_size_of_directory_(dir_size.first, &dir_size.second); + if (!s.ok()) { + return s; + } + } + + WriteLock lock(&cache_mutex_); + for (const auto& dir : dirs) { + auto& v = cache_[dir]; + v.ref++; + auto dir_size = dir_sizes.find(dir); + if (dir_size != dir_sizes.end()) { + v.size = dir_size->second; + } + } + return Status::OK(); +} + +void LogicalBlockSizeCache::UnrefAndTryRemoveCachedLogicalBlockSize( + const std::vector& directories) { + std::vector dirs; + dirs.reserve(directories.size()); + for (auto& dir : directories) { + dirs.emplace_back(RemoveTrailingSlash(dir)); + } + + WriteLock lock(&cache_mutex_); + for (const auto& dir : dirs) { + auto it = cache_.find(dir); + if (it != cache_.end() && !(--(it->second.ref))) { + cache_.erase(it); + } + } +} + +size_t LogicalBlockSizeCache::GetLogicalBlockSize(const std::string& fname, + int fd) { + std::string dir = fname.substr(0, fname.find_last_of("/")); + if (dir.empty()) { + dir = "/"; + } + { + ReadLock lock(&cache_mutex_); + auto it = cache_.find(dir); + if (it != cache_.end()) { + return it->second.size; + } + } + return get_logical_block_size_of_fd_(fd); +} +#endif + +Status PosixHelper::GetLogicalBlockSizeOfDirectory(const std::string& directory, + size_t* size) { + int fd = open(directory.c_str(), O_DIRECTORY | O_RDONLY); + if (fd == -1) { + close(fd); + return Status::IOError("Cannot open directory " + directory); + } + *size = PosixHelper::GetLogicalBlockSizeOfFd(fd); + close(fd); + return Status::OK(); +} + +size_t PosixHelper::GetLogicalBlockSizeOfFd(int fd) { +#ifdef OS_LINUX + struct stat buf; + int result = fstat(fd, &buf); + if (result == -1) { + return kDefaultPageSize; + } + if (major(buf.st_dev) == 0) { + // Unnamed devices (e.g. non-device mounts), reserved as null device number. + // These don't have an entry in /sys/dev/block/. Return a sensible default. + return kDefaultPageSize; + } + + // Reading queue/logical_block_size does not require special permissions. + const int kBufferSize = 100; + char path[kBufferSize]; + char real_path[PATH_MAX + 1]; + snprintf(path, kBufferSize, "/sys/dev/block/%u:%u", major(buf.st_dev), + minor(buf.st_dev)); + if (realpath(path, real_path) == nullptr) { + return kDefaultPageSize; + } + std::string device_dir(real_path); + if (!device_dir.empty() && device_dir.back() == '/') { + device_dir.pop_back(); + } + // NOTE: sda3 and nvme0n1p1 do not have a `queue/` subdir, only the parent sda + // and nvme0n1 have it. + // $ ls -al '/sys/dev/block/8:3' + // lrwxrwxrwx. 1 root root 0 Jun 26 01:38 /sys/dev/block/8:3 -> + // ../../block/sda/sda3 + // $ ls -al '/sys/dev/block/259:4' + // lrwxrwxrwx 1 root root 0 Jan 31 16:04 /sys/dev/block/259:4 -> + // ../../devices/pci0000:17/0000:17:00.0/0000:18:00.0/nvme/nvme0/nvme0n1/nvme0n1p1 + size_t parent_end = device_dir.rfind('/', device_dir.length() - 1); + if (parent_end == std::string::npos) { + return kDefaultPageSize; + } + size_t parent_begin = device_dir.rfind('/', parent_end - 1); + if (parent_begin == std::string::npos) { + return kDefaultPageSize; + } + std::string parent = + device_dir.substr(parent_begin + 1, parent_end - parent_begin - 1); + std::string child = device_dir.substr(parent_end + 1, std::string::npos); + if (parent != "block" && + (child.compare(0, 4, "nvme") || child.find('p') != std::string::npos)) { + device_dir = device_dir.substr(0, parent_end); + } + std::string fname = device_dir + "/queue/logical_block_size"; + FILE* fp; + size_t size = 0; + fp = fopen(fname.c_str(), "r"); + if (fp != nullptr) { + char* line = nullptr; + size_t len = 0; + if (getline(&line, &len, fp) != -1) { + sscanf(line, "%zu", &size); + } + free(line); + fclose(fp); + } + if (size != 0 && (size & (size - 1)) == 0) { + return size; + } +#endif + (void)fd; + return kDefaultPageSize; +} + /* * PosixRandomAccessFile * * pread() based random-access */ PosixRandomAccessFile::PosixRandomAccessFile( - const std::string& fname, int fd, const EnvOptions& options + const std::string& fname, int fd, size_t logical_block_size, + const EnvOptions& options #if defined(ROCKSDB_IOURING_PRESENT) , ThreadLocalPtr* thread_local_io_urings @@ -424,7 +550,7 @@ PosixRandomAccessFile::PosixRandomAccessFile( : filename_(fname), fd_(fd), use_direct_io_(options.use_direct_reads), - logical_sector_size_(GetLogicalBufferSize(fd_)) + logical_sector_size_(logical_block_size) #if defined(ROCKSDB_IOURING_PRESENT) , thread_local_io_urings_(thread_local_io_urings) @@ -988,13 +1114,14 @@ IOStatus PosixMmapFile::Allocate(uint64_t offset, uint64_t len, * Use posix write to write data to a file. */ PosixWritableFile::PosixWritableFile(const std::string& fname, int fd, + size_t logical_block_size, const EnvOptions& options) : FSWritableFile(options), filename_(fname), use_direct_io_(options.use_direct_writes), fd_(fd), filesize_(0), - logical_sector_size_(GetLogicalBufferSize(fd_)) { + logical_sector_size_(logical_block_size) { #ifdef ROCKSDB_FALLOCATE_PRESENT allow_fallocate_ = options.allow_fallocate; fallocate_with_keep_size_ = options.fallocate_with_keep_size; diff --git a/env/io_posix.h b/env/io_posix.h index ddc3244e4..61d3cda9d 100644 --- a/env/io_posix.h +++ b/env/io_posix.h @@ -14,11 +14,15 @@ #endif #include #include +#include +#include #include +#include "port/port.h" #include "rocksdb/env.h" -#include "util/thread_local.h" #include "rocksdb/file_system.h" #include "rocksdb/io_status.h" +#include "util/mutexlock.h" +#include "util/thread_local.h" // For non linux platform, the following macros are used only as place // holder. @@ -31,40 +35,93 @@ #endif namespace ROCKSDB_NAMESPACE { -static std::string IOErrorMsg(const std::string& context, - const std::string& file_name) { - if (file_name.empty()) { - return context; - } - return context + ": " + file_name; -} - +std::string IOErrorMsg(const std::string& context, + const std::string& file_name); // file_name can be left empty if it is not unkown. -static IOStatus IOError(const std::string& context, - const std::string& file_name, int err_number) { - switch (err_number) { - case ENOSPC: { - IOStatus s = IOStatus::NoSpace(IOErrorMsg(context, file_name), - strerror(err_number)); - s.SetRetryable(true); - return s; - } - case ESTALE: - return IOStatus::IOError(IOStatus::kStaleFile); - case ENOENT: - return IOStatus::PathNotFound(IOErrorMsg(context, file_name), - strerror(err_number)); - default: - return IOStatus::IOError(IOErrorMsg(context, file_name), - strerror(err_number)); - } -} +IOStatus IOError(const std::string& context, const std::string& file_name, + int err_number); class PosixHelper { public: static size_t GetUniqueIdFromFile(int fd, char* id, size_t max_size); + static size_t GetLogicalBlockSizeOfFd(int fd); + static Status GetLogicalBlockSizeOfDirectory(const std::string& directory, + size_t* size); }; +#ifdef OS_LINUX +// Files under a specific directory have the same logical block size. +// This class caches the logical block size for the specified directories to +// save the CPU cost of computing the size. +// Safe for concurrent access from multiple threads without any external +// synchronization. +class LogicalBlockSizeCache { + public: + LogicalBlockSizeCache( + std::function get_logical_block_size_of_fd = + PosixHelper::GetLogicalBlockSizeOfFd, + std::function + get_logical_block_size_of_directory = + PosixHelper::GetLogicalBlockSizeOfDirectory) + : get_logical_block_size_of_fd_(get_logical_block_size_of_fd), + get_logical_block_size_of_directory_( + get_logical_block_size_of_directory) {} + + // Takes the following actions: + // 1. Increases reference count of the directories; + // 2. If the directory's logical block size is not cached, + // compute the buffer size and cache the result. + Status RefAndCacheLogicalBlockSize( + const std::vector& directories); + + // Takes the following actions: + // 1. Decreases reference count of the directories; + // 2. If the reference count of a directory reaches 0, remove the directory + // from the cache. + void UnrefAndTryRemoveCachedLogicalBlockSize( + const std::vector& directories); + + // Returns the logical block size for the file. + // + // If the file is under a cached directory, return the cached size. + // Otherwise, the size is computed. + size_t GetLogicalBlockSize(const std::string& fname, int fd); + + int GetRefCount(const std::string& dir) { + ReadLock lock(&cache_mutex_); + auto it = cache_.find(dir); + if (it == cache_.end()) { + return 0; + } + return it->second.ref; + } + + size_t Size() const { return cache_.size(); } + + bool Contains(const std::string& dir) { + ReadLock lock(&cache_mutex_); + return cache_.find(dir) != cache_.end(); + } + + private: + struct CacheValue { + CacheValue() : size(0), ref(0) {} + + // Logical block size of the directory. + size_t size; + // Reference count of the directory. + int ref; + }; + + std::function get_logical_block_size_of_fd_; + std::function + get_logical_block_size_of_directory_; + + std::map cache_; + port::RWMutex cache_mutex_; +}; +#endif + class PosixSequentialFile : public FSSequentialFile { private: std::string filename_; @@ -75,6 +132,7 @@ class PosixSequentialFile : public FSSequentialFile { public: PosixSequentialFile(const std::string& fname, FILE* file, int fd, + size_t logical_block_size, const EnvOptions& options); virtual ~PosixSequentialFile(); @@ -123,6 +181,7 @@ class PosixRandomAccessFile : public FSRandomAccessFile { public: PosixRandomAccessFile(const std::string& fname, int fd, + size_t logical_block_size, const EnvOptions& options #if defined(ROCKSDB_IOURING_PRESENT) , @@ -172,6 +231,7 @@ class PosixWritableFile : public FSWritableFile { public: explicit PosixWritableFile(const std::string& fname, int fd, + size_t logical_block_size, const EnvOptions& options); virtual ~PosixWritableFile(); diff --git a/env/io_posix_test.cc b/env/io_posix_test.cc new file mode 100644 index 000000000..4aefd7dbc --- /dev/null +++ b/env/io_posix_test.cc @@ -0,0 +1,140 @@ +// Copyright (c) 2020-present, Facebook, Inc. All rights reserved. +// This source code is licensed under both the GPLv2 (found in the +// COPYING file in the root directory) and Apache 2.0 License +// (found in the LICENSE.Apache file in the root directory). + +#include "test_util/testharness.h" + +#ifdef ROCKSDB_LIB_IO_POSIX +#include "env/io_posix.h" + +namespace ROCKSDB_NAMESPACE { + +#ifdef OS_LINUX +class LogicalBlockSizeCacheTest : public testing::Test {}; + +// Tests the caching behavior. +TEST_F(LogicalBlockSizeCacheTest, Cache) { + int ncall = 0; + auto get_fd_block_size = [&](int fd) { + ncall++; + return fd; + }; + std::map dir_fds{ + {"/", 0}, + {"/db", 1}, + {"/db1", 2}, + {"/db2", 3}, + }; + auto get_dir_block_size = [&](const std::string& dir, size_t* size) { + ncall++; + *size = dir_fds[dir]; + return Status::OK(); + }; + LogicalBlockSizeCache cache(get_fd_block_size, get_dir_block_size); + ASSERT_EQ(0, ncall); + ASSERT_EQ(0, cache.Size()); + + ASSERT_EQ(6, cache.GetLogicalBlockSize("/sst", 6)); + ASSERT_EQ(1, ncall); + ASSERT_EQ(7, cache.GetLogicalBlockSize("/db/sst1", 7)); + ASSERT_EQ(2, ncall); + ASSERT_EQ(8, cache.GetLogicalBlockSize("/db/sst2", 8)); + ASSERT_EQ(3, ncall); + + ASSERT_OK(cache.RefAndCacheLogicalBlockSize({"/", "/db1/", "/db2"})); + ASSERT_EQ(3, cache.Size()); + ASSERT_TRUE(cache.Contains("/")); + ASSERT_TRUE(cache.Contains("/db1")); + ASSERT_TRUE(cache.Contains("/db2")); + ASSERT_EQ(6, ncall); + // Block size for / is cached. + ASSERT_EQ(0, cache.GetLogicalBlockSize("/sst", 6)); + ASSERT_EQ(6, ncall); + // No cached size for /db. + ASSERT_EQ(7, cache.GetLogicalBlockSize("/db/sst1", 7)); + ASSERT_EQ(7, ncall); + ASSERT_EQ(8, cache.GetLogicalBlockSize("/db/sst2", 8)); + ASSERT_EQ(8, ncall); + // Block size for /db1 is cached. + ASSERT_EQ(2, cache.GetLogicalBlockSize("/db1/sst1", 4)); + ASSERT_EQ(8, ncall); + ASSERT_EQ(2, cache.GetLogicalBlockSize("/db1/sst2", 5)); + ASSERT_EQ(8, ncall); + // Block size for /db2 is cached. + ASSERT_EQ(3, cache.GetLogicalBlockSize("/db2/sst1", 6)); + ASSERT_EQ(8, ncall); + ASSERT_EQ(3, cache.GetLogicalBlockSize("/db2/sst2", 7)); + ASSERT_EQ(8, ncall); + + cache.RefAndCacheLogicalBlockSize({"/db"}); + ASSERT_EQ(4, cache.Size()); + ASSERT_TRUE(cache.Contains("/")); + ASSERT_TRUE(cache.Contains("/db1")); + ASSERT_TRUE(cache.Contains("/db2")); + ASSERT_TRUE(cache.Contains("/db")); + + ASSERT_EQ(9, ncall); + // Block size for /db is cached. + ASSERT_EQ(1, cache.GetLogicalBlockSize("/db/sst1", 7)); + ASSERT_EQ(9, ncall); + ASSERT_EQ(1, cache.GetLogicalBlockSize("/db/sst2", 8)); + ASSERT_EQ(9, ncall); +} + +// Tests the reference counting behavior. +TEST_F(LogicalBlockSizeCacheTest, Ref) { + int ncall = 0; + auto get_fd_block_size = [&](int fd) { + ncall++; + return fd; + }; + std::map dir_fds{ + {"/db", 0}, + }; + auto get_dir_block_size = [&](const std::string& dir, size_t* size) { + ncall++; + *size = dir_fds[dir]; + return Status::OK(); + }; + LogicalBlockSizeCache cache(get_fd_block_size, get_dir_block_size); + + ASSERT_EQ(0, ncall); + + ASSERT_EQ(1, cache.GetLogicalBlockSize("/db/sst0", 1)); + ASSERT_EQ(1, ncall); + + cache.RefAndCacheLogicalBlockSize({"/db"}); + ASSERT_EQ(2, ncall); + ASSERT_EQ(1, cache.GetRefCount("/db")); + // Block size for /db is cached. Ref count = 1. + ASSERT_EQ(0, cache.GetLogicalBlockSize("/db/sst1", 1)); + ASSERT_EQ(2, ncall); + + // Ref count = 2, but won't recompute the cached buffer size. + cache.RefAndCacheLogicalBlockSize({"/db"}); + ASSERT_EQ(2, cache.GetRefCount("/db")); + ASSERT_EQ(2, ncall); + + // Ref count = 1. + cache.UnrefAndTryRemoveCachedLogicalBlockSize({"/db"}); + ASSERT_EQ(1, cache.GetRefCount("/db")); + // Block size for /db is still cached. + ASSERT_EQ(0, cache.GetLogicalBlockSize("/db/sst2", 1)); + ASSERT_EQ(2, ncall); + + // Ref count = 0 and cached buffer size for /db is removed. + cache.UnrefAndTryRemoveCachedLogicalBlockSize({"/db"}); + ASSERT_EQ(0, cache.Size()); + ASSERT_EQ(1, cache.GetLogicalBlockSize("/db/sst0", 1)); + ASSERT_EQ(3, ncall); +} +#endif + +} // namespace ROCKSDB_NAMESPACE +#endif + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/env/mock_env.h b/env/mock_env.h index 145cae067..5cd9d499d 100644 --- a/env/mock_env.h +++ b/env/mock_env.h @@ -24,75 +24,80 @@ class MockEnv : public EnvWrapper { public: explicit MockEnv(Env* base_env); - virtual ~MockEnv(); + ~MockEnv() override; // Partial implementation of the Env interface. - virtual Status NewSequentialFile(const std::string& fname, - std::unique_ptr* result, - const EnvOptions& soptions) override; + Status RegisterDbPaths(const std::vector& /*paths*/) override { + return Status::OK(); + } - virtual Status NewRandomAccessFile(const std::string& fname, - std::unique_ptr* result, - const EnvOptions& soptions) override; + Status UnregisterDbPaths(const std::vector& /*paths*/) override { + return Status::OK(); + } - virtual Status NewRandomRWFile(const std::string& fname, - std::unique_ptr* result, - const EnvOptions& options) override; + Status NewSequentialFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& soptions) override; - virtual Status ReuseWritableFile(const std::string& fname, - const std::string& old_fname, - std::unique_ptr* result, - const EnvOptions& options) override; + Status NewRandomAccessFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& soptions) override; - virtual Status NewWritableFile(const std::string& fname, - std::unique_ptr* result, - const EnvOptions& env_options) override; + Status NewRandomRWFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& options) override; - virtual Status NewDirectory(const std::string& name, - std::unique_ptr* result) override; + Status ReuseWritableFile(const std::string& fname, + const std::string& old_fname, + std::unique_ptr* result, + const EnvOptions& options) override; - virtual Status FileExists(const std::string& fname) override; + Status NewWritableFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& env_options) override; - virtual Status GetChildren(const std::string& dir, - std::vector* result) override; + Status NewDirectory(const std::string& name, + std::unique_ptr* result) override; + + Status FileExists(const std::string& fname) override; + + Status GetChildren(const std::string& dir, + std::vector* result) override; void DeleteFileInternal(const std::string& fname); - virtual Status DeleteFile(const std::string& fname) override; + Status DeleteFile(const std::string& fname) override; - virtual Status Truncate(const std::string& fname, size_t size) override; + Status Truncate(const std::string& fname, size_t size) override; - virtual Status CreateDir(const std::string& dirname) override; + Status CreateDir(const std::string& dirname) override; - virtual Status CreateDirIfMissing(const std::string& dirname) override; + Status CreateDirIfMissing(const std::string& dirname) override; - virtual Status DeleteDir(const std::string& dirname) override; + Status DeleteDir(const std::string& dirname) override; - virtual Status GetFileSize(const std::string& fname, - uint64_t* file_size) override; + Status GetFileSize(const std::string& fname, uint64_t* file_size) override; - virtual Status GetFileModificationTime(const std::string& fname, - uint64_t* time) override; + Status GetFileModificationTime(const std::string& fname, + uint64_t* time) override; - virtual Status RenameFile(const std::string& src, - const std::string& target) override; + Status RenameFile(const std::string& src, const std::string& target) override; - virtual Status LinkFile(const std::string& src, - const std::string& target) override; + Status LinkFile(const std::string& src, const std::string& target) override; - virtual Status NewLogger(const std::string& fname, - std::shared_ptr* result) override; + Status NewLogger(const std::string& fname, + std::shared_ptr* result) override; - virtual Status LockFile(const std::string& fname, FileLock** flock) override; + Status LockFile(const std::string& fname, FileLock** flock) override; - virtual Status UnlockFile(FileLock* flock) override; + Status UnlockFile(FileLock* flock) override; - virtual Status GetTestDirectory(std::string* path) override; + Status GetTestDirectory(std::string* path) override; // Results of these can be affected by FakeSleepForMicroseconds() - virtual Status GetCurrentTime(int64_t* unix_time) override; - virtual uint64_t NowMicros() override; - virtual uint64_t NowNanos() override; + Status GetCurrentTime(int64_t* unix_time) override; + uint64_t NowMicros() override; + uint64_t NowNanos() override; Status CorruptBuffer(const std::string& fname); diff --git a/include/rocksdb/env.h b/include/rocksdb/env.h index 056d8a1c0..a62eaa666 100644 --- a/include/rocksdb/env.h +++ b/include/rocksdb/env.h @@ -163,6 +163,15 @@ class Env { // The result of Default() belongs to rocksdb and must never be deleted. static Env* Default(); + // See FileSystem::RegisterDbPaths. + virtual Status RegisterDbPaths(const std::vector& /*paths*/) { + return Status::OK(); + } + // See FileSystem::UnregisterDbPaths. + virtual Status UnregisterDbPaths(const std::vector& /*paths*/) { + return Status::OK(); + } + // Create a brand new sequentially-readable file with the specified name. // On success, stores a pointer to the new file in *result and returns OK. // On failure stores nullptr in *result and returns non-OK. If the file does @@ -1155,6 +1164,14 @@ class EnvWrapper : public Env { Env* target() const { return target_; } // The following text is boilerplate that forwards all methods to target() + Status RegisterDbPaths(const std::vector& paths) override { + return target_->RegisterDbPaths(paths); + } + + Status UnregisterDbPaths(const std::vector& paths) override { + return target_->UnregisterDbPaths(paths); + } + Status NewSequentialFile(const std::string& f, std::unique_ptr* r, const EnvOptions& options) override { diff --git a/include/rocksdb/file_system.h b/include/rocksdb/file_system.h index c1fd919f3..8a7592d01 100644 --- a/include/rocksdb/file_system.h +++ b/include/rocksdb/file_system.h @@ -171,6 +171,35 @@ class FileSystem { // The result of Default() belongs to rocksdb and must never be deleted. static std::shared_ptr Default(); + // Handles the event when a new DB or a new ColumnFamily starts using the + // specified data paths. + // + // The data paths might be shared by different DBs or ColumnFamilies, + // so RegisterDbPaths might be called with the same data paths. + // For example, when CreateColumnFamily is called multiple times with the same + // data path, RegisterDbPaths will also be called with the same data path. + // + // If the return status is ok, then the paths must be correspondingly + // called in UnregisterDbPaths; + // otherwise this method should have no side effect, and UnregisterDbPaths + // do not need to be called for the paths. + // + // Different implementations may take different actions. + // By default, it's a no-op and returns Status::OK. + virtual Status RegisterDbPaths(const std::vector& /*paths*/) { + return Status::OK(); + } + // Handles the event a DB or a ColumnFamily stops using the specified data + // paths. + // + // It should be called corresponding to each successful RegisterDbPaths. + // + // Different implementations may take different actions. + // By default, it's a no-op and returns Status::OK. + virtual Status UnregisterDbPaths(const std::vector& /*paths*/) { + return Status::OK(); + } + // Create a brand new sequentially-readable file with the specified name. // On success, stores a pointer to the new file in *result and returns OK. // On failure stores nullptr in *result and returns non-OK. If the file does diff --git a/src.mk b/src.mk index a85fe60d2..52412fae6 100644 --- a/src.mk +++ b/src.mk @@ -337,6 +337,7 @@ MAIN_SOURCES = \ db/db_tailing_iter_test.cc \ db/db_test.cc \ db/db_test2.cc \ + db/db_logical_block_size_cache_test.cc \ db/db_universal_compaction_test.cc \ db/db_wal_test.cc \ db/db_write_test.cc \ @@ -383,6 +384,7 @@ MAIN_SOURCES = \ db/write_controller_test.cc \ env/env_basic_test.cc \ env/env_test.cc \ + env/io_posix_test.cc \ env/mock_env_test.cc \ logging/auto_roll_logger_test.cc \ logging/env_logger_test.cc \