diff --git a/db/compact_files_test.cc b/db/compact_files_test.cc index 051cca8a4..76042b6e3 100644 --- a/db/compact_files_test.cc +++ b/db/compact_files_test.cc @@ -253,6 +253,61 @@ TEST_F(CompactFilesTest, CapturingPendingFiles) { delete db; } +TEST_F(CompactFilesTest, CompactionFilterWithGetSv) { + class FilterWithGet : public CompactionFilter { + public: + virtual bool Filter(int level, const Slice& key, const Slice& value, + std::string* new_value, + bool* value_changed) const override { + if (db_ == nullptr) { + return true; + } + std::string res; + db_->Get(ReadOptions(), "", &res); + return true; + } + + void SetDB(DB* db) { + db_ = db; + } + + virtual const char* Name() const override { return "FilterWithGet"; } + + private: + DB* db_; + }; + + + std::shared_ptr cf(new FilterWithGet()); + + Options options; + options.create_if_missing = true; + options.compaction_filter = cf.get(); + + DB* db = nullptr; + DestroyDB(db_name_, options); + Status s = DB::Open(options, db_name_, &db); + ASSERT_OK(s); + + cf->SetDB(db); + + // Write one L0 file + db->Put(WriteOptions(), "K1", "V1"); + db->Flush(FlushOptions()); + + // Compact all L0 files using CompactFiles + rocksdb::ColumnFamilyMetaData meta; + db->GetColumnFamilyMetaData(&meta); + for (auto& file : meta.levels[0].files) { + std::string fname = file.db_path + "/" + file.name; + ASSERT_OK( + db->CompactFiles(rocksdb::CompactionOptions(), {fname}, 0)); + } + + + delete db; +} + } // namespace rocksdb int main(int argc, char** argv) { diff --git a/db/db_impl.cc b/db/db_impl.cc index f22f13ae3..eec62be1a 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2153,7 +2153,7 @@ Status DBImpl::CompactFiles( immutable_db_options_.info_log.get()); // Perform CompactFiles - SuperVersion* sv = GetAndRefSuperVersion(cfd); + SuperVersion* sv = cfd->GetReferencedSuperVersion(&mutex_); { InstrumentedMutexLock l(&mutex_); @@ -2165,7 +2165,12 @@ Status DBImpl::CompactFiles( input_file_names, output_level, output_path_id, &job_context, &log_buffer); } - ReturnAndCleanupSuperVersion(cfd, sv); + if (sv->Unref()) { + mutex_.Lock(); + sv->Cleanup(); + mutex_.Unlock(); + delete sv; + } // Find and delete obsolete files {