You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
rocksdb/utilities/ttl/db_ttl_impl.h

251 lines
8.2 KiB

// Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved.
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#ifndef ROCKSDB_LITE
#include <deque>
#include <string>
#include <vector>
#include "db/db_impl/db_impl.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/db.h"
#include "rocksdb/merge_operator.h"
#include "rocksdb/system_clock.h"
#include "rocksdb/utilities/db_ttl.h"
#include "rocksdb/utilities/utility_db.h"
#include "utilities/compaction_filters/layered_compaction_filter_base.h"
#ifdef _WIN32
// Windows API macro interference
#undef GetCurrentTime
#endif
namespace ROCKSDB_NAMESPACE {
struct ConfigOptions;
class ObjectLibrary;
class ObjectRegistry;
class DBWithTTLImpl : public DBWithTTL {
public:
static void SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options,
SystemClock* clock);
static void RegisterTtlClasses();
explicit DBWithTTLImpl(DB* db);
virtual ~DBWithTTLImpl();
virtual Status Close() override;
Status CreateColumnFamilyWithTtl(const ColumnFamilyOptions& options,
const std::string& column_family_name,
ColumnFamilyHandle** handle,
int ttl) override;
Status CreateColumnFamily(const ColumnFamilyOptions& options,
const std::string& column_family_name,
ColumnFamilyHandle** handle) override;
using StackableDB::Put;
virtual Status Put(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& val) override;
using StackableDB::Get;
virtual Status Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
PinnableSlice* value) override;
using StackableDB::MultiGet;
virtual std::vector<Status> MultiGet(
const ReadOptions& options,
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys,
std::vector<std::string>* values) override;
using StackableDB::KeyMayExist;
virtual bool KeyMayExist(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value,
bool* value_found = nullptr) override;
using StackableDB::Merge;
virtual Status Merge(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) override;
virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override;
using StackableDB::NewIterator;
virtual Iterator* NewIterator(const ReadOptions& opts,
ColumnFamilyHandle* column_family) override;
virtual DB* GetBaseDB() override { return db_; }
static bool IsStale(const Slice& value, int32_t ttl, SystemClock* clock);
static Status AppendTS(const Slice& val, std::string* val_with_ts,
SystemClock* clock);
static Status SanityCheckTimestamp(const Slice& str);
static Status StripTS(std::string* str);
static Status StripTS(PinnableSlice* str);
static const uint32_t kTSLength = sizeof(int32_t); // size of timestamp
static const int32_t kMinTimestamp = 1368146402; // 05/09/2013:5:40PM GMT-8
static const int32_t kMaxTimestamp = 2147483647; // 01/18/2038:7:14PM GMT-8
void SetTtl(int32_t ttl) override { SetTtl(DefaultColumnFamily(), ttl); }
void SetTtl(ColumnFamilyHandle *h, int32_t ttl) override;
private:
// remember whether the Close completes or not
bool closed_;
};
class TtlIterator : public Iterator {
public:
explicit TtlIterator(Iterator* iter) : iter_(iter) { assert(iter_); }
~TtlIterator() { delete iter_; }
bool Valid() const override { return iter_->Valid(); }
void SeekToFirst() override { iter_->SeekToFirst(); }
void SeekToLast() override { iter_->SeekToLast(); }
void Seek(const Slice& target) override { iter_->Seek(target); }
void SeekForPrev(const Slice& target) override { iter_->SeekForPrev(target); }
void Next() override { iter_->Next(); }
void Prev() override { iter_->Prev(); }
Slice key() const override { return iter_->key(); }
Iterator with timestamp (#6255) Summary: Preliminary support for iterator with user timestamp. Current implementation does not consider merge operator and reverse iterator. Auto compaction is also disabled in unit tests. Create an iterator with timestamp. ``` ... read_opts.timestamp = &ts; auto* iter = db->NewIterator(read_opts); // target is key without timestamp. for (iter->Seek(target); iter->Valid(); iter->Next()) {} for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {} delete iter; read_opts.timestamp = &ts1; // lower_bound and upper_bound are without timestamp. read_opts.iterate_lower_bound = &lower_bound; read_opts.iterate_upper_bound = &upper_bound; auto* iter1 = db->NewIterator(read_opts); // Do Seek or SeekToFirst() delete iter1; ``` Test plan (dev server) ``` $make check ``` Simple benchmarking (dev server) 1. The overhead introduced by this PR even when timestamp is disabled. key size: 16 bytes value size: 100 bytes Entries: 1000000 Data reside in main memory, and try to stress iterator. Repeated three times on master and this PR. - Seek without next ``` ./db_bench -db=/dev/shm/rocksdbtest-1000 -benchmarks=fillseq,seekrandom -enable_pipelined_write=false -disable_wal=true -format_version=3 ``` master: 159047.0 ops/sec this PR: 158922.3 ops/sec (2% drop in throughput) - Seek and next 10 times ``` ./db_bench -db=/dev/shm/rocksdbtest-1000 -benchmarks=fillseq,seekrandom -enable_pipelined_write=false -disable_wal=true -format_version=3 -seek_nexts=10 ``` master: 109539.3 ops/sec this PR: 107519.7 ops/sec (2% drop in throughput) Pull Request resolved: https://github.com/facebook/rocksdb/pull/6255 Differential Revision: D19438227 Pulled By: riversand963 fbshipit-source-id: b66b4979486f8474619f4aa6bdd88598870b0746
5 years ago
int32_t ttl_timestamp() const {
return DecodeFixed32(iter_->value().data() + iter_->value().size() -
DBWithTTLImpl::kTSLength);
}
Slice value() const override {
// TODO: handle timestamp corruption like in general iterator semantics
assert(DBWithTTLImpl::SanityCheckTimestamp(iter_->value()).ok());
Slice trimmed_value = iter_->value();
trimmed_value.size_ -= DBWithTTLImpl::kTSLength;
return trimmed_value;
}
Status status() const override { return iter_->status(); }
private:
Iterator* iter_;
};
class TtlCompactionFilter : public LayeredCompactionFilterBase {
public:
TtlCompactionFilter(int32_t ttl, SystemClock* clock,
const CompactionFilter* _user_comp_filter,
std::unique_ptr<const CompactionFilter>
_user_comp_filter_from_factory = nullptr);
virtual bool Filter(int level, const Slice& key, const Slice& old_val,
std::string* new_val, bool* value_changed) const override;
const char* Name() const override { return kClassName(); }
static const char* kClassName() { return "TtlCompactionFilter"; }
bool IsInstanceOf(const std::string& name) const override {
if (name == "Delete By TTL") {
return true;
} else {
return LayeredCompactionFilterBase::IsInstanceOf(name);
}
}
Status PrepareOptions(const ConfigOptions& config_options) override;
Status ValidateOptions(const DBOptions& db_opts,
const ColumnFamilyOptions& cf_opts) const override;
private:
int32_t ttl_;
SystemClock* clock_;
};
class TtlCompactionFilterFactory : public CompactionFilterFactory {
public:
TtlCompactionFilterFactory(
int32_t ttl, SystemClock* clock,
std::shared_ptr<CompactionFilterFactory> comp_filter_factory);
std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) override;
void SetTtl(int32_t ttl) {
ttl_ = ttl;
}
const char* Name() const override { return kClassName(); }
static const char* kClassName() { return "TtlCompactionFilterFactory"; }
Status PrepareOptions(const ConfigOptions& config_options) override;
Status ValidateOptions(const DBOptions& db_opts,
const ColumnFamilyOptions& cf_opts) const override;
const Customizable* Inner() const override {
return user_comp_filter_factory_.get();
}
private:
int32_t ttl_;
SystemClock* clock_;
std::shared_ptr<CompactionFilterFactory> user_comp_filter_factory_;
};
class TtlMergeOperator : public MergeOperator {
public:
explicit TtlMergeOperator(const std::shared_ptr<MergeOperator>& merge_op,
SystemClock* clock);
bool FullMergeV2(const MergeOperationInput& merge_in,
MergeOperationOutput* merge_out) const override;
[RocksDB] [MergeOperator] The new Merge Interface! Uses merge sequences. Summary: Here are the major changes to the Merge Interface. It has been expanded to handle cases where the MergeOperator is not associative. It does so by stacking up merge operations while scanning through the key history (i.e.: during Get() or Compaction), until a valid Put/Delete/end-of-history is encountered; it then applies all of the merge operations in the correct sequence starting with the base/sentinel value. I have also introduced an "AssociativeMerge" function which allows the user to take advantage of associative merge operations (such as in the case of counters). The implementation will always attempt to merge the operations/operands themselves together when they are encountered, and will resort to the "stacking" method if and only if the "associative-merge" fails. This implementation is conjectured to allow MergeOperator to handle the general case, while still providing the user with the ability to take advantage of certain efficiencies in their own merge-operator / data-structure. NOTE: This is a preliminary diff. This must still go through a lot of review, revision, and testing. Feedback welcome! Test Plan: -This is a preliminary diff. I have only just begun testing/debugging it. -I will be testing this with the existing MergeOperator use-cases and unit-tests (counters, string-append, and redis-lists) -I will be "desk-checking" and walking through the code with the help gdb. -I will find a way of stress-testing the new interface / implementation using db_bench, db_test, merge_test, and/or db_stress. -I will ensure that my tests cover all cases: Get-Memtable, Get-Immutable-Memtable, Get-from-Disk, Iterator-Range-Scan, Flush-Memtable-to-L0, Compaction-L0-L1, Compaction-Ln-L(n+1), Put/Delete found, Put/Delete not-found, end-of-history, end-of-file, etc. -A lot of feedback from the reviewers. Reviewers: haobo, dhruba, zshao, emayanke Reviewed By: haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D11499
11 years ago
bool PartialMergeMulti(const Slice& key,
const std::deque<Slice>& operand_list,
std::string* new_value, Logger* logger) const override;
[RocksDB] [MergeOperator] The new Merge Interface! Uses merge sequences. Summary: Here are the major changes to the Merge Interface. It has been expanded to handle cases where the MergeOperator is not associative. It does so by stacking up merge operations while scanning through the key history (i.e.: during Get() or Compaction), until a valid Put/Delete/end-of-history is encountered; it then applies all of the merge operations in the correct sequence starting with the base/sentinel value. I have also introduced an "AssociativeMerge" function which allows the user to take advantage of associative merge operations (such as in the case of counters). The implementation will always attempt to merge the operations/operands themselves together when they are encountered, and will resort to the "stacking" method if and only if the "associative-merge" fails. This implementation is conjectured to allow MergeOperator to handle the general case, while still providing the user with the ability to take advantage of certain efficiencies in their own merge-operator / data-structure. NOTE: This is a preliminary diff. This must still go through a lot of review, revision, and testing. Feedback welcome! Test Plan: -This is a preliminary diff. I have only just begun testing/debugging it. -I will be testing this with the existing MergeOperator use-cases and unit-tests (counters, string-append, and redis-lists) -I will be "desk-checking" and walking through the code with the help gdb. -I will find a way of stress-testing the new interface / implementation using db_bench, db_test, merge_test, and/or db_stress. -I will ensure that my tests cover all cases: Get-Memtable, Get-Immutable-Memtable, Get-from-Disk, Iterator-Range-Scan, Flush-Memtable-to-L0, Compaction-L0-L1, Compaction-Ln-L(n+1), Put/Delete found, Put/Delete not-found, end-of-history, end-of-file, etc. -A lot of feedback from the reviewers. Reviewers: haobo, dhruba, zshao, emayanke Reviewed By: haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D11499
11 years ago
static const char* kClassName() { return "TtlMergeOperator"; }
const char* Name() const override { return kClassName(); }
bool IsInstanceOf(const std::string& name) const override {
if (name == "Merge By TTL") {
[RocksDB] [MergeOperator] The new Merge Interface! Uses merge sequences. Summary: Here are the major changes to the Merge Interface. It has been expanded to handle cases where the MergeOperator is not associative. It does so by stacking up merge operations while scanning through the key history (i.e.: during Get() or Compaction), until a valid Put/Delete/end-of-history is encountered; it then applies all of the merge operations in the correct sequence starting with the base/sentinel value. I have also introduced an "AssociativeMerge" function which allows the user to take advantage of associative merge operations (such as in the case of counters). The implementation will always attempt to merge the operations/operands themselves together when they are encountered, and will resort to the "stacking" method if and only if the "associative-merge" fails. This implementation is conjectured to allow MergeOperator to handle the general case, while still providing the user with the ability to take advantage of certain efficiencies in their own merge-operator / data-structure. NOTE: This is a preliminary diff. This must still go through a lot of review, revision, and testing. Feedback welcome! Test Plan: -This is a preliminary diff. I have only just begun testing/debugging it. -I will be testing this with the existing MergeOperator use-cases and unit-tests (counters, string-append, and redis-lists) -I will be "desk-checking" and walking through the code with the help gdb. -I will find a way of stress-testing the new interface / implementation using db_bench, db_test, merge_test, and/or db_stress. -I will ensure that my tests cover all cases: Get-Memtable, Get-Immutable-Memtable, Get-from-Disk, Iterator-Range-Scan, Flush-Memtable-to-L0, Compaction-L0-L1, Compaction-Ln-L(n+1), Put/Delete found, Put/Delete not-found, end-of-history, end-of-file, etc. -A lot of feedback from the reviewers. Reviewers: haobo, dhruba, zshao, emayanke Reviewed By: haobo CC: leveldb Differential Revision: https://reviews.facebook.net/D11499
11 years ago
return true;
} else {
return MergeOperator::IsInstanceOf(name);
}
}
Status PrepareOptions(const ConfigOptions& config_options) override;
Status ValidateOptions(const DBOptions& db_opts,
const ColumnFamilyOptions& cf_opts) const override;
const Customizable* Inner() const override { return user_merge_op_.get(); }
private:
std::shared_ptr<MergeOperator> user_merge_op_;
SystemClock* clock_;
};
extern "C" {
int RegisterTtlObjects(ObjectLibrary& library, const std::string& /*arg*/);
} // extern "C"
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE