not cut compaction output when compact to level 0

Summary: we should not call ShouldStopBefore() in compaction when the compaction targets level 0. Otherwise, CheckConsistency will fail the assertion of seq number check on level 0.

Test Plan:
make all check -j64
I also manully test that using db_bench to compact files to level 0. Without this line change, the assertion files and multiple files are generated on level 0 after compaction.

Reviewers: yhchiang, andrewkr, yiwu, sdong

Reviewed By: sdong

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D64269
main
Aaron Gao 8 years ago
parent 9ed928e7a9
commit c2a62a4cb2
  1. 43
      db/compact_files_test.cc
  2. 3
      db/compaction_job.cc

@ -50,6 +50,7 @@ class FlushedFileCollector : public EventListener {
} }
return result; return result;
} }
void ClearFlushedFiles() { flushed_files_.clear(); }
private: private:
std::vector<std::string> flushed_files_; std::vector<std::string> flushed_files_;
@ -116,7 +117,7 @@ TEST_F(CompactFilesTest, L0ConflictsFiles) {
TEST_F(CompactFilesTest, ObsoleteFiles) { TEST_F(CompactFilesTest, ObsoleteFiles) {
Options options; Options options;
// to trigger compaction more easily // to trigger compaction more easily
const int kWriteBufferSize = 10000; const int kWriteBufferSize = 65536;
options.create_if_missing = true; options.create_if_missing = true;
// Disable RocksDB background compaction. // Disable RocksDB background compaction.
options.compaction_style = kCompactionStyleNone; options.compaction_style = kCompactionStyleNone;
@ -154,6 +155,46 @@ TEST_F(CompactFilesTest, ObsoleteFiles) {
delete db; delete db;
} }
TEST_F(CompactFilesTest, NotCutOutputOnLevel0) {
Options options;
options.create_if_missing = true;
// Disable RocksDB background compaction.
options.compaction_style = kCompactionStyleNone;
options.level0_slowdown_writes_trigger = 1000;
options.level0_stop_writes_trigger = 1000;
options.write_buffer_size = 65536;
options.max_write_buffer_number = 2;
options.compression = kNoCompression;
options.max_compaction_bytes = 5000;
// Add listener
FlushedFileCollector* collector = new FlushedFileCollector();
options.listeners.emplace_back(collector);
DB* db = nullptr;
DestroyDB(db_name_, options);
Status s = DB::Open(options, db_name_, &db);
assert(s.ok());
assert(db);
// create couple files
for (int i = 0; i < 500; ++i) {
db->Put(WriteOptions(), ToString(i), std::string(1000, 'a' + (i % 26)));
}
reinterpret_cast<DBImpl*>(db)->TEST_WaitForFlushMemTable();
auto l0_files_1 = collector->GetFlushedFiles();
collector->ClearFlushedFiles();
for (int i = 0; i < 500; ++i) {
db->Put(WriteOptions(), ToString(i), std::string(1000, 'a' + (i % 26)));
}
reinterpret_cast<DBImpl*>(db)->TEST_WaitForFlushMemTable();
auto l0_files_2 = collector->GetFlushedFiles();
ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files_1, 0));
ASSERT_OK(db->CompactFiles(CompactionOptions(), l0_files_2, 0));
// no assertion failure
delete db;
}
TEST_F(CompactFilesTest, CapturingPendingFiles) { TEST_F(CompactFilesTest, CapturingPendingFiles) {
Options options; Options options;
options.create_if_missing = true; options.create_if_missing = true;

@ -762,7 +762,8 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
if (end != nullptr && if (end != nullptr &&
cfd->user_comparator()->Compare(c_iter->user_key(), *end) >= 0) { cfd->user_comparator()->Compare(c_iter->user_key(), *end) >= 0) {
break; break;
} else if (sub_compact->ShouldStopBefore( } else if (sub_compact->compaction->output_level() != 0 &&
sub_compact->ShouldStopBefore(
key, sub_compact->current_output_file_size) && key, sub_compact->current_output_file_size) &&
sub_compact->builder != nullptr) { sub_compact->builder != nullptr) {
status = FinishCompactionOutputFile(input->status(), sub_compact); status = FinishCompactionOutputFile(input->status(), sub_compact);

Loading…
Cancel
Save