Support for Column family specific paths.

Summary:
In this change, an option to set different paths for different column families is added.
This option is set via cf_paths setting of ColumnFamilyOptions. This option will work in a similar fashion to db_paths setting. Cf_paths is a vector of Dbpath values which contains a pair of the absolute path and target size. Multiple levels in a Column family can go to different paths if cf_paths has more than one path.
To maintain backward compatibility, if cf_paths is not specified for a column family, db_paths setting will be used. Note that, if db_paths setting is also not specified, RocksDB already has code to use db_name as the only path.

Changes :
1) A new member "cf_paths" is added to ImmutableCfOptions. This is set, based on cf_paths setting of ColumnFamilyOptions and db_paths setting of ImmutableDbOptions.  This member is used to identify the path information whenever files are accessed.
2) Validation checks are added for cf_paths setting based on existing checks for db_paths setting.
3) DestroyDB, PurgeObsoleteFiles etc. are edited to support multiple cf_paths.
4) Unit tests are added appropriately.
Closes https://github.com/facebook/rocksdb/pull/3102

Differential Revision: D6951697

Pulled By: ajkr

fbshipit-source-id: 60d2262862b0a8fd6605b09ccb0da32bb331787d
main
Phani Shekhar Mantripragada 7 years ago committed by Facebook Github Bot
parent 67182678a5
commit 446b32cfc3
  1. 2
      db/builder.cc
  2. 65
      db/column_family.cc
  3. 10
      db/column_family.h
  4. 72
      db/column_family_test.cc
  5. 20
      db/compaction_job.cc
  6. 10
      db/compaction_picker.cc
  7. 2
      db/compaction_picker_test.cc
  8. 6
      db/compaction_picker_universal.cc
  9. 119
      db/db_compaction_test.cc
  10. 66
      db/db_impl.cc
  11. 22
      db/db_impl.h
  12. 39
      db/db_impl_compaction_flush.cc
  13. 66
      db/db_impl_files.cc
  14. 62
      db/db_impl_open.cc
  15. 2
      db/db_sst_test.cc
  16. 2
      db/db_test2.cc
  17. 18
      db/db_test_util.cc
  18. 5
      db/db_test_util.h
  19. 140
      db/db_universal_compaction_test.cc
  20. 3
      db/external_sst_file_ingestion_job.cc
  21. 11
      db/job_context.h
  22. 8
      db/repair.cc
  23. 2
      db/table_cache.cc
  24. 50
      db/version_set.cc
  25. 34
      db/version_set.h
  26. 4
      include/rocksdb/db.h
  27. 15
      include/rocksdb/options.h
  28. 3
      options/cf_options.cc
  29. 2
      options/cf_options.h
  30. 1
      options/options_helper.cc
  31. 2
      options/options_settable_test.cc
  32. 16
      tools/db_stress.cc
  33. 2
      util/file_util.cc
  34. 2
      util/file_util.h

@ -92,7 +92,7 @@ Status BuildTable(
return s;
}
std::string fname = TableFileName(ioptions.db_paths, meta->fd.GetNumber(),
std::string fname = TableFileName(ioptions.cf_paths, meta->fd.GetNumber(),
meta->fd.GetPathId());
#ifndef ROCKSDB_LITE
EventHelpers::NotifyTableFileCreationStarted(

@ -34,6 +34,7 @@
#include "table/merging_iterator.h"
#include "util/autovector.h"
#include "util/compression.h"
#include "util/sst_file_manager_impl.h"
namespace rocksdb {
@ -159,6 +160,28 @@ Status CheckConcurrentWritesSupported(const ColumnFamilyOptions& cf_options) {
return Status::OK();
}
Status CheckCFPathsSupported(const DBOptions& db_options,
const ColumnFamilyOptions& cf_options) {
// More than one cf_paths are supported only in universal
// and level compaction styles. This function also checks the case
// in which cf_paths is not specified, which results in db_paths
// being used.
if ((cf_options.compaction_style != kCompactionStyleUniversal) &&
(cf_options.compaction_style != kCompactionStyleLevel)) {
if (cf_options.cf_paths.size() > 1) {
return Status::NotSupported(
"More than one CF paths are only supported in "
"universal and level compaction styles. ");
} else if (cf_options.cf_paths.empty() &&
db_options.db_paths.size() > 1) {
return Status::NotSupported(
"More than one DB paths are only supported in "
"universal and level compaction styles. ");
}
}
return Status::OK();
}
ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
const ColumnFamilyOptions& src) {
ColumnFamilyOptions result = src;
@ -277,9 +300,24 @@ ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
result.hard_pending_compaction_bytes_limit;
}
#ifndef ROCKSDB_LITE
// When the DB is stopped, it's possible that there are some .trash files that
// were not deleted yet, when we open the DB we will find these .trash files
// and schedule them to be deleted (or delete immediately if SstFileManager
// was not used)
auto sfm = static_cast<SstFileManagerImpl*>(db_options.sst_file_manager.get());
for (size_t i = 0; i < result.cf_paths.size(); i++) {
DeleteScheduler::CleanupDirectory(db_options.env, sfm, result.cf_paths[i].path);
}
#endif
if (result.cf_paths.empty()) {
result.cf_paths = db_options.db_paths;
}
if (result.level_compaction_dynamic_level_bytes) {
if (result.compaction_style != kCompactionStyleLevel ||
db_options.db_paths.size() > 1U) {
result.cf_paths.size() > 1U) {
// 1. level_compaction_dynamic_level_bytes only makes sense for
// level-based compaction.
// 2. we don't yet know how to make both of this feature and multiple
@ -1138,6 +1176,31 @@ Env::WriteLifeTimeHint ColumnFamilyData::CalculateSSTWriteHint(int level) {
static_cast<int>(Env::WLTH_MEDIUM));
}
Status ColumnFamilyData::AddDirectories() {
Status s;
assert(data_dirs_.empty());
for (auto& p : ioptions_.cf_paths) {
std::unique_ptr<Directory> path_directory;
s = DBImpl::CreateAndNewDirectory(ioptions_.env, p.path, &path_directory);
if (!s.ok()) {
return s;
}
assert(path_directory != nullptr);
data_dirs_.emplace_back(path_directory.release());
}
assert(data_dirs_.size() == ioptions_.cf_paths.size());
return s;
}
Directory* ColumnFamilyData::GetDataDir(size_t path_id) const {
if (data_dirs_.empty()) {
return nullptr;
}
assert(path_id < data_dirs_.size());
return data_dirs_[path_id].get();
}
ColumnFamilySet::ColumnFamilySet(const std::string& dbname,
const ImmutableDBOptions* db_options,
const EnvOptions& env_options,

@ -139,6 +139,9 @@ extern Status CheckCompressionSupported(const ColumnFamilyOptions& cf_options);
extern Status CheckConcurrentWritesSupported(
const ColumnFamilyOptions& cf_options);
extern Status CheckCFPathsSupported(const DBOptions& db_options,
const ColumnFamilyOptions& cf_options);
extern ColumnFamilyOptions SanitizeOptions(const ImmutableDBOptions& db_options,
const ColumnFamilyOptions& src);
// Wrap user defined table proproties collector factories `from cf_options`
@ -376,6 +379,10 @@ class ColumnFamilyData {
Env::WriteLifeTimeHint CalculateSSTWriteHint(int level);
Status AddDirectories();
Directory* GetDataDir(size_t path_id) const;
private:
friend class ColumnFamilySet;
ColumnFamilyData(uint32_t id, const std::string& name,
@ -459,6 +466,9 @@ class ColumnFamilyData {
// Memtable id to track flush.
std::atomic<uint64_t> last_memtable_id_;
// Directories corresponding to cf_paths.
std::vector<std::unique_ptr<Directory>> data_dirs_;
};
// ColumnFamilySet has interesting thread-safety requirements

@ -69,9 +69,15 @@ class ColumnFamilyTest : public testing::Test {
}
~ColumnFamilyTest() {
std::vector<ColumnFamilyDescriptor> column_families;
for (auto h : handles_) {
ColumnFamilyDescriptor cfdescriptor;
h->GetDescriptor(&cfdescriptor);
column_families.push_back(cfdescriptor);
}
Close();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
Destroy();
Destroy(column_families);
delete env_;
}
@ -236,9 +242,11 @@ class ColumnFamilyTest : public testing::Test {
#endif // !ROCKSDB_LITE
}
void Destroy() {
void Destroy(const std::vector<ColumnFamilyDescriptor>& column_families =
std::vector<ColumnFamilyDescriptor>()) {
Close();
ASSERT_OK(DestroyDB(dbname_, Options(db_options_, column_family_options_)));
ASSERT_OK(DestroyDB(dbname_, Options(db_options_, column_family_options_),
column_families));
}
void CreateColumnFamilies(
@ -483,6 +491,12 @@ class ColumnFamilyTest : public testing::Test {
ASSERT_OK(destfile->Close());
}
int GetSstFileCount(std::string path) {
std::vector<std::string> files;
DBTestBase::GetSstFiles(env_, path, &files);
return static_cast<int>(files.size());
}
std::vector<ColumnFamilyHandle*> handles_;
std::vector<std::string> names_;
std::set<std::string> keys_;
@ -3129,6 +3143,58 @@ TEST_F(ColumnFamilyTest, DISABLED_LogTruncationTest) {
// cleanup
env_->DeleteDir(backup_logs);
}
TEST_F(ColumnFamilyTest, DefaultCfPathsTest) {
Open();
// Leave cf_paths for one column families to be empty.
// Files should be generated according to db_paths for that
// column family.
ColumnFamilyOptions cf_opt1, cf_opt2;
cf_opt1.cf_paths.emplace_back(dbname_ + "_one_1",
std::numeric_limits<uint64_t>::max());
CreateColumnFamilies({"one", "two"}, {cf_opt1, cf_opt2});
Reopen({ColumnFamilyOptions(), cf_opt1, cf_opt2});
// Fill Column family 1.
PutRandomData(1, 100, 100);
Flush(1);
ASSERT_EQ(1, GetSstFileCount(cf_opt1.cf_paths[0].path));
ASSERT_EQ(0, GetSstFileCount(dbname_));
// Fill column family 2
PutRandomData(2, 100, 100);
Flush(2);
// SST from Column family 2 should be generated in
// db_paths which is dbname_ in this case.
ASSERT_EQ(1, GetSstFileCount(dbname_));
}
TEST_F(ColumnFamilyTest, MultipleCFPathsTest) {
Open();
// Configure Column family specific paths.
ColumnFamilyOptions cf_opt1, cf_opt2;
cf_opt1.cf_paths.emplace_back(dbname_ + "_one_1",
std::numeric_limits<uint64_t>::max());
cf_opt2.cf_paths.emplace_back(dbname_ + "_two_1",
std::numeric_limits<uint64_t>::max());
CreateColumnFamilies({"one", "two"}, {cf_opt1, cf_opt2});
Reopen({ColumnFamilyOptions(), cf_opt1, cf_opt2});
PutRandomData(1, 100, 100);
Flush(1);
// Check that files are generated in appropriate paths.
ASSERT_EQ(1, GetSstFileCount(cf_opt1.cf_paths[0].path));
ASSERT_EQ(0, GetSstFileCount(dbname_));
PutRandomData(2, 100, 100);
Flush(2);
ASSERT_EQ(1, GetSstFileCount(cf_opt2.cf_paths[0].path));
ASSERT_EQ(0, GetSstFileCount(dbname_));
}
} // namespace rocksdb
int main(int argc, char** argv) {

@ -566,7 +566,9 @@ Status CompactionJob::Run() {
TablePropertiesCollection tp;
for (const auto& state : compact_->sub_compact_states) {
for (const auto& output : state.outputs) {
auto fn = TableFileName(db_options_.db_paths, output.meta.fd.GetNumber(),
auto fn = TableFileName(
state.compaction->immutable_cf_options()->cf_paths,
output.meta.fd.GetNumber(),
output.meta.fd.GetPathId());
tp[fn] = output.table_properties;
}
@ -1112,7 +1114,9 @@ Status CompactionJob::FinishCompactionOutputFile(
// This happens when the output level is bottom level, at the same time
// the sub_compact output nothing.
std::string fname = TableFileName(
db_options_.db_paths, meta->fd.GetNumber(), meta->fd.GetPathId());
sub_compact->compaction->immutable_cf_options()->cf_paths,
meta->fd.GetNumber(),
meta->fd.GetPathId());
env_->DeleteFile(fname);
// Also need to remove the file from outputs, or it will be added to the
@ -1165,7 +1169,9 @@ Status CompactionJob::FinishCompactionOutputFile(
std::string fname;
FileDescriptor output_fd;
if (meta != nullptr) {
fname = TableFileName(db_options_.db_paths, meta->fd.GetNumber(),
fname = TableFileName(
sub_compact->compaction->immutable_cf_options()->cf_paths,
meta->fd.GetNumber(),
meta->fd.GetPathId());
output_fd = meta->fd;
} else {
@ -1180,7 +1186,9 @@ Status CompactionJob::FinishCompactionOutputFile(
auto sfm =
static_cast<SstFileManagerImpl*>(db_options_.sst_file_manager.get());
if (sfm && meta != nullptr && meta->fd.GetPathId() == 0) {
auto fn = TableFileName(cfd->ioptions()->db_paths, meta->fd.GetNumber(),
auto fn = TableFileName(
sub_compact->compaction->immutable_cf_options()->cf_paths,
meta->fd.GetNumber(),
meta->fd.GetPathId());
sfm->OnAddFile(fn);
if (sfm->IsMaxAllowedSpaceReached()) {
@ -1266,7 +1274,9 @@ Status CompactionJob::OpenCompactionOutputFile(
assert(sub_compact->builder == nullptr);
// no need to lock because VersionSet::next_file_number_ is atomic
uint64_t file_number = versions_->NewFileNumber();
std::string fname = TableFileName(db_options_.db_paths, file_number,
std::string fname = TableFileName(
sub_compact->compaction->immutable_cf_options()->cf_paths,
file_number,
sub_compact->compaction->output_path_id());
// Fire events.
ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();

@ -616,7 +616,7 @@ Compaction* CompactionPicker::CompactRange(
}
}
}
assert(output_path_id < static_cast<uint32_t>(ioptions_.db_paths.size()));
assert(output_path_id < static_cast<uint32_t>(ioptions_.cf_paths.size()));
if (ExpandInputsToCleanCut(cf_name, vstorage, &inputs) == false) {
// manual compaction is now multi-threaded, so it can
@ -1336,10 +1336,10 @@ uint32_t LevelCompactionBuilder::GetPathId(
const ImmutableCFOptions& ioptions,
const MutableCFOptions& mutable_cf_options, int level) {
uint32_t p = 0;
assert(!ioptions.db_paths.empty());
assert(!ioptions.cf_paths.empty());
// size remaining in the most recent path
uint64_t current_path_size = ioptions.db_paths[0].target_size;
uint64_t current_path_size = ioptions.cf_paths[0].target_size;
uint64_t level_size;
int cur_level = 0;
@ -1349,7 +1349,7 @@ uint32_t LevelCompactionBuilder::GetPathId(
level_size = mutable_cf_options.max_bytes_for_level_base;
// Last path is the fallback
while (p < ioptions.db_paths.size() - 1) {
while (p < ioptions.cf_paths.size() - 1) {
if (level_size <= current_path_size) {
if (cur_level == level) {
// Does desired level fit in this path?
@ -1376,7 +1376,7 @@ uint32_t LevelCompactionBuilder::GetPathId(
}
}
p++;
current_path_size = ioptions.db_paths[p].target_size;
current_path_size = ioptions.cf_paths[p].target_size;
}
return p;
}

@ -59,7 +59,7 @@ class CompactionPickerTest : public testing::Test {
vstorage_(nullptr) {
fifo_options_.max_table_files_size = 1;
mutable_cf_options_.RefreshDerivedOptions(ioptions_);
ioptions_.db_paths.emplace_back("dummy",
ioptions_.cf_paths.emplace_back("dummy",
std::numeric_limits<uint64_t>::max());
}

@ -406,9 +406,9 @@ uint32_t UniversalCompactionPicker::GetPathId(
file_size *
(100 - mutable_cf_options.compaction_options_universal.size_ratio) / 100;
uint32_t p = 0;
assert(!ioptions.db_paths.empty());
for (; p < ioptions.db_paths.size() - 1; p++) {
uint64_t target_size = ioptions.db_paths[p].target_size;
assert(!ioptions.cf_paths.empty());
for (; p < ioptions.cf_paths.size() - 1; p++) {
uint64_t target_size = ioptions.cf_paths[p].target_size;
if (target_size > file_size &&
accumulated_size + (target_size - file_size) > future_size) {
return p;

@ -1983,6 +1983,125 @@ TEST_P(DBCompactionTestWithParam, LevelCompactionPathUse) {
Destroy(options);
}
TEST_P(DBCompactionTestWithParam, LevelCompactionCFPathUse) {
Options options = CurrentOptions();
options.db_paths.emplace_back(dbname_, 500 * 1024);
options.db_paths.emplace_back(dbname_ + "_2", 4 * 1024 * 1024);
options.db_paths.emplace_back(dbname_ + "_3", 1024 * 1024 * 1024);
options.memtable_factory.reset(
new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
options.compaction_style = kCompactionStyleLevel;
options.write_buffer_size = 110 << 10; // 110KB
options.arena_block_size = 4 << 10;
options.level0_file_num_compaction_trigger = 2;
options.num_levels = 4;
options.max_bytes_for_level_base = 400 * 1024;
options.max_subcompactions = max_subcompactions_;
std::vector<Options> option_vector;
option_vector.emplace_back(options);
ColumnFamilyOptions cf_opt1(options), cf_opt2(options);
// Configure CF1 specific paths.
cf_opt1.cf_paths.emplace_back(dbname_ + "cf1", 500 * 1024);
cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_2", 4 * 1024 * 1024);
cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_3", 1024 * 1024 * 1024);
option_vector.emplace_back(DBOptions(options), cf_opt1);
CreateColumnFamilies({"one"},option_vector[1]);
// Configura CF2 specific paths.
cf_opt2.cf_paths.emplace_back(dbname_ + "cf2", 500 * 1024);
cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_2", 4 * 1024 * 1024);
cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_3", 1024 * 1024 * 1024);
option_vector.emplace_back(DBOptions(options), cf_opt2);
CreateColumnFamilies({"two"},option_vector[2]);
ReopenWithColumnFamilies({"default", "one", "two"}, option_vector);
Random rnd(301);
int key_idx = 0;
int key_idx1 = 0;
int key_idx2 = 0;
auto generate_file = [&]() {
GenerateNewFile(0, &rnd, &key_idx);
GenerateNewFile(1, &rnd, &key_idx1);
GenerateNewFile(2, &rnd, &key_idx2);
};
auto check_sstfilecount = [&](int path_id, int expected) {
ASSERT_EQ(expected, GetSstFileCount(options.db_paths[path_id].path));
ASSERT_EQ(expected, GetSstFileCount(cf_opt1.cf_paths[path_id].path));
ASSERT_EQ(expected, GetSstFileCount(cf_opt2.cf_paths[path_id].path));
};
auto check_filesperlevel = [&](const std::string& expected) {
ASSERT_EQ(expected, FilesPerLevel(0));
ASSERT_EQ(expected, FilesPerLevel(1));
ASSERT_EQ(expected, FilesPerLevel(2));
};
auto check_getvalues = [&]() {
for (int i = 0; i < key_idx; i++) {
auto v = Get(0, Key(i));
ASSERT_NE(v, "NOT_FOUND");
ASSERT_TRUE(v.size() == 1 || v.size() == 990);
}
for (int i = 0; i < key_idx1; i++) {
auto v = Get(1, Key(i));
ASSERT_NE(v, "NOT_FOUND");
ASSERT_TRUE(v.size() == 1 || v.size() == 990);
}
for (int i = 0; i < key_idx2; i++) {
auto v = Get(2, Key(i));
ASSERT_NE(v, "NOT_FOUND");
ASSERT_TRUE(v.size() == 1 || v.size() == 990);
}
};
// Check that default column family uses db_paths.
// And Column family "one" uses cf_paths.
// First three 110KB files are not going to second path.
// After that, (100K, 200K)
for (int num = 0; num < 3; num++) {
generate_file();
}
// Another 110KB triggers a compaction to 400K file to fill up first path
generate_file();
check_sstfilecount(1, 3);
// (1, 4)
generate_file();
check_filesperlevel("1,4");
check_sstfilecount(1, 4);
check_sstfilecount(0, 1);
// (1, 4, 1)
generate_file();
check_filesperlevel("1,4,1");
check_sstfilecount(2, 1);
check_sstfilecount(1, 4);
check_sstfilecount(0, 1);
// (1, 4, 2)
generate_file();
check_filesperlevel("1,4,2");
check_sstfilecount(2, 2);
check_sstfilecount(1, 4);
check_sstfilecount(0, 1);
check_getvalues();
ReopenWithColumnFamilies({"default", "one", "two"}, option_vector);
check_getvalues();
Destroy(options, true);
}
TEST_P(DBCompactionTestWithParam, ConvertCompactionStyle) {
Random rnd(301);
int max_key_level_insert = 200;

@ -514,7 +514,16 @@ void DBImpl::ScheduleBgLogWriterClose(JobContext* job_context) {
}
}
Directory* DBImpl::Directories::GetDataDir(size_t path_id) {
Directory* DBImpl::GetDataDir(ColumnFamilyData* cfd, size_t path_id) const {
assert(cfd);
Directory* ret_dir = cfd->GetDataDir(path_id);
if (ret_dir == nullptr) {
return directories_.GetDataDir(path_id);
}
return ret_dir;
}
Directory* DBImpl::Directories::GetDataDir(size_t path_id) const {
assert(path_id < data_dirs_.size());
Directory* ret_dir = data_dirs_[path_id].get();
if (ret_dir == nullptr) {
@ -860,12 +869,11 @@ void DBImpl::BackgroundCallPurge() {
auto fname = purge_file->fname;
auto type = purge_file->type;
auto number = purge_file->number;
auto path_id = purge_file->path_id;
auto job_id = purge_file->job_id;
purge_queue_.pop_front();
mutex_.Unlock();
DeleteObsoleteFileImpl(job_id, fname, type, number, path_id);
DeleteObsoleteFileImpl(job_id, fname, type, number);
mutex_.Lock();
} else {
assert(!logs_to_free_queue_.empty());
@ -1306,6 +1314,17 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
if (s.ok() && immutable_db_options_.allow_concurrent_memtable_write) {
s = CheckConcurrentWritesSupported(cf_options);
}
if (s.ok()) {
s = CheckCFPathsSupported(initial_db_options_, cf_options);
}
if (s.ok()) {
for (auto& cf_path : cf_options.cf_paths) {
s = env_->CreateDirIfMissing(cf_path.path);
if (!s.ok()) {
break;
}
}
}
if (!s.ok()) {
return s;
}
@ -1337,6 +1356,12 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options,
&cf_options);
write_thread_.ExitUnbatched(&w);
}
if (s.ok()) {
auto* cfd =
versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name);
assert(cfd != nullptr);
s = cfd->AddDirectories();
}
if (s.ok()) {
single_column_family_mode_ = false;
auto* cfd =
@ -2391,7 +2416,8 @@ Status DB::ListColumnFamilies(const DBOptions& db_options,
Snapshot::~Snapshot() {
}
Status DestroyDB(const std::string& dbname, const Options& options) {
Status DestroyDB(const std::string& dbname, const Options& options,
const std::vector<ColumnFamilyDescriptor>& column_families) {
ImmutableDBOptions soptions(SanitizeOptions(dbname, options));
Env* env = soptions.env;
std::vector<std::string> filenames;
@ -2417,7 +2443,7 @@ Status DestroyDB(const std::string& dbname, const Options& options) {
if (type == kMetaDatabase) {
del = DestroyDB(path_to_delete, options);
} else if (type == kTableFile) {
del = DeleteSSTFile(&soptions, path_to_delete, 0);
del = DeleteSSTFile(&soptions, path_to_delete);
} else {
del = env->DeleteFile(path_to_delete);
}
@ -2427,15 +2453,32 @@ Status DestroyDB(const std::string& dbname, const Options& options) {
}
}
std::vector<std::string> paths;
for (size_t path_id = 0; path_id < options.db_paths.size(); path_id++) {
const auto& db_path = options.db_paths[path_id];
env->GetChildren(db_path.path, &filenames);
paths.emplace_back(options.db_paths[path_id].path);
}
for (auto& cf : column_families) {
for (size_t path_id = 0; path_id < cf.options.cf_paths.size();
path_id++) {
paths.emplace_back(cf.options.cf_paths[path_id].path);
}
}
// Remove duplicate paths.
// Note that we compare only the actual paths but not path ids.
// This reason is that same path can appear at different path_ids
// for different column families.
std::sort(paths.begin(), paths.end());
paths.erase(std::unique(paths.begin(), paths.end()), paths.end());
for (auto& path : paths) {
env->GetChildren(path, &filenames);
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type) &&
type == kTableFile) { // Lock file will be deleted at end
std::string table_path = db_path.path + "/" + filenames[i];
Status del = DeleteSSTFile(&soptions, table_path,
static_cast<uint32_t>(path_id));
std::string table_path = path + "/" + filenames[i];
Status del = DeleteSSTFile(&soptions, table_path);
if (result.ok() && !del.ok()) {
result = del;
}
@ -2921,11 +2964,12 @@ Status DBImpl::VerifyChecksum() {
}
for (auto& sv : sv_list) {
VersionStorageInfo* vstorage = sv->current->storage_info();
ColumnFamilyData* cfd = sv->current->cfd();
for (int i = 0; i < vstorage->num_non_empty_levels() && s.ok(); i++) {
for (size_t j = 0; j < vstorage->LevelFilesBrief(i).num_files && s.ok();
j++) {
const auto& fd = vstorage->LevelFilesBrief(i).files[j].fd;
std::string fname = TableFileName(immutable_db_options_.db_paths,
std::string fname = TableFileName(cfd->ioptions()->cf_paths,
fd.GetNumber(), fd.GetPathId());
s = rocksdb::VerifySstFileChecksum(options, env_options, fname);
}

@ -478,7 +478,7 @@ class DBImpl : public DB {
// It is not necessary to hold the mutex when invoking this method.
// If FindObsoleteFiles() was run, we need to also run
// PurgeObsoleteFiles(), even if disable_delete_obsolete_files_ is true
void PurgeObsoleteFiles(const JobContext& background_contet,
void PurgeObsoleteFiles(JobContext& background_contet,
bool schedule_only = false);
void SchedulePurge();
@ -636,6 +636,9 @@ class DBImpl : public DB {
virtual Status Close() override;
static Status CreateAndNewDirectory(Env* env, const std::string& dirname,
std::unique_ptr<Directory>* directory);
protected:
Env* const env_;
const std::string dbname_;
@ -789,7 +792,7 @@ class DBImpl : public DB {
void DeleteObsoleteFiles();
// Delete obsolete files and log status and information of file deletion
void DeleteObsoleteFileImpl(int job_id, const std::string& fname,
FileType type, uint64_t number, uint32_t path_id);
FileType type, uint64_t number);
// Background process needs to call
// auto x = CaptureCurrentFileNumberInPendingOutputs()
@ -911,7 +914,7 @@ class DBImpl : public DB {
void SchedulePendingFlush(ColumnFamilyData* cfd, FlushReason flush_reason);
void SchedulePendingCompaction(ColumnFamilyData* cfd);
void SchedulePendingPurge(std::string fname, FileType type, uint64_t number,
uint32_t path_id, int job_id);
int job_id);
static void BGWorkCompaction(void* arg);
// Runs a pre-chosen universal compaction involving bottom level in a
// separate, bottom-pri thread pool.
@ -960,6 +963,8 @@ class DBImpl : public DB {
uint64_t GetMaxTotalWalSize() const;
Directory* GetDataDir(ColumnFamilyData* cfd, size_t path_id) const;
Status CloseHelper();
// table_cache_ provides its own synchronization
@ -1097,7 +1102,7 @@ class DBImpl : public DB {
const std::string& wal_dir,
const std::vector<DbPath>& data_paths);
Directory* GetDataDir(size_t path_id);
Directory* GetDataDir(size_t path_id) const;
Directory* GetWalDir() {
if (wal_dir_) {
@ -1112,9 +1117,6 @@ class DBImpl : public DB {
std::unique_ptr<Directory> db_dir_;
std::vector<std::unique_ptr<Directory>> data_dirs_;
std::unique_ptr<Directory> wal_dir_;
Status CreateAndNewDirectory(Env* env, const std::string& dirname,
std::unique_ptr<Directory>* directory) const;
};
Directories directories_;
@ -1158,11 +1160,9 @@ class DBImpl : public DB {
std::string fname;
FileType type;
uint64_t number;
uint32_t path_id;
int job_id;
PurgeFileInfo(std::string fn, FileType t, uint64_t num, uint32_t pid,
int jid)
: fname(fn), type(t), number(num), path_id(pid), job_id(jid) {}
PurgeFileInfo(std::string fn, FileType t, uint64_t num, int jid)
: fname(fn), type(t), number(num), job_id(jid) {}
};
// flush_queue_ and compaction_queue_ hold column families that we need to

@ -122,7 +122,7 @@ Status DBImpl::FlushMemTableToOutputFile(
env_options_for_compaction_, versions_.get(), &mutex_, &shutting_down_,
snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
job_context, log_buffer, directories_.GetDbDir(),
directories_.GetDataDir(0U),
GetDataDir(cfd, 0U),
GetCompressionFlush(*cfd->ioptions(), mutable_cf_options), stats_,
&event_logger_, mutable_cf_options.report_bg_io_stats);
@ -195,7 +195,7 @@ Status DBImpl::FlushMemTableToOutputFile(
if (sfm) {
// Notify sst_file_manager that a new file was added
std::string file_path = MakeTableFileName(
immutable_db_options_.db_paths[0].path, file_meta.fd.GetNumber());
cfd->ioptions()->cf_paths[0].path, file_meta.fd.GetNumber());
sfm->OnAddFile(file_path);
if (sfm->IsMaxAllowedSpaceReached() && bg_error_.ok()) {
Status new_bg_error = Status::NoSpace("Max allowed space was reached");
@ -240,7 +240,7 @@ void DBImpl::NotifyOnFlushBegin(ColumnFamilyData* cfd, FileMetaData* file_meta,
info.cf_name = cfd->GetName();
// TODO(yhchiang): make db_paths dynamic in case flush does not
// go to L0 in the future.
info.file_path = MakeTableFileName(immutable_db_options_.db_paths[0].path,
info.file_path = MakeTableFileName(cfd->ioptions()->cf_paths[0].path,
file_meta->fd.GetNumber());
info.thread_id = env_->GetThreadID();
info.job_id = job_id;
@ -285,7 +285,7 @@ void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd,
info.cf_name = cfd->GetName();
// TODO(yhchiang): make db_paths dynamic in case flush does not
// go to L0 in the future.
info.file_path = MakeTableFileName(immutable_db_options_.db_paths[0].path,
info.file_path = MakeTableFileName(cfd->ioptions()->cf_paths[0].path,
file_meta->fd.GetNumber());
info.thread_id = env_->GetThreadID();
info.job_id = job_id;
@ -308,12 +308,13 @@ void DBImpl::NotifyOnFlushCompleted(ColumnFamilyData* cfd,
Status DBImpl::CompactRange(const CompactRangeOptions& options,
ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end) {
if (options.target_path_id >= immutable_db_options_.db_paths.size()) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
if (options.target_path_id >= cfd->ioptions()->cf_paths.size()) {
return Status::InvalidArgument("Invalid target path ID");
}
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd();
bool exclusive = options.exclusive_manual_compaction;
bool flush_needed = true;
@ -579,7 +580,7 @@ Status DBImpl::CompactFilesImpl(
version->GetColumnFamilyMetaData(&cf_meta);
if (output_path_id < 0) {
if (immutable_db_options_.db_paths.size() == 1U) {
if (cfd->ioptions()->cf_paths.size() == 1U) {
output_path_id = 0;
} else {
return Status::NotSupported(
@ -651,8 +652,9 @@ Status DBImpl::CompactFilesImpl(
job_context->job_id, c.get(), immutable_db_options_,
env_options_for_compaction_, versions_.get(), &shutting_down_,
preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(),
directories_.GetDataDir(c->output_path_id()), stats_, &mutex_, &bg_error_,
snapshot_seqs, earliest_write_conflict_snapshot, snapshot_checker,
GetDataDir(c->column_family_data(), c->output_path_id()),
stats_, &mutex_, &bg_error_, snapshot_seqs,
earliest_write_conflict_snapshot, snapshot_checker,
table_cache_, &event_logger_,
c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_,
@ -729,7 +731,7 @@ Status DBImpl::CompactFilesImpl(
if (output_file_names != nullptr) {
for (const auto newf : c->edit()->GetNewFiles()) {
(*output_file_names).push_back(TableFileName(
immutable_db_options_.db_paths, newf.second.fd.GetNumber(),
c->immutable_cf_options()->cf_paths, newf.second.fd.GetNumber(),
newf.second.fd.GetPathId()) );
}
}
@ -805,7 +807,7 @@ void DBImpl::NotifyOnCompactionCompleted(
info.compression = c->output_compression();
for (size_t i = 0; i < c->num_input_levels(); ++i) {
for (const auto fmd : *c->inputs(i)) {
auto fn = TableFileName(immutable_db_options_.db_paths,
auto fn = TableFileName(c->immutable_cf_options()->cf_paths,
fmd->fd.GetNumber(), fmd->fd.GetPathId());
info.input_files.push_back(fn);
if (info.table_properties.count(fn) == 0) {
@ -818,7 +820,8 @@ void DBImpl::NotifyOnCompactionCompleted(
}
}
for (const auto newf : c->edit()->GetNewFiles()) {
info.output_files.push_back(TableFileName(immutable_db_options_.db_paths,
info.output_files.push_back(TableFileName(
c->immutable_cf_options()->cf_paths,
newf.second.fd.GetNumber(),
newf.second.fd.GetPathId()));
}
@ -1292,10 +1295,9 @@ void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) {
}
void DBImpl::SchedulePendingPurge(std::string fname, FileType type,
uint64_t number, uint32_t path_id,
int job_id) {
uint64_t number, int job_id) {
mutex_.AssertHeld();
PurgeFileInfo file_info(fname, type, number, path_id, job_id);
PurgeFileInfo file_info(fname, type, number, job_id);
purge_queue_.push_back(std::move(file_info));
}
@ -1876,8 +1878,9 @@ Status DBImpl::BackgroundCompaction(bool* made_progress,
job_context->job_id, c.get(), immutable_db_options_,
env_options_for_compaction_, versions_.get(), &shutting_down_,
preserve_deletes_seqnum_.load(), log_buffer, directories_.GetDbDir(),
directories_.GetDataDir(c->output_path_id()), stats_, &mutex_,
&bg_error_, snapshot_seqs, earliest_write_conflict_snapshot,
GetDataDir(c->column_family_data(), c->output_path_id()),
stats_, &mutex_, &bg_error_,
snapshot_seqs, earliest_write_conflict_snapshot,
snapshot_checker, table_cache_, &event_logger_,
c->mutable_cf_options()->paranoid_file_checks,
c->mutable_cf_options()->report_bg_io_stats, dbname_,

@ -191,8 +191,8 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
// Mark the elements in job_context->sst_delete_files as grabbedForPurge
// so that other threads calling FindObsoleteFiles with full_scan=true
// will not add these files to candidate list for purge.
for (const auto sst_to_del : job_context->sst_delete_files) {
MarkAsGrabbedForPurge(sst_to_del->fd.GetNumber());
for (const auto& sst_to_del : job_context->sst_delete_files) {
MarkAsGrabbedForPurge(sst_to_del.metadata->fd.GetNumber());
}
// store the current filenum, lognum, etc
@ -207,13 +207,29 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
if (doing_the_full_scan) {
InfoLogPrefix info_log_prefix(!immutable_db_options_.db_log_dir.empty(),
dbname_);
std::vector<std::string> paths;
for (size_t path_id = 0; path_id < immutable_db_options_.db_paths.size();
path_id++) {
paths.emplace_back(immutable_db_options_.db_paths[path_id].path);
}
// Note that if cf_paths is not specified in the ColumnFamilyOptions
// of a particular column family, we use db_paths as the cf_paths
// setting. Hence, there can be multiple duplicates of files from db_paths
// in the following code. The duplicate are removed while identifying
// unique files in PurgeObsoleteFiles.
for (auto cfd : *versions_->GetColumnFamilySet()) {
for (size_t path_id = 0; path_id < cfd->ioptions()->cf_paths.size();
path_id++) {
paths.emplace_back(cfd->ioptions()->cf_paths[path_id].path);
}
}
for (auto& path : paths) {
// set of all files in the directory. We'll exclude files that are still
// alive in the subsequent processings.
std::vector<std::string> files;
env_->GetChildren(immutable_db_options_.db_paths[path_id].path,
&files); // Ignore errors
env_->GetChildren(path, &files); // Ignore errors
for (const std::string& file : files) {
uint64_t number;
FileType type;
@ -231,7 +247,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
// TODO(icanadi) clean up this mess to avoid having one-off "/" prefixes
job_context->full_scan_candidate_files.emplace_back(
"/" + file, static_cast<uint32_t>(path_id));
"/" + file, path);
}
}
@ -241,7 +257,8 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
env_->GetChildren(immutable_db_options_.wal_dir,
&log_files); // Ignore errors
for (const std::string& log_file : log_files) {
job_context->full_scan_candidate_files.emplace_back(log_file, 0);
job_context->full_scan_candidate_files.emplace_back(log_file,
immutable_db_options_.wal_dir);
}
}
// Add info log files in db_log_dir
@ -250,8 +267,9 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
std::vector<std::string> info_log_files;
// Ignore errors
env_->GetChildren(immutable_db_options_.db_log_dir, &info_log_files);
for (std::string log_file : info_log_files) {
job_context->full_scan_candidate_files.emplace_back(log_file, 0);
for (std::string& log_file : info_log_files) {
job_context->full_scan_candidate_files.emplace_back(log_file,
immutable_db_options_.db_log_dir);
}
}
}
@ -326,7 +344,7 @@ bool CompareCandidateFile(const JobContext::CandidateFileInfo& first,
} else if (first.file_name < second.file_name) {
return false;
} else {
return (first.path_id > second.path_id);
return (first.file_path > second.file_path);
}
}
}; // namespace
@ -335,12 +353,11 @@ bool CompareCandidateFile(const JobContext::CandidateFileInfo& first,
// Note: All WAL files must be deleted through this function (unelss they are
// archived) to ensure that maniefest is updated properly.
void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname,
FileType type, uint64_t number,
uint32_t path_id) {
FileType type, uint64_t number) {
Status file_deletion_status;
if (type == kTableFile) {
file_deletion_status =
DeleteSSTFile(&immutable_db_options_, fname, path_id);
DeleteSSTFile(&immutable_db_options_, fname);
} else {
if (type == kLogFile) {
// Before deleting the file, mark file as deleted in the manifest
@ -385,7 +402,7 @@ void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname,
// belong to live files are possibly removed. Also, removes all the
// files in sst_delete_files and log_delete_files.
// It is not necessary to hold the mutex when invoking this method.
void DBImpl::PurgeObsoleteFiles(const JobContext& state, bool schedule_only) {
void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:Begin");
// we'd better have sth to delete
assert(state.HaveSomethingToDelete());
@ -408,23 +425,23 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state, bool schedule_only) {
state.log_delete_files.size() + state.manifest_delete_files.size());
// We may ignore the dbname when generating the file names.
const char* kDumbDbName = "";
for (auto file : state.sst_delete_files) {
for (auto& file : state.sst_delete_files) {
candidate_files.emplace_back(
MakeTableFileName(kDumbDbName, file->fd.GetNumber()),
file->fd.GetPathId());
if (file->table_reader_handle) {
table_cache_->Release(file->table_reader_handle);
MakeTableFileName(kDumbDbName, file.metadata->fd.GetNumber()), file.path);
if (file.metadata->table_reader_handle) {
table_cache_->Release(file.metadata->table_reader_handle);
}
delete file;
file.DeleteMetadata();
}
for (auto file_num : state.log_delete_files) {
if (file_num > 0) {
candidate_files.emplace_back(LogFileName(kDumbDbName, file_num), 0);
candidate_files.emplace_back(LogFileName(kDumbDbName, file_num),
immutable_db_options_.wal_dir);
}
}
for (const auto& filename : state.manifest_delete_files) {
candidate_files.emplace_back(filename, 0);
candidate_files.emplace_back(filename, dbname_);
}
// dedup state.candidate_files so we don't try to delete the same
@ -450,7 +467,6 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state, bool schedule_only) {
std::unordered_set<uint64_t> files_to_del;
for (const auto& candidate_file : candidate_files) {
std::string to_delete = candidate_file.file_name;
uint32_t path_id = candidate_file.path_id;
uint64_t number;
FileType type;
// Ignore file if we cannot recognize it.
@ -517,7 +533,7 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state, bool schedule_only) {
if (type == kTableFile) {
// evict from cache
TableCache::Evict(table_cache_.get(), number);
fname = TableFileName(immutable_db_options_.db_paths, number, path_id);
fname = MakeTableFileName(candidate_file.file_path, number);
} else {
fname = ((type == kLogFile) ? immutable_db_options_.wal_dir : dbname_) +
"/" + to_delete;
@ -534,9 +550,9 @@ void DBImpl::PurgeObsoleteFiles(const JobContext& state, bool schedule_only) {
Status file_deletion_status;
if (schedule_only) {
InstrumentedMutexLock guard_lock(&mutex_);
SchedulePendingPurge(fname, type, number, path_id, state.job_id);
SchedulePendingPurge(fname, type, number, state.job_id);
} else {
DeleteObsoleteFileImpl(state.job_id, fname, type, number, path_id);
DeleteObsoleteFileImpl(state.job_id, fname, type, number);
}
}

@ -163,17 +163,13 @@ static Status ValidateOptions(
if (s.ok() && db_options.allow_concurrent_memtable_write) {
s = CheckConcurrentWritesSupported(cfd.options);
}
if (s.ok()) {
s = CheckCFPathsSupported(db_options, cfd.options);
}
if (!s.ok()) {
return s;
}
if (db_options.db_paths.size() > 1) {
if ((cfd.options.compaction_style != kCompactionStyleUniversal) &&
(cfd.options.compaction_style != kCompactionStyleLevel)) {
return Status::NotSupported(
"More than one DB paths are only supported in "
"universal and level compaction styles. ");
}
}
if (cfd.options.ttl > 0 || cfd.options.compaction_options_fifo.ttl > 0) {
if (db_options.max_open_files != -1) {
return Status::NotSupported(
@ -253,9 +249,9 @@ Status DBImpl::NewDB() {
return s;
}
Status DBImpl::Directories::CreateAndNewDirectory(
Status DBImpl::CreateAndNewDirectory(
Env* env, const std::string& dirname,
std::unique_ptr<Directory>* directory) const {
std::unique_ptr<Directory>* directory) {
// We call CreateDirIfMissing() as the directory may already exist (if we
// are reopening a DB), when this happens we don't want creating the
// directory to cause an error. However, we need to check if creating the
@ -273,12 +269,12 @@ Status DBImpl::Directories::CreateAndNewDirectory(
Status DBImpl::Directories::SetDirectories(
Env* env, const std::string& dbname, const std::string& wal_dir,
const std::vector<DbPath>& data_paths) {
Status s = CreateAndNewDirectory(env, dbname, &db_dir_);
Status s = DBImpl::CreateAndNewDirectory(env, dbname, &db_dir_);
if (!s.ok()) {
return s;
}
if (!wal_dir.empty() && dbname != wal_dir) {
s = CreateAndNewDirectory(env, wal_dir, &wal_dir_);
s = DBImpl::CreateAndNewDirectory(env, wal_dir, &wal_dir_);
if (!s.ok()) {
return s;
}
@ -291,7 +287,7 @@ Status DBImpl::Directories::SetDirectories(
data_dirs_.emplace_back(nullptr);
} else {
std::unique_ptr<Directory> path_directory;
s = CreateAndNewDirectory(env, db_path, &path_directory);
s = DBImpl::CreateAndNewDirectory(env, db_path, &path_directory);
if (!s.ok()) {
return s;
}
@ -384,6 +380,14 @@ Status DBImpl::Recover(
if (immutable_db_options_.paranoid_checks && s.ok()) {
s = CheckConsistency();
}
if (s.ok() && !read_only) {
for (auto cfd : *versions_->GetColumnFamilySet()) {
s = cfd->AddDirectories();
if (!s.ok()) {
return s;
}
}
}
if (s.ok()) {
SequenceNumber next_sequence(kMaxSequenceNumber);
default_cf_handle_ = new ColumnFamilyHandleImpl(
@ -1030,8 +1034,17 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
DBImpl* impl = new DBImpl(db_options, dbname, seq_per_batch);
s = impl->env_->CreateDirIfMissing(impl->immutable_db_options_.wal_dir);
if (s.ok()) {
for (auto db_path : impl->immutable_db_options_.db_paths) {
s = impl->env_->CreateDirIfMissing(db_path.path);
std::vector<std::string> paths;
for (auto& db_path : impl->immutable_db_options_.db_paths) {
paths.emplace_back(db_path.path);
}
for (auto& cf : column_families) {
for (auto& cf_path : cf.options.cf_paths) {
paths.emplace_back(cf_path.path);
}
}
for (auto& path : paths) {
s = impl->env_->CreateDirIfMissing(path);
if (!s.ok()) {
break;
}
@ -1174,20 +1187,31 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
impl->immutable_db_options_.sst_file_manager.get());
if (s.ok() && sfm) {
// Notify SstFileManager about all sst files that already exist in
// db_paths[0] when the DB is opened.
auto& db_path = impl->immutable_db_options_.db_paths[0];
// db_paths[0] and cf_paths[0] when the DB is opened.
std::vector<std::string> paths;
paths.emplace_back(impl->immutable_db_options_.db_paths[0].path);
for (auto& cf : column_families) {
if (!cf.options.cf_paths.empty()) {
paths.emplace_back(cf.options.cf_paths[0].path);
}
}
// Remove duplicate paths.
std::sort(paths.begin(), paths.end());
paths.erase(std::unique(paths.begin(), paths.end()), paths.end());
for (auto& path : paths) {
std::vector<std::string> existing_files;
impl->immutable_db_options_.env->GetChildren(db_path.path, &existing_files);
impl->immutable_db_options_.env->GetChildren(path, &existing_files);
for (auto& file_name : existing_files) {
uint64_t file_number;
FileType file_type;
std::string file_path = db_path.path + "/" + file_name;
std::string file_path = path + "/" + file_name;
if (ParseFileName(file_name, &file_number, &file_type) &&
file_type == kTableFile) {
sfm->OnAddFile(file_path);
}
}
}
}
#endif // !ROCKSDB_LITE
if (s.ok()) {

@ -103,7 +103,7 @@ TEST_F(DBSSTTest, SSTsWithLdbSuffixHandling) {
ASSERT_GT(num_files, 0);
std::vector<std::string> filenames;
GetSstFiles(dbname_, &filenames);
GetSstFiles(env_, dbname_, &filenames);
int num_ldb_files = 0;
for (size_t i = 0; i < filenames.size(); ++i) {
if (i & 1) {

@ -1112,7 +1112,7 @@ TEST_F(DBTest2, PresetCompressionDict) {
size_t out_bytes = 0;
std::vector<std::string> files;
GetSstFiles(dbname_, &files);
GetSstFiles(env_, dbname_, &files);
for (const auto& file : files) {
uint64_t curr_bytes;
env_->GetFileSize(dbname_ + "/" + file, &curr_bytes);

@ -575,9 +575,17 @@ void DBTestBase::DestroyAndReopen(const Options& options) {
ASSERT_OK(TryReopen(options));
}
void DBTestBase::Destroy(const Options& options) {
void DBTestBase::Destroy(const Options& options, bool delete_cf_paths) {
std::vector<ColumnFamilyDescriptor> column_families;
if (delete_cf_paths) {
for (size_t i = 0; i < handles_.size(); ++i) {
ColumnFamilyDescriptor cfdescriptor;
handles_[i]->GetDescriptor(&cfdescriptor);
column_families.push_back(cfdescriptor);
}
}
Close();
ASSERT_OK(DestroyDB(dbname_, options));
ASSERT_OK(DestroyDB(dbname_, options, column_families));
}
Status DBTestBase::ReadOnlyReopen(const Options& options) {
@ -1017,9 +1025,9 @@ std::string DBTestBase::DumpSSTableList() {
return property;
}
void DBTestBase::GetSstFiles(std::string path,
void DBTestBase::GetSstFiles(Env* env, std::string path,
std::vector<std::string>* files) {
env_->GetChildren(path, files);
env->GetChildren(path, files);
files->erase(
std::remove_if(files->begin(), files->end(), [](std::string name) {
@ -1031,7 +1039,7 @@ void DBTestBase::GetSstFiles(std::string path,
int DBTestBase::GetSstFileCount(std::string path) {
std::vector<std::string> files;
GetSstFiles(path, &files);
DBTestBase::GetSstFiles(env_, path, &files);
return static_cast<int>(files.size());
}

@ -804,7 +804,7 @@ class DBTestBase : public testing::Test {
void DestroyAndReopen(const Options& options);
void Destroy(const Options& options);
void Destroy(const Options& options, bool delete_cf_paths = false);
Status ReadOnlyReopen(const Options& options);
@ -904,7 +904,8 @@ class DBTestBase : public testing::Test {
std::string DumpSSTableList();
void GetSstFiles(std::string path, std::vector<std::string>* files);
static void GetSstFiles(Env* env, std::string path,
std::vector<std::string>* files);
int GetSstFileCount(std::string path);

@ -1343,6 +1343,146 @@ TEST_P(DBTestUniversalCompaction, UniversalCompactionFourPaths) {
Destroy(options);
}
TEST_P(DBTestUniversalCompaction, UniversalCompactionCFPathUse) {
Options options = CurrentOptions();
options.db_paths.emplace_back(dbname_, 300 * 1024);
options.db_paths.emplace_back(dbname_ + "_2", 300 * 1024);
options.db_paths.emplace_back(dbname_ + "_3", 500 * 1024);
options.db_paths.emplace_back(dbname_ + "_4", 1024 * 1024 * 1024);
options.memtable_factory.reset(
new SpecialSkipListFactory(KNumKeysByGenerateNewFile - 1));
options.compaction_style = kCompactionStyleUniversal;
options.compaction_options_universal.size_ratio = 5;
options.write_buffer_size = 111 << 10; // 114KB
options.arena_block_size = 4 << 10;
options.level0_file_num_compaction_trigger = 2;
options.num_levels = 1;
std::vector<Options> option_vector;
option_vector.emplace_back(options);
ColumnFamilyOptions cf_opt1(options), cf_opt2(options);
// Configure CF1 specific paths.
cf_opt1.cf_paths.emplace_back(dbname_ + "cf1", 300 * 1024);
cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_2", 300 * 1024);
cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_3", 500 * 1024);
cf_opt1.cf_paths.emplace_back(dbname_ + "cf1_4", 1024 * 1024 * 1024);
option_vector.emplace_back(DBOptions(options), cf_opt1);
CreateColumnFamilies({"one"},option_vector[1]);
// Configura CF2 specific paths.
cf_opt2.cf_paths.emplace_back(dbname_ + "cf2", 300 * 1024);
cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_2", 300 * 1024);
cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_3", 500 * 1024);
cf_opt2.cf_paths.emplace_back(dbname_ + "cf2_4", 1024 * 1024 * 1024);
option_vector.emplace_back(DBOptions(options), cf_opt2);
CreateColumnFamilies({"two"},option_vector[2]);
ReopenWithColumnFamilies({"default", "one", "two"}, option_vector);
Random rnd(301);
int key_idx = 0;
int key_idx1 = 0;
int key_idx2 = 0;
auto generate_file = [&]() {
GenerateNewFile(0, &rnd, &key_idx);
GenerateNewFile(1, &rnd, &key_idx1);
GenerateNewFile(2, &rnd, &key_idx2);
};
auto check_sstfilecount = [&](int path_id, int expected) {
ASSERT_EQ(expected, GetSstFileCount(options.db_paths[path_id].path));
ASSERT_EQ(expected, GetSstFileCount(cf_opt1.cf_paths[path_id].path));
ASSERT_EQ(expected, GetSstFileCount(cf_opt2.cf_paths[path_id].path));
};
auto check_getvalues = [&]() {
for (int i = 0; i < key_idx; i++) {
auto v = Get(0, Key(i));
ASSERT_NE(v, "NOT_FOUND");
ASSERT_TRUE(v.size() == 1 || v.size() == 990);
}
for (int i = 0; i < key_idx1; i++) {
auto v = Get(1, Key(i));
ASSERT_NE(v, "NOT_FOUND");
ASSERT_TRUE(v.size() == 1 || v.size() == 990);
}
for (int i = 0; i < key_idx2; i++) {
auto v = Get(2, Key(i));
ASSERT_NE(v, "NOT_FOUND");
ASSERT_TRUE(v.size() == 1 || v.size() == 990);
}
};
// First three 110KB files are not going to second path.
// After that, (100K, 200K)
for (int num = 0; num < 3; num++) {
generate_file();
}
// Another 110KB triggers a compaction to 400K file to second path
generate_file();
check_sstfilecount(2, 1);
// (1, 4)
generate_file();
check_sstfilecount(2, 1);
check_sstfilecount(0, 1);
// (1,1,4) -> (2, 4)
generate_file();
check_sstfilecount(2, 1);
check_sstfilecount(1, 1);
check_sstfilecount(0, 0);
// (1, 2, 4) -> (3, 4)
generate_file();
check_sstfilecount(2, 1);
check_sstfilecount(1, 1);
check_sstfilecount(0, 0);
// (1, 3, 4) -> (8)
generate_file();
check_sstfilecount(3, 1);
// (1, 8)
generate_file();
check_sstfilecount(3, 1);
check_sstfilecount(0, 1);
// (1, 1, 8) -> (2, 8)
generate_file();
check_sstfilecount(3, 1);
check_sstfilecount(1, 1);
// (1, 2, 8) -> (3, 8)
generate_file();
check_sstfilecount(3, 1);
check_sstfilecount(1, 1);
check_sstfilecount(0, 0);
// (1, 3, 8) -> (4, 8)
generate_file();
check_sstfilecount(2, 1);
check_sstfilecount(3, 1);
// (1, 4, 8) -> (5, 8)
generate_file();
check_sstfilecount(3, 1);
check_sstfilecount(2, 1);
check_sstfilecount(0, 0);
check_getvalues();
ReopenWithColumnFamilies({"default", "one", "two"}, option_vector);
check_getvalues();
Destroy(options, true);
}
TEST_P(DBTestUniversalCompaction, IncreaseUniversalCompactionNumLevels) {
std::function<void(int)> verify_func = [&](int num_keys_in_db) {
std::string keys_in_db;

@ -94,7 +94,8 @@ Status ExternalSstFileIngestionJob::Prepare(
const std::string path_outside_db = f.external_file_path;
const std::string path_inside_db =
TableFileName(db_options_.db_paths, f.fd.GetNumber(), f.fd.GetPathId());
TableFileName(cfd_->ioptions()->cf_paths, f.fd.GetNumber(),
f.fd.GetPathId());
if (ingestion_options_.move_files) {
status = env_->LinkFile(path_outside_db, path_inside_db);

@ -92,11 +92,12 @@ struct JobContext {
// Structure to store information for candidate files to delete.
struct CandidateFileInfo {
std::string file_name;
uint32_t path_id;
CandidateFileInfo(std::string name, uint32_t path)
: file_name(std::move(name)), path_id(path) {}
std::string file_path;
CandidateFileInfo(std::string name, std::string path)
: file_name(std::move(name)), file_path(path) {}
bool operator==(const CandidateFileInfo& other) const {
return file_name == other.file_name && path_id == other.path_id;
return file_name == other.file_name &&
file_path == other.file_path;
}
};
@ -113,7 +114,7 @@ struct JobContext {
std::vector<FileDescriptor> sst_live;
// a list of sst files that we need to delete
std::vector<FileMetaData*> sst_delete_files;
std::vector<ObsoleteFileInfo> sst_delete_files;
// a list of log files that we need to delete
std::vector<uint64_t> log_delete_files;

@ -101,10 +101,12 @@ class Repairer {
db_options_(SanitizeOptions(dbname_, db_options)),
immutable_db_options_(ImmutableDBOptions(db_options_)),
icmp_(default_cf_opts.comparator),
default_cf_opts_(default_cf_opts),
default_cf_opts_(
SanitizeOptions(immutable_db_options_, default_cf_opts)),
default_cf_iopts_(
ImmutableCFOptions(immutable_db_options_, default_cf_opts)),
unknown_cf_opts_(unknown_cf_opts),
ImmutableCFOptions(immutable_db_options_, default_cf_opts_)),
unknown_cf_opts_(
SanitizeOptions(immutable_db_options_, unknown_cf_opts)),
create_unknown_cfs_(create_unknown_cfs),
raw_table_cache_(
// TableCache can be small since we expect each table to be opened

@ -92,7 +92,7 @@ Status TableCache::GetTableReader(
bool skip_filters, int level, bool prefetch_index_and_filter_in_cache,
bool for_compaction) {
std::string fname =
TableFileName(ioptions_.db_paths, fd.GetNumber(), fd.GetPathId());
TableFileName(ioptions_.cf_paths, fd.GetNumber(), fd.GetPathId());
unique_ptr<RandomAccessFile> file;
Status s = ioptions_.env->NewRandomAccessFile(fname, &file, env_options);

@ -355,7 +355,11 @@ Version::~Version() {
assert(f->refs > 0);
f->refs--;
if (f->refs <= 0) {
vset_->obsolete_files_.push_back(f);
assert(cfd_ != nullptr);
uint32_t path_id = f->fd.GetPathId();
assert(path_id < cfd_->ioptions()->cf_paths.size());
vset_->obsolete_files_.push_back(
ObsoleteFileInfo(f, cfd_->ioptions()->cf_paths[path_id].path));
}
}
}
@ -756,7 +760,7 @@ Status Version::GetTableProperties(std::shared_ptr<const TableProperties>* tp,
file_name = *fname;
} else {
file_name =
TableFileName(vset_->db_options_->db_paths, file_meta->fd.GetNumber(),
TableFileName(ioptions->cf_paths, file_meta->fd.GetNumber(),
file_meta->fd.GetPathId());
}
s = ioptions->env->NewRandomAccessFile(file_name, &file, env_options_);
@ -797,7 +801,7 @@ Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props,
int level) {
for (const auto& file_meta : storage_info_.files_[level]) {
auto fname =
TableFileName(vset_->db_options_->db_paths, file_meta->fd.GetNumber(),
TableFileName(cfd_->ioptions()->cf_paths, file_meta->fd.GetNumber(),
file_meta->fd.GetPathId());
// 1. If the table is already present in table cache, load table
// properties from there.
@ -825,7 +829,7 @@ Status Version::GetPropertiesOfTablesInRange(
false);
for (const auto& file_meta : files) {
auto fname =
TableFileName(vset_->db_options_->db_paths,
TableFileName(cfd_->ioptions()->cf_paths,
file_meta->fd.GetNumber(), file_meta->fd.GetPathId());
if (props->count(fname) == 0) {
// 1. If the table is already present in table cache, load table
@ -897,11 +901,11 @@ void Version::GetColumnFamilyMetaData(ColumnFamilyMetaData* cf_meta) {
for (const auto& file : vstorage->LevelFiles(level)) {
uint32_t path_id = file->fd.GetPathId();
std::string file_path;
if (path_id < ioptions->db_paths.size()) {
file_path = ioptions->db_paths[path_id].path;
if (path_id < ioptions->cf_paths.size()) {
file_path = ioptions->cf_paths[path_id].path;
} else {
assert(!ioptions->db_paths.empty());
file_path = ioptions->db_paths.back().path;
assert(!ioptions->cf_paths.empty());
file_path = ioptions->cf_paths.back().path;
}
files.emplace_back(
MakeTableFileName("", file->fd.GetNumber()), file_path,
@ -2687,12 +2691,12 @@ VersionSet::~VersionSet() {
Cache* table_cache = column_family_set_->get_table_cache();
table_cache->ApplyToAllCacheEntries(&CloseTables, false /* thread_safe */);
column_family_set_.reset();
for (auto file : obsolete_files_) {
if (file->table_reader_handle) {
table_cache->Release(file->table_reader_handle);
TableCache::Evict(table_cache, file->fd.GetNumber());
for (auto& file : obsolete_files_) {
if (file.metadata->table_reader_handle) {
table_cache->Release(file.metadata->table_reader_handle);
TableCache::Evict(table_cache, file.metadata->fd.GetNumber());
}
delete file;
file.DeleteMetadata();
}
obsolete_files_.clear();
}
@ -4101,11 +4105,11 @@ void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
LiveFileMetaData filemetadata;
filemetadata.column_family_name = cfd->GetName();
uint32_t path_id = file->fd.GetPathId();
if (path_id < db_options_->db_paths.size()) {
filemetadata.db_path = db_options_->db_paths[path_id].path;
if (path_id < cfd->ioptions()->cf_paths.size()) {
filemetadata.db_path = cfd->ioptions()->cf_paths[path_id].path;
} else {
assert(!db_options_->db_paths.empty());
filemetadata.db_path = db_options_->db_paths.back().path;
assert(!cfd->ioptions()->cf_paths.empty());
filemetadata.db_path = cfd->ioptions()->cf_paths.back().path;
}
filemetadata.name = MakeTableFileName("", file->fd.GetNumber());
filemetadata.level = level;
@ -4120,17 +4124,17 @@ void VersionSet::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
}
}
void VersionSet::GetObsoleteFiles(std::vector<FileMetaData*>* files,
void VersionSet::GetObsoleteFiles(std::vector<ObsoleteFileInfo>* files,
std::vector<std::string>* manifest_filenames,
uint64_t min_pending_output) {
assert(manifest_filenames->empty());
obsolete_manifests_.swap(*manifest_filenames);
std::vector<FileMetaData*> pending_files;
for (auto f : obsolete_files_) {
if (f->fd.GetNumber() < min_pending_output) {
files->push_back(f);
std::vector<ObsoleteFileInfo> pending_files;
for (auto& f : obsolete_files_) {
if (f.metadata->fd.GetNumber() < min_pending_output) {
files->push_back(std::move(f));
} else {
pending_files.push_back(f);
pending_files.push_back(std::move(f));
}
}
obsolete_files_.swap(pending_files);

@ -695,6 +695,36 @@ class Version {
void operator=(const Version&);
};
struct ObsoleteFileInfo {
FileMetaData* metadata;
std::string path;
ObsoleteFileInfo() noexcept : metadata(nullptr) {}
ObsoleteFileInfo(FileMetaData* f, const std::string& file_path)
: metadata(f), path(file_path) {}
ObsoleteFileInfo(const ObsoleteFileInfo&) = delete;
ObsoleteFileInfo& operator=(const ObsoleteFileInfo&) = delete;
ObsoleteFileInfo(ObsoleteFileInfo&& rhs) noexcept :
ObsoleteFileInfo() {
*this = std::move(rhs);
}
ObsoleteFileInfo& operator=(ObsoleteFileInfo&& rhs) noexcept {
path = std::move(rhs.path);
metadata = rhs.metadata;
rhs.metadata = nullptr;
return *this;
}
void DeleteMetadata() {
delete metadata;
metadata = nullptr;
}
};
class VersionSet {
public:
VersionSet(const std::string& dbname, const ImmutableDBOptions* db_options,
@ -876,7 +906,7 @@ class VersionSet {
// This function doesn't support leveldb SST filenames
void GetLiveFilesMetaData(std::vector<LiveFileMetaData> *metadata);
void GetObsoleteFiles(std::vector<FileMetaData*>* files,
void GetObsoleteFiles(std::vector<ObsoleteFileInfo>* files,
std::vector<std::string>* manifest_filenames,
uint64_t min_pending_output);
@ -959,7 +989,7 @@ class VersionSet {
// Current size of manifest file
uint64_t manifest_file_size_;
std::vector<FileMetaData*> obsolete_files_;
std::vector<ObsoleteFileInfo> obsolete_files_;
std::vector<std::string> obsolete_manifests_;
// env options for all reads and writes except compactions

@ -1167,7 +1167,9 @@ class DB {
// Destroy the contents of the specified database.
// Be very careful using this method.
Status DestroyDB(const std::string& name, const Options& options);
Status DestroyDB(const std::string& name, const Options& options,
const std::vector<ColumnFamilyDescriptor>& column_families =
std::vector<ColumnFamilyDescriptor>());
#ifndef ROCKSDB_LITE
// If a DB cannot be opened, you may attempt to call this method to

@ -77,6 +77,7 @@ enum CompressionType : unsigned char {
};
struct Options;
struct DbPath;
struct ColumnFamilyOptions : public AdvancedColumnFamilyOptions {
// The function recovers options to a previous version. Only 4.6 or later
@ -263,6 +264,20 @@ struct ColumnFamilyOptions : public AdvancedColumnFamilyOptions {
// BlockBasedTableOptions.
std::shared_ptr<TableFactory> table_factory;
// A list of paths where SST files for this column family
// can be put into, with its target size. Similar to db_paths,
// newer data is placed into paths specified earlier in the
// vector while older data gradually moves to paths specified
// later in the vector.
// Note that, if a path is supplied to multiple column
// families, it would have files and total size from all
// the column families combined. User should privision for the
// total size(from all the column families) in such cases.
//
// If left empty, db_paths will be used.
// Default: empty
std::vector<DbPath> cf_paths;
// Create ColumnFamilyOptions with default values for all fields
ColumnFamilyOptions();
// Create ColumnFamilyOptions from Options

@ -75,7 +75,8 @@ ImmutableCFOptions::ImmutableCFOptions(const ImmutableDBOptions& db_options,
max_subcompactions(db_options.max_subcompactions),
memtable_insert_with_hint_prefix_extractor(
cf_options.memtable_insert_with_hint_prefix_extractor.get()),
ttl(cf_options.ttl) {}
ttl(cf_options.ttl),
cf_paths(cf_options.cf_paths) {}
// Multiple two operands. If they overflow, return op1.
uint64_t MultiplyCheckOverflow(uint64_t op1, double op2) {

@ -120,6 +120,8 @@ struct ImmutableCFOptions {
const SliceTransform* memtable_insert_with_hint_prefix_extractor;
uint64_t ttl;
std::vector<DbPath> cf_paths;
};
struct MutableCFOptions {

@ -1601,6 +1601,7 @@ std::unordered_map<std::string, OptionTypeInfo>
uint34_t* existing_value_size,
Slice delta_value,
std::string* merged_value);
std::vector<DbPath> cf_paths;
*/
{"report_bg_io_stats",
{offset_of(&ColumnFamilyOptions::report_bg_io_stats),

@ -345,6 +345,8 @@ TEST_F(OptionsSettableTest, ColumnFamilyOptionsAllFieldsSettable) {
sizeof(std::shared_ptr<const SliceTransform>)},
{offset_of(&ColumnFamilyOptions::table_factory),
sizeof(std::shared_ptr<TableFactory>)},
{offset_of(&ColumnFamilyOptions::cf_paths),
sizeof(std::vector<DbPath>)},
};
char* options_ptr = new char[sizeof(ColumnFamilyOptions)];

@ -1017,8 +1017,10 @@ struct ThreadState {
class DbStressListener : public EventListener {
public:
DbStressListener(const std::string& db_name,
const std::vector<DbPath>& db_paths)
: db_name_(db_name), db_paths_(db_paths) {}
const std::vector<DbPath>& db_paths,
const std::vector<ColumnFamilyDescriptor>& column_families)
: db_name_(db_name), db_paths_(db_paths),
column_families_(column_families) {}
virtual ~DbStressListener() {}
#ifndef ROCKSDB_LITE
virtual void OnFlushCompleted(DB* db, const FlushJobInfo& info) override {
@ -1085,6 +1087,13 @@ class DbStressListener : public EventListener {
return;
}
}
for (auto& cf : column_families_) {
for (const auto& cf_path : cf.options.cf_paths) {
if (cf_path.path == file_dir) {
return;
}
}
}
assert(false);
#endif // !NDEBUG
}
@ -1117,6 +1126,7 @@ class DbStressListener : public EventListener {
private:
std::string db_name_;
std::vector<DbPath> db_paths_;
std::vector<ColumnFamilyDescriptor> column_families_;
};
} // namespace
@ -2547,7 +2557,7 @@ class StressTest {
}
options_.listeners.clear();
options_.listeners.emplace_back(
new DbStressListener(FLAGS_db, options_.db_paths));
new DbStressListener(FLAGS_db, options_.db_paths, cf_descriptors));
options_.create_missing_column_families = true;
if (!FLAGS_use_txn) {
s = DB::Open(DBOptions(options_), FLAGS_db, cf_descriptors,

@ -83,7 +83,7 @@ Status CreateFile(Env* env, const std::string& destination,
}
Status DeleteSSTFile(const ImmutableDBOptions* db_options,
const std::string& fname, uint32_t path_id) {
const std::string& fname) {
#ifndef ROCKSDB_LITE
auto sfm =
static_cast<SstFileManagerImpl*>(db_options->sst_file_manager.get());

@ -22,6 +22,6 @@ extern Status CreateFile(Env* env, const std::string& destination,
const std::string& contents);
extern Status DeleteSSTFile(const ImmutableDBOptions* db_options,
const std::string& fname, uint32_t path_id);
const std::string& fname);
} // namespace rocksdb

Loading…
Cancel
Save