From 5273c814835227c85fe26f724f7d5d80c8413a20 Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Mon, 29 Oct 2012 01:13:41 -0700 Subject: [PATCH] Ability to invoke application hook for every key during compaction. Summary: There are certain use-cases where the application intends to delete older keys aftre they have expired a certian time period. One option for those applications is to periodically scan the entire database and delete appropriate keys. A better way is to allow the application to hook into the compaction process. This patch allows the application to set a method callback for every key that is being compacted. If this method returns true, then the key is not preserved in the output of the compaction. Test Plan: This is mostly to preview the proposed new public api. Since it is a public api, please do due diligence on reviewing it. I will be writing test cases for this api in mynext version of this patch. Reviewers: MarkCallaghan, heyongqiang Reviewed By: heyongqiang CC: sheki, adsharma Differential Revision: https://reviews.facebook.net/D6285 --- db/db_impl.cc | 19 ++++- db/db_test.cc | 169 ++++++++++++++++++++++++++++++++++++++ include/leveldb/options.h | 16 ++++ util/options.cc | 5 +- 4 files changed, 207 insertions(+), 2 deletions(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index 62f8592ca..409b877ac 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1099,6 +1099,8 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { } Slice key = input->key(); + Slice value = input->value(); + Slice* compaction_filter_value = NULL; if (compact->compaction->ShouldStopBefore(key) && compact->builder != NULL) { status = FinishCompactionOutputFile(compact, input); @@ -1138,6 +1140,21 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { // few iterations of this loop (by rule (A) above). // Therefore this deletion marker is obsolete and can be dropped. drop = true; + } else if (options_.CompactionFilter != NULL && + ikey.type != kTypeDeletion && + ikey.sequence < compact->smallest_snapshot) { + // If the user has specified a compaction filter, then invoke + // it. If this key is not visible via any snapshot and the + // return value of the compaction filter is true and then + // drop this key from the output. + drop = options_.CompactionFilter(compact->compaction->level(), + ikey.user_key, value, &compaction_filter_value); + + // If the application wants to change the value, then do so here. + if (compaction_filter_value != NULL) { + value = *compaction_filter_value; + delete compaction_filter_value; + } } last_sequence_for_key = ikey.sequence; @@ -1164,7 +1181,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { compact->current_output()->smallest.DecodeFrom(key); } compact->current_output()->largest.DecodeFrom(key); - compact->builder->Add(key, input->value()); + compact->builder->Add(key, value); // Close output file if it is big enough if (compact->builder->FileSize() >= diff --git a/db/db_test.cc b/db/db_test.cc index 491e88072..7a2256e49 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -1195,6 +1195,175 @@ TEST(DBTest, RepeatedWritesToSameKey) { } } +// This is a static filter used for filtering +// kvs during the compaction process. +static int cfilter_count; +static std::string NEW_VALUE = "NewValue"; +static bool keep_filter(int level, const Slice& key, + const Slice& value, Slice** new_value) { + cfilter_count++; + return false; +} +static bool delete_filter(int level, const Slice& key, + const Slice& value, Slice** new_value) { + cfilter_count++; + return true; +} +static bool change_filter(int level, const Slice& key, + const Slice& value, Slice** new_value) { + assert(new_value != NULL); + *new_value = new Slice(NEW_VALUE); + return false; +} + +TEST(DBTest, CompactionFilter) { + Options options = CurrentOptions(); + options.num_levels = 3; + options.max_mem_compaction_level = 0; + options.CompactionFilter = keep_filter; + Reopen(&options); + + // Write 100K+1 keys, these are written to a few files + // in L0. We do this so that the current snapshot points + // to the 100001 key.The compaction filter is not invoked + // on keys that are visible via a snapshot because we + // anyways cannot delete it. + const std::string value(10, 'x'); + for (int i = 0; i < 100001; i++) { + char key[100]; + snprintf(key, sizeof(key), "B%010d", i); + Put(key, value); + } + dbfull()->TEST_CompactMemTable(); + + // Push all files to the highest level L2. Verify that + // the compaction is each level invokes the filter for + // all the keys in that level. + cfilter_count = 0; + dbfull()->TEST_CompactRange(0, NULL, NULL); + ASSERT_EQ(cfilter_count, 100000); + cfilter_count = 0; + dbfull()->TEST_CompactRange(1, NULL, NULL); + ASSERT_EQ(cfilter_count, 100000); + + ASSERT_EQ(NumTableFilesAtLevel(0), 0); + ASSERT_EQ(NumTableFilesAtLevel(1), 0); + ASSERT_NE(NumTableFilesAtLevel(2), 0); + cfilter_count = 0; + + // overwrite all the 100K+1 keys once again. + for (int i = 0; i < 100001; i++) { + char key[100]; + snprintf(key, sizeof(key), "B%010d", i); + Put(key, value); + } + dbfull()->TEST_CompactMemTable(); + + // push all files to the highest level L2. This + // means that all keys should pass at least once + // via the compaction filter + cfilter_count = 0; + dbfull()->TEST_CompactRange(0, NULL, NULL); + ASSERT_EQ(cfilter_count, 100000); + cfilter_count = 0; + dbfull()->TEST_CompactRange(1, NULL, NULL); + ASSERT_EQ(cfilter_count, 100000); + ASSERT_EQ(NumTableFilesAtLevel(0), 0); + ASSERT_EQ(NumTableFilesAtLevel(1), 0); + ASSERT_NE(NumTableFilesAtLevel(2), 0); + + // create a new database with the compaction + // filter in such a way that it deletes all keys + options.CompactionFilter = delete_filter; + options.create_if_missing = true; + DestroyAndReopen(&options); + + // write all the keys once again. + for (int i = 0; i < 100001; i++) { + char key[100]; + snprintf(key, sizeof(key), "B%010d", i); + Put(key, value); + } + dbfull()->TEST_CompactMemTable(); + ASSERT_NE(NumTableFilesAtLevel(0), 0); + ASSERT_EQ(NumTableFilesAtLevel(1), 0); + ASSERT_EQ(NumTableFilesAtLevel(2), 0); + + // Push all files to the highest level L2. This + // triggers the compaction filter to delete all keys, + // verify that at the end of the compaction process, + // nothing is left. + cfilter_count = 0; + dbfull()->TEST_CompactRange(0, NULL, NULL); + ASSERT_EQ(cfilter_count, 100000); + cfilter_count = 0; + dbfull()->TEST_CompactRange(1, NULL, NULL); + ASSERT_EQ(cfilter_count, 0); + ASSERT_EQ(NumTableFilesAtLevel(0), 0); + ASSERT_EQ(NumTableFilesAtLevel(1), 0); + + // Scan the entire database to ensure that only the + // 100001th key is left in the db. The 100001th key + // is part of the default-most-current snapshot and + // cannot be deleted. + Iterator* iter = db_->NewIterator(ReadOptions()); + iter->SeekToFirst(); + int count = 0; + while (iter->Valid()) { + count++; + iter->Next(); + } + ASSERT_EQ(count, 1); + delete iter; +} + +TEST(DBTest, CompactionFilterWithValueChange) { + Options options = CurrentOptions(); + options.num_levels = 3; + options.max_mem_compaction_level = 0; + options.CompactionFilter = change_filter; + Reopen(&options); + + // Write 100K+1 keys, these are written to a few files + // in L0. We do this so that the current snapshot points + // to the 100001 key.The compaction filter is not invoked + // on keys that are visible via a snapshot because we + // anyways cannot delete it. + const std::string value(10, 'x'); + for (int i = 0; i < 100001; i++) { + char key[100]; + snprintf(key, sizeof(key), "B%010d", i); + Put(key, value); + } + + // push all files to lower levels + dbfull()->TEST_CompactMemTable(); + dbfull()->TEST_CompactRange(0, NULL, NULL); + dbfull()->TEST_CompactRange(1, NULL, NULL); + + // re-write all data again + for (int i = 0; i < 100001; i++) { + char key[100]; + snprintf(key, sizeof(key), "B%010d", i); + Put(key, value); + } + + // push all files to lower levels. This should + // invoke the compaction filter for all 100000 keys. + dbfull()->TEST_CompactMemTable(); + dbfull()->TEST_CompactRange(0, NULL, NULL); + dbfull()->TEST_CompactRange(1, NULL, NULL); + + // verify that all keys now have the new value that + // was set by the compaction process. + for (int i = 0; i < 100000; i++) { + char key[100]; + snprintf(key, sizeof(key), "B%010d", i); + std::string newvalue = Get(key); + ASSERT_EQ(newvalue.compare(NEW_VALUE), 0); + } +} + TEST(DBTest, SparseMerge) { Options options = CurrentOptions(); options.compression = kNoCompression; diff --git a/include/leveldb/options.h b/include/leveldb/options.h index 530d8d6d3..cf7f84ee7 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -8,6 +8,7 @@ #include #include #include +#include "leveldb/slice.h" namespace leveldb { @@ -299,6 +300,21 @@ struct Options { Options(); void Dump(Logger * log) const; + + // This method allows an application to modify/delete a key-value at + // the time of compaction. The compaction process invokes this + // method for every kv that is being compacted. A return value + // of false indicates that the kv should be preserved in the + // output of this compaction run and a return value of true + // indicates that this key-value should be removed from the + // output of the compaction. The application can inspect + // the existing value of the key, modify it if needed and + // return back the new value for this key. The application + // should allocate memory for the Slice object that is used to + // return the new value and the leveldb framework will + // free up that memory. + bool (*CompactionFilter)(int level, const Slice& key, + const Slice& existing_value, Slice** new_value); }; // Options that control read operations diff --git a/util/options.cc b/util/options.cc index 765b72e5e..0ade9600d 100644 --- a/util/options.cc +++ b/util/options.cc @@ -47,7 +47,8 @@ Options::Options() table_cache_numshardbits(4), max_log_file_size(0), delete_obsolete_files_period_micros(0), - rate_limit(0.0) { + rate_limit(0.0), + CompactionFilter(NULL) { } void @@ -123,6 +124,8 @@ Options::Dump( delete_obsolete_files_period_micros); Log(log," Options.rate_limit: %.2f", rate_limit); + Log(log," Options.CompactionFilter: %p", + CompactionFilter); } // Options::Dump