support bulk loading with universal compaction

Summary:
Support buck load with universal compaction.
More test cases to be added.
Closes https://github.com/facebook/rocksdb/pull/2202

Differential Revision: D4935360

Pulled By: lightmark

fbshipit-source-id: cc3ca1b6f42faa503207dab1408d6bcf393ee5b5
main
Aaron Gao 8 years ago committed by Facebook Github Bot
parent 3b4d1b7a44
commit 7eddecce12
  1. 2
      db/compaction_picker_universal.cc
  2. 3
      db/db_impl.cc
  3. 30
      db/external_sst_file_basic_test.cc
  4. 68
      db/external_sst_file_ingestion_job.cc
  5. 10
      db/external_sst_file_ingestion_job.h
  6. 49
      db/external_sst_file_test.cc

@ -342,8 +342,6 @@ Compaction* UniversalCompactionPicker::PickCompaction(
assert(f->smallest_seqno <= f->largest_seqno);
if (is_first) {
is_first = false;
} else {
assert(prev_smallest_seqno > f->largest_seqno);
}
prev_smallest_seqno = f->smallest_seqno;
}

@ -2532,7 +2532,8 @@ Status DBImpl::IngestExternalFile(
if (status.ok()) {
bool need_flush = false;
status = ingestion_job.NeedsFlush(&need_flush);
TEST_SYNC_POINT_CALLBACK("DBImpl::IngestExternalFile:NeedFlush",
&need_flush);
if (status.ok() && need_flush) {
mutex_.Unlock();
status = FlushMemTable(cfd, FlushOptions(), true /* writes_stopped */);

@ -182,6 +182,7 @@ TEST_F(ExternalSSTFileBasicTest, NoCopy) {
}
TEST_F(ExternalSSTFileBasicTest, IngestFileWithGlobalSeqnoPickedSeqno) {
do {
Options options = CurrentOptions();
DestroyAndReopen(options);
std::map<std::string, std::string> true_data;
@ -203,8 +204,8 @@ TEST_F(ExternalSSTFileBasicTest, IngestFileWithGlobalSeqnoPickedSeqno) {
// File overwrite some keys, a seqno will be assigned
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 1);
ASSERT_OK(
GenerateAndAddExternalFile(options, {11, 15, 19}, file_id++, &true_data));
ASSERT_OK(GenerateAndAddExternalFile(options, {11, 15, 19}, file_id++,
&true_data));
// File overwrite some keys, a seqno will be assigned
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 2);
@ -225,18 +226,18 @@ TEST_F(ExternalSSTFileBasicTest, IngestFileWithGlobalSeqnoPickedSeqno) {
}
SequenceNumber last_seqno = dbfull()->GetLatestSequenceNumber();
ASSERT_OK(
GenerateAndAddExternalFile(options, {60, 61, 62}, file_id++, &true_data));
ASSERT_OK(GenerateAndAddExternalFile(options, {60, 61, 62}, file_id++,
&true_data));
// File dont overwrite any keys, No seqno needed
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno);
ASSERT_OK(
GenerateAndAddExternalFile(options, {40, 41, 42}, file_id++, &true_data));
ASSERT_OK(GenerateAndAddExternalFile(options, {40, 41, 42}, file_id++,
&true_data));
// File overwrite some keys, a seqno will be assigned
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 1);
ASSERT_OK(
GenerateAndAddExternalFile(options, {20, 30, 40}, file_id++, &true_data));
ASSERT_OK(GenerateAndAddExternalFile(options, {20, 30, 40}, file_id++,
&true_data));
// File overwrite some keys, a seqno will be assigned
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 2);
@ -244,13 +245,13 @@ TEST_F(ExternalSSTFileBasicTest, IngestFileWithGlobalSeqnoPickedSeqno) {
// We will need a seqno for the file regardless if the file overwrite
// keys in the DB or not because we have a snapshot
ASSERT_OK(
GenerateAndAddExternalFile(options, {1000, 1002}, file_id++, &true_data));
ASSERT_OK(GenerateAndAddExternalFile(options, {1000, 1002}, file_id++,
&true_data));
// A global seqno will be assigned anyway because of the snapshot
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 3);
ASSERT_OK(
GenerateAndAddExternalFile(options, {2000, 3002}, file_id++, &true_data));
ASSERT_OK(GenerateAndAddExternalFile(options, {2000, 3002}, file_id++,
&true_data));
// A global seqno will be assigned anyway because of the snapshot
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 4);
@ -261,13 +262,14 @@ TEST_F(ExternalSSTFileBasicTest, IngestFileWithGlobalSeqnoPickedSeqno) {
db_->ReleaseSnapshot(snapshot);
ASSERT_OK(
GenerateAndAddExternalFile(options, {5000, 5001}, file_id++, &true_data));
ASSERT_OK(GenerateAndAddExternalFile(options, {5000, 5001}, file_id++,
&true_data));
// No snapshot anymore, no need to assign a seqno
ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), last_seqno + 5);
size_t kcnt = 0;
VerifyDBFromMap(true_data, &kcnt, false);
} while (ChangeCompactOptions());
}
TEST_F(ExternalSSTFileBasicTest, FadviseTrigger) {

@ -156,34 +156,34 @@ Status ExternalSstFileIngestionJob::Run() {
bool consumed_seqno = false;
bool force_global_seqno = false;
const SequenceNumber last_seqno = versions_->LastSequence();
if (ingestion_options_.snapshot_consistency && !db_snapshots_->empty()) {
// We need to assign a global sequence number to all the files even
// if the dont overlap with any ranges since we have snapshots
force_global_seqno = true;
}
const SequenceNumber last_seqno = versions_->LastSequence();
SuperVersion* super_version = cfd_->GetSuperVersion();
edit_.SetColumnFamily(cfd_->GetID());
// The levels that the files will be ingested into
for (IngestedFileInfo& f : files_to_ingest_) {
bool overlap_with_db = false;
status = AssignLevelForIngestedFile(super_version, &f, &overlap_with_db);
SequenceNumber assigned_seqno = 0;
status = AssignLevelAndSeqnoForIngestedFile(
super_version, force_global_seqno, cfd_->ioptions()->compaction_style,
&f, &assigned_seqno);
if (!status.ok()) {
return status;
}
if (overlap_with_db || force_global_seqno) {
status = AssignGlobalSeqnoForIngestedFile(&f, last_seqno + 1);
status = AssignGlobalSeqnoForIngestedFile(&f, assigned_seqno);
TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Run",
&assigned_seqno);
if (assigned_seqno == last_seqno + 1) {
consumed_seqno = true;
} else {
status = AssignGlobalSeqnoForIngestedFile(&f, 0);
}
if (!status.ok()) {
return status;
}
edit_.AddFile(f.picked_level, f.fd.GetNumber(), f.fd.GetPathId(),
f.fd.GetFileSize(), f.smallest_internal_key(),
f.largest_internal_key(), f.assigned_seqno, f.assigned_seqno,
@ -388,15 +388,25 @@ Status ExternalSstFileIngestionJob::IngestedFilesOverlapWithMemtables(
return status;
}
Status ExternalSstFileIngestionJob::AssignLevelForIngestedFile(
SuperVersion* sv, IngestedFileInfo* file_to_ingest, bool* overlap_with_db) {
*overlap_with_db = false;
Status ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile(
SuperVersion* sv, bool force_global_seqno, CompactionStyle compaction_style,
IngestedFileInfo* file_to_ingest, SequenceNumber* assigned_seqno) {
Status status;
*assigned_seqno = 0;
const SequenceNumber last_seqno = versions_->LastSequence();
if (force_global_seqno) {
*assigned_seqno = last_seqno + 1;
if (compaction_style == kCompactionStyleUniversal) {
file_to_ingest->picked_level = 0;
return status;
}
}
bool overlap_with_db = false;
Arena arena;
ReadOptions ro;
ro.total_order_seek = true;
Status status;
int target_level = 0;
auto* vstorage = cfd_->current()->storage_info();
for (int lvl = 0; lvl < cfd_->NumberLevels(); lvl++) {
@ -423,24 +433,46 @@ Status ExternalSstFileIngestionJob::AssignLevelForIngestedFile(
if (!status.ok()) {
return status;
}
if (overlap_with_level) {
// We must use L0 or any level higher than `lvl` to be able to overwrite
// the keys that we overlap with in this level, We also need to assign
// this file a seqno to overwrite the existing keys in level `lvl`
*overlap_with_db = true;
overlap_with_db = true;
break;
}
if (compaction_style == kCompactionStyleUniversal && lvl != 0) {
const std::vector<FileMetaData*>& level_files =
vstorage->LevelFiles(lvl);
const SequenceNumber level_largest_seqno =
(*max_element(level_files.begin(), level_files.end(),
[](FileMetaData* f1, FileMetaData* f2) {
return f1->largest_seqno < f2->largest_seqno;
}))
->largest_seqno;
if (level_largest_seqno != 0) {
*assigned_seqno = level_largest_seqno;
} else {
continue;
}
}
} else if (compaction_style == kCompactionStyleUniversal) {
continue;
}
// We dont overlap with any keys in this level, but we still need to check
// if our file can fit in it
if (IngestedFileFitInLevel(file_to_ingest, lvl)) {
target_level = lvl;
}
}
TEST_SYNC_POINT_CALLBACK(
"ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile",
&overlap_with_db);
file_to_ingest->picked_level = target_level;
if (overlap_with_db && *assigned_seqno == 0) {
*assigned_seqno = last_seqno + 1;
}
return status;
}

@ -114,12 +114,14 @@ class ExternalSstFileIngestionJob {
// REQUIRES: Mutex held
Status IngestedFilesOverlapWithMemtables(SuperVersion* sv, bool* overlap);
// Assign `file_to_ingest` the lowest possible level that it can
// be ingested to.
// Assign `file_to_ingest` the appropriate sequence number and the lowest
// possible level that it can be ingested to according to compaction_style.
// REQUIRES: Mutex held
Status AssignLevelForIngestedFile(SuperVersion* sv,
Status AssignLevelAndSeqnoForIngestedFile(SuperVersion* sv,
bool force_global_seqno,
CompactionStyle compaction_style,
IngestedFileInfo* file_to_ingest,
bool* overlap_with_db);
SequenceNumber* assigned_seqno);
// Set the file global sequence number to `seqno`
Status AssignGlobalSeqnoForIngestedFile(IngestedFileInfo* file_to_ingest,

@ -315,8 +315,7 @@ TEST_F(ExternalSSTFileTest, Basic) {
ASSERT_EQ(Get(Key(k)), value);
}
DestroyAndRecreateExternalSSTFilesDir();
} while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction |
kSkipFIFOCompaction));
} while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction));
}
class SstFileWriterCollector : public TablePropertiesCollector {
public:
@ -556,8 +555,7 @@ TEST_F(ExternalSSTFileTest, AddList) {
ASSERT_EQ(Get(Key(k)), value);
}
DestroyAndRecreateExternalSSTFilesDir();
} while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction |
kSkipFIFOCompaction));
} while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction));
}
TEST_F(ExternalSSTFileTest, AddListAtomicity) {
@ -599,8 +597,7 @@ TEST_F(ExternalSSTFileTest, AddListAtomicity) {
ASSERT_EQ(Get(Key(k)), value);
}
DestroyAndRecreateExternalSSTFilesDir();
} while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction |
kSkipFIFOCompaction));
} while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction));
}
// This test reporduce a bug that can happen in some cases if the DB started
// purging obsolete files when we are adding an external sst file.
@ -831,12 +828,31 @@ TEST_F(ExternalSSTFileTest, MultiThreaded) {
fprintf(stderr, "Verified %d values\n", num_files * keys_per_file);
DestroyAndRecreateExternalSSTFilesDir();
} while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction |
kSkipFIFOCompaction));
} while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction));
}
TEST_F(ExternalSSTFileTest, OverlappingRanges) {
Random rnd(301);
int picked_level = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"ExternalSstFileIngestionJob::Run", [&picked_level](void* arg) {
ASSERT_TRUE(arg != nullptr);
picked_level = *(static_cast<int*>(arg));
});
bool need_flush = false;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::IngestExternalFile:NeedFlush", [&need_flush](void* arg) {
ASSERT_TRUE(arg != nullptr);
need_flush = *(static_cast<bool*>(arg));
});
bool overlap_with_db = false;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"ExternalSstFileIngestionJob::AssignLevelAndSeqnoForIngestedFile",
[&overlap_with_db](void* arg) {
ASSERT_TRUE(arg != nullptr);
overlap_with_db = *(static_cast<bool*>(arg));
});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
do {
Options options = CurrentOptions();
DestroyAndReopen(options);
@ -889,8 +905,9 @@ TEST_F(ExternalSSTFileTest, OverlappingRanges) {
// Insert the generated file
s = DeprecatedAddFile({file_name});
auto it = true_data.lower_bound(Key(range_start));
if (option_config_ != kUniversalCompaction &&
option_config_ != kUniversalCompactionMultiLevel) {
if (it != true_data.end() && it->first <= Key(range_end)) {
// This range overlap with data already exist in DB
ASSERT_NOK(s);
@ -899,6 +916,17 @@ TEST_F(ExternalSSTFileTest, OverlappingRanges) {
ASSERT_OK(s);
success_add_file++;
}
} else {
if ((it != true_data.end() && it->first <= Key(range_end)) ||
need_flush || picked_level > 0 || overlap_with_db) {
// This range overlap with data already exist in DB
ASSERT_NOK(s);
failed_add_file++;
} else {
ASSERT_OK(s);
success_add_file++;
}
}
}
if (s.ok()) {
@ -930,8 +958,7 @@ TEST_F(ExternalSSTFileTest, OverlappingRanges) {
}
printf("keys/values verified\n");
DestroyAndRecreateExternalSSTFilesDir();
} while (ChangeOptions(kSkipPlainTable | kSkipUniversalCompaction |
kSkipFIFOCompaction));
} while (ChangeOptions(kSkipPlainTable | kSkipFIFOCompaction));
}
TEST_F(ExternalSSTFileTest, PickedLevel) {

Loading…
Cancel
Save