Update DB::AddFile() to have less restrictions

Summary:
Update DB::AddFile() restrictions to be
  - Key range in loaded table file don't overlap with existing keys or tombstones in DB.
  - No other writes happen during AddFile call.

The updated AddFile() will verify that the file key range don't overlap with any keys or tombstones in the DB, and then add the file to L0

Test Plan: unit tests

Reviewers: igor, rven, anthony, kradhakrishnan, sdong

Reviewed By: sdong

Subscribers: adsharma, ameyag, dhruba

Differential Revision: https://reviews.facebook.net/D49233
main
Islam AbdelRahman 9 years ago
parent 11c71a365a
commit ff4499e297
  1. 64
      db/db_impl.cc
  2. 148
      db/db_test.cc
  3. 5
      db/version_builder.cc
  4. 10
      include/rocksdb/db.h

@ -3274,10 +3274,6 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
ColumnFamilyData* cfd = cfh->cfd(); ColumnFamilyData* cfd = cfh->cfd();
if (cfd->NumberLevels() <= 1) {
return Status::NotSupported(
"AddFile requires a database with at least 2 levels");
}
if (file_info->version != 1) { if (file_info->version != 1) {
return Status::InvalidArgument("Generated table version is not supported"); return Status::InvalidArgument("Generated table version is not supported");
} }
@ -3324,48 +3320,44 @@ Status DBImpl::AddFile(ColumnFamilyHandle* column_family,
WriteThread::Writer w; WriteThread::Writer w;
write_thread_.EnterUnbatched(&w, &mutex_); write_thread_.EnterUnbatched(&w, &mutex_);
// Make sure memtables are empty if (!snapshots_.empty()) {
if (!cfd->mem()->IsEmpty() || cfd->imm()->NumNotFlushed() > 0) { // Check that no snapshots are being held
// Cannot add the file since the keys in memtable status =
// will hide the keys in file Status::NotSupported("Cannot add a file while holding snapshots");
status = Status::NotSupported("Memtable is not empty");
} }
// Make sure last sequence number is 0, if there are existing files then
// they should have sequence number = 0
if (status.ok() && versions_->LastSequence() > 0) {
status = Status::NotSupported("Last Sequence number is not zero");
}
auto* vstorage = cfd->current()->storage_info();
if (status.ok()) { if (status.ok()) {
// Make sure that the key range in the file we will add does not overlap // Verify that added file key range dont overlap with any keys in DB
// with previously added files SuperVersion* sv = cfd->GetSuperVersion()->Ref();
Slice smallest_user_key = meta.smallest.user_key(); Arena arena;
Slice largest_user_key = meta.largest.user_key(); ReadOptions ro;
for (int level = 0; level < vstorage->num_non_empty_levels(); level++) { ro.total_order_seek = true;
if (vstorage->OverlapInLevel(level, &smallest_user_key, ScopedArenaIterator iter(NewInternalIterator(ro, cfd, sv, &arena));
&largest_user_key)) {
status = Status::NotSupported("Cannot add overlapping files"); InternalKey range_start(file_info->smallest_key, kMaxSequenceNumber,
break; kTypeValue);
iter->Seek(range_start.Encode());
status = iter->status();
if (status.ok() && iter->Valid()) {
ParsedInternalKey seek_result;
if (ParseInternalKey(iter->key(), &seek_result)) {
auto* vstorage = cfd->current()->storage_info();
if (vstorage->InternalComparator()->user_comparator()->Compare(
seek_result.user_key, file_info->largest_key) <= 0) {
status = Status::NotSupported("Cannot add overlapping range");
}
} else {
status = Status::Corruption("DB have corrupted keys");
} }
} }
} }
if (status.ok()) { if (status.ok()) {
// We add the file to the last level // Add file to L0
int target_level = cfd->NumberLevels() - 1;
if (cfd->ioptions()->level_compaction_dynamic_level_bytes == false) {
// If we are using dynamic level compaction we add the file to
// last level with files
target_level = vstorage->num_non_empty_levels() - 1;
if (target_level <= 0) {
target_level = 1;
}
}
VersionEdit edit; VersionEdit edit;
edit.SetColumnFamily(cfd->GetID()); edit.SetColumnFamily(cfd->GetID());
edit.AddFile(target_level, meta.fd.GetNumber(), meta.fd.GetPathId(), edit.AddFile(0, meta.fd.GetNumber(), meta.fd.GetPathId(),
meta.fd.GetFileSize(), meta.smallest, meta.largest, meta.fd.GetFileSize(), meta.smallest, meta.largest,
meta.smallest_seqno, meta.largest_seqno, meta.smallest_seqno, meta.largest_seqno,
meta.marked_for_compaction); meta.marked_for_compaction);

@ -9602,9 +9602,15 @@ TEST_F(DBTest, AddExternalSstFile) {
ASSERT_EQ(Get(Key(k)), Key(k) + "_val"); ASSERT_EQ(Get(Key(k)), Key(k) + "_val");
} }
// Add file using file info // Add file while holding a snapshot will fail
s = db_->AddFile(&file2_info); const Snapshot* s1 = db_->GetSnapshot();
ASSERT_TRUE(s.ok()) << s.ToString(); if (s1 != nullptr) {
ASSERT_NOK(db_->AddFile(&file2_info));
db_->ReleaseSnapshot(s1);
}
// We can add the file after releaseing the snapshot
ASSERT_OK(db_->AddFile(&file2_info));
ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U); ASSERT_EQ(db_->GetLatestSequenceNumber(), 0U);
for (int k = 0; k < 200; k++) { for (int k = 0; k < 200; k++) {
ASSERT_EQ(Get(Key(k)), Key(k) + "_val"); ASSERT_EQ(Get(Key(k)), Key(k) + "_val");
@ -9624,9 +9630,8 @@ TEST_F(DBTest, AddExternalSstFile) {
} }
ASSERT_NE(db_->GetLatestSequenceNumber(), 0U); ASSERT_NE(db_->GetLatestSequenceNumber(), 0U);
// DB have values in memtable now, we cannot add files anymore // Key range of file5 (400 => 499) dont overlap with any keys in DB
s = db_->AddFile(file5); ASSERT_OK(db_->AddFile(file5));
ASSERT_FALSE(s.ok()) << s.ToString();
// Make sure values are correct before and after flush/compaction // Make sure values are correct before and after flush/compaction
for (int i = 0; i < 2; i++) { for (int i = 0; i < 2; i++) {
@ -9637,13 +9642,37 @@ TEST_F(DBTest, AddExternalSstFile) {
} }
ASSERT_EQ(Get(Key(k)), value); ASSERT_EQ(Get(Key(k)), value);
} }
for (int k = 400; k < 500; k++) {
std::string value = Key(k) + "_val";
ASSERT_EQ(Get(Key(k)), value);
}
ASSERT_OK(Flush()); ASSERT_OK(Flush());
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
} }
// DB sequence number is not zero, cannot add files anymore Close();
s = db_->AddFile(file5); options.disable_auto_compactions = true;
ASSERT_FALSE(s.ok()) << s.ToString(); Reopen(options);
// Delete keys in range (400 => 499)
for (int k = 400; k < 500; k++) {
ASSERT_OK(Delete(Key(k)));
}
// We deleted range (400 => 499) but cannot add file5 because
// of the range tombstones
ASSERT_NOK(db_->AddFile(file5));
// Compacting the DB will remove the tombstones
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
// Now we can add the file
ASSERT_OK(db_->AddFile(file5));
// Verify values of file5 in DB
for (int k = 400; k < 500; k++) {
std::string value = Key(k) + "_val";
ASSERT_EQ(Get(Key(k)), value);
}
} while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction | } while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction |
kSkipFIFOCompaction)); kSkipFIFOCompaction));
} }
@ -9816,6 +9845,107 @@ TEST_F(DBTest, AddExternalSstFileMultiThreaded) {
} while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction | } while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction |
kSkipFIFOCompaction)); kSkipFIFOCompaction));
} }
TEST_F(DBTest, AddExternalSstFileOverlappingRanges) {
std::string sst_files_folder = test::TmpDir(env_) + "/sst_files/";
Random rnd(301);
do {
env_->CreateDir(sst_files_folder);
Options options = CurrentOptions();
DestroyAndReopen(options);
const ImmutableCFOptions ioptions(options);
SstFileWriter sst_file_writer(EnvOptions(), ioptions, options.comparator);
printf("Option config = %d\n", option_config_);
std::vector<std::pair<int, int>> key_ranges;
for (int i = 0; i < 500; i++) {
int range_start = rnd.Uniform(20000);
int keys_per_range = 10 + rnd.Uniform(41);
key_ranges.emplace_back(range_start, range_start + keys_per_range);
}
int memtable_add = 0;
int success_add_file = 0;
int failed_add_file = 0;
std::map<std::string, std::string> true_data;
for (size_t i = 0; i < key_ranges.size(); i++) {
int range_start = key_ranges[i].first;
int range_end = key_ranges[i].second;
Status s;
std::string range_val = "range_" + ToString(i);
// For 20% of ranges we use DB::Put, for 80% we use DB::AddFile
if (i && i % 5 == 0) {
// Use DB::Put to insert range (insert into memtable)
range_val += "_put";
for (int k = range_start; k <= range_end; k++) {
s = Put(Key(k), range_val);
ASSERT_OK(s);
}
memtable_add++;
} else {
// Use DB::AddFile to insert range
range_val += "_add_file";
// Generate the file containing the range
std::string file_name = sst_files_folder + env_->GenerateUniqueId();
ASSERT_OK(sst_file_writer.Open(file_name));
for (int k = range_start; k <= range_end; k++) {
s = sst_file_writer.Add(Key(k), range_val);
ASSERT_OK(s);
}
ExternalSstFileInfo file_info;
s = sst_file_writer.Finish(&file_info);
ASSERT_OK(s);
// Insert the generated file
s = db_->AddFile(&file_info);
auto it = true_data.lower_bound(Key(range_start));
if (it != true_data.end() && it->first <= Key(range_end)) {
// This range overlap with data already exist in DB
ASSERT_NOK(s);
failed_add_file++;
} else {
ASSERT_OK(s);
success_add_file++;
}
}
if (s.ok()) {
// Update true_data map to include the new inserted data
for (int k = range_start; k <= range_end; k++) {
true_data[Key(k)] = range_val;
}
}
// Flush / Compact the DB
if (i && i % 50 == 0) {
Flush();
}
if (i && i % 75 == 0) {
db_->CompactRange(CompactRangeOptions(), nullptr, nullptr);
}
}
printf(
"Total: %zu ranges\n"
"AddFile()|Success: %d ranges\n"
"AddFile()|RangeConflict: %d ranges\n"
"Put(): %d ranges\n",
key_ranges.size(), success_add_file, failed_add_file, memtable_add);
// Verify the correctness of the data
for (const auto& kv : true_data) {
ASSERT_EQ(Get(kv.first), kv.second);
}
printf("keys/values verified\n");
} while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction |
kSkipFIFOCompaction));
}
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE
// 1 Create some SST files by inserting K-V pairs into DB // 1 Create some SST files by inserting K-V pairs into DB

@ -136,7 +136,10 @@ class VersionBuilder::Rep {
auto f2 = level_files[i]; auto f2 = level_files[i];
if (level == 0) { if (level == 0) {
assert(level_zero_cmp_(f1, f2)); assert(level_zero_cmp_(f1, f2));
assert(f1->largest_seqno > f2->largest_seqno); assert(f1->largest_seqno > f2->largest_seqno ||
// We can have multiple files with seqno = 0 as a result of
// using DB::AddFile()
(f1->largest_seqno == 0 && f2->largest_seqno == 0));
} else { } else {
assert(level_nonzero_cmp_(f1, f2)); assert(level_nonzero_cmp_(f1, f2));

@ -679,13 +679,11 @@ class DB {
// move the file instead of copying it. // move the file instead of copying it.
// //
// Current Requirements: // Current Requirements:
// (1) Memtable is empty. // (1) Key range in loaded table file don't overlap with
// (2) All existing files (if any) have sequence number = 0. // existing keys or tombstones in DB.
// (3) Key range in loaded table file don't overlap with existing // (2) No other writes happen during AddFile call, otherwise
// files key ranges.
// (4) No other writes happen during AddFile call, otherwise
// DB may get corrupted. // DB may get corrupted.
// (5) Database have at least 2 levels. // (3) No snapshots are held.
virtual Status AddFile(ColumnFamilyHandle* column_family, virtual Status AddFile(ColumnFamilyHandle* column_family,
const std::string& file_path, const std::string& file_path,
bool move_file = false) = 0; bool move_file = false) = 0;

Loading…
Cancel
Save