Blob DB TTL extractor

Summary:
Introducing blob_db::TTLExtractor to replace extract_ttl_fn. The TTL
extractor can be use to extract TTL from keys insert with Put or
WriteBatch. Change over existing extract_ttl_fn are:
* If value is changed, it will be return via std::string* (rather than Slice*). With Slice* the new value has to be part of the existing value. With std::string* the limitation is removed.
* It can optionally return TTL or expiration.

Other changes in this PR:
* replace `std::chrono::system_clock` with `Env::NowMicros` so that I can mock time in tests.
* add several TTL tests.
* other minor naming change.
Closes https://github.com/facebook/rocksdb/pull/2659

Differential Revision: D5512627

Pulled By: yiwu-arbug

fbshipit-source-id: 0dfcb00d74d060b8534c6130c808e4d5d0a54440
main
Yi Wu 7 years ago committed by Facebook Github Bot
parent 710411aea6
commit 6083bc79f8
  1. 1
      CMakeLists.txt
  2. 1
      TARGETS
  3. 1
      src.mk
  4. 2
      utilities/blob_db/blob_db.cc
  5. 23
      utilities/blob_db/blob_db.h
  6. 162
      utilities/blob_db/blob_db_impl.cc
  7. 36
      utilities/blob_db/blob_db_impl.h
  8. 276
      utilities/blob_db/blob_db_test.cc
  9. 5
      utilities/blob_db/blob_log_format.h
  10. 31
      utilities/blob_db/ttl_extractor.cc
  11. 43
      utilities/blob_db/ttl_extractor.h

@ -482,6 +482,7 @@ set(SOURCES
utilities/blob_db/blob_log_reader.cc utilities/blob_db/blob_log_reader.cc
utilities/blob_db/blob_log_writer.cc utilities/blob_db/blob_log_writer.cc
utilities/blob_db/blob_log_format.cc utilities/blob_db/blob_log_format.cc
utilities/blob_db/ttl_extractor.cc
utilities/cassandra/cassandra_compaction_filter.cc utilities/cassandra/cassandra_compaction_filter.cc
utilities/cassandra/format.cc utilities/cassandra/format.cc
utilities/cassandra/merge_operator.cc utilities/cassandra/merge_operator.cc

@ -211,6 +211,7 @@ cpp_library(
"utilities/blob_db/blob_log_reader.cc", "utilities/blob_db/blob_log_reader.cc",
"utilities/blob_db/blob_log_writer.cc", "utilities/blob_db/blob_log_writer.cc",
"utilities/blob_db/blob_log_format.cc", "utilities/blob_db/blob_log_format.cc",
"utilities/blob_db/ttl_extractor.cc",
"utilities/cassandra/cassandra_compaction_filter.cc", "utilities/cassandra/cassandra_compaction_filter.cc",
"utilities/cassandra/format.cc", "utilities/cassandra/format.cc",
"utilities/cassandra/merge_operator.cc", "utilities/cassandra/merge_operator.cc",

@ -159,6 +159,7 @@ LIB_SOURCES = \
utilities/blob_db/blob_log_reader.cc \ utilities/blob_db/blob_log_reader.cc \
utilities/blob_db/blob_log_writer.cc \ utilities/blob_db/blob_log_writer.cc \
utilities/blob_db/blob_log_format.cc \ utilities/blob_db/blob_log_format.cc \
utilities/blob_db/ttl_extractor.cc \
utilities/cassandra/cassandra_compaction_filter.cc \ utilities/cassandra/cassandra_compaction_filter.cc \
utilities/cassandra/format.cc \ utilities/cassandra/format.cc \
utilities/cassandra/merge_operator.cc \ utilities/cassandra/merge_operator.cc \

@ -17,7 +17,6 @@
#include "table/block.h" #include "table/block.h"
#include "table/block_based_table_builder.h" #include "table/block_based_table_builder.h"
#include "table/block_builder.h" #include "table/block_builder.h"
#include "util/crc32c.h"
#include "util/file_reader_writer.h" #include "util/file_reader_writer.h"
#include "util/filename.h" #include "util/filename.h"
#include "utilities/blob_db/blob_db_impl.h" #include "utilities/blob_db/blob_db_impl.h"
@ -163,7 +162,6 @@ BlobDBOptions::BlobDBOptions()
bytes_per_sync(0), bytes_per_sync(0),
blob_file_size(256 * 1024 * 1024), blob_file_size(256 * 1024 * 1024),
num_concurrent_simple_blobs(4), num_concurrent_simple_blobs(4),
default_ttl_extractor(false),
compression(kNoCompression) {} compression(kNoCompression) {}
} // namespace blob_db } // namespace blob_db

@ -13,6 +13,7 @@
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/status.h" #include "rocksdb/status.h"
#include "rocksdb/utilities/stackable_db.h" #include "rocksdb/utilities/stackable_db.h"
#include "utilities/blob_db/ttl_extractor.h"
namespace rocksdb { namespace rocksdb {
@ -64,15 +65,10 @@ struct BlobDBOptions {
// how many files to use for simple blobs at one time // how many files to use for simple blobs at one time
uint32_t num_concurrent_simple_blobs; uint32_t num_concurrent_simple_blobs;
// this function is to be provided by client if they intend to // Instead of setting TTL explicitly by calling PutWithTTL or PutUntil,
// use Put API to provide TTL. // applications can set a TTLExtractor which can extract TTL from key-value
// the first argument is the value in the Put API // pairs.
// in case you want to do some modifications to the value, std::shared_ptr<TTLExtractor> ttl_extractor;
// return a new Slice in the second.
// otherwise just copy the input value into output.
// the ttl should be extracted and returned in last pointer.
// otherwise assign it to -1
std::function<bool(const Slice&, Slice*, int32_t*)> extract_ttl_fn;
// eviction callback. // eviction callback.
// this function will be called for every blob that is getting // this function will be called for every blob that is getting
@ -80,9 +76,6 @@ struct BlobDBOptions {
std::function<void(const ColumnFamilyHandle*, const Slice&, const Slice&)> std::function<void(const ColumnFamilyHandle*, const Slice&, const Slice&)>
gc_evict_cb_fn; gc_evict_cb_fn;
// default ttl extactor
bool default_ttl_extractor;
// what compression to use for Blob's // what compression to use for Blob's
CompressionType compression; CompressionType compression;
@ -95,10 +88,6 @@ struct BlobDBOptions {
}; };
class BlobDB : public StackableDB { class BlobDB : public StackableDB {
public:
// the suffix to a blob value to represent "ttl:TTLVAL"
static const uint64_t kTTLSuffixLength = 8;
public: public:
using rocksdb::StackableDB::Put; using rocksdb::StackableDB::Put;
@ -120,6 +109,8 @@ class BlobDB : public StackableDB {
return PutWithTTL(options, DefaultColumnFamily(), key, value, ttl); return PutWithTTL(options, DefaultColumnFamily(), key, value, ttl);
} }
// Put with expiration. Key with expiration time equal to -1
// means the key don't expire.
virtual Status PutUntil(const WriteOptions& options, virtual Status PutUntil(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key, ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value, int32_t expiration) = 0; const Slice& value, int32_t expiration) = 0;

@ -6,9 +6,7 @@
#include "utilities/blob_db/blob_db_impl.h" #include "utilities/blob_db/blob_db_impl.h"
#include <algorithm> #include <algorithm>
#include <chrono>
#include <cinttypes> #include <cinttypes>
#include <ctime>
#include <iomanip> #include <iomanip>
#include <limits> #include <limits>
#include <memory> #include <memory>
@ -58,17 +56,6 @@ namespace rocksdb {
namespace blob_db { namespace blob_db {
struct GCStats {
uint64_t blob_count = 0;
uint64_t num_deletes = 0;
uint64_t deleted_size = 0;
uint64_t num_relocs = 0;
uint64_t succ_deletes_lsm = 0;
uint64_t overrided_while_delete = 0;
uint64_t succ_relocs = 0;
std::shared_ptr<BlobFile> newfile = nullptr;
};
// BlobHandle is a pointer to the blob that is stored in the LSM // BlobHandle is a pointer to the blob that is stored in the LSM
class BlobHandle { class BlobHandle {
public: public:
@ -192,7 +179,8 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname,
const DBOptions& db_options) const DBOptions& db_options)
: BlobDB(nullptr), : BlobDB(nullptr),
db_impl_(nullptr), db_impl_(nullptr),
myenv_(db_options.env), env_(db_options.env),
ttl_extractor_(blob_db_options.ttl_extractor.get()),
wo_set_(false), wo_set_(false),
bdb_options_(blob_db_options), bdb_options_(blob_db_options),
db_options_(db_options), db_options_(db_options),
@ -218,10 +206,6 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname,
blob_dir_ = (bdb_options_.path_relative) blob_dir_ = (bdb_options_.path_relative)
? dbname + "/" + bdb_options_.blob_dir ? dbname + "/" + bdb_options_.blob_dir
: bdb_options_.blob_dir; : bdb_options_.blob_dir;
if (bdb_options_.default_ttl_extractor) {
bdb_options_.extract_ttl_fn = &BlobDBImpl::ExtractTTLFromBlob;
}
} }
Status BlobDBImpl::LinkToBaseDB(DB* db) { Status BlobDBImpl::LinkToBaseDB(DB* db) {
@ -238,17 +222,17 @@ Status BlobDBImpl::LinkToBaseDB(DB* db) {
db_impl_ = dynamic_cast<DBImpl*>(db); db_impl_ = dynamic_cast<DBImpl*>(db);
} }
myenv_ = db_->GetEnv(); env_ = db_->GetEnv();
opt_db_.reset(new OptimisticTransactionDBImpl(db, false)); opt_db_.reset(new OptimisticTransactionDBImpl(db, false));
Status s = myenv_->CreateDirIfMissing(blob_dir_); Status s = env_->CreateDirIfMissing(blob_dir_);
if (!s.ok()) { if (!s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log, ROCKS_LOG_WARN(db_options_.info_log,
"Failed to create blob directory: %s status: '%s'", "Failed to create blob directory: %s status: '%s'",
blob_dir_.c_str(), s.ToString().c_str()); blob_dir_.c_str(), s.ToString().c_str());
} }
s = myenv_->NewDirectory(blob_dir_, &dir_ent_); s = env_->NewDirectory(blob_dir_, &dir_ent_);
if (!s.ok()) { if (!s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log, ROCKS_LOG_WARN(db_options_.info_log,
"Failed to open blob directory: %s status: '%s'", "Failed to open blob directory: %s status: '%s'",
@ -293,10 +277,6 @@ BlobDBImpl::BlobDBImpl(DB* db, const BlobDBOptions& blob_db_options)
blob_dir_ = (bdb_options_.path_relative) blob_dir_ = (bdb_options_.path_relative)
? db_->GetName() + "/" + bdb_options_.blob_dir ? db_->GetName() + "/" + bdb_options_.blob_dir
: bdb_options_.blob_dir; : bdb_options_.blob_dir;
if (bdb_options_.default_ttl_extractor) {
bdb_options_.extract_ttl_fn = &BlobDBImpl::ExtractTTLFromBlob;
}
} }
BlobDBImpl::~BlobDBImpl() { BlobDBImpl::~BlobDBImpl() {
@ -311,7 +291,7 @@ Status BlobDBImpl::OpenPhase1() {
return Status::NotSupported("No blob directory in options"); return Status::NotSupported("No blob directory in options");
std::unique_ptr<Directory> dir_ent; std::unique_ptr<Directory> dir_ent;
Status s = myenv_->NewDirectory(blob_dir_, &dir_ent); Status s = env_->NewDirectory(blob_dir_, &dir_ent);
if (!s.ok()) { if (!s.ok()) {
ROCKS_LOG_WARN(db_options_.info_log, ROCKS_LOG_WARN(db_options_.info_log,
"Failed to open blob directory: %s status: '%s'", "Failed to open blob directory: %s status: '%s'",
@ -366,7 +346,7 @@ void BlobDBImpl::OnFlushBeginHandler(DB* db, const FlushJobInfo& info) {
Status BlobDBImpl::GetAllLogFiles( Status BlobDBImpl::GetAllLogFiles(
std::set<std::pair<uint64_t, std::string>>* file_nums) { std::set<std::pair<uint64_t, std::string>>* file_nums) {
std::vector<std::string> all_files; std::vector<std::string> all_files;
Status status = myenv_->GetChildren(blob_dir_, &all_files); Status status = env_->GetChildren(blob_dir_, &all_files);
if (!status.ok()) { if (!status.ok()) {
return status; return status;
} }
@ -413,7 +393,7 @@ Status BlobDBImpl::OpenAllFiles() {
for (auto f_iter : file_nums) { for (auto f_iter : file_nums) {
std::string bfpath = BlobFileName(blob_dir_, f_iter.first); std::string bfpath = BlobFileName(blob_dir_, f_iter.first);
uint64_t size_bytes; uint64_t size_bytes;
Status s1 = myenv_->GetFileSize(bfpath, &size_bytes); Status s1 = env_->GetFileSize(bfpath, &size_bytes);
if (!s1.ok()) { if (!s1.ok()) {
ROCKS_LOG_WARN( ROCKS_LOG_WARN(
db_options_.info_log, db_options_.info_log,
@ -436,7 +416,7 @@ Status BlobDBImpl::OpenAllFiles() {
// read header // read header
std::shared_ptr<Reader> reader; std::shared_ptr<Reader> reader;
reader = bfptr->OpenSequentialReader(myenv_, db_options_, env_options_); reader = bfptr->OpenSequentialReader(env_, db_options_, env_options_);
s1 = reader->ReadHeader(&bfptr->header_); s1 = reader->ReadHeader(&bfptr->header_);
if (!s1.ok()) { if (!s1.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log, ROCKS_LOG_ERROR(db_options_.info_log,
@ -448,7 +428,7 @@ Status BlobDBImpl::OpenAllFiles() {
bfptr->header_valid_ = true; bfptr->header_valid_ = true;
std::shared_ptr<RandomAccessFileReader> ra_reader = std::shared_ptr<RandomAccessFileReader> ra_reader =
GetOrOpenRandomAccessReader(bfptr, myenv_, env_options_); GetOrOpenRandomAccessReader(bfptr, env_, env_options_);
BlobLogFooter bf; BlobLogFooter bf;
s1 = bfptr->ReadFooter(&bf); s1 = bfptr->ReadFooter(&bf);
@ -586,13 +566,13 @@ Status BlobDBImpl::CreateWriterLocked(const std::shared_ptr<BlobFile>& bfile) {
EnvOptions env_options = env_options_; EnvOptions env_options = env_options_;
env_options.writable_file_max_buffer_size = 0; env_options.writable_file_max_buffer_size = 0;
Status s = myenv_->ReopenWritableFile(fpath, &wfile, env_options); Status s = env_->ReopenWritableFile(fpath, &wfile, env_options);
if (!s.ok()) { if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log, ROCKS_LOG_ERROR(db_options_.info_log,
"Failed to open blob file for write: %s status: '%s'" "Failed to open blob file for write: %s status: '%s'"
" exists: '%s'", " exists: '%s'",
fpath.c_str(), s.ToString().c_str(), fpath.c_str(), s.ToString().c_str(),
myenv_->FileExists(fpath).ToString().c_str()); env_->FileExists(fpath).ToString().c_str());
return s; return s;
} }
@ -788,39 +768,13 @@ std::shared_ptr<BlobFile> BlobDBImpl::SelectBlobFileTTL(uint32_t expiration) {
return bfile; return bfile;
} }
bool BlobDBImpl::ExtractTTLFromBlob(const Slice& value, Slice* newval,
int32_t* ttl_val) {
*newval = value;
*ttl_val = -1;
if (value.size() <= BlobDB::kTTLSuffixLength) return false;
int32_t ttl_tmp =
DecodeFixed32(value.data() + value.size() - sizeof(int32_t));
std::string ttl_exp(value.data() + value.size() - BlobDB::kTTLSuffixLength,
4);
if (ttl_exp != "ttl:") return false;
newval->remove_suffix(BlobDB::kTTLSuffixLength);
*ttl_val = ttl_tmp;
return true;
}
////////////////////////////////////////////////////////////////////////////////
// A specific pattern is looked up at the end of the value part.
// ttl:TTLVAL . if this pattern is found, PutWithTTL is called, otherwise
// regular Put is called.
////////////////////////////////////////////////////////////////////////////////
Status BlobDBImpl::Put(const WriteOptions& options, Status BlobDBImpl::Put(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key, ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) { const Slice& value) {
Slice newval; std::string new_value;
int32_t ttl_val; Slice value_slice;
if (bdb_options_.extract_ttl_fn) { int32_t expiration = ExtractExpiration(key, value, &value_slice, &new_value);
bdb_options_.extract_ttl_fn(value, &newval, &ttl_val); return PutUntil(options, column_family, key, value_slice, expiration);
return PutWithTTL(options, column_family, key, newval, ttl_val);
}
return PutWithTTL(options, column_family, key, value, -1);
} }
Status BlobDBImpl::Delete(const WriteOptions& options, Status BlobDBImpl::Delete(const WriteOptions& options,
@ -852,6 +806,7 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
Status batch_rewrite_status_; Status batch_rewrite_status_;
std::shared_ptr<BlobFile> last_file_; std::shared_ptr<BlobFile> last_file_;
bool has_put_; bool has_put_;
std::string new_value_;
public: public:
explicit BlobInserter(BlobDBImpl* impl, SequenceNumber seq) explicit BlobInserter(BlobDBImpl* impl, SequenceNumber seq)
@ -866,23 +821,13 @@ Status BlobDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
bool has_put() { return has_put_; } bool has_put() { return has_put_; }
virtual Status PutCF(uint32_t column_family_id, const Slice& key, virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value_unc) override { const Slice& value_slice) override {
Slice newval; Slice value_unc;
int32_t ttl_val = -1; int32_t expiration =
if (impl_->bdb_options_.extract_ttl_fn) { impl_->ExtractExpiration(key, value_slice, &value_unc, &new_value_);
impl_->bdb_options_.extract_ttl_fn(value_unc, &newval, &ttl_val);
} else {
newval = value_unc;
}
int32_t expiration = -1;
if (ttl_val != -1) {
std::time_t cur_t = std::chrono::system_clock::to_time_t(
std::chrono::system_clock::now());
expiration = ttl_val + static_cast<int32_t>(cur_t);
}
std::shared_ptr<BlobFile> bfile = std::shared_ptr<BlobFile> bfile =
(ttl_val != -1) (expiration != -1)
? impl_->SelectBlobFileTTL(expiration) ? impl_->SelectBlobFileTTL(expiration)
: ((last_file_) ? last_file_ : impl_->SelectBlobFile()); : ((last_file_) ? last_file_ : impl_->SelectBlobFile());
if (last_file_ && last_file_ != bfile) { if (last_file_ && last_file_ != bfile) {
@ -1004,12 +949,8 @@ Status BlobDBImpl::PutWithTTL(const WriteOptions& options,
ColumnFamilyHandle* column_family, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value, const Slice& key, const Slice& value,
int32_t ttl) { int32_t ttl) {
return PutUntil( return PutUntil(options, column_family, key, value,
options, column_family, key, value, static_cast<int32_t>(EpochNow()) + ttl);
(ttl != -1)
? ttl + static_cast<int32_t>(std::chrono::system_clock::to_time_t(
std::chrono::system_clock::now()))
: -1);
} }
Slice BlobDBImpl::GetCompressedSlice(const Slice& raw, Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
@ -1024,6 +965,7 @@ Slice BlobDBImpl::GetCompressedSlice(const Slice& raw,
return *compression_output; return *compression_output;
} }
// TODO(yiwu): We should use uint64_t for expiration.
Status BlobDBImpl::PutUntil(const WriteOptions& options, Status BlobDBImpl::PutUntil(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key, ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value_unc, int32_t expiration) { const Slice& value_unc, int32_t expiration) {
@ -1097,6 +1039,24 @@ Status BlobDBImpl::PutUntil(const WriteOptions& options,
return s; return s;
} }
// TODO(yiwu): We should return uint64_t after updating the rest of the code
// to use uint64_t for expiration.
int32_t BlobDBImpl::ExtractExpiration(const Slice& key, const Slice& value,
Slice* value_slice,
std::string* new_value) {
uint64_t expiration = kNoExpiration;
bool value_changed = false;
if (ttl_extractor_ != nullptr) {
bool has_ttl = ttl_extractor_->ExtractExpiration(
key, value, EpochNow(), &expiration, new_value, &value_changed);
if (!has_ttl) {
expiration = kNoExpiration;
}
}
*value_slice = value_changed ? Slice(*new_value) : value;
return (expiration == kNoExpiration) ? -1 : static_cast<int32_t>(expiration);
}
Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile, Status BlobDBImpl::AppendBlob(const std::shared_ptr<BlobFile>& bfile,
const std::string& headerbuf, const Slice& key, const std::string& headerbuf, const Slice& key,
const Slice& value, std::string* index_entry) { const Slice& value, std::string* index_entry) {
@ -1240,7 +1200,7 @@ Status BlobDBImpl::CommonGet(const ColumnFamilyData* cfd, const Slice& key,
// takes locks when called // takes locks when called
std::shared_ptr<RandomAccessFileReader> reader = std::shared_ptr<RandomAccessFileReader> reader =
GetOrOpenRandomAccessReader(bfile, myenv_, env_options_); GetOrOpenRandomAccessReader(bfile, env_, env_options_);
if (value != nullptr) { if (value != nullptr) {
std::string* valueptr = value; std::string* valueptr = value;
@ -1377,14 +1337,13 @@ std::pair<bool, int64_t> BlobDBImpl::SanityCheck(bool aborted) {
assert(!bfile->Immutable()); assert(!bfile->Immutable());
} }
std::time_t epoch_now = uint64_t epoch_now = EpochNow();
std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
for (auto bfile_pair : blob_files_) { for (auto bfile_pair : blob_files_) {
auto bfile = bfile_pair.second; auto bfile = bfile_pair.second;
ROCKS_LOG_INFO( ROCKS_LOG_INFO(
db_options_.info_log, db_options_.info_log,
"Blob File %s %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %d", "Blob File %s %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64 " %" PRIu64,
bfile->PathName().c_str(), bfile->GetFileSize(), bfile->BlobCount(), bfile->PathName().c_str(), bfile->GetFileSize(), bfile->BlobCount(),
bfile->deleted_count_, bfile->deleted_size_, bfile->deleted_count_, bfile->deleted_size_,
(bfile->ttl_range_.second - epoch_now)); (bfile->ttl_range_.second - epoch_now));
@ -1603,8 +1562,7 @@ std::pair<bool, int64_t> BlobDBImpl::CheckSeqFiles(bool aborted) {
std::vector<std::shared_ptr<BlobFile>> process_files; std::vector<std::shared_ptr<BlobFile>> process_files;
{ {
std::time_t epoch_now = uint64_t epoch_now = EpochNow();
std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
ReadLock rl(&mutex_); ReadLock rl(&mutex_);
for (auto bfile : open_blob_files_) { for (auto bfile : open_blob_files_) {
@ -1713,11 +1671,10 @@ std::pair<bool, int64_t> BlobDBImpl::WaStats(bool aborted) {
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr, Status BlobDBImpl::GCFileAndUpdateLSM(const std::shared_ptr<BlobFile>& bfptr,
GCStats* gcstats) { GCStats* gcstats) {
std::chrono::system_clock::time_point now = std::chrono::system_clock::now(); uint64_t tt = EpochNow();
std::time_t tt = std::chrono::system_clock::to_time_t(now);
std::shared_ptr<Reader> reader = std::shared_ptr<Reader> reader =
bfptr->OpenSequentialReader(myenv_, db_options_, env_options_); bfptr->OpenSequentialReader(env_, db_options_, env_options_);
if (!reader) { if (!reader) {
ROCKS_LOG_ERROR(db_options_.info_log, ROCKS_LOG_ERROR(db_options_.info_log,
"File sequential reader could not be opened", "File sequential reader could not be opened",
@ -1987,7 +1944,7 @@ std::pair<bool, int64_t> BlobDBImpl::DeleteObsFiles(bool aborted) {
} }
} }
Status s = myenv_->DeleteFile(bfile->PathName()); Status s = env_->DeleteFile(bfile->PathName());
if (!s.ok()) { if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log, ROCKS_LOG_ERROR(db_options_.info_log,
"File failed to be deleted as obsolete %s", "File failed to be deleted as obsolete %s",
@ -2019,7 +1976,7 @@ std::pair<bool, int64_t> BlobDBImpl::DeleteObsFiles(bool aborted) {
bool BlobDBImpl::CallbackEvictsImpl(std::shared_ptr<BlobFile> bfile) { bool BlobDBImpl::CallbackEvictsImpl(std::shared_ptr<BlobFile> bfile) {
std::shared_ptr<Reader> reader = std::shared_ptr<Reader> reader =
bfile->OpenSequentialReader(myenv_, db_options_, env_options_); bfile->OpenSequentialReader(env_, db_options_, env_options_);
if (!reader) { if (!reader) {
ROCKS_LOG_ERROR( ROCKS_LOG_ERROR(
db_options_.info_log, db_options_.info_log,
@ -2264,6 +2221,23 @@ Status BlobDBImpl::TEST_GetSequenceNumber(const Slice& key,
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily()); auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(DefaultColumnFamily());
return CommonGet(cfh->cfd(), key, index_entry, nullptr, sequence); return CommonGet(cfh->cfd(), key, index_entry, nullptr, sequence);
} }
std::vector<std::shared_ptr<BlobFile>> BlobDBImpl::TEST_GetBlobFiles() const {
std::vector<std::shared_ptr<BlobFile>> blob_files;
for (auto& p : blob_files_) {
blob_files.emplace_back(p.second);
}
return blob_files;
}
void BlobDBImpl::TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile) {
CloseSeqWrite(bfile, false /*abort*/);
}
Status BlobDBImpl::TEST_GCFileAndUpdateLSM(std::shared_ptr<BlobFile>& bfile,
GCStats* gc_stats) {
return GCFileAndUpdateLSM(bfile, gc_stats);
}
#endif // !NDEBUG #endif // !NDEBUG
} // namespace blob_db } // namespace blob_db

@ -10,6 +10,7 @@
#include <atomic> #include <atomic>
#include <condition_variable> #include <condition_variable>
#include <ctime> #include <ctime>
#include <limits>
#include <list> #include <list>
#include <memory> #include <memory>
#include <set> #include <set>
@ -45,7 +46,6 @@ namespace blob_db {
class BlobFile; class BlobFile;
class BlobDBImpl; class BlobDBImpl;
struct GCStats;
class BlobDBFlushBeginListener : public EventListener { class BlobDBFlushBeginListener : public EventListener {
public: public:
@ -134,6 +134,17 @@ struct blobf_compare_ttl {
const std::shared_ptr<BlobFile>& rhs) const; const std::shared_ptr<BlobFile>& rhs) const;
}; };
struct GCStats {
uint64_t blob_count = 0;
uint64_t num_deletes = 0;
uint64_t deleted_size = 0;
uint64_t num_relocs = 0;
uint64_t succ_deletes_lsm = 0;
uint64_t overrided_while_delete = 0;
uint64_t succ_relocs = 0;
std::shared_ptr<BlobFile> newfile = nullptr;
};
/** /**
* The implementation class for BlobDB. This manages the value * The implementation class for BlobDB. This manages the value
* part in TTL aware sequentially written files. These files are * part in TTL aware sequentially written files. These files are
@ -147,6 +158,9 @@ class BlobDBImpl : public BlobDB {
friend class BlobDBIterator; friend class BlobDBIterator;
public: public:
static constexpr uint64_t kNoExpiration =
std::numeric_limits<uint64_t>::max();
using rocksdb::StackableDB::Put; using rocksdb::StackableDB::Put;
Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family, Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family,
const Slice& key, const Slice& value) override; const Slice& key, const Slice& value) override;
@ -200,12 +214,16 @@ class BlobDBImpl : public BlobDB {
#ifndef NDEBUG #ifndef NDEBUG
Status TEST_GetSequenceNumber(const Slice& key, SequenceNumber* sequence); Status TEST_GetSequenceNumber(const Slice& key, SequenceNumber* sequence);
std::vector<std::shared_ptr<BlobFile>> TEST_GetBlobFiles() const;
void TEST_CloseBlobFile(std::shared_ptr<BlobFile>& bfile);
Status TEST_GCFileAndUpdateLSM(std::shared_ptr<BlobFile>& bfile,
GCStats* gc_stats);
#endif // !NDEBUG #endif // !NDEBUG
private: private:
static bool ExtractTTLFromBlob(const Slice& value, Slice* newval,
int32_t* ttl_val);
Status OpenPhase1(); Status OpenPhase1();
Status CommonGet(const ColumnFamilyData* cfd, const Slice& key, Status CommonGet(const ColumnFamilyData* cfd, const Slice& key,
@ -237,6 +255,9 @@ class BlobDBImpl : public BlobDB {
// appends a task into timer queue to close the file // appends a task into timer queue to close the file
void CloseIf(const std::shared_ptr<BlobFile>& bfile); void CloseIf(const std::shared_ptr<BlobFile>& bfile);
int32_t ExtractExpiration(const Slice& key, const Slice& value,
Slice* value_slice, std::string* new_value);
Status AppendBlob(const std::shared_ptr<BlobFile>& bfile, Status AppendBlob(const std::shared_ptr<BlobFile>& bfile,
const std::string& headerbuf, const Slice& key, const std::string& headerbuf, const Slice& key,
const Slice& value, std::string* index_entry); const Slice& value, std::string* index_entry);
@ -346,11 +367,12 @@ class BlobDBImpl : public BlobDB {
std::vector<std::shared_ptr<BlobFile>>* to_process, uint64_t epoch, std::vector<std::shared_ptr<BlobFile>>* to_process, uint64_t epoch,
uint64_t last_id, size_t files_to_collect); uint64_t last_id, size_t files_to_collect);
private: uint64_t EpochNow() { return env_->NowMicros() / 1000000; }
// the base DB // the base DB
DBImpl* db_impl_; DBImpl* db_impl_;
Env* env_;
Env* myenv_; TTLExtractor* ttl_extractor_;
// Optimistic Transaction DB used during Garbage collection // Optimistic Transaction DB used during Garbage collection
// for atomicity // for atomicity

@ -25,7 +25,22 @@ class BlobDBTest : public testing::Test {
public: public:
const int kMaxBlobSize = 1 << 14; const int kMaxBlobSize = 1 << 14;
BlobDBTest() : dbname_(test::TmpDir() + "/blob_db_test"), blob_db_(nullptr) { class MockEnv : public EnvWrapper {
public:
MockEnv() : EnvWrapper(Env::Default()) {}
void set_now_micros(uint64_t now_micros) { now_micros_ = now_micros; }
uint64_t NowMicros() override { return now_micros_; }
private:
uint64_t now_micros_ = 0;
};
BlobDBTest()
: dbname_(test::TmpDir() + "/blob_db_test"),
mock_env_(new MockEnv()),
blob_db_(nullptr) {
Status s = DestroyBlobDB(dbname_, Options(), BlobDBOptions()); Status s = DestroyBlobDB(dbname_, Options(), BlobDBOptions());
assert(s.ok()); assert(s.ok());
} }
@ -59,9 +74,25 @@ class BlobDBTest : public testing::Test {
} }
} }
void PutRandomUntil(const std::string &key, int32_t expiration, Random *rnd,
std::map<std::string, std::string> *data = nullptr) {
int len = rnd->Next() % kMaxBlobSize + 1;
std::string value = test::RandomHumanReadableString(rnd, len);
ASSERT_OK(blob_db_->PutUntil(WriteOptions(), Slice(key), Slice(value),
expiration));
if (data != nullptr) {
(*data)[key] = value;
}
}
void PutRandom(const std::string &key, Random *rnd, void PutRandom(const std::string &key, Random *rnd,
std::map<std::string, std::string> *data = nullptr) { std::map<std::string, std::string> *data = nullptr) {
PutRandomWithTTL(key, -1, rnd, data); int len = rnd->Next() % kMaxBlobSize + 1;
std::string value = test::RandomHumanReadableString(rnd, len);
ASSERT_OK(blob_db_->Put(WriteOptions(), Slice(key), Slice(value)));
if (data != nullptr) {
(*data)[key] = value;
}
} }
void PutRandomToWriteBatch( void PutRandomToWriteBatch(
@ -115,6 +146,8 @@ class BlobDBTest : public testing::Test {
} }
const std::string dbname_; const std::string dbname_;
std::unique_ptr<MockEnv> mock_env_;
std::shared_ptr<TTLExtractor> ttl_extractor_;
BlobDB *blob_db_; BlobDB *blob_db_;
}; // class BlobDBTest }; // class BlobDBTest
@ -130,6 +163,245 @@ TEST_F(BlobDBTest, Put) {
VerifyDB(data); VerifyDB(data);
} }
TEST_F(BlobDBTest, PutWithTTL) {
Random rnd(301);
Options options;
options.env = mock_env_.get();
BlobDBOptionsImpl bdb_options;
bdb_options.ttl_range_secs = 1000;
bdb_options.blob_file_size = 256 * 1000 * 1000;
bdb_options.disable_background_tasks = true;
Open(bdb_options, options);
std::map<std::string, std::string> data;
mock_env_->set_now_micros(50 * 1000000);
for (size_t i = 0; i < 100; i++) {
int32_t ttl = rnd.Next() % 100;
PutRandomWithTTL("key" + ToString(i), ttl, &rnd,
(ttl < 50 ? nullptr : &data));
}
mock_env_->set_now_micros(100 * 1000000);
auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
auto blob_files = bdb_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_TRUE(blob_files[0]->HasTTL());
bdb_impl->TEST_CloseBlobFile(blob_files[0]);
GCStats gc_stats;
ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_EQ(100 - data.size(), gc_stats.num_deletes);
ASSERT_EQ(data.size(), gc_stats.num_relocs);
VerifyDB(data);
}
TEST_F(BlobDBTest, PutUntil) {
Random rnd(301);
Options options;
options.env = mock_env_.get();
BlobDBOptionsImpl bdb_options;
bdb_options.ttl_range_secs = 1000;
bdb_options.blob_file_size = 256 * 1000 * 1000;
bdb_options.disable_background_tasks = true;
Open(bdb_options, options);
std::map<std::string, std::string> data;
mock_env_->set_now_micros(50 * 1000000);
for (size_t i = 0; i < 100; i++) {
int32_t expiration = rnd.Next() % 100 + 50;
PutRandomUntil("key" + ToString(i), expiration, &rnd,
(expiration < 100 ? nullptr : &data));
}
mock_env_->set_now_micros(100 * 1000000);
auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
auto blob_files = bdb_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_TRUE(blob_files[0]->HasTTL());
bdb_impl->TEST_CloseBlobFile(blob_files[0]);
GCStats gc_stats;
ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_EQ(100 - data.size(), gc_stats.num_deletes);
ASSERT_EQ(data.size(), gc_stats.num_relocs);
VerifyDB(data);
}
TEST_F(BlobDBTest, TTLExtrator_NoTTL) {
// The default ttl extractor return no ttl for every key.
ttl_extractor_.reset(new TTLExtractor());
Random rnd(301);
Options options;
options.env = mock_env_.get();
BlobDBOptionsImpl bdb_options;
bdb_options.ttl_range_secs = 1000;
bdb_options.blob_file_size = 256 * 1000 * 1000;
bdb_options.num_concurrent_simple_blobs = 1;
bdb_options.ttl_extractor = ttl_extractor_;
bdb_options.disable_background_tasks = true;
Open(bdb_options, options);
std::map<std::string, std::string> data;
mock_env_->set_now_micros(0);
for (size_t i = 0; i < 100; i++) {
PutRandom("key" + ToString(i), &rnd, &data);
}
// very far in the future..
mock_env_->set_now_micros(std::numeric_limits<uint64_t>::max() - 10);
auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
auto blob_files = bdb_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_FALSE(blob_files[0]->HasTTL());
bdb_impl->TEST_CloseBlobFile(blob_files[0]);
GCStats gc_stats;
ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_EQ(0, gc_stats.num_deletes);
ASSERT_EQ(100, gc_stats.num_relocs);
VerifyDB(data);
}
TEST_F(BlobDBTest, TTLExtractor_ExtractTTL) {
Random rnd(301);
class TestTTLExtractor : public TTLExtractor {
public:
explicit TestTTLExtractor(Random *r) : rnd(r) {}
virtual bool ExtractTTL(const Slice &key, const Slice &value, uint64_t *ttl,
std::string * /*new_value*/,
bool * /*value_changed*/) override {
*ttl = rnd->Next() % 100;
if (*ttl >= 50) {
data[key.ToString()] = value.ToString();
}
return true;
}
Random *rnd;
std::map<std::string, std::string> data;
};
ttl_extractor_.reset(new TestTTLExtractor(&rnd));
Options options;
options.env = mock_env_.get();
BlobDBOptionsImpl bdb_options;
bdb_options.ttl_range_secs = 1000;
bdb_options.blob_file_size = 256 * 1000 * 1000;
bdb_options.ttl_extractor = ttl_extractor_;
bdb_options.disable_background_tasks = true;
Open(bdb_options, options);
mock_env_->set_now_micros(50 * 1000000);
for (size_t i = 0; i < 100; i++) {
PutRandom("key" + ToString(i), &rnd);
}
mock_env_->set_now_micros(100 * 1000000);
auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
auto blob_files = bdb_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_TRUE(blob_files[0]->HasTTL());
bdb_impl->TEST_CloseBlobFile(blob_files[0]);
GCStats gc_stats;
ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
auto &data = static_cast<TestTTLExtractor *>(ttl_extractor_.get())->data;
ASSERT_EQ(100 - data.size(), gc_stats.num_deletes);
ASSERT_EQ(data.size(), gc_stats.num_relocs);
VerifyDB(data);
}
TEST_F(BlobDBTest, TTLExtractor_ExtractExpiration) {
Random rnd(301);
class TestTTLExtractor : public TTLExtractor {
public:
explicit TestTTLExtractor(Random *r) : rnd(r) {}
virtual bool ExtractExpiration(const Slice &key, const Slice &value,
uint64_t /*now*/, uint64_t *expiration,
std::string * /*new_value*/,
bool * /*value_changed*/) override {
*expiration = rnd->Next() % 100 + 50;
if (*expiration >= 100) {
data[key.ToString()] = value.ToString();
}
return true;
}
Random *rnd;
std::map<std::string, std::string> data;
};
ttl_extractor_.reset(new TestTTLExtractor(&rnd));
Options options;
options.env = mock_env_.get();
BlobDBOptionsImpl bdb_options;
bdb_options.ttl_range_secs = 1000;
bdb_options.blob_file_size = 256 * 1000 * 1000;
bdb_options.ttl_extractor = ttl_extractor_;
bdb_options.disable_background_tasks = true;
Open(bdb_options, options);
mock_env_->set_now_micros(50 * 1000000);
for (size_t i = 0; i < 100; i++) {
PutRandom("key" + ToString(i), &rnd);
}
mock_env_->set_now_micros(100 * 1000000);
auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
auto blob_files = bdb_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_TRUE(blob_files[0]->HasTTL());
bdb_impl->TEST_CloseBlobFile(blob_files[0]);
GCStats gc_stats;
ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
auto &data = static_cast<TestTTLExtractor *>(ttl_extractor_.get())->data;
ASSERT_EQ(100 - data.size(), gc_stats.num_deletes);
ASSERT_EQ(data.size(), gc_stats.num_relocs);
VerifyDB(data);
}
TEST_F(BlobDBTest, TTLExtractor_ChangeValue) {
class TestTTLExtractor : public TTLExtractor {
public:
const Slice kTTLSuffix = Slice("ttl:");
bool ExtractTTL(const Slice & /*key*/, const Slice &value, uint64_t *ttl,
std::string *new_value, bool *value_changed) override {
if (value.size() < 12) {
return false;
}
const char *p = value.data() + value.size() - 12;
if (kTTLSuffix != Slice(p, 4)) {
return false;
}
*ttl = DecodeFixed64(p + 4);
*new_value = Slice(value.data(), value.size() - 12).ToString();
*value_changed = true;
return true;
}
};
Random rnd(301);
Options options;
options.env = mock_env_.get();
BlobDBOptionsImpl bdb_options;
bdb_options.ttl_range_secs = 1000;
bdb_options.blob_file_size = 256 * 1000 * 1000;
bdb_options.ttl_extractor = std::make_shared<TestTTLExtractor>();
bdb_options.disable_background_tasks = true;
Open(bdb_options, options);
std::map<std::string, std::string> data;
mock_env_->set_now_micros(50 * 1000000);
for (size_t i = 0; i < 100; i++) {
int len = rnd.Next() % kMaxBlobSize + 1;
std::string key = "key" + ToString(i);
std::string value = test::RandomHumanReadableString(&rnd, len);
uint64_t ttl = rnd.Next() % 100;
std::string value_ttl = value + "ttl:";
PutFixed64(&value_ttl, ttl);
ASSERT_OK(blob_db_->Put(WriteOptions(), Slice(key), Slice(value_ttl)));
if (ttl >= 50) {
data[key] = value;
}
}
mock_env_->set_now_micros(100 * 1000000);
auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
auto blob_files = bdb_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_TRUE(blob_files[0]->HasTTL());
bdb_impl->TEST_CloseBlobFile(blob_files[0]);
GCStats gc_stats;
ASSERT_OK(bdb_impl->TEST_GCFileAndUpdateLSM(blob_files[0], &gc_stats));
ASSERT_EQ(100 - data.size(), gc_stats.num_deletes);
ASSERT_EQ(data.size(), gc_stats.num_relocs);
VerifyDB(data);
}
TEST_F(BlobDBTest, StackableDBGet) { TEST_F(BlobDBTest, StackableDBGet) {
Random rnd(301); Random rnd(301);
BlobDBOptionsImpl bdb_options; BlobDBOptionsImpl bdb_options;

@ -11,6 +11,7 @@
#include <cstddef> #include <cstddef>
#include <cstdint> #include <cstdint>
#include <limits>
#include <memory> #include <memory>
#include <string> #include <string>
#include <utility> #include <utility>
@ -229,6 +230,10 @@ class BlobLogRecord {
uint64_t GetBlobSize() const { return blob_size_; } uint64_t GetBlobSize() const { return blob_size_; }
bool HasTTL() const {
return ttl_val_ != std::numeric_limits<uint32_t>::max();
}
uint32_t GetTTL() const { return ttl_val_; } uint32_t GetTTL() const { return ttl_val_; }
uint64_t GetTimeVal() const { return time_val_; } uint64_t GetTimeVal() const { return time_val_; }

@ -0,0 +1,31 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include "ttl_extractor.h"
#include "util/coding.h"
namespace rocksdb {
namespace blob_db {
bool TTLExtractor::ExtractTTL(const Slice& /*key*/, const Slice& /*value*/,
uint64_t* /*ttl*/, std::string* /*new_value*/,
bool* /*value_changed*/) {
return false;
}
bool TTLExtractor::ExtractExpiration(const Slice& key, const Slice& value,
uint64_t now, uint64_t* expiration,
std::string* new_value,
bool* value_changed) {
uint64_t ttl;
bool has_ttl = ExtractTTL(key, value, &ttl, new_value, value_changed);
if (has_ttl) {
*expiration = now + ttl;
}
return has_ttl;
}
} // namespace blob_db
} // namespace rocksdb

@ -0,0 +1,43 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#include <memory>
#include <string>
#include "rocksdb/slice.h"
namespace rocksdb {
namespace blob_db {
// TTLExtractor allow applications to extract TTL from key-value pairs.
// This useful for applications using Put or WriteBatch to write keys and
// don't intend to migrate to PutWithTTL or PutUntil.
//
// Applications can implement either ExtractTTL or ExtractExpiration. If both
// are implemented, ExtractExpiration will take precedence.
class TTLExtractor {
public:
// Extract TTL from key-value pair.
// Return true if the key has TTL, false otherwise. If key has TTL,
// TTL is pass back through ttl. The method can optionally modify the value,
// pass the result back through new_value, and also set value_changed to true.
virtual bool ExtractTTL(const Slice& key, const Slice& value, uint64_t* ttl,
std::string* new_value, bool* value_changed);
// Extract expiration time from key-value pair.
// Return true if the key has expiration time, false otherwise. If key has
// expiration time, it is pass back through expiration. The method can
// optionally modify the value, pass the result back through new_value,
// and also set value_changed to true.
virtual bool ExtractExpiration(const Slice& key, const Slice& value,
uint64_t now, uint64_t* expiration,
std::string* new_value, bool* value_changed);
virtual ~TTLExtractor() = default;
};
} // namespace blob_db
} // namespace rocksdb
Loading…
Cancel
Save