Cache result of GetLogicalBufferSize in Linux (#6457)

Summary:
In Linux, when reopening DB with many SST files, profiling shows that 100% system cpu time spent for a couple of seconds for `GetLogicalBufferSize`. This slows down MyRocks' recovery time when site is down.

This PR introduces two new APIs:
1. `Env::RegisterDbPaths` and `Env::UnregisterDbPaths` lets `DB` tell the env when it starts or stops using its database directories . The `PosixFileSystem` takes this opportunity to set up a cache from database directories to the corresponding logical block sizes.
2. `LogicalBlockSizeCache` is defined only for OS_LINUX to cache the logical block sizes.

Other modifications:
1. rename `logical buffer size` to `logical block size` to be consistent with Linux terms.
2. declare `GetLogicalBlockSize` in `PosixHelper` to expose it to `PosixFileSystem`.
3. change the functions `IOError` and `IOStatus` in `env/io_posix.h` to have external linkage since they are used in other translation units too.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/6457

Test Plan:
1. A new unit test is added for `LogicalBlockSizeCache` in `env/io_posix_test.cc`.
2. A new integration test is added for `DB` operations related to the cache in `db/db_logical_block_size_cache_test.cc`.

`make check`

Differential Revision: D20131243

Pulled By: cheng-chang

fbshipit-source-id: 3077c50f8065c0bffb544d8f49fb10bba9408d04
main
Cheng Chang 4 years ago committed by Facebook GitHub Bot
parent 331e6199df
commit 2d9efc9ab2
  1. 2
      CMakeLists.txt
  2. 8
      Makefile
  3. 14
      TARGETS
  4. 46
      db/column_family.cc
  5. 5
      db/column_family.h
  6. 22
      db/db_impl/db_impl.cc
  7. 8
      db/db_impl/db_impl_open.cc
  8. 483
      db/db_logical_block_size_cache_test.cc
  9. 2
      db/version_builder.cc
  10. 7
      env/composite_env_wrapper.h
  11. 26
      env/env_chroot.cc
  12. 52
      env/fs_posix.cc
  13. 275
      env/io_posix.cc
  14. 116
      env/io_posix.h
  15. 140
      env/io_posix_test.cc
  16. 91
      env/mock_env.h
  17. 17
      include/rocksdb/env.h
  18. 29
      include/rocksdb/file_system.h
  19. 2
      src.mk

@ -973,6 +973,7 @@ if(WITH_TESTS)
db/db_tailing_iter_test.cc
db/db_test.cc
db/db_test2.cc
db/db_logical_block_size_cache_test.cc
db/db_universal_compaction_test.cc
db/db_wal_test.cc
db/db_write_test.cc
@ -1009,6 +1010,7 @@ if(WITH_TESTS)
db/write_controller_test.cc
env/env_basic_test.cc
env/env_test.cc
env/io_posix_test.cc
env/mock_env_test.cc
file/delete_scheduler_test.cc
logging/auto_roll_logger_test.cc

@ -459,6 +459,7 @@ TESTS = \
env_basic_test \
env_test \
env_logger_test \
io_posix_test \
hash_test \
random_test \
thread_local_test \
@ -468,6 +469,7 @@ TESTS = \
db_wal_test \
db_block_cache_test \
db_test \
db_logical_block_size_cache_test \
db_blob_index_test \
db_iter_test \
db_iter_stress_test \
@ -1313,6 +1315,9 @@ db_test: db/db_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
db_test2: db/db_test2.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
db_logical_block_size_cache_test: db/db_logical_block_size_cache_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
db_blob_index_test: db/db_blob_index_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
@ -1480,6 +1485,9 @@ env_basic_test: env/env_basic_test.o $(LIBOBJECTS) $(TESTHARNESS)
env_test: env/env_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
io_posix_test: env/io_posix_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
fault_injection_test: db/fault_injection_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

@ -802,6 +802,13 @@ ROCKS_TESTS = [
[],
[],
],
[
"db_logical_block_size_cache_test",
"db/db_logical_block_size_cache_test.cc",
"serial",
[],
[],
],
[
"db_memtable_test",
"db/db_memtable_test.cc",
@ -1096,6 +1103,13 @@ ROCKS_TESTS = [
[],
[],
],
[
"io_posix_test",
"env/io_posix_test.cc",
"serial",
[],
[],
],
[
"iostats_context_test",
"monitoring/iostats_context_test.cc",

@ -471,6 +471,17 @@ void SuperVersionUnrefHandle(void* ptr) {
}
} // anonymous namespace
std::vector<std::string> ColumnFamilyData::GetDbPaths() const {
std::vector<std::string> paths;
paths.reserve(ioptions_.cf_paths.size());
for (const DbPath& db_path : ioptions_.cf_paths) {
paths.emplace_back(db_path.path);
}
return paths;
}
const uint32_t ColumnFamilyData::kDummyColumnFamilyDataId = port::kMaxUint32;
ColumnFamilyData::ColumnFamilyData(
uint32_t id, const std::string& name, Version* _dummy_versions,
Cache* _table_cache, WriteBufferManager* write_buffer_manager,
@ -507,7 +518,23 @@ ColumnFamilyData::ColumnFamilyData(
queued_for_compaction_(false),
prev_compaction_needed_bytes_(0),
allow_2pc_(db_options.allow_2pc),
last_memtable_id_(0) {
last_memtable_id_(0),
db_paths_registered_(false) {
if (id_ != kDummyColumnFamilyDataId) {
// TODO(cc): RegisterDbPaths can be expensive, considering moving it
// outside of this constructor which might be called with db mutex held.
// TODO(cc): considering using ioptions_.fs, currently some tests rely on
// EnvWrapper, that's the main reason why we use env here.
Status s = ioptions_.env->RegisterDbPaths(GetDbPaths());
if (s.ok()) {
db_paths_registered_ = true;
} else {
ROCKS_LOG_ERROR(
ioptions_.info_log,
"Failed to register data paths of column family (id: %d, name: %s)",
id_, name_.c_str());
}
}
Ref();
// Convert user defined table properties collector factories to internal ones.
@ -601,6 +628,18 @@ ColumnFamilyData::~ColumnFamilyData() {
for (MemTable* m : to_delete) {
delete m;
}
if (db_paths_registered_) {
// TODO(cc): considering using ioptions_.fs, currently some tests rely on
// EnvWrapper, that's the main reason why we use env here.
Status s = ioptions_.env->UnregisterDbPaths(GetDbPaths());
if (!s.ok()) {
ROCKS_LOG_ERROR(
ioptions_.info_log,
"Failed to unregister data paths of column family (id: %d, name: %s)",
id_, name_.c_str());
}
}
}
bool ColumnFamilyData::UnrefAndTryDelete() {
@ -1363,8 +1402,9 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
BlockCacheTracer* const block_cache_tracer)
: max_column_family_(0),
dummy_cfd_(new ColumnFamilyData(
0, "", nullptr, nullptr, nullptr, ColumnFamilyOptions(), *db_options,
file_options, nullptr, block_cache_tracer)),
ColumnFamilyData::kDummyColumnFamilyDataId, "", nullptr, nullptr,
nullptr, ColumnFamilyOptions(), *db_options, file_options, nullptr,
block_cache_tracer)),
default_cfd_cache_(nullptr),
db_name_(dbname),
db_options_(db_options),

@ -508,6 +508,7 @@ class ColumnFamilyData {
private:
friend class ColumnFamilySet;
static const uint32_t kDummyColumnFamilyDataId;
ColumnFamilyData(uint32_t id, const std::string& name,
Version* dummy_versions, Cache* table_cache,
WriteBufferManager* write_buffer_manager,
@ -517,6 +518,8 @@ class ColumnFamilyData {
ColumnFamilySet* column_family_set,
BlockCacheTracer* const block_cache_tracer);
std::vector<std::string> GetDbPaths() const;
uint32_t id_;
const std::string name_;
Version* dummy_versions_; // Head of circular doubly-linked list of versions.
@ -593,6 +596,8 @@ class ColumnFamilyData {
// Directories corresponding to cf_paths.
std::vector<std::shared_ptr<FSDirectory>> data_dirs_;
bool db_paths_registered_;
};
// ColumnFamilySet has interesting thread-safety requirements

@ -21,7 +21,6 @@
#include <stdexcept>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>
@ -3528,24 +3527,15 @@ Status DestroyDB(const std::string& dbname, const Options& options,
}
}
std::vector<std::string> paths;
for (const auto& path : options.db_paths) {
paths.emplace_back(path.path);
std::set<std::string> paths;
for (const DbPath& db_path : options.db_paths) {
paths.insert(db_path.path);
}
for (const auto& cf : column_families) {
for (const auto& path : cf.options.cf_paths) {
paths.emplace_back(path.path);
for (const ColumnFamilyDescriptor& cf : column_families) {
for (const DbPath& cf_path : cf.options.cf_paths) {
paths.insert(cf_path.path);
}
}
// Remove duplicate paths.
// Note that we compare only the actual paths but not path ids.
// This reason is that same path can appear at different path_ids
// for different column families.
std::sort(paths.begin(), paths.end());
paths.erase(std::unique(paths.begin(), paths.end()), paths.end());
for (const auto& path : paths) {
if (env->GetChildren(path, &filenames).ok()) {
for (const auto& fname : filenames) {

@ -1401,13 +1401,9 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
impl->error_handler_.EnableAutoRecovery();
}
}
if (!s.ok()) {
delete impl;
return s;
if (s.ok()) {
s = impl->CreateArchivalDirectory();
}
s = impl->CreateArchivalDirectory();
if (!s.ok()) {
delete impl;
return s;

@ -0,0 +1,483 @@
// Copyright (c) 2020-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "test_util/testharness.h"
#ifdef OS_LINUX
#include "env/io_posix.h"
#include "rocksdb/db.h"
#include "rocksdb/env.h"
namespace ROCKSDB_NAMESPACE {
class EnvWithCustomLogicalBlockSizeCache : public EnvWrapper {
public:
EnvWithCustomLogicalBlockSizeCache(Env* env, LogicalBlockSizeCache* cache)
: EnvWrapper(env), cache_(cache) {}
Status RegisterDbPaths(const std::vector<std::string>& paths) override {
return cache_->RefAndCacheLogicalBlockSize(paths);
}
Status UnregisterDbPaths(const std::vector<std::string>& paths) override {
cache_->UnrefAndTryRemoveCachedLogicalBlockSize(paths);
return Status::OK();
}
private:
LogicalBlockSizeCache* cache_;
};
class DBLogicalBlockSizeCacheTest : public testing::Test {
public:
DBLogicalBlockSizeCacheTest()
: dbname_(test::PerThreadDBPath("logical_block_size_cache_test")),
data_path_0_(dbname_ + "/data_path_0"),
data_path_1_(dbname_ + "/data_path_1"),
cf_path_0_(dbname_ + "/cf_path_0"),
cf_path_1_(dbname_ + "/cf_path_1") {
auto get_fd_block_size = [&](int fd) {
return fd;
};
auto get_dir_block_size = [&](const std::string& /*dir*/, size_t* size) {
*size = 1024;
return Status::OK();
};
cache_.reset(new LogicalBlockSizeCache(
get_fd_block_size, get_dir_block_size));
env_.reset(new EnvWithCustomLogicalBlockSizeCache(
Env::Default(), cache_.get()));
}
protected:
std::string dbname_;
std::string data_path_0_;
std::string data_path_1_;
std::string cf_path_0_;
std::string cf_path_1_;
std::unique_ptr<LogicalBlockSizeCache> cache_;
std::unique_ptr<Env> env_;
};
TEST_F(DBLogicalBlockSizeCacheTest, OpenClose) {
// Tests that Open will cache the logical block size for data paths,
// and Close will remove the cached sizes.
Options options;
options.create_if_missing = true;
options.env = env_.get();
options.db_paths = {{data_path_0_, 2048}, {data_path_1_, 2048}};
for (int i = 0; i < 2; i++) {
DB* db;
if (!i) {
printf("Open\n");
ASSERT_OK(DB::Open(options, dbname_, &db));
} else {
printf("OpenForReadOnly\n");
ASSERT_OK(DB::OpenForReadOnly(options, dbname_, &db));
}
ASSERT_EQ(2, cache_->Size());
ASSERT_TRUE(cache_->Contains(data_path_0_));
ASSERT_EQ(1, cache_->GetRefCount(data_path_0_));
ASSERT_TRUE(cache_->Contains(data_path_1_));
ASSERT_EQ(1, cache_->GetRefCount(data_path_1_));
ASSERT_OK(db->Close());
ASSERT_EQ(0, cache_->Size());
delete db;
}
}
TEST_F(DBLogicalBlockSizeCacheTest, OpenDelete) {
// Tests that Open will cache the logical block size for data paths,
// and delete the db pointer will remove the cached sizes.
Options options;
options.create_if_missing = true;
options.env = env_.get();
for (int i = 0; i < 2; i++) {
DB* db;
if (!i) {
printf("Open\n");
ASSERT_OK(DB::Open(options, dbname_, &db));
} else {
printf("OpenForReadOnly\n");
ASSERT_OK(DB::OpenForReadOnly(options, dbname_, &db));
}
ASSERT_EQ(1, cache_->Size());
ASSERT_TRUE(cache_->Contains(dbname_));
ASSERT_EQ(1, cache_->GetRefCount(dbname_));
delete db;
ASSERT_EQ(0, cache_->Size());
}
}
TEST_F(DBLogicalBlockSizeCacheTest, CreateColumnFamily) {
// Tests that CreateColumnFamily will cache the cf_paths,
// drop the column family handle won't drop the cache,
// drop and then delete the column family handle will drop the cache.
Options options;
options.create_if_missing = true;
options.env = env_.get();
ColumnFamilyOptions cf_options;
cf_options.cf_paths = {{cf_path_0_, 1024}, {cf_path_1_, 2048}};
DB* db;
ASSERT_OK(DB::Open(options, dbname_, &db));
ASSERT_EQ(1, cache_->Size());
ASSERT_TRUE(cache_->Contains(dbname_));
ASSERT_EQ(1, cache_->GetRefCount(dbname_));
ColumnFamilyHandle* cf = nullptr;
ASSERT_OK(db->CreateColumnFamily(cf_options, "cf", &cf));
ASSERT_EQ(3, cache_->Size());
ASSERT_TRUE(cache_->Contains(dbname_));
ASSERT_EQ(1, cache_->GetRefCount(dbname_));
ASSERT_TRUE(cache_->Contains(cf_path_0_));
ASSERT_EQ(1, cache_->GetRefCount(cf_path_0_));
ASSERT_TRUE(cache_->Contains(cf_path_1_));
ASSERT_EQ(1, cache_->GetRefCount(cf_path_1_));
// Drop column family does not drop cache.
ASSERT_OK(db->DropColumnFamily(cf));
ASSERT_EQ(3, cache_->Size());
ASSERT_TRUE(cache_->Contains(dbname_));
ASSERT_EQ(1, cache_->GetRefCount(dbname_));
ASSERT_TRUE(cache_->Contains(cf_path_0_));
ASSERT_EQ(1, cache_->GetRefCount(cf_path_0_));
ASSERT_TRUE(cache_->Contains(cf_path_1_));
ASSERT_EQ(1, cache_->GetRefCount(cf_path_1_));
// Delete handle will drop cache.
ASSERT_OK(db->DestroyColumnFamilyHandle(cf));
ASSERT_TRUE(cache_->Contains(dbname_));
ASSERT_EQ(1, cache_->GetRefCount(dbname_));
delete db;
ASSERT_EQ(0, cache_->Size());
}
TEST_F(DBLogicalBlockSizeCacheTest, CreateColumnFamilies) {
// Tests that CreateColumnFamilies will cache the cf_paths,
// drop the column family handle won't drop the cache,
// drop and then delete the column family handle will drop the cache.
Options options;
options.create_if_missing = true;
options.env = env_.get();
ColumnFamilyOptions cf_options;
cf_options.cf_paths = {{cf_path_0_, 1024}};
DB* db;
ASSERT_OK(DB::Open(options, dbname_, &db));
ASSERT_EQ(1, cache_->Size());
ASSERT_TRUE(cache_->Contains(dbname_));
ASSERT_EQ(1, cache_->GetRefCount(dbname_));
std::vector<ColumnFamilyHandle*> cfs;
ASSERT_OK(db->CreateColumnFamilies(cf_options, {"cf1", "cf2"}, &cfs));
ASSERT_EQ(2, cache_->Size());
ASSERT_TRUE(cache_->Contains(dbname_));
ASSERT_EQ(1, cache_->GetRefCount(dbname_));
ASSERT_TRUE(cache_->Contains(cf_path_0_));
ASSERT_EQ(2, cache_->GetRefCount(cf_path_0_));
// Drop column family does not drop cache.
for (ColumnFamilyHandle* cf : cfs) {
ASSERT_OK(db->DropColumnFamily(cf));
ASSERT_EQ(2, cache_->Size());
ASSERT_TRUE(cache_->Contains(dbname_));
ASSERT_EQ(1, cache_->GetRefCount(dbname_));
ASSERT_TRUE(cache_->Contains(cf_path_0_));
ASSERT_EQ(2, cache_->GetRefCount(cf_path_0_));
}
// Delete one handle will not drop cache because another handle is still
// referencing cf_path_0_.
ASSERT_OK(db->DestroyColumnFamilyHandle(cfs[0]));
ASSERT_EQ(2, cache_->Size());
ASSERT_TRUE(cache_->Contains(dbname_));
ASSERT_EQ(1, cache_->GetRefCount(dbname_));
ASSERT_TRUE(cache_->Contains(cf_path_0_));
ASSERT_EQ(1, cache_->GetRefCount(cf_path_0_));
// Delete the last handle will drop cache.
ASSERT_OK(db->DestroyColumnFamilyHandle(cfs[1]));
ASSERT_EQ(1, cache_->Size());
ASSERT_TRUE(cache_->Contains(dbname_));
ASSERT_EQ(1, cache_->GetRefCount(dbname_));
delete db;
ASSERT_EQ(0, cache_->Size());
}
TEST_F(DBLogicalBlockSizeCacheTest, OpenWithColumnFamilies) {
// Tests that Open two column families with the same cf_path will cache the
// cf_path and have 2 references to the cached size,
// drop the column family handle won't drop the cache,
// drop and then delete the column family handle will drop the cache.
Options options;
options.create_if_missing = true;
options.env = env_.get();
ColumnFamilyOptions cf_options;
cf_options.cf_paths = {{cf_path_0_, 1024}};
for (int i = 0; i < 2; i++) {
DB* db;
ColumnFamilyHandle* cf1 = nullptr;
ColumnFamilyHandle* cf2 = nullptr;
ASSERT_OK(DB::Open(options, dbname_, &db));
ASSERT_OK(db->CreateColumnFamily(cf_options, "cf1", &cf1));
ASSERT_OK(db->CreateColumnFamily(cf_options, "cf2", &cf2));
ASSERT_OK(db->DestroyColumnFamilyHandle(cf1));
ASSERT_OK(db->DestroyColumnFamilyHandle(cf2));
delete db;
ASSERT_EQ(0, cache_->Size());
std::vector<ColumnFamilyHandle*> cfs;
if (!i) {
printf("Open\n");
ASSERT_OK(DB::Open(options, dbname_,
{{"cf1", cf_options},
{"cf2", cf_options},
{"default", ColumnFamilyOptions()}},
&cfs, &db));
} else {
printf("OpenForReadOnly\n");
ASSERT_OK(DB::OpenForReadOnly(options, dbname_,
{{"cf1", cf_options},
{"cf2", cf_options},
{"default", ColumnFamilyOptions()}},
&cfs, &db));
}
// Logical block sizes of dbname_ and cf_path_0_ are cached during Open.
ASSERT_EQ(2, cache_->Size());
ASSERT_TRUE(cache_->Contains(dbname_));
ASSERT_EQ(1, cache_->GetRefCount(dbname_));
ASSERT_TRUE(cache_->Contains(cf_path_0_));
ASSERT_EQ(2, cache_->GetRefCount(cf_path_0_));
// Drop handles won't drop the cache.
ASSERT_OK(db->DropColumnFamily(cfs[0]));
ASSERT_OK(db->DropColumnFamily(cfs[1]));
ASSERT_EQ(2, cache_->Size());
ASSERT_TRUE(cache_->Contains(dbname_));
ASSERT_EQ(1, cache_->GetRefCount(dbname_));
ASSERT_TRUE(cache_->Contains(cf_path_0_));
ASSERT_EQ(2, cache_->GetRefCount(cf_path_0_));
// Delete 1st handle won't drop the cache for cf_path_0_.
ASSERT_OK(db->DestroyColumnFamilyHandle(cfs[0]));
ASSERT_EQ(2, cache_->Size());
ASSERT_TRUE(cache_->Contains(dbname_));
ASSERT_EQ(1, cache_->GetRefCount(dbname_));
ASSERT_TRUE(cache_->Contains(cf_path_0_));
ASSERT_EQ(1, cache_->GetRefCount(cf_path_0_));
// Delete 2nd handle will drop the cache for cf_path_0_.
ASSERT_OK(db->DestroyColumnFamilyHandle(cfs[1]));
ASSERT_EQ(1, cache_->Size());
ASSERT_TRUE(cache_->Contains(dbname_));
ASSERT_EQ(1, cache_->GetRefCount(dbname_));
// Delete the default handle won't affect the cache because db still refers
// to the default CF.
ASSERT_OK(db->DestroyColumnFamilyHandle(cfs[2]));
ASSERT_EQ(1, cache_->Size());
ASSERT_TRUE(cache_->Contains(dbname_));
ASSERT_EQ(1, cache_->GetRefCount(dbname_));
delete db;
ASSERT_EQ(0, cache_->Size());
}
}
TEST_F(DBLogicalBlockSizeCacheTest, DestroyColumnFamilyHandle) {
// Tests that destroy column family without dropping won't drop the cache,
// because compaction and flush might still need to get logical block size
// when opening new files.
Options options;
options.create_if_missing = true;
options.env = env_.get();
ColumnFamilyOptions cf_options;
cf_options.cf_paths = {{cf_path_0_, 1024}};
DB* db;
ASSERT_OK(DB::Open(options, dbname_, &db));
ASSERT_EQ(1, cache_->Size());
ASSERT_TRUE(cache_->Contains(dbname_));
ASSERT_EQ(1, cache_->GetRefCount(dbname_));
ColumnFamilyHandle* cf = nullptr;
ASSERT_OK(db->CreateColumnFamily(cf_options, "cf", &cf));
ASSERT_EQ(2, cache_->Size());
ASSERT_TRUE(cache_->Contains(dbname_));
ASSERT_EQ(1, cache_->GetRefCount(dbname_));
ASSERT_TRUE(cache_->Contains(cf_path_0_));
ASSERT_EQ(1, cache_->GetRefCount(cf_path_0_));
// Delete handle won't drop cache.
ASSERT_OK(db->DestroyColumnFamilyHandle(cf));
ASSERT_EQ(2, cache_->Size());
ASSERT_TRUE(cache_->Contains(dbname_));
ASSERT_EQ(1, cache_->GetRefCount(dbname_));
ASSERT_TRUE(cache_->Contains(cf_path_0_));
ASSERT_EQ(1, cache_->GetRefCount(cf_path_0_));
delete db;
ASSERT_EQ(0, cache_->Size());
// Open with column families.
std::vector<ColumnFamilyHandle*> cfs;
for (int i = 0; i < 2; i++) {
if (!i) {
printf("Open\n");
ASSERT_OK(DB::Open(
options, dbname_,
{{"cf", cf_options}, {"default", ColumnFamilyOptions()}}, &cfs, &db));
} else {
printf("OpenForReadOnly\n");
ASSERT_OK(DB::OpenForReadOnly(
options, dbname_,
{{"cf", cf_options}, {"default", ColumnFamilyOptions()}}, &cfs, &db));
}
// cf_path_0_ and dbname_ are cached.
ASSERT_EQ(2, cache_->Size());
ASSERT_TRUE(cache_->Contains(dbname_));
ASSERT_EQ(1, cache_->GetRefCount(dbname_));
ASSERT_TRUE(cache_->Contains(cf_path_0_));
ASSERT_EQ(1, cache_->GetRefCount(cf_path_0_));
// Deleting handle won't drop cache.
ASSERT_OK(db->DestroyColumnFamilyHandle(cfs[0]));
ASSERT_OK(db->DestroyColumnFamilyHandle(cfs[1]));
ASSERT_EQ(2, cache_->Size());
ASSERT_TRUE(cache_->Contains(dbname_));
ASSERT_EQ(1, cache_->GetRefCount(dbname_));
ASSERT_TRUE(cache_->Contains(cf_path_0_));
ASSERT_EQ(1, cache_->GetRefCount(cf_path_0_));
delete db;
ASSERT_EQ(0, cache_->Size());
}
}
TEST_F(DBLogicalBlockSizeCacheTest, MultiDBWithDifferentPaths) {
// Tests the cache behavior when there are multiple DBs sharing the same env
// with different db_paths and cf_paths.
Options options;
options.create_if_missing = true;
options.env = env_.get();
DB* db0;
ASSERT_OK(DB::Open(options, data_path_0_, &db0));
ASSERT_EQ(1, cache_->Size());
ASSERT_TRUE(cache_->Contains(data_path_0_));
ColumnFamilyOptions cf_options0;
cf_options0.cf_paths = {{cf_path_0_, 1024}};
ColumnFamilyHandle* cf0;
db0->CreateColumnFamily(cf_options0, "cf", &cf0);
ASSERT_EQ(2, cache_->Size());
ASSERT_TRUE(cache_->Contains(data_path_0_));
ASSERT_EQ(1, cache_->GetRefCount(data_path_0_));
ASSERT_TRUE(cache_->Contains(cf_path_0_));
ASSERT_EQ(1, cache_->GetRefCount(cf_path_0_));
DB* db1;
ASSERT_OK(DB::Open(options, data_path_1_, &db1));
ASSERT_EQ(3, cache_->Size());
ASSERT_TRUE(cache_->Contains(data_path_0_));
ASSERT_EQ(1, cache_->GetRefCount(data_path_0_));
ASSERT_TRUE(cache_->Contains(cf_path_0_));
ASSERT_EQ(1, cache_->GetRefCount(cf_path_0_));
ASSERT_TRUE(cache_->Contains(data_path_1_));
ASSERT_EQ(1, cache_->GetRefCount(data_path_1_));
ColumnFamilyOptions cf_options1;
cf_options1.cf_paths = {{cf_path_1_, 1024}};
ColumnFamilyHandle* cf1;
db1->CreateColumnFamily(cf_options1, "cf", &cf1);
ASSERT_EQ(4, cache_->Size());
ASSERT_TRUE(cache_->Contains(data_path_0_));
ASSERT_EQ(1, cache_->GetRefCount(data_path_0_));
ASSERT_TRUE(cache_->Contains(cf_path_0_));
ASSERT_EQ(1, cache_->GetRefCount(cf_path_0_));
ASSERT_TRUE(cache_->Contains(data_path_1_));
ASSERT_EQ(1, cache_->GetRefCount(data_path_1_));
ASSERT_TRUE(cache_->Contains(cf_path_1_));
ASSERT_EQ(1, cache_->GetRefCount(cf_path_1_));
db0->DestroyColumnFamilyHandle(cf0);
delete db0;
ASSERT_EQ(2, cache_->Size());
ASSERT_TRUE(cache_->Contains(data_path_1_));
ASSERT_EQ(1, cache_->GetRefCount(data_path_1_));
ASSERT_TRUE(cache_->Contains(cf_path_1_));
ASSERT_EQ(1, cache_->GetRefCount(cf_path_1_));
db1->DestroyColumnFamilyHandle(cf1);
delete db1;
ASSERT_EQ(0, cache_->Size());
}
TEST_F(DBLogicalBlockSizeCacheTest, MultiDBWithSamePaths) {
// Tests the cache behavior when there are multiple DBs sharing the same env
// with the same db_paths and cf_paths.
Options options;
options.create_if_missing = true;
options.env = env_.get();
options.db_paths = {{data_path_0_, 1024}};
ColumnFamilyOptions cf_options;
cf_options.cf_paths = {{cf_path_0_, 1024}};
DB* db0;
ASSERT_OK(DB::Open(options, dbname_ + "/db0", &db0));
ASSERT_EQ(1, cache_->Size());
ASSERT_TRUE(cache_->Contains(data_path_0_));
ASSERT_EQ(1, cache_->GetRefCount(data_path_0_));
ColumnFamilyHandle* cf0;
db0->CreateColumnFamily(cf_options, "cf", &cf0);
ASSERT_EQ(2, cache_->Size());
ASSERT_TRUE(cache_->Contains(data_path_0_));
ASSERT_EQ(1, cache_->GetRefCount(data_path_0_));
ASSERT_TRUE(cache_->Contains(cf_path_0_));
ASSERT_EQ(1, cache_->GetRefCount(cf_path_0_));
DB* db1;
ASSERT_OK(DB::Open(options, dbname_ + "/db1", &db1));
ASSERT_EQ(2, cache_->Size());
ASSERT_TRUE(cache_->Contains(data_path_0_));
ASSERT_EQ(2, cache_->GetRefCount(data_path_0_));
ASSERT_TRUE(cache_->Contains(cf_path_0_));
ASSERT_EQ(1, cache_->GetRefCount(cf_path_0_));
ColumnFamilyHandle* cf1;
db1->CreateColumnFamily(cf_options, "cf", &cf1);
ASSERT_EQ(2, cache_->Size());
ASSERT_TRUE(cache_->Contains(data_path_0_));
ASSERT_EQ(2, cache_->GetRefCount(data_path_0_));
ASSERT_TRUE(cache_->Contains(cf_path_0_));
ASSERT_EQ(2, cache_->GetRefCount(cf_path_0_));
db0->DestroyColumnFamilyHandle(cf0);
delete db0;
ASSERT_EQ(2, cache_->Size());
ASSERT_TRUE(cache_->Contains(data_path_0_));
ASSERT_EQ(1, cache_->GetRefCount(data_path_0_));
ASSERT_TRUE(cache_->Contains(cf_path_0_));
ASSERT_EQ(1, cache_->GetRefCount(cf_path_0_));
db1->DestroyColumnFamilyHandle(cf1);
delete db1;
ASSERT_EQ(0, cache_->Size());
}
} // namespace ROCKSDB_NAMESPACE
#endif // OS_LINUX
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

@ -405,7 +405,7 @@ class VersionBuilder::Rep {
size_t max_load = port::kMaxSizet;
if (!always_load) {
// If it is initial loading and not set to always laoding all the
// If it is initial loading and not set to always loading all the
// files, we only load up to kInitialLoadLimit files, to limit the
// time reopening the DB.
const size_t kInitialLoadLimit = 16;

@ -300,6 +300,13 @@ class CompositeEnvWrapper : public Env {
FileSystem* fs_env_target() const { return fs_env_target_; }
Status RegisterDbPaths(const std::vector<std::string>& paths) override {
return fs_env_target_->RegisterDbPaths(paths);
}
Status UnregisterDbPaths(const std::vector<std::string>& paths) override {
return fs_env_target_->UnregisterDbPaths(paths);
}
// The following text is boilerplate that forwards all methods to target()
Status NewSequentialFile(const std::string& f,
std::unique_ptr<SequentialFile>* r,

26
env/env_chroot.cc vendored

@ -38,6 +38,32 @@ class ChrootEnv : public EnvWrapper {
#endif
}
Status RegisterDbPaths(const std::vector<std::string>& paths) override {
std::vector<std::string> encoded_paths;
encoded_paths.reserve(paths.size());
for (auto& path : paths) {
auto status_and_enc_path = EncodePathWithNewBasename(path);
if (!status_and_enc_path.first.ok()) {
return status_and_enc_path.first;
}
encoded_paths.emplace_back(status_and_enc_path.second);
}
return EnvWrapper::Env::RegisterDbPaths(encoded_paths);
}
Status UnregisterDbPaths(const std::vector<std::string>& paths) override {
std::vector<std::string> encoded_paths;
encoded_paths.reserve(paths.size());
for (auto& path : paths) {
auto status_and_enc_path = EncodePathWithNewBasename(path);
if (!status_and_enc_path.first.ok()) {
return status_and_enc_path.first;
}
encoded_paths.emplace_back(status_and_enc_path.second);
}
return EnvWrapper::Env::UnregisterDbPaths(encoded_paths);
}
Status NewSequentialFile(const std::string& fname,
std::unique_ptr<SequentialFile>* result,
const EnvOptions& options) override {

52
env/fs_posix.cc vendored

@ -183,7 +183,8 @@ class PosixFileSystem : public FileSystem {
errno);
}
}
result->reset(new PosixSequentialFile(fname, file, fd, options));
result->reset(new PosixSequentialFile(
fname, file, fd, GetLogicalBlockSize(fname, fd), options));
return IOStatus::OK();
}
@ -242,12 +243,13 @@ class PosixFileSystem : public FileSystem {
}
#endif
}
result->reset(new PosixRandomAccessFile(fname, fd, options
result->reset(new PosixRandomAccessFile(
fname, fd, GetLogicalBlockSize(fname, fd), options
#if defined(ROCKSDB_IOURING_PRESENT)
,
thread_local_io_urings_.get()
,
thread_local_io_urings_.get()
#endif
));
));
}
return s;
}
@ -328,12 +330,14 @@ class PosixFileSystem : public FileSystem {
}
}
#endif
result->reset(new PosixWritableFile(fname, fd, options));
result->reset(new PosixWritableFile(
fname, fd, GetLogicalBlockSize(fname, fd), options));
} else {
// disable mmap writes
EnvOptions no_mmap_writes_options = options;
no_mmap_writes_options.use_mmap_writes = false;
result->reset(new PosixWritableFile(fname, fd, no_mmap_writes_options));
result->reset(new PosixWritableFile(
fname, fd, GetLogicalBlockSize(fname, fd), no_mmap_writes_options));
}
return s;
}
@ -428,12 +432,14 @@ class PosixFileSystem : public FileSystem {
}
}
#endif
result->reset(new PosixWritableFile(fname, fd, options));
result->reset(new PosixWritableFile(
fname, fd, GetLogicalBlockSize(fname, fd), options));
} else {
// disable mmap writes
FileOptions no_mmap_writes_options = options;
no_mmap_writes_options.use_mmap_writes = false;
result->reset(new PosixWritableFile(fname, fd, no_mmap_writes_options));
result->reset(new PosixWritableFile(
fname, fd, GetLogicalBlockSize(fname, fd), no_mmap_writes_options));
}
return s;
}
@ -853,7 +859,15 @@ class PosixFileSystem : public FileSystem {
optimized.fallocate_with_keep_size = true;
return optimized;
}
#ifdef OS_LINUX
Status RegisterDbPaths(const std::vector<std::string>& paths) override {
return logical_block_size_cache_.RefAndCacheLogicalBlockSize(paths);
}
Status UnregisterDbPaths(const std::vector<std::string>& paths) override {
logical_block_size_cache_.UnrefAndTryRemoveCachedLogicalBlockSize(paths);
return Status::OK();
}
#endif
private:
bool checkedDiskForMmap_;
bool forceMmapOff_; // do we override Env options?
@ -899,8 +913,26 @@ class PosixFileSystem : public FileSystem {
// If true, allow non owner read access for db files. Otherwise, non-owner
// has no access to db files.
bool allow_non_owner_access_;
#ifdef OS_LINUX
static LogicalBlockSizeCache logical_block_size_cache_;
#endif
static size_t GetLogicalBlockSize(const std::string& fname, int fd);
};
#ifdef OS_LINUX
LogicalBlockSizeCache PosixFileSystem::logical_block_size_cache_;
#endif
size_t PosixFileSystem::GetLogicalBlockSize(const std::string& fname, int fd) {
#ifdef OS_LINUX
return logical_block_size_cache_.GetLogicalBlockSize(fname, fd);
#else
(void) fname;
return PosixHelper::GetLogicalBlockSizeOfFd(fd);
#endif
}
PosixFileSystem::PosixFileSystem()
: checkedDiskForMmap_(false),
forceMmapOff_(false),

275
env/io_posix.cc vendored

@ -45,6 +45,35 @@
namespace ROCKSDB_NAMESPACE {
std::string IOErrorMsg(const std::string& context,
const std::string& file_name) {
if (file_name.empty()) {
return context;
}
return context + ": " + file_name;
}
// file_name can be left empty if it is not unkown.
IOStatus IOError(const std::string& context, const std::string& file_name,
int err_number) {
switch (err_number) {
case ENOSPC: {
IOStatus s = IOStatus::NoSpace(IOErrorMsg(context, file_name),
strerror(err_number));
s.SetRetryable(true);
return s;
}
case ESTALE:
return IOStatus::IOError(IOStatus::kStaleFile);
case ENOENT:
return IOStatus::PathNotFound(IOErrorMsg(context, file_name),
strerror(err_number));
default:
return IOStatus::IOError(IOErrorMsg(context, file_name),
strerror(err_number));
}
}
// A wrapper for fadvise, if the platform doesn't support fadvise,
// it will simply return 0.
int Fadvise(int fd, off_t offset, size_t len, int advice) {
@ -112,75 +141,6 @@ bool PosixPositionedWrite(int fd, const char* buf, size_t nbyte, off_t offset) {
return true;
}
size_t GetLogicalBufferSize(int __attribute__((__unused__)) fd) {
#ifdef OS_LINUX
struct stat buf;
int result = fstat(fd, &buf);
if (result == -1) {
return kDefaultPageSize;
}
if (major(buf.st_dev) == 0) {
// Unnamed devices (e.g. non-device mounts), reserved as null device number.
// These don't have an entry in /sys/dev/block/. Return a sensible default.
return kDefaultPageSize;
}
// Reading queue/logical_block_size does not require special permissions.
const int kBufferSize = 100;
char path[kBufferSize];
char real_path[PATH_MAX + 1];
snprintf(path, kBufferSize, "/sys/dev/block/%u:%u", major(buf.st_dev),
minor(buf.st_dev));
if (realpath(path, real_path) == nullptr) {
return kDefaultPageSize;
}
std::string device_dir(real_path);
if (!device_dir.empty() && device_dir.back() == '/') {
device_dir.pop_back();
}
// NOTE: sda3 and nvme0n1p1 do not have a `queue/` subdir, only the parent sda
// and nvme0n1 have it.
// $ ls -al '/sys/dev/block/8:3'
// lrwxrwxrwx. 1 root root 0 Jun 26 01:38 /sys/dev/block/8:3 ->
// ../../block/sda/sda3
// $ ls -al '/sys/dev/block/259:4'
// lrwxrwxrwx 1 root root 0 Jan 31 16:04 /sys/dev/block/259:4 ->
// ../../devices/pci0000:17/0000:17:00.0/0000:18:00.0/nvme/nvme0/nvme0n1/nvme0n1p1
size_t parent_end = device_dir.rfind('/', device_dir.length() - 1);
if (parent_end == std::string::npos) {
return kDefaultPageSize;
}
size_t parent_begin = device_dir.rfind('/', parent_end - 1);
if (parent_begin == std::string::npos) {
return kDefaultPageSize;
}
std::string parent =
device_dir.substr(parent_begin + 1, parent_end - parent_begin - 1);
std::string child = device_dir.substr(parent_end + 1, std::string::npos);
if (parent != "block" &&
(child.compare(0, 4, "nvme") || child.find('p') != std::string::npos)) {
device_dir = device_dir.substr(0, parent_end);
}
std::string fname = device_dir + "/queue/logical_block_size";
FILE* fp;
size_t size = 0;
fp = fopen(fname.c_str(), "r");
if (fp != nullptr) {
char* line = nullptr;
size_t len = 0;
if (getline(&line, &len, fp) != -1) {
sscanf(line, "%zu", &size);
}
free(line);
fclose(fp);
}
if (size != 0 && (size & (size - 1)) == 0) {
return size;
}
#endif
return kDefaultPageSize;
}
#ifdef ROCKSDB_RANGESYNC_PRESENT
#if !defined(ZFS_SUPER_MAGIC)
@ -247,12 +207,13 @@ bool IsSectorAligned(const void* ptr, size_t sector_size) {
* PosixSequentialFile
*/
PosixSequentialFile::PosixSequentialFile(const std::string& fname, FILE* file,
int fd, const EnvOptions& options)
int fd, size_t logical_block_size,
const EnvOptions& options)
: filename_(fname),
file_(file),
fd_(fd),
use_direct_io_(options.use_direct_reads),
logical_sector_size_(GetLogicalBufferSize(fd_)) {
logical_sector_size_(logical_block_size) {
assert(!options.use_direct_reads || !options.use_mmap_reads);
}
@ -409,13 +370,178 @@ size_t PosixHelper::GetUniqueIdFromFile(int fd, char* id, size_t max_size) {
return static_cast<size_t>(rid - id);
}
#endif
#ifdef OS_LINUX
std::string RemoveTrailingSlash(const std::string& path) {
std::string p = path;
if (p.size() > 1 && p.back() == '/') {
p.pop_back();
}
return p;
}
Status LogicalBlockSizeCache::RefAndCacheLogicalBlockSize(
const std::vector<std::string>& directories) {
std::vector<std::string> dirs;
dirs.reserve(directories.size());
for (auto& d : directories) {
dirs.emplace_back(RemoveTrailingSlash(d));
}
std::map<std::string, size_t> dir_sizes;
{
ReadLock lock(&cache_mutex_);
for (const auto& dir : dirs) {
if (cache_.find(dir) == cache_.end()) {
dir_sizes.emplace(dir, 0);
}
}
}
Status s;
for (auto& dir_size : dir_sizes) {
s = get_logical_block_size_of_directory_(dir_size.first, &dir_size.second);
if (!s.ok()) {
return s;
}
}
WriteLock lock(&cache_mutex_);
for (const auto& dir : dirs) {
auto& v = cache_[dir];
v.ref++;
auto dir_size = dir_sizes.find(dir);
if (dir_size != dir_sizes.end()) {
v.size = dir_size->second;
}
}
return Status::OK();
}
void LogicalBlockSizeCache::UnrefAndTryRemoveCachedLogicalBlockSize(
const std::vector<std::string>& directories) {
std::vector<std::string> dirs;
dirs.reserve(directories.size());
for (auto& dir : directories) {
dirs.emplace_back(RemoveTrailingSlash(dir));
}
WriteLock lock(&cache_mutex_);
for (const auto& dir : dirs) {
auto it = cache_.find(dir);
if (it != cache_.end() && !(--(it->second.ref))) {
cache_.erase(it);
}
}
}
size_t LogicalBlockSizeCache::GetLogicalBlockSize(const std::string& fname,
int fd) {
std::string dir = fname.substr(0, fname.find_last_of("/"));
if (dir.empty()) {
dir = "/";
}
{
ReadLock lock(&cache_mutex_);
auto it = cache_.find(dir);
if (it != cache_.end()) {
return it->second.size;
}
}
return get_logical_block_size_of_fd_(fd);
}
#endif
Status PosixHelper::GetLogicalBlockSizeOfDirectory(const std::string& directory,
size_t* size) {
int fd = open(directory.c_str(), O_DIRECTORY | O_RDONLY);
if (fd == -1) {
close(fd);
return Status::IOError("Cannot open directory " + directory);
}
*size = PosixHelper::GetLogicalBlockSizeOfFd(fd);
close(fd);
return Status::OK();
}
size_t PosixHelper::GetLogicalBlockSizeOfFd(int fd) {
#ifdef OS_LINUX
struct stat buf;
int result = fstat(fd, &buf);
if (result == -1) {
return kDefaultPageSize;
}
if (major(buf.st_dev) == 0) {
// Unnamed devices (e.g. non-device mounts), reserved as null device number.
// These don't have an entry in /sys/dev/block/. Return a sensible default.
return kDefaultPageSize;
}
// Reading queue/logical_block_size does not require special permissions.
const int kBufferSize = 100;
char path[kBufferSize];
char real_path[PATH_MAX + 1];
snprintf(path, kBufferSize, "/sys/dev/block/%u:%u", major(buf.st_dev),
minor(buf.st_dev));
if (realpath(path, real_path) == nullptr) {
return kDefaultPageSize;
}
std::string device_dir(real_path);
if (!device_dir.empty() && device_dir.back() == '/') {
device_dir.pop_back();
}
// NOTE: sda3 and nvme0n1p1 do not have a `queue/` subdir, only the parent sda
// and nvme0n1 have it.
// $ ls -al '/sys/dev/block/8:3'
// lrwxrwxrwx. 1 root root 0 Jun 26 01:38 /sys/dev/block/8:3 ->
// ../../block/sda/sda3
// $ ls -al '/sys/dev/block/259:4'
// lrwxrwxrwx 1 root root 0 Jan 31 16:04 /sys/dev/block/259:4 ->
// ../../devices/pci0000:17/0000:17:00.0/0000:18:00.0/nvme/nvme0/nvme0n1/nvme0n1p1
size_t parent_end = device_dir.rfind('/', device_dir.length() - 1);
if (parent_end == std::string::npos) {
return kDefaultPageSize;
}
size_t parent_begin = device_dir.rfind('/', parent_end - 1);
if (parent_begin == std::string::npos) {
return kDefaultPageSize;
}
std::string parent =
device_dir.substr(parent_begin + 1, parent_end - parent_begin - 1);
std::string child = device_dir.substr(parent_end + 1, std::string::npos);
if (parent != "block" &&
(child.compare(0, 4, "nvme") || child.find('p') != std::string::npos)) {
device_dir = device_dir.substr(0, parent_end);
}
std::string fname = device_dir + "/queue/logical_block_size";
FILE* fp;
size_t size = 0;
fp = fopen(fname.c_str(), "r");
if (fp != nullptr) {
char* line = nullptr;
size_t len = 0;
if (getline(&line, &len, fp) != -1) {
sscanf(line, "%zu", &size);
}
free(line);
fclose(fp);
}
if (size != 0 && (size & (size - 1)) == 0) {
return size;
}
#endif
(void)fd;
return kDefaultPageSize;
}
/*
* PosixRandomAccessFile
*
* pread() based random-access
*/
PosixRandomAccessFile::PosixRandomAccessFile(
const std::string& fname, int fd, const EnvOptions& options
const std::string& fname, int fd, size_t logical_block_size,
const EnvOptions& options
#if defined(ROCKSDB_IOURING_PRESENT)
,
ThreadLocalPtr* thread_local_io_urings
@ -424,7 +550,7 @@ PosixRandomAccessFile::PosixRandomAccessFile(
: filename_(fname),
fd_(fd),
use_direct_io_(options.use_direct_reads),
logical_sector_size_(GetLogicalBufferSize(fd_))
logical_sector_size_(logical_block_size)
#if defined(ROCKSDB_IOURING_PRESENT)
,
thread_local_io_urings_(thread_local_io_urings)
@ -988,13 +1114,14 @@ IOStatus PosixMmapFile::Allocate(uint64_t offset, uint64_t len,
* Use posix write to write data to a file.
*/
PosixWritableFile::PosixWritableFile(const std::string& fname, int fd,
size_t logical_block_size,
const EnvOptions& options)
: FSWritableFile(options),
filename_(fname),
use_direct_io_(options.use_direct_writes),
fd_(fd),
filesize_(0),
logical_sector_size_(GetLogicalBufferSize(fd_)) {
logical_sector_size_(logical_block_size) {
#ifdef ROCKSDB_FALLOCATE_PRESENT
allow_fallocate_ = options.allow_fallocate;
fallocate_with_keep_size_ = options.fallocate_with_keep_size;

116
env/io_posix.h vendored

@ -14,11 +14,15 @@
#endif
#include <unistd.h>
#include <atomic>
#include <functional>
#include <map>
#include <string>
#include "port/port.h"
#include "rocksdb/env.h"
#include "util/thread_local.h"
#include "rocksdb/file_system.h"
#include "rocksdb/io_status.h"
#include "util/mutexlock.h"
#include "util/thread_local.h"
// For non linux platform, the following macros are used only as place
// holder.
@ -31,40 +35,93 @@
#endif
namespace ROCKSDB_NAMESPACE {
static std::string IOErrorMsg(const std::string& context,
const std::string& file_name) {
if (file_name.empty()) {
return context;
}
return context + ": " + file_name;
}
std::string IOErrorMsg(const std::string& context,
const std::string& file_name);
// file_name can be left empty if it is not unkown.
static IOStatus IOError(const std::string& context,
const std::string& file_name, int err_number) {
switch (err_number) {
case ENOSPC: {
IOStatus s = IOStatus::NoSpace(IOErrorMsg(context, file_name),
strerror(err_number));
s.SetRetryable(true);
return s;
}
case ESTALE:
return IOStatus::IOError(IOStatus::kStaleFile);
case ENOENT:
return IOStatus::PathNotFound(IOErrorMsg(context, file_name),
strerror(err_number));
default:
return IOStatus::IOError(IOErrorMsg(context, file_name),
strerror(err_number));
}
}
IOStatus IOError(const std::string& context, const std::string& file_name,
int err_number);
class PosixHelper {
public:
static size_t GetUniqueIdFromFile(int fd, char* id, size_t max_size);
static size_t GetLogicalBlockSizeOfFd(int fd);
static Status GetLogicalBlockSizeOfDirectory(const std::string& directory,
size_t* size);
};
#ifdef OS_LINUX
// Files under a specific directory have the same logical block size.
// This class caches the logical block size for the specified directories to
// save the CPU cost of computing the size.
// Safe for concurrent access from multiple threads without any external
// synchronization.
class LogicalBlockSizeCache {
public:
LogicalBlockSizeCache(
std::function<size_t(int)> get_logical_block_size_of_fd =
PosixHelper::GetLogicalBlockSizeOfFd,
std::function<Status(const std::string&, size_t*)>
get_logical_block_size_of_directory =
PosixHelper::GetLogicalBlockSizeOfDirectory)
: get_logical_block_size_of_fd_(get_logical_block_size_of_fd),
get_logical_block_size_of_directory_(
get_logical_block_size_of_directory) {}
// Takes the following actions:
// 1. Increases reference count of the directories;
// 2. If the directory's logical block size is not cached,
// compute the buffer size and cache the result.
Status RefAndCacheLogicalBlockSize(
const std::vector<std::string>& directories);
// Takes the following actions:
// 1. Decreases reference count of the directories;
// 2. If the reference count of a directory reaches 0, remove the directory
// from the cache.
void UnrefAndTryRemoveCachedLogicalBlockSize(
const std::vector<std::string>& directories);
// Returns the logical block size for the file.
//
// If the file is under a cached directory, return the cached size.
// Otherwise, the size is computed.
size_t GetLogicalBlockSize(const std::string& fname, int fd);
int GetRefCount(const std::string& dir) {
ReadLock lock(&cache_mutex_);
auto it = cache_.find(dir);
if (it == cache_.end()) {
return 0;
}
return it->second.ref;
}
size_t Size() const { return cache_.size(); }
bool Contains(const std::string& dir) {
ReadLock lock(&cache_mutex_);
return cache_.find(dir) != cache_.end();
}
private:
struct CacheValue {
CacheValue() : size(0), ref(0) {}
// Logical block size of the directory.
size_t size;
// Reference count of the directory.
int ref;
};
std::function<size_t(int)> get_logical_block_size_of_fd_;
std::function<Status(const std::string&, size_t*)>
get_logical_block_size_of_directory_;
std::map<std::string, CacheValue> cache_;
port::RWMutex cache_mutex_;
};
#endif
class PosixSequentialFile : public FSSequentialFile {
private:
std::string filename_;
@ -75,6 +132,7 @@ class PosixSequentialFile : public FSSequentialFile {
public:
PosixSequentialFile(const std::string& fname, FILE* file, int fd,
size_t logical_block_size,
const EnvOptions& options);
virtual ~PosixSequentialFile();
@ -123,6 +181,7 @@ class PosixRandomAccessFile : public FSRandomAccessFile {
public:
PosixRandomAccessFile(const std::string& fname, int fd,
size_t logical_block_size,
const EnvOptions& options
#if defined(ROCKSDB_IOURING_PRESENT)
,
@ -172,6 +231,7 @@ class PosixWritableFile : public FSWritableFile {
public:
explicit PosixWritableFile(const std::string& fname, int fd,
size_t logical_block_size,
const EnvOptions& options);
virtual ~PosixWritableFile();

140
env/io_posix_test.cc vendored

@ -0,0 +1,140 @@
// Copyright (c) 2020-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "test_util/testharness.h"
#ifdef ROCKSDB_LIB_IO_POSIX
#include "env/io_posix.h"
namespace ROCKSDB_NAMESPACE {
#ifdef OS_LINUX
class LogicalBlockSizeCacheTest : public testing::Test {};
// Tests the caching behavior.
TEST_F(LogicalBlockSizeCacheTest, Cache) {
int ncall = 0;
auto get_fd_block_size = [&](int fd) {
ncall++;
return fd;
};
std::map<std::string, int> dir_fds{
{"/", 0},
{"/db", 1},
{"/db1", 2},
{"/db2", 3},
};
auto get_dir_block_size = [&](const std::string& dir, size_t* size) {
ncall++;
*size = dir_fds[dir];
return Status::OK();
};
LogicalBlockSizeCache cache(get_fd_block_size, get_dir_block_size);
ASSERT_EQ(0, ncall);
ASSERT_EQ(0, cache.Size());
ASSERT_EQ(6, cache.GetLogicalBlockSize("/sst", 6));
ASSERT_EQ(1, ncall);
ASSERT_EQ(7, cache.GetLogicalBlockSize("/db/sst1", 7));
ASSERT_EQ(2, ncall);
ASSERT_EQ(8, cache.GetLogicalBlockSize("/db/sst2", 8));
ASSERT_EQ(3, ncall);
ASSERT_OK(cache.RefAndCacheLogicalBlockSize({"/", "/db1/", "/db2"}));
ASSERT_EQ(3, cache.Size());
ASSERT_TRUE(cache.Contains("/"));
ASSERT_TRUE(cache.Contains("/db1"));
ASSERT_TRUE(cache.Contains("/db2"));
ASSERT_EQ(6, ncall);
// Block size for / is cached.
ASSERT_EQ(0, cache.GetLogicalBlockSize("/sst", 6));
ASSERT_EQ(6, ncall);
// No cached size for /db.
ASSERT_EQ(7, cache.GetLogicalBlockSize("/db/sst1", 7));
ASSERT_EQ(7, ncall);
ASSERT_EQ(8, cache.GetLogicalBlockSize("/db/sst2", 8));
ASSERT_EQ(8, ncall);
// Block size for /db1 is cached.
ASSERT_EQ(2, cache.GetLogicalBlockSize("/db1/sst1", 4));
ASSERT_EQ(8, ncall);
ASSERT_EQ(2, cache.GetLogicalBlockSize("/db1/sst2", 5));
ASSERT_EQ(8, ncall);
// Block size for /db2 is cached.
ASSERT_EQ(3, cache.GetLogicalBlockSize("/db2/sst1", 6));
ASSERT_EQ(8, ncall);
ASSERT_EQ(3, cache.GetLogicalBlockSize("/db2/sst2", 7));
ASSERT_EQ(8, ncall);
cache.RefAndCacheLogicalBlockSize({"/db"});
ASSERT_EQ(4, cache.Size());
ASSERT_TRUE(cache.Contains("/"));
ASSERT_TRUE(cache.Contains("/db1"));
ASSERT_TRUE(cache.Contains("/db2"));
ASSERT_TRUE(cache.Contains("/db"));
ASSERT_EQ(9, ncall);
// Block size for /db is cached.
ASSERT_EQ(1, cache.GetLogicalBlockSize("/db/sst1", 7));
ASSERT_EQ(9, ncall);
ASSERT_EQ(1, cache.GetLogicalBlockSize("/db/sst2", 8));
ASSERT_EQ(9, ncall);
}
// Tests the reference counting behavior.
TEST_F(LogicalBlockSizeCacheTest, Ref) {
int ncall = 0;
auto get_fd_block_size = [&](int fd) {
ncall++;
return fd;
};
std::map<std::string, int> dir_fds{
{"/db", 0},
};
auto get_dir_block_size = [&](const std::string& dir, size_t* size) {
ncall++;
*size = dir_fds[dir];
return Status::OK();
};
LogicalBlockSizeCache cache(get_fd_block_size, get_dir_block_size);
ASSERT_EQ(0, ncall);
ASSERT_EQ(1, cache.GetLogicalBlockSize("/db/sst0", 1));
ASSERT_EQ(1, ncall);
cache.RefAndCacheLogicalBlockSize({"/db"});
ASSERT_EQ(2, ncall);
ASSERT_EQ(1, cache.GetRefCount("/db"));
// Block size for /db is cached. Ref count = 1.
ASSERT_EQ(0, cache.GetLogicalBlockSize("/db/sst1", 1));
ASSERT_EQ(2, ncall);
// Ref count = 2, but won't recompute the cached buffer size.
cache.RefAndCacheLogicalBlockSize({"/db"});
ASSERT_EQ(2, cache.GetRefCount("/db"));
ASSERT_EQ(2, ncall);
// Ref count = 1.
cache.UnrefAndTryRemoveCachedLogicalBlockSize({"/db"});
ASSERT_EQ(1, cache.GetRefCount("/db"));
// Block size for /db is still cached.
ASSERT_EQ(0, cache.GetLogicalBlockSize("/db/sst2", 1));
ASSERT_EQ(2, ncall);
// Ref count = 0 and cached buffer size for /db is removed.
cache.UnrefAndTryRemoveCachedLogicalBlockSize({"/db"});
ASSERT_EQ(0, cache.Size());
ASSERT_EQ(1, cache.GetLogicalBlockSize("/db/sst0", 1));
ASSERT_EQ(3, ncall);
}
#endif
} // namespace ROCKSDB_NAMESPACE
#endif
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

91
env/mock_env.h vendored

@ -24,75 +24,80 @@ class MockEnv : public EnvWrapper {
public:
explicit MockEnv(Env* base_env);
virtual ~MockEnv();
~MockEnv() override;
// Partial implementation of the Env interface.
virtual Status NewSequentialFile(const std::string& fname,
std::unique_ptr<SequentialFile>* result,
const EnvOptions& soptions) override;
Status RegisterDbPaths(const std::vector<std::string>& /*paths*/) override {
return Status::OK();
}
virtual Status NewRandomAccessFile(const std::string& fname,
std::unique_ptr<RandomAccessFile>* result,
const EnvOptions& soptions) override;
Status UnregisterDbPaths(const std::vector<std::string>& /*paths*/) override {
return Status::OK();
}
virtual Status NewRandomRWFile(const std::string& fname,
std::unique_ptr<RandomRWFile>* result,
const EnvOptions& options) override;
Status NewSequentialFile(const std::string& fname,
std::unique_ptr<SequentialFile>* result,
const EnvOptions& soptions) override;
virtual Status ReuseWritableFile(const std::string& fname,
const std::string& old_fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& options) override;
Status NewRandomAccessFile(const std::string& fname,
std::unique_ptr<RandomAccessFile>* result,
const EnvOptions& soptions) override;
virtual Status NewWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& env_options) override;
Status NewRandomRWFile(const std::string& fname,
std::unique_ptr<RandomRWFile>* result,
const EnvOptions& options) override;
virtual Status NewDirectory(const std::string& name,
std::unique_ptr<Directory>* result) override;
Status ReuseWritableFile(const std::string& fname,
const std::string& old_fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& options) override;
virtual Status FileExists(const std::string& fname) override;
Status NewWritableFile(const std::string& fname,
std::unique_ptr<WritableFile>* result,
const EnvOptions& env_options) override;
virtual Status GetChildren(const std::string& dir,
std::vector<std::string>* result) override;
Status NewDirectory(const std::string& name,
std::unique_ptr<Directory>* result) override;
Status FileExists(const std::string& fname) override;
Status GetChildren(const std::string& dir,
std::vector<std::string>* result) override;
void DeleteFileInternal(const std::string& fname);
virtual Status DeleteFile(const std::string& fname) override;
Status DeleteFile(const std::string& fname) override;
virtual Status Truncate(const std::string& fname, size_t size) override;
Status Truncate(const std::string& fname, size_t size) override;
virtual Status CreateDir(const std::string& dirname) override;
Status CreateDir(const std::string& dirname) override;
virtual Status CreateDirIfMissing(const std::string& dirname) override;
Status CreateDirIfMissing(const std::string& dirname) override;
virtual Status DeleteDir(const std::string& dirname) override;
Status DeleteDir(const std::string& dirname) override;
virtual Status GetFileSize(const std::string& fname,
uint64_t* file_size) override;
Status GetFileSize(const std::string& fname, uint64_t* file_size) override;
virtual Status GetFileModificationTime(const std::string& fname,
uint64_t* time) override;
Status GetFileModificationTime(const std::string& fname,
uint64_t* time) override;
virtual Status RenameFile(const std::string& src,
const std::string& target) override;
Status RenameFile(const std::string& src, const std::string& target) override;
virtual Status LinkFile(const std::string& src,
const std::string& target) override;
Status LinkFile(const std::string& src, const std::string& target) override;
virtual Status NewLogger(const std::string& fname,
std::shared_ptr<Logger>* result) override;
Status NewLogger(const std::string& fname,
std::shared_ptr<Logger>* result) override;
virtual Status LockFile(const std::string& fname, FileLock** flock) override;
Status LockFile(const std::string& fname, FileLock** flock) override;
virtual Status UnlockFile(FileLock* flock) override;
Status UnlockFile(FileLock* flock) override;
virtual Status GetTestDirectory(std::string* path) override;
Status GetTestDirectory(std::string* path) override;
// Results of these can be affected by FakeSleepForMicroseconds()
virtual Status GetCurrentTime(int64_t* unix_time) override;
virtual uint64_t NowMicros() override;
virtual uint64_t NowNanos() override;
Status GetCurrentTime(int64_t* unix_time) override;
uint64_t NowMicros() override;
uint64_t NowNanos() override;
Status CorruptBuffer(const std::string& fname);

@ -163,6 +163,15 @@ class Env {
// The result of Default() belongs to rocksdb and must never be deleted.
static Env* Default();
// See FileSystem::RegisterDbPaths.
virtual Status RegisterDbPaths(const std::vector<std::string>& /*paths*/) {
return Status::OK();
}
// See FileSystem::UnregisterDbPaths.
virtual Status UnregisterDbPaths(const std::vector<std::string>& /*paths*/) {
return Status::OK();
}
// Create a brand new sequentially-readable file with the specified name.
// On success, stores a pointer to the new file in *result and returns OK.
// On failure stores nullptr in *result and returns non-OK. If the file does
@ -1155,6 +1164,14 @@ class EnvWrapper : public Env {
Env* target() const { return target_; }
// The following text is boilerplate that forwards all methods to target()
Status RegisterDbPaths(const std::vector<std::string>& paths) override {
return target_->RegisterDbPaths(paths);
}
Status UnregisterDbPaths(const std::vector<std::string>& paths) override {
return target_->UnregisterDbPaths(paths);
}
Status NewSequentialFile(const std::string& f,
std::unique_ptr<SequentialFile>* r,
const EnvOptions& options) override {

@ -171,6 +171,35 @@ class FileSystem {
// The result of Default() belongs to rocksdb and must never be deleted.
static std::shared_ptr<FileSystem> Default();
// Handles the event when a new DB or a new ColumnFamily starts using the
// specified data paths.
//
// The data paths might be shared by different DBs or ColumnFamilies,
// so RegisterDbPaths might be called with the same data paths.
// For example, when CreateColumnFamily is called multiple times with the same
// data path, RegisterDbPaths will also be called with the same data path.
//
// If the return status is ok, then the paths must be correspondingly
// called in UnregisterDbPaths;
// otherwise this method should have no side effect, and UnregisterDbPaths
// do not need to be called for the paths.
//
// Different implementations may take different actions.
// By default, it's a no-op and returns Status::OK.
virtual Status RegisterDbPaths(const std::vector<std::string>& /*paths*/) {
return Status::OK();
}
// Handles the event a DB or a ColumnFamily stops using the specified data
// paths.
//
// It should be called corresponding to each successful RegisterDbPaths.
//
// Different implementations may take different actions.
// By default, it's a no-op and returns Status::OK.
virtual Status UnregisterDbPaths(const std::vector<std::string>& /*paths*/) {
return Status::OK();
}
// Create a brand new sequentially-readable file with the specified name.
// On success, stores a pointer to the new file in *result and returns OK.
// On failure stores nullptr in *result and returns non-OK. If the file does

@ -337,6 +337,7 @@ MAIN_SOURCES = \
db/db_tailing_iter_test.cc \
db/db_test.cc \
db/db_test2.cc \
db/db_logical_block_size_cache_test.cc \
db/db_universal_compaction_test.cc \
db/db_wal_test.cc \
db/db_write_test.cc \
@ -383,6 +384,7 @@ MAIN_SOURCES = \
db/write_controller_test.cc \
env/env_basic_test.cc \
env/env_test.cc \
env/io_posix_test.cc \
env/mock_env_test.cc \
logging/auto_roll_logger_test.cc \
logging/env_logger_test.cc \

Loading…
Cancel
Save