BlobDB: Cleanup TTLExtractor interface (#4229)

Summary:
Cleanup TTLExtractor interface. The original purpose of it is to allow our users keep using existing `Write()` interface but allow it to accept TTL via `TTLExtractor`. However the interface is confusing. Will replace it with something like `WriteWithTTL(batch, ttl)` in the future.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4229

Differential Revision: D9174390

Pulled By: yiwu-arbug

fbshipit-source-id: 68201703d784408b851336ab4dd9b84188245b2d
main
Yi Wu 6 years ago committed by Facebook Github Bot
parent ceb5fea1e3
commit 140f256da2
  1. 1
      CMakeLists.txt
  2. 1
      TARGETS
  3. 1
      src.mk
  4. 3
      utilities/blob_db/blob_db.cc
  5. 34
      utilities/blob_db/blob_db.h
  6. 28
      utilities/blob_db/blob_db_impl.cc
  7. 4
      utilities/blob_db/blob_db_impl.h
  8. 186
      utilities/blob_db/blob_db_test.cc
  9. 34
      utilities/blob_db/ttl_extractor.cc

@ -615,7 +615,6 @@ set(SOURCES
utilities/blob_db/blob_log_reader.cc utilities/blob_db/blob_log_reader.cc
utilities/blob_db/blob_log_writer.cc utilities/blob_db/blob_log_writer.cc
utilities/blob_db/blob_log_format.cc utilities/blob_db/blob_log_format.cc
utilities/blob_db/ttl_extractor.cc
utilities/cassandra/cassandra_compaction_filter.cc utilities/cassandra/cassandra_compaction_filter.cc
utilities/cassandra/format.cc utilities/cassandra/format.cc
utilities/cassandra/merge_operator.cc utilities/cassandra/merge_operator.cc

@ -236,7 +236,6 @@ cpp_library(
"utilities/blob_db/blob_log_format.cc", "utilities/blob_db/blob_log_format.cc",
"utilities/blob_db/blob_log_reader.cc", "utilities/blob_db/blob_log_reader.cc",
"utilities/blob_db/blob_log_writer.cc", "utilities/blob_db/blob_log_writer.cc",
"utilities/blob_db/ttl_extractor.cc",
"utilities/cassandra/cassandra_compaction_filter.cc", "utilities/cassandra/cassandra_compaction_filter.cc",
"utilities/cassandra/format.cc", "utilities/cassandra/format.cc",
"utilities/cassandra/merge_operator.cc", "utilities/cassandra/merge_operator.cc",

@ -164,7 +164,6 @@ LIB_SOURCES = \
utilities/blob_db/blob_log_format.cc \ utilities/blob_db/blob_log_format.cc \
utilities/blob_db/blob_log_reader.cc \ utilities/blob_db/blob_log_reader.cc \
utilities/blob_db/blob_log_writer.cc \ utilities/blob_db/blob_log_writer.cc \
utilities/blob_db/ttl_extractor.cc \
utilities/cassandra/cassandra_compaction_filter.cc \ utilities/cassandra/cassandra_compaction_filter.cc \
utilities/cassandra/format.cc \ utilities/cassandra/format.cc \
utilities/cassandra/merge_operator.cc \ utilities/cassandra/merge_operator.cc \

@ -87,9 +87,6 @@ void BlobDBOptions::Dump(Logger* log) const {
ROCKS_LOG_HEADER( ROCKS_LOG_HEADER(
log, " BlobDBOptions.blob_file_size: %" PRIu64, log, " BlobDBOptions.blob_file_size: %" PRIu64,
blob_file_size); blob_file_size);
ROCKS_LOG_HEADER(
log, " BlobDBOptions.ttl_extractor: %p",
ttl_extractor.get());
ROCKS_LOG_HEADER( ROCKS_LOG_HEADER(
log, " BlobDBOptions.compression: %d", log, " BlobDBOptions.compression: %d",
static_cast<int>(compression)); static_cast<int>(compression));

@ -18,8 +18,6 @@ namespace rocksdb {
namespace blob_db { namespace blob_db {
class TTLExtractor;
// A wrapped database which puts values of KV pairs in a separate log // A wrapped database which puts values of KV pairs in a separate log
// and store location to the log in the underlying DB. // and store location to the log in the underlying DB.
// It lacks lots of importatant functionalities, e.g. DB restarts, // It lacks lots of importatant functionalities, e.g. DB restarts,
@ -67,11 +65,6 @@ struct BlobDBOptions {
// after it exceeds that size // after it exceeds that size
uint64_t blob_file_size = 256 * 1024 * 1024; uint64_t blob_file_size = 256 * 1024 * 1024;
// Instead of setting TTL explicitly by calling PutWithTTL or PutUntil,
// applications can set a TTLExtractor which can extract TTL from key-value
// pairs.
std::shared_ptr<TTLExtractor> ttl_extractor = nullptr;
// what compression to use for Blob's // what compression to use for Blob's
CompressionType compression = kNoCompression; CompressionType compression = kNoCompression;
@ -228,33 +221,6 @@ class BlobDB : public StackableDB {
Status DestroyBlobDB(const std::string& dbname, const Options& options, Status DestroyBlobDB(const std::string& dbname, const Options& options,
const BlobDBOptions& bdb_options); const BlobDBOptions& bdb_options);
// TTLExtractor allow applications to extract TTL from key-value pairs.
// This useful for applications using Put or WriteBatch to write keys and
// don't intend to migrate to PutWithTTL or PutUntil.
//
// Applications can implement either ExtractTTL or ExtractExpiration. If both
// are implemented, ExtractExpiration will take precedence.
class TTLExtractor {
public:
// Extract TTL from key-value pair.
// Return true if the key has TTL, false otherwise. If key has TTL,
// TTL is pass back through ttl. The method can optionally modify the value,
// pass the result back through new_value, and also set value_changed to true.
virtual bool ExtractTTL(const Slice& key, const Slice& value, uint64_t* ttl,
std::string* new_value, bool* value_changed);
// Extract expiration time from key-value pair.
// Return true if the key has expiration time, false otherwise. If key has
// expiration time, it is pass back through expiration. The method can
// optionally modify the value, pass the result back through new_value,
// and also set value_changed to true.
virtual bool ExtractExpiration(const Slice& key, const Slice& value,
uint64_t now, uint64_t* expiration,
std::string* new_value, bool* value_changed);
virtual ~TTLExtractor() = default;
};
} // namespace blob_db } // namespace blob_db
} // namespace rocksdb } // namespace rocksdb
#endif // ROCKSDB_LITE #endif // ROCKSDB_LITE

@ -79,7 +79,6 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname,
dbname_(dbname), dbname_(dbname),
db_impl_(nullptr), db_impl_(nullptr),
env_(db_options.env), env_(db_options.env),
ttl_extractor_(blob_db_options.ttl_extractor.get()),
bdb_options_(blob_db_options), bdb_options_(blob_db_options),
db_options_(db_options), db_options_(db_options),
cf_options_(cf_options), cf_options_(cf_options),
@ -563,12 +562,8 @@ class BlobDBImpl::BlobInserter : public WriteBatch::Handler {
return Status::NotSupported( return Status::NotSupported(
"Blob DB doesn't support non-default column family."); "Blob DB doesn't support non-default column family.");
} }
std::string new_value; Status s = blob_db_impl_->PutBlobValue(options_, key, value,
Slice value_slice; kNoExpiration, &batch_);
uint64_t expiration =
blob_db_impl_->ExtractExpiration(key, value, &value_slice, &new_value);
Status s = blob_db_impl_->PutBlobValue(options_, key, value_slice,
expiration, &batch_);
return s; return s;
} }
@ -661,10 +656,7 @@ void BlobDBImpl::GetLiveFilesMetaData(std::vector<LiveFileMetaData>* metadata) {
Status BlobDBImpl::Put(const WriteOptions& options, const Slice& key, Status BlobDBImpl::Put(const WriteOptions& options, const Slice& key,
const Slice& value) { const Slice& value) {
std::string new_value; return PutUntil(options, key, value, kNoExpiration);
Slice value_slice;
uint64_t expiration = ExtractExpiration(key, value, &value_slice, &new_value);
return PutUntil(options, key, value_slice, expiration);
} }
Status BlobDBImpl::PutWithTTL(const WriteOptions& options, Status BlobDBImpl::PutWithTTL(const WriteOptions& options,
@ -786,20 +778,6 @@ Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
return *compression_output; return *compression_output;
} }
uint64_t BlobDBImpl::ExtractExpiration(const Slice& key, const Slice& value,
Slice* value_slice,
std::string* new_value) {
uint64_t expiration = kNoExpiration;
bool has_expiration = false;
bool value_changed = false;
if (ttl_extractor_ != nullptr) {
has_expiration = ttl_extractor_->ExtractExpiration(
key, value, EpochNow(), &expiration, new_value, &value_changed);
}
*value_slice = value_changed ? Slice(*new_value) : value;
return has_expiration ? expiration : kNoExpiration;
}
void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context) { void BlobDBImpl::GetCompactionContext(BlobCompactionContext* context) {
ReadLock l(&mutex_); ReadLock l(&mutex_);

@ -235,9 +235,6 @@ class BlobDBImpl : public BlobDB {
void ObsoleteBlobFile(std::shared_ptr<BlobFile> blob_file, void ObsoleteBlobFile(std::shared_ptr<BlobFile> blob_file,
SequenceNumber obsolete_seq, bool update_size); SequenceNumber obsolete_seq, bool update_size);
uint64_t ExtractExpiration(const Slice& key, const Slice& value,
Slice* value_slice, std::string* new_value);
Status PutBlobValue(const WriteOptions& options, const Slice& key, Status PutBlobValue(const WriteOptions& options, const Slice& key,
const Slice& value, uint64_t expiration, const Slice& value, uint64_t expiration,
WriteBatch* batch); WriteBatch* batch);
@ -337,7 +334,6 @@ class BlobDBImpl : public BlobDB {
// the base DB // the base DB
DBImpl* db_impl_; DBImpl* db_impl_;
Env* env_; Env* env_;
TTLExtractor* ttl_extractor_;
// the options that govern the behavior of Blob Storage // the options that govern the behavior of Blob Storage
BlobDBOptions bdb_options_; BlobDBOptions bdb_options_;

@ -235,7 +235,6 @@ class BlobDBTest : public testing::Test {
const std::string dbname_; const std::string dbname_;
std::unique_ptr<MockTimeEnv> mock_env_; std::unique_ptr<MockTimeEnv> mock_env_;
std::shared_ptr<TTLExtractor> ttl_extractor_;
BlobDB *blob_db_; BlobDB *blob_db_;
}; // class BlobDBTest }; // class BlobDBTest
@ -312,191 +311,6 @@ TEST_F(BlobDBTest, PutUntil) {
VerifyDB(data); VerifyDB(data);
} }
TEST_F(BlobDBTest, TTLExtrator_NoTTL) {
// The default ttl extractor return no ttl for every key.
ttl_extractor_.reset(new TTLExtractor());
Random rnd(301);
Options options;
options.env = mock_env_.get();
BlobDBOptions bdb_options;
bdb_options.ttl_range_secs = 1000;
bdb_options.min_blob_size = 0;
bdb_options.blob_file_size = 256 * 1000 * 1000;
bdb_options.ttl_extractor = ttl_extractor_;
bdb_options.disable_background_tasks = true;
Open(bdb_options, options);
std::map<std::string, std::string> data;
mock_env_->set_current_time(0);
for (size_t i = 0; i < 100; i++) {
PutRandom("key" + ToString(i), &rnd, &data);
}
// very far in the future..
mock_env_->set_current_time(std::numeric_limits<uint64_t>::max() / 1000000 -
10);
auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
auto blob_files = bdb_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_FALSE(blob_files[0]->HasTTL());
ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
GCStats gc_stats;
ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_EQ(0, gc_stats.num_keys_expired);
ASSERT_EQ(100, gc_stats.num_keys_relocated);
VerifyDB(data);
}
TEST_F(BlobDBTest, TTLExtractor_ExtractTTL) {
Random rnd(301);
class TestTTLExtractor : public TTLExtractor {
public:
explicit TestTTLExtractor(Random *r) : rnd(r) {}
virtual bool ExtractTTL(const Slice &key, const Slice &value, uint64_t *ttl,
std::string * /*new_value*/,
bool * /*value_changed*/) override {
*ttl = rnd->Next() % 100;
if (*ttl > 50) {
data[key.ToString()] = value.ToString();
}
return true;
}
Random *rnd;
std::map<std::string, std::string> data;
};
ttl_extractor_.reset(new TestTTLExtractor(&rnd));
Options options;
options.env = mock_env_.get();
BlobDBOptions bdb_options;
bdb_options.ttl_range_secs = 1000;
bdb_options.min_blob_size = 0;
bdb_options.blob_file_size = 256 * 1000 * 1000;
bdb_options.ttl_extractor = ttl_extractor_;
bdb_options.disable_background_tasks = true;
Open(bdb_options, options);
mock_env_->set_current_time(50);
for (size_t i = 0; i < 100; i++) {
PutRandom("key" + ToString(i), &rnd);
}
mock_env_->set_current_time(100);
auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
auto blob_files = bdb_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_TRUE(blob_files[0]->HasTTL());
ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
GCStats gc_stats;
ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
auto &data = static_cast<TestTTLExtractor *>(ttl_extractor_.get())->data;
ASSERT_EQ(100 - data.size(), gc_stats.num_keys_expired);
ASSERT_EQ(data.size(), gc_stats.num_keys_relocated);
VerifyDB(data);
}
TEST_F(BlobDBTest, TTLExtractor_ExtractExpiration) {
Random rnd(301);
class TestTTLExtractor : public TTLExtractor {
public:
explicit TestTTLExtractor(Random *r) : rnd(r) {}
virtual bool ExtractExpiration(const Slice &key, const Slice &value,
uint64_t /*now*/, uint64_t *expiration,
std::string * /*new_value*/,
bool * /*value_changed*/) override {
*expiration = rnd->Next() % 100 + 50;
if (*expiration > 100) {
data[key.ToString()] = value.ToString();
}
return true;
}
Random *rnd;
std::map<std::string, std::string> data;
};
ttl_extractor_.reset(new TestTTLExtractor(&rnd));
Options options;
options.env = mock_env_.get();
BlobDBOptions bdb_options;
bdb_options.ttl_range_secs = 1000;
bdb_options.min_blob_size = 0;
bdb_options.blob_file_size = 256 * 1000 * 1000;
bdb_options.ttl_extractor = ttl_extractor_;
bdb_options.disable_background_tasks = true;
Open(bdb_options, options);
mock_env_->set_current_time(50);
for (size_t i = 0; i < 100; i++) {
PutRandom("key" + ToString(i), &rnd);
}
mock_env_->set_current_time(100);
auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
auto blob_files = bdb_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_TRUE(blob_files[0]->HasTTL());
ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
GCStats gc_stats;
ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
auto &data = static_cast<TestTTLExtractor *>(ttl_extractor_.get())->data;
ASSERT_EQ(100 - data.size(), gc_stats.num_keys_expired);
ASSERT_EQ(data.size(), gc_stats.num_keys_relocated);
VerifyDB(data);
}
TEST_F(BlobDBTest, TTLExtractor_ChangeValue) {
class TestTTLExtractor : public TTLExtractor {
public:
const Slice kTTLSuffix = Slice("ttl:");
bool ExtractTTL(const Slice & /*key*/, const Slice &value, uint64_t *ttl,
std::string *new_value, bool *value_changed) override {
if (value.size() < 12) {
return false;
}
const char *p = value.data() + value.size() - 12;
if (kTTLSuffix != Slice(p, 4)) {
return false;
}
*ttl = DecodeFixed64(p + 4);
*new_value = Slice(value.data(), value.size() - 12).ToString();
*value_changed = true;
return true;
}
};
Random rnd(301);
Options options;
options.env = mock_env_.get();
BlobDBOptions bdb_options;
bdb_options.ttl_range_secs = 1000;
bdb_options.min_blob_size = 0;
bdb_options.blob_file_size = 256 * 1000 * 1000;
bdb_options.ttl_extractor = std::make_shared<TestTTLExtractor>();
bdb_options.disable_background_tasks = true;
Open(bdb_options, options);
std::map<std::string, std::string> data;
mock_env_->set_current_time(50);
for (size_t i = 0; i < 100; i++) {
int len = rnd.Next() % kMaxBlobSize + 1;
std::string key = "key" + ToString(i);
std::string value = test::RandomHumanReadableString(&rnd, len);
uint64_t ttl = rnd.Next() % 100;
std::string value_ttl = value + "ttl:";
PutFixed64(&value_ttl, ttl);
ASSERT_OK(blob_db_->Put(WriteOptions(), Slice(key), Slice(value_ttl)));
if (ttl > 50) {
data[key] = value;
}
}
mock_env_->set_current_time(100);
auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
auto blob_files = bdb_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_TRUE(blob_files[0]->HasTTL());
ASSERT_OK(bdb_impl->TEST_CloseBlobFile(blob_files[0]));
GCStats gc_stats;
ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_EQ(100 - data.size(), gc_stats.num_keys_expired);
ASSERT_EQ(data.size(), gc_stats.num_keys_relocated);
VerifyDB(data);
}
TEST_F(BlobDBTest, StackableDBGet) { TEST_F(BlobDBTest, StackableDBGet) {
Random rnd(301); Random rnd(301);
BlobDBOptions bdb_options; BlobDBOptions bdb_options;

@ -1,34 +0,0 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#ifndef ROCKSDB_LITE
#include "utilities/blob_db/blob_db.h"
#include "util/coding.h"
namespace rocksdb {
namespace blob_db {
bool TTLExtractor::ExtractTTL(const Slice& /*key*/, const Slice& /*value*/,
uint64_t* /*ttl*/, std::string* /*new_value*/,
bool* /*value_changed*/) {
return false;
}
bool TTLExtractor::ExtractExpiration(const Slice& key, const Slice& value,
uint64_t now, uint64_t* expiration,
std::string* new_value,
bool* value_changed) {
uint64_t ttl;
bool has_ttl = ExtractTTL(key, value, &ttl, new_value, value_changed);
if (has_ttl) {
*expiration = now + ttl;
}
return has_ttl;
}
} // namespace blob_db
} // namespace rocksdb
#endif // ROCKSDB_LITE
Loading…
Cancel
Save