Pass parsed user key to prefix extractor in V2 compaction

Previously, the prefix extractor was being supplied with the RocksDB
key instead of a parsed user key. This makes correct interpretation
by calling application fragile or impossible.
main
Spencer Kimball 10 years ago
parent 54153ab07a
commit 3fcf7b26b9
  1. 30
      db/c_test.c
  2. 14
      db/db_impl.cc

@ -261,6 +261,29 @@ static void CompactionFilterV2Filter(
}
}
// Custom prefix extractor for compaction filter V2 which extracts first 3 characters.
static void CFV2PrefixExtractorDestroy(void* arg) { }
static char* CFV2PrefixExtractorTransform(void* arg, const char* key, size_t length, size_t* dst_length) {
// Verify keys are maximum length 4; this verifies fix for a
// prior bug which was passing the RocksDB-encoded key with
// logical timestamp suffix instead of parsed user key.
if (length > 4) {
fprintf(stderr, "%s:%d: %s: key %s is not user key\n", __FILE__, __LINE__, phase, key);
abort();
}
*dst_length = length < 3 ? length : 3;
return (char*)key;
}
static unsigned char CFV2PrefixExtractorInDomain(void* state, const char* key, size_t length) {
return 1;
}
static unsigned char CFV2PrefixExtractorInRange(void* state, const char* key, size_t length) {
return 1;
}
static const char* CFV2PrefixExtractorName(void* state) {
return "TestCFV2PrefixExtractor";
}
// Custom compaction filter factory V2.
static void CompactionFilterFactoryV2Destroy(void* arg) {
rocksdb_slicetransform_destroy((rocksdb_slicetransform_t*)arg);
@ -585,9 +608,12 @@ int main(int argc, char** argv) {
{
rocksdb_compactionfilterfactoryv2_t* factory;
rocksdb_slicetransform_t* prefix_extractor;
prefix_extractor = rocksdb_slicetransform_create_fixed_prefix(3);
prefix_extractor = rocksdb_slicetransform_create(
NULL, CFV2PrefixExtractorDestroy, CFV2PrefixExtractorTransform,
CFV2PrefixExtractorInDomain, CFV2PrefixExtractorInRange,
CFV2PrefixExtractorName);
factory = rocksdb_compactionfilterfactoryv2_create(
prefix_extractor, prefix_extractor, CompactionFilterFactoryV2Destroy,
NULL, prefix_extractor, CompactionFilterFactoryV2Destroy,
CompactionFilterFactoryV2Create, CompactionFilterFactoryV2Name);
// Create new database
rocksdb_close(db);

@ -3000,19 +3000,19 @@ Status DBImpl::DoCompactionWork(CompactionState* compact,
Slice key = backup_input->key();
Slice value = backup_input->value();
const SliceTransform* transformer =
cfd->options()->compaction_filter_factory_v2->GetPrefixExtractor();
const auto key_prefix = transformer->Transform(key);
if (!prefix_initialized) {
compact->cur_prefix_ = key_prefix.ToString();
prefix_initialized = true;
}
if (!ParseInternalKey(key, &ikey)) {
// log error
Log(options_.info_log, "[%s] Failed to parse key: %s",
cfd->GetName().c_str(), key.ToString().c_str());
continue;
} else {
const SliceTransform* transformer =
cfd->options()->compaction_filter_factory_v2->GetPrefixExtractor();
const auto key_prefix = transformer->Transform(ikey.user_key);
if (!prefix_initialized) {
compact->cur_prefix_ = key_prefix.ToString();
prefix_initialized = true;
}
// If the prefix remains the same, keep buffering
if (key_prefix.compare(Slice(compact->cur_prefix_)) == 0) {
// Apply the compaction filter V2 to all the kv pairs sharing

Loading…
Cancel
Save