Delete files in given key range

Summary:
This is an initial diff for providing the ability to delete
files which are completely within a given range of keys.

Test Plan: DBCompactionTest.DeleteRange

Reviewers: IslamAbdelRahman, sdong

Reviewed By: sdong

Subscribers: yoshinorim, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D52293
main
Venkatesh Radhakrishnan 9 years ago
parent d8677a8d2c
commit 63ddb783db
  1. 7
      db/convenience.cc
  2. 107
      db/db_compaction_test.cc
  3. 70
      db/db_impl.cc
  4. 2
      db/db_impl.h
  5. 10
      include/rocksdb/convenience.h

@ -18,6 +18,13 @@ namespace rocksdb {
void CancelAllBackgroundWork(DB* db, bool wait) {
(dynamic_cast<DBImpl*>(db))->CancelAllBackgroundWork(wait);
}
Status DeleteFilesInRange(DB* db, ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end) {
return (dynamic_cast<DBImpl*>(db))
->DeleteFilesInRange(column_family, begin, end);
}
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -10,6 +10,7 @@
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "rocksdb/experimental.h"
#include "rocksdb/utilities/convenience.h"
#include "util/sync_point.h"
namespace rocksdb {
@ -1140,6 +1141,112 @@ TEST_F(DBCompactionTest, ManualPartialFill) {
}
}
TEST_F(DBCompactionTest, DeleteFileRange) {
Options options = CurrentOptions();
options.write_buffer_size = 10 * 1024 * 1024;
options.max_bytes_for_level_multiplier = 2;
options.num_levels = 4;
options.level0_file_num_compaction_trigger = 3;
options.max_background_compactions = 3;
DestroyAndReopen(options);
int32_t value_size = 10 * 1024; // 10 KB
// Add 2 non-overlapping files
Random rnd(301);
std::map<int32_t, std::string> values;
// file 1 [0 => 100]
for (int32_t i = 0; i < 100; i++) {
values[i] = RandomString(&rnd, value_size);
ASSERT_OK(Put(Key(i), values[i]));
}
ASSERT_OK(Flush());
// file 2 [100 => 300]
for (int32_t i = 100; i < 300; i++) {
values[i] = RandomString(&rnd, value_size);
ASSERT_OK(Put(Key(i), values[i]));
}
ASSERT_OK(Flush());
// 2 files in L0
ASSERT_EQ("2", FilesPerLevel(0));
CompactRangeOptions compact_options;
compact_options.change_level = true;
compact_options.target_level = 2;
ASSERT_OK(db_->CompactRange(compact_options, nullptr, nullptr));
// 2 files in L2
ASSERT_EQ("0,0,2", FilesPerLevel(0));
// file 3 [ 0 => 200]
for (int32_t i = 0; i < 200; i++) {
values[i] = RandomString(&rnd, value_size);
ASSERT_OK(Put(Key(i), values[i]));
}
ASSERT_OK(Flush());
// Many files 4 [300 => 4300)
for (int32_t i = 0; i <= 5; i++) {
for (int32_t j = 300; j < 4300; j++) {
if (j == 2300) {
ASSERT_OK(Flush());
dbfull()->TEST_WaitForFlushMemTable();
}
values[j] = RandomString(&rnd, value_size);
ASSERT_OK(Put(Key(j), values[j]));
}
}
ASSERT_OK(Flush());
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
// Verify level sizes
uint64_t target_size = 4 * options.max_bytes_for_level_base;
for (int32_t i = 1; i < options.num_levels; i++) {
ASSERT_LE(SizeAtLevel(i), target_size);
target_size *= options.max_bytes_for_level_multiplier;
}
int32_t old_num_files = CountFiles();
std::string begin_string = Key(1000);
std::string end_string = Key(2000);
Slice begin(begin_string);
Slice end(end_string);
ASSERT_OK(DeleteFilesInRange(db_, db_->DefaultColumnFamily(), &begin, &end));
int32_t deleted_count = 0;
for (int32_t i = 0; i < 4300; i++) {
if (i < 1000 || i > 2000) {
ASSERT_EQ(Get(Key(i)), values[i]);
} else {
ReadOptions roptions;
std::string result;
Status s = db_->Get(roptions, Key(i), &result);
ASSERT_TRUE(s.IsNotFound() || s.ok());
if (s.IsNotFound()) {
deleted_count++;
}
}
}
ASSERT_GT(deleted_count, 0);
ASSERT_OK(
DeleteFilesInRange(db_, db_->DefaultColumnFamily(), nullptr, nullptr));
int32_t deleted_count2 = 0;
for (int32_t i = 0; i < 4300; i++) {
ReadOptions roptions;
std::string result;
Status s = db_->Get(roptions, Key(i), &result);
ASSERT_TRUE(s.IsNotFound());
deleted_count2++;
}
ASSERT_GT(deleted_count2, deleted_count);
int32_t new_num_files = CountFiles();
ASSERT_GT(old_num_files, new_num_files);
}
TEST_P(DBCompactionTestWithParam, TrivialMoveToLastLevelWithFiles) {
int32_t trivial_move = 0;
int32_t non_trivial_move = 0;

@ -5007,6 +5007,76 @@ Status DBImpl::DeleteFile(std::string name) {
return status;
}
Status DBImpl::DeleteFilesInRange(ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end) {
Status status;
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
ColumnFamilyData* cfd = cfh->cfd();
VersionEdit edit;
JobContext job_context(next_job_id_.fetch_add(1), true);
{
InstrumentedMutexLock l(&mutex_);
auto* vstorage = cfd->current()->storage_info();
for (int i = 0; i < cfd->NumberLevels(); i++) {
if (vstorage->LevelFiles(i).empty() ||
!vstorage->OverlapInLevel(i, begin, end)) {
continue;
}
std::vector<FileMetaData*> level_files;
InternalKey begin_storage, end_storage, *begin_key, *end_key;
if (begin == nullptr) {
begin_key = nullptr;
} else {
begin_storage.SetMaxPossibleForUserKey(*begin);
begin_key = &begin_storage;
}
if (end == nullptr) {
end_key = nullptr;
} else {
end_storage.SetMinPossibleForUserKey(*end);
end_key = &end_storage;
}
vstorage->GetOverlappingInputs(i, begin_key, end_key, &level_files, -1,
nullptr, false);
for (const auto* level_file : level_files) {
if (((begin == nullptr) ||
(cfd->internal_comparator().user_comparator()->Compare(
level_file->smallest.user_key(), *begin) >= 0)) &&
((end == nullptr) ||
(cfd->internal_comparator().user_comparator()->Compare(
level_file->largest.user_key(), *end) <= 0))) {
if (level_file->being_compacted) {
continue;
}
edit.SetColumnFamily(cfd->GetID());
edit.DeleteFile(i, level_file->fd.GetNumber());
}
}
}
if (edit.GetDeletedFiles().empty()) {
return Status::OK();
}
status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
&edit, &mutex_, directories_.GetDbDir());
if (status.ok()) {
InstallSuperVersionAndScheduleWorkWrapper(
cfd, &job_context, *cfd->GetLatestMutableCFOptions());
}
FindObsoleteFiles(&job_context, false);
} // lock released here
LogFlush(db_options_.info_log);
// remove files outside the db-lock
if (job_context.HaveSomethingToDelete()) {
// Call PurgeObsoleteFiles() without holding mutex.
PurgeObsoleteFiles(job_context);
}
job_context.Clean();
return status;
}
void DBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
InstrumentedMutexLock l(&mutex_);
versions_->GetLiveFilesMetaData(metadata);

@ -188,6 +188,8 @@ class DBImpl : public DB {
const TransactionLogIterator::ReadOptions&
read_options = TransactionLogIterator::ReadOptions()) override;
virtual Status DeleteFile(std::string name) override;
Status DeleteFilesInRange(ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end);
virtual void GetLiveFilesMetaData(
std::vector<LiveFileMetaData>* metadata) override;

@ -7,6 +7,7 @@
#include <unordered_map>
#include <string>
#include "rocksdb/db.h"
#include "rocksdb/options.h"
#include "rocksdb/table.h"
@ -92,8 +93,15 @@ Status GetMemTableRepFactoryFromString(
Status GetOptionsFromString(const Options& base_options,
const std::string& opts_str, Options* new_options);
/// Request stopping background work, if wait is true wait until it's done
// Request stopping background work, if wait is true wait until it's done
void CancelAllBackgroundWork(DB* db, bool wait = false);
// Delete files which are entirely in the given range
// Could leave some keys in the range which are in files which are not
// entirely in the range.
// Snapshots before the delete might not see the data in the given range.
Status DeleteFilesInRange(DB* db, ColumnFamilyHandle* column_family,
const Slice* begin, const Slice* end);
#endif // ROCKSDB_LITE
} // namespace rocksdb

Loading…
Cancel
Save