Add DB::Properties::kEstimateOldestKeyTime

Summary:
With FIFO compaction we would like to get the oldest data time for monitoring. The problem is we don't have timestamp for each key in the DB. As an approximation, we expose the earliest of sst file "creation_time" property.

My plan is to override the property with a more accurate value with blob db, where we actually have timestamp.
Closes https://github.com/facebook/rocksdb/pull/2842

Differential Revision: D5770600

Pulled By: yiwu-arbug

fbshipit-source-id: 03833c8f10bbfbee62f8ea5c0d03c0cafb5d853a
main
Yi Wu 7 years ago committed by Facebook Github Bot
parent d2a65c59e1
commit 66a2c44ef4
  1. 1
      HISTORY.md
  2. 17
      db/builder.cc
  3. 7
      db/builder.h
  4. 27
      db/compaction_picker.cc
  5. 74
      db/db_properties_test.cc
  6. 14
      db/db_sst_test.cc
  7. 31
      db/db_test_util.h
  8. 7
      db/flush_job.cc
  9. 34
      db/internal_stats.cc
  10. 2
      db/internal_stats.h
  11. 22
      db/memtable.cc
  12. 9
      db/memtable.h
  13. 8
      db/memtable_list.cc
  14. 3
      db/memtable_list.h
  15. 7
      include/rocksdb/db.h
  16. 6
      include/rocksdb/table_properties.h
  17. 14
      table/block_based_table_builder.cc
  18. 3
      table/block_based_table_builder.h
  19. 3
      table/block_based_table_factory.cc
  20. 3
      table/meta_blocks.cc
  21. 8
      table/table_builder.h
  22. 5
      table/table_properties.cc
  23. 47
      utilities/blob_db/blob_db_test.cc

@ -8,6 +8,7 @@
* Support dynamic adjustment of rate limit according to demand for background I/O. It can be enabled by passing `true` to the `auto_tuned` parameter in `NewGenericRateLimiter()`. The value passed as `rate_bytes_per_sec` will still be respected as an upper-bound.
* Support dynamically changing `ColumnFamilyOptions::compaction_options_fifo`.
* Introduce `EventListener::OnStallConditionsChanged()` callback. Users can implement it to be notified when user writes are stalled, stopped, or resumed.
* Add a new db property "rocksdb.estimate-oldest-key-time" to return oldest data timestamp. The property is available only for FIFO compaction with compaction_options_fifo.allow_compaction = false.
### Bug Fixes
* Fix a potential data inconsistency issue during point-in-time recovery. `DB:Open()` will abort if column family inconsistency is found during PIT recovery.

@ -47,15 +47,15 @@ TableBuilder* NewTableBuilder(
WritableFileWriter* file, const CompressionType compression_type,
const CompressionOptions& compression_opts, int level,
const std::string* compression_dict, const bool skip_filters,
const uint64_t creation_time) {
const uint64_t creation_time, const uint64_t oldest_key_time) {
assert((column_family_id ==
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
column_family_name.empty());
return ioptions.table_factory->NewTableBuilder(
TableBuilderOptions(ioptions, internal_comparator,
int_tbl_prop_collector_factories, compression_type,
compression_opts, compression_dict, skip_filters,
column_family_name, level, creation_time),
TableBuilderOptions(
ioptions, internal_comparator, int_tbl_prop_collector_factories,
compression_type, compression_opts, compression_dict, skip_filters,
column_family_name, level, creation_time, oldest_key_time),
column_family_id, file);
}
@ -74,8 +74,8 @@ Status BuildTable(
const CompressionOptions& compression_opts, bool paranoid_file_checks,
InternalStats* internal_stats, TableFileCreationReason reason,
EventLogger* event_logger, int job_id, const Env::IOPriority io_priority,
TableProperties* table_properties, int level,
const uint64_t creation_time) {
TableProperties* table_properties, int level, const uint64_t creation_time,
const uint64_t oldest_key_time) {
assert((column_family_id ==
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
column_family_name.empty());
@ -120,12 +120,11 @@ Status BuildTable(
file_writer.reset(new WritableFileWriter(std::move(file), env_options,
ioptions.statistics));
builder = NewTableBuilder(
ioptions, internal_comparator, int_tbl_prop_collector_factories,
column_family_id, column_family_name, file_writer.get(), compression,
compression_opts, level, nullptr /* compression_dict */,
false /* skip_filters */, creation_time);
false /* skip_filters */, creation_time, oldest_key_time);
}
MergeHelper merge(env, internal_comparator.user_comparator(),

@ -6,6 +6,7 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#pragma once
#include <limits>
#include <string>
#include <utility>
#include <vector>
@ -51,7 +52,8 @@ TableBuilder* NewTableBuilder(
WritableFileWriter* file, const CompressionType compression_type,
const CompressionOptions& compression_opts, int level,
const std::string* compression_dict = nullptr,
const bool skip_filters = false, const uint64_t creation_time = 0);
const bool skip_filters = false, const uint64_t creation_time = 0,
const uint64_t oldest_key_time = std::numeric_limits<uint64_t>::max());
// Build a Table file from the contents of *iter. The generated file
// will be named according to number specified in meta. On success, the rest of
@ -78,6 +80,7 @@ extern Status BuildTable(
EventLogger* event_logger = nullptr, int job_id = 0,
const Env::IOPriority io_priority = Env::IO_HIGH,
TableProperties* table_properties = nullptr, int level = -1,
const uint64_t creation_time = 0);
const uint64_t creation_time = 0,
const uint64_t oldest_key_time = std::numeric_limits<uint64_t>::max());
} // namespace rocksdb

@ -1446,19 +1446,22 @@ Compaction* FIFOCompactionPicker::PickTTLCompaction(
inputs.emplace_back();
inputs[0].level = 0;
for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) {
auto f = *ritr;
if (f->fd.table_reader != nullptr &&
f->fd.table_reader->GetTableProperties() != nullptr) {
auto creation_time =
f->fd.table_reader->GetTableProperties()->creation_time;
if (creation_time == 0 ||
creation_time >=
(current_time - mutable_cf_options.compaction_options_fifo.ttl)) {
break;
// avoid underflow
if (current_time > mutable_cf_options.compaction_options_fifo.ttl) {
for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) {
auto f = *ritr;
if (f->fd.table_reader != nullptr &&
f->fd.table_reader->GetTableProperties() != nullptr) {
auto creation_time =
f->fd.table_reader->GetTableProperties()->creation_time;
if (creation_time == 0 ||
creation_time >= (current_time -
mutable_cf_options.compaction_options_fifo.ttl)) {
break;
}
total_size -= f->compensated_file_size;
inputs[0].files.push_back(f);
}
total_size -= f->compensated_file_size;
inputs[0].files.push_back(f);
}
}

@ -1309,6 +1309,80 @@ TEST_F(DBPropertiesTest, EstimateNumKeysUnderflow) {
ASSERT_EQ(0, num_keys);
}
TEST_F(DBPropertiesTest, EstimateOldestKeyTime) {
std::unique_ptr<MockTimeEnv> mock_env(new MockTimeEnv(Env::Default()));
uint64_t oldest_key_time = 0;
Options options;
options.env = mock_env.get();
// "rocksdb.estimate-oldest-key-time" only available to fifo compaction.
mock_env->set_current_time(100);
for (auto compaction : {kCompactionStyleLevel, kCompactionStyleUniversal,
kCompactionStyleNone}) {
options.compaction_style = compaction;
options.create_if_missing = true;
DestroyAndReopen(options);
ASSERT_OK(Put("foo", "bar"));
ASSERT_FALSE(dbfull()->GetIntProperty(
DB::Properties::kEstimateOldestKeyTime, &oldest_key_time));
}
options.compaction_style = kCompactionStyleFIFO;
options.compaction_options_fifo.ttl = 300;
options.compaction_options_fifo.allow_compaction = false;
DestroyAndReopen(options);
mock_env->set_current_time(100);
ASSERT_OK(Put("k1", "v1"));
ASSERT_TRUE(dbfull()->GetIntProperty(DB::Properties::kEstimateOldestKeyTime,
&oldest_key_time));
ASSERT_EQ(100, oldest_key_time);
ASSERT_OK(Flush());
ASSERT_EQ("1", FilesPerLevel());
ASSERT_TRUE(dbfull()->GetIntProperty(DB::Properties::kEstimateOldestKeyTime,
&oldest_key_time));
ASSERT_EQ(100, oldest_key_time);
mock_env->set_current_time(200);
ASSERT_OK(Put("k2", "v2"));
ASSERT_OK(Flush());
ASSERT_EQ("2", FilesPerLevel());
ASSERT_TRUE(dbfull()->GetIntProperty(DB::Properties::kEstimateOldestKeyTime,
&oldest_key_time));
ASSERT_EQ(100, oldest_key_time);
mock_env->set_current_time(300);
ASSERT_OK(Put("k3", "v3"));
ASSERT_OK(Flush());
ASSERT_EQ("3", FilesPerLevel());
ASSERT_TRUE(dbfull()->GetIntProperty(DB::Properties::kEstimateOldestKeyTime,
&oldest_key_time));
ASSERT_EQ(100, oldest_key_time);
mock_env->set_current_time(450);
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ("2", FilesPerLevel());
ASSERT_TRUE(dbfull()->GetIntProperty(DB::Properties::kEstimateOldestKeyTime,
&oldest_key_time));
ASSERT_EQ(200, oldest_key_time);
mock_env->set_current_time(550);
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ("1", FilesPerLevel());
ASSERT_TRUE(dbfull()->GetIntProperty(DB::Properties::kEstimateOldestKeyTime,
&oldest_key_time));
ASSERT_EQ(300, oldest_key_time);
mock_env->set_current_time(650);
ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr));
ASSERT_EQ("", FilesPerLevel());
ASSERT_FALSE(dbfull()->GetIntProperty(DB::Properties::kEstimateOldestKeyTime,
&oldest_key_time));
// Close before mock_env destructs.
Close();
}
#endif // ROCKSDB_LITE
} // namespace rocksdb

@ -701,9 +701,13 @@ TEST_F(DBSSTTest, GetTotalSstFilesSize) {
ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
&total_sst_files_size));
// Live SST files = 1 (compacted file)
// Total SST files = 6 (5 original files + compacted file)
ASSERT_EQ(live_sst_files_size, 1 * single_file_size);
ASSERT_EQ(total_sst_files_size, 6 * single_file_size);
// The 5 bytes difference comes from oldest-key-time table property isn't
// propagated on compaction. It is written with default value
// std::numeric_limits<uint64_t>::max as varint64.
ASSERT_EQ(live_sst_files_size, 1 * single_file_size + 5);
// Total SST files = 5 original files + compacted file
ASSERT_EQ(total_sst_files_size, 5 * single_file_size + live_sst_files_size);
// hold current version
std::unique_ptr<Iterator> iter2(dbfull()->NewIterator(ReadOptions()));
@ -724,14 +728,14 @@ TEST_F(DBSSTTest, GetTotalSstFilesSize) {
&total_sst_files_size));
// Live SST files = 0
// Total SST files = 6 (5 original files + compacted file)
ASSERT_EQ(total_sst_files_size, 6 * single_file_size);
ASSERT_EQ(total_sst_files_size, 5 * single_file_size + live_sst_files_size);
iter1.reset();
ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",
&total_sst_files_size));
// Live SST files = 0
// Total SST files = 1 (compacted file)
ASSERT_EQ(total_sst_files_size, 1 * single_file_size);
ASSERT_EQ(total_sst_files_size, live_sst_files_size);
iter2.reset();
ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size",

@ -572,6 +572,37 @@ class SpecialEnv : public EnvWrapper {
std::atomic<bool> is_wal_sync_thread_safe_{true};
};
class MockTimeEnv : public EnvWrapper {
public:
explicit MockTimeEnv(Env* base) : EnvWrapper(base) {}
virtual Status GetCurrentTime(int64_t* time) override {
assert(time != nullptr);
assert(current_time_ <=
static_cast<uint64_t>(std::numeric_limits<int64_t>::max()));
*time = static_cast<int64_t>(current_time_);
return Status::OK();
}
virtual uint64_t NowMicros() override {
assert(current_time_ <= std::numeric_limits<uint64_t>::max() / 1000000);
return current_time_ * 1000000;
}
virtual uint64_t NowNanos() override {
assert(current_time_ <= std::numeric_limits<uint64_t>::max() / 1000000000);
return current_time_ * 1000000000;
}
void set_current_time(uint64_t time) {
assert(time >= current_time_);
current_time_ = time;
}
private:
uint64_t current_time_ = 0;
};
#ifndef ROCKSDB_LITE
class OnFileDeletionListener : public EventListener {
public:

@ -16,6 +16,7 @@
#include <inttypes.h>
#include <algorithm>
#include <limits>
#include <vector>
#include "db/builder.h"
@ -299,6 +300,9 @@ Status FlushJob::WriteLevel0Table() {
db_options_.env->GetCurrentTime(&_current_time); // ignore error
const uint64_t current_time = static_cast<uint64_t>(_current_time);
uint64_t oldest_key_time =
mems_.front()->ApproximateOldestKeyTime();
s = BuildTable(
dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_,
env_options_, cfd_->table_cache(), iter.get(),
@ -309,7 +313,8 @@ Status FlushJob::WriteLevel0Table() {
output_compression_, cfd_->ioptions()->compression_opts,
mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(),
TableFileCreationReason::kFlush, event_logger_, job_context_->job_id,
Env::IO_HIGH, &table_properties_, 0 /* level */, current_time);
Env::IO_HIGH, &table_properties_, 0 /* level */, current_time,
oldest_key_time);
LogFlush(db_options_.info_log);
}
ROCKS_LOG_INFO(db_options_.info_log,

@ -13,8 +13,9 @@
#endif
#include <inttypes.h>
#include <string>
#include <algorithm>
#include <limits>
#include <string>
#include <utility>
#include <vector>
#include "db/column_family.h"
@ -243,6 +244,7 @@ static const std::string num_running_flushes = "num-running-flushes";
static const std::string actual_delayed_write_rate =
"actual-delayed-write-rate";
static const std::string is_write_stopped = "is-write-stopped";
static const std::string estimate_oldest_key_time = "estimate-oldest-key-time";
const std::string DB::Properties::kNumFilesAtLevelPrefix =
rocksdb_prefix + num_files_at_level_prefix;
@ -316,6 +318,8 @@ const std::string DB::Properties::kActualDelayedWriteRate =
rocksdb_prefix + actual_delayed_write_rate;
const std::string DB::Properties::kIsWriteStopped =
rocksdb_prefix + is_write_stopped;
const std::string DB::Properties::kEstimateOldestKeyTime =
rocksdb_prefix + estimate_oldest_key_time;
const std::unordered_map<std::string, DBPropertyInfo>
InternalStats::ppt_name_to_info = {
@ -414,6 +418,9 @@ const std::unordered_map<std::string, DBPropertyInfo>
nullptr}},
{DB::Properties::kIsWriteStopped,
{false, nullptr, &InternalStats::HandleIsWriteStopped, nullptr}},
{DB::Properties::kEstimateOldestKeyTime,
{false, nullptr, &InternalStats::HandleEstimateOldestKeyTime,
nullptr}},
};
const DBPropertyInfo* GetPropertyInfo(const Slice& property) {
@ -776,6 +783,31 @@ bool InternalStats::HandleIsWriteStopped(uint64_t* value, DBImpl* db,
return true;
}
bool InternalStats::HandleEstimateOldestKeyTime(uint64_t* value, DBImpl* /*db*/,
Version* /*version*/) {
// TODO(yiwu): The property is currently available for fifo compaction
// with allow_compaction = false. This is because we don't propagate
// oldest_key_time on compaction.
if (cfd_->ioptions()->compaction_style != kCompactionStyleFIFO ||
cfd_->GetCurrentMutableCFOptions()
->compaction_options_fifo.allow_compaction) {
return false;
}
TablePropertiesCollection collection;
auto s = cfd_->current()->GetPropertiesOfAllTables(&collection);
if (!s.ok()) {
return false;
}
*value = std::numeric_limits<uint64_t>::max();
for (auto& p : collection) {
*value = std::min(*value, p.second->oldest_key_time);
}
*value = std::min({cfd_->mem()->ApproximateOldestKeyTime(),
cfd_->imm()->ApproximateOldestKeyTime(), *value});
return *value < std::numeric_limits<uint64_t>::max();
}
void InternalStats::DumpDBStats(std::string* value) {
char buf[1000];
// DB-level stats, only available from default column family

@ -477,6 +477,8 @@ class InternalStats {
bool HandleActualDelayedWriteRate(uint64_t* value, DBImpl* db,
Version* version);
bool HandleIsWriteStopped(uint64_t* value, DBImpl* db, Version* version);
bool HandleEstimateOldestKeyTime(uint64_t* value, DBImpl* db,
Version* version);
// Total number of background errors encountered. Every time a flush task
// or compaction task fails, this counter is incremented. The failure can

@ -9,8 +9,9 @@
#include "db/memtable.h"
#include <memory>
#include <algorithm>
#include <limits>
#include <memory>
#include "db/dbformat.h"
#include "db/merge_context.h"
@ -97,7 +98,8 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
flush_state_(FLUSH_NOT_REQUESTED),
env_(ioptions.env),
insert_with_hint_prefix_extractor_(
ioptions.memtable_insert_with_hint_prefix_extractor) {
ioptions.memtable_insert_with_hint_prefix_extractor),
oldest_key_time_(std::numeric_limits<uint64_t>::max()) {
UpdateFlushState();
// something went wrong if we need to flush before inserting anything
assert(!ShouldScheduleFlush());
@ -203,6 +205,21 @@ void MemTable::UpdateFlushState() {
}
}
void MemTable::UpdateOldestKeyTime() {
uint64_t oldest_key_time = oldest_key_time_.load(std::memory_order_relaxed);
if (oldest_key_time == std::numeric_limits<uint64_t>::max()) {
int64_t current_time = 0;
auto s = env_->GetCurrentTime(&current_time);
if (s.ok()) {
assert(current_time >= 0);
// If fail, the timestamp is already set.
oldest_key_time_.compare_exchange_strong(
oldest_key_time, static_cast<uint64_t>(current_time),
std::memory_order_relaxed, std::memory_order_relaxed);
}
}
}
int MemTable::KeyComparator::operator()(const char* prefix_len_key1,
const char* prefix_len_key2) const {
// Internal keys are encoded as length-prefixed strings.
@ -517,6 +534,7 @@ void MemTable::Add(SequenceNumber s, ValueType type,
if (is_range_del_table_empty_ && type == kTypeRangeDeletion) {
is_range_del_table_empty_ = false;
}
UpdateOldestKeyTime();
}
// Callback from MemTable::Get()

@ -352,6 +352,10 @@ class MemTable {
const MemTableOptions* GetMemTableOptions() const { return &moptions_; }
uint64_t ApproximateOldestKeyTime() const {
return oldest_key_time_.load(std::memory_order_relaxed);
}
private:
enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED };
@ -415,12 +419,17 @@ class MemTable {
// Insert hints for each prefix.
std::unordered_map<Slice, void*, SliceHasher> insert_hints_;
// Timestamp of oldest key
std::atomic<uint64_t> oldest_key_time_;
// Returns a heuristic flush decision
bool ShouldFlushNow() const;
// Updates flush_state_ using ShouldFlushNow()
void UpdateFlushState();
void UpdateOldestKeyTime();
// No copying allowed
MemTable(const MemTable&);
MemTable& operator=(const MemTable&);

@ -10,6 +10,7 @@
#endif
#include <inttypes.h>
#include <limits>
#include <string>
#include "db/memtable.h"
#include "db/version_set.h"
@ -448,6 +449,13 @@ size_t MemTableList::ApproximateUnflushedMemTablesMemoryUsage() {
size_t MemTableList::ApproximateMemoryUsage() { return current_memory_usage_; }
uint64_t MemTableList::ApproximateOldestKeyTime() const {
if (!current_->memlist_.empty()) {
return current_->memlist_.back()->ApproximateOldestKeyTime();
}
return std::numeric_limits<uint64_t>::max();
}
void MemTableList::InstallNewVersion() {
if (current_->refs_ == 1) {
// we're the only one using the version, just keep using it

@ -224,6 +224,9 @@ class MemTableList {
// the unflushed mem-tables.
size_t ApproximateUnflushedMemTablesMemoryUsage();
// Returns an estimate of the timestamp of the earliest key.
uint64_t ApproximateOldestKeyTime() const;
// Request a flush of all existing memtables to storage. This will
// cause future calls to IsFlushPending() to return true if this list is
// non-empty (regardless of the min_write_buffer_number_to_merge

@ -582,6 +582,12 @@ class DB {
// "rocksdb.is-write-stopped" - Return 1 if write has been stopped.
static const std::string kIsWriteStopped;
// "rocksdb.estimate-oldest-key-time" - returns an estimation of
// oldest key timestamp in the DB. Currently only available for
// FIFO compaction with
// compaction_options_fifo.allow_compaction = false.
static const std::string kEstimateOldestKeyTime;
};
#endif /* ROCKSDB_LITE */
@ -632,6 +638,7 @@ class DB {
// "rocksdb.num-running-flushes"
// "rocksdb.actual-delayed-write-rate"
// "rocksdb.is-write-stopped"
// "rocksdb.estimate-oldest-key-time"
virtual bool GetIntProperty(ColumnFamilyHandle* column_family,
const Slice& property, uint64_t* value) = 0;
virtual bool GetIntProperty(const Slice& property, uint64_t* value) {

@ -4,8 +4,9 @@
#pragma once
#include <stdint.h>
#include <string>
#include <limits>
#include <map>
#include <string>
#include "rocksdb/status.h"
#include "rocksdb/types.h"
@ -49,6 +50,7 @@ struct TablePropertiesNames {
static const std::string kPropertyCollectors;
static const std::string kCompression;
static const std::string kCreationTime;
static const std::string kOldestKeyTime;
};
extern const std::string kPropertiesBlock;
@ -162,6 +164,8 @@ struct TableProperties {
// The time when the SST file was created.
// Since SST files are immutable, this is equivalent to last modified time.
uint64_t creation_time = 0;
// Timestamp of the earliest key
uint64_t oldest_key_time = std::numeric_limits<uint64_t>::max();
// Name of the column family with which this SST file is associated.
// If column family is unknown, `column_family_name` will be an empty string.

@ -276,6 +276,7 @@ struct BlockBasedTableBuilder::Rep {
uint32_t column_family_id;
const std::string& column_family_name;
uint64_t creation_time = 0;
uint64_t oldest_key_time = std::numeric_limits<uint64_t>::max();
std::vector<std::unique_ptr<IntTblPropCollector>> table_properties_collectors;
@ -288,7 +289,8 @@ struct BlockBasedTableBuilder::Rep {
const CompressionType _compression_type,
const CompressionOptions& _compression_opts,
const std::string* _compression_dict, const bool skip_filters,
const std::string& _column_family_name, const uint64_t _creation_time)
const std::string& _column_family_name, const uint64_t _creation_time,
const uint64_t _oldest_key_time)
: ioptions(_ioptions),
table_options(table_opt),
internal_comparator(icomparator),
@ -305,7 +307,8 @@ struct BlockBasedTableBuilder::Rep {
table_options, data_block)),
column_family_id(_column_family_id),
column_family_name(_column_family_name),
creation_time(_creation_time) {
creation_time(_creation_time),
oldest_key_time(_oldest_key_time) {
if (table_options.index_type ==
BlockBasedTableOptions::kTwoLevelIndexSearch) {
p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder(
@ -344,7 +347,8 @@ BlockBasedTableBuilder::BlockBasedTableBuilder(
const CompressionType compression_type,
const CompressionOptions& compression_opts,
const std::string* compression_dict, const bool skip_filters,
const std::string& column_family_name, const uint64_t creation_time) {
const std::string& column_family_name, const uint64_t creation_time,
const uint64_t oldest_key_time) {
BlockBasedTableOptions sanitized_table_options(table_options);
if (sanitized_table_options.format_version == 0 &&
sanitized_table_options.checksum != kCRC32c) {
@ -360,7 +364,8 @@ BlockBasedTableBuilder::BlockBasedTableBuilder(
rep_ = new Rep(ioptions, sanitized_table_options, internal_comparator,
int_tbl_prop_collector_factories, column_family_id, file,
compression_type, compression_opts, compression_dict,
skip_filters, column_family_name, creation_time);
skip_filters, column_family_name, creation_time,
oldest_key_time);
if (rep_->filter_builder != nullptr) {
rep_->filter_builder->StartBlock(0);
@ -738,6 +743,7 @@ Status BlockBasedTableBuilder::Finish() {
r->p_index_builder_->EstimateTopLevelIndexSize(r->offset);
}
r->props.creation_time = r->creation_time;
r->props.oldest_key_time = r->oldest_key_time;
// Add basic properties
property_block_builder.AddTableProperty(r->props);

@ -47,7 +47,8 @@ class BlockBasedTableBuilder : public TableBuilder {
const CompressionType compression_type,
const CompressionOptions& compression_opts,
const std::string* compression_dict, const bool skip_filters,
const std::string& column_family_name, const uint64_t creation_time = 0);
const std::string& column_family_name, const uint64_t creation_time = 0,
const uint64_t oldest_key_time = std::numeric_limits<uint64_t>::max());
// REQUIRES: Either Finish() or Abandon() has been called.
~BlockBasedTableBuilder();

@ -85,7 +85,8 @@ TableBuilder* BlockBasedTableFactory::NewTableBuilder(
table_builder_options.compression_dict,
table_builder_options.skip_filters,
table_builder_options.column_family_name,
table_builder_options.creation_time);
table_builder_options.creation_time,
table_builder_options.oldest_key_time);
return table_builder;
}

@ -79,6 +79,7 @@ void PropertyBlockBuilder::AddTableProperty(const TableProperties& props) {
Add(TablePropertiesNames::kFixedKeyLen, props.fixed_key_len);
Add(TablePropertiesNames::kColumnFamilyId, props.column_family_id);
Add(TablePropertiesNames::kCreationTime, props.creation_time);
Add(TablePropertiesNames::kOldestKeyTime, props.oldest_key_time);
if (!props.filter_policy_name.empty()) {
Add(TablePropertiesNames::kFilterPolicy, props.filter_policy_name);
@ -213,6 +214,8 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file,
&new_table_properties->column_family_id},
{TablePropertiesNames::kCreationTime,
&new_table_properties->creation_time},
{TablePropertiesNames::kOldestKeyTime,
&new_table_properties->oldest_key_time},
};
std::string last_key;

@ -10,6 +10,7 @@
#pragma once
#include <stdint.h>
#include <limits>
#include <string>
#include <utility>
#include <vector>
@ -55,7 +56,8 @@ struct TableBuilderOptions {
const CompressionOptions& _compression_opts,
const std::string* _compression_dict, bool _skip_filters,
const std::string& _column_family_name, int _level,
const uint64_t _creation_time = 0)
const uint64_t _creation_time = 0,
const int64_t _oldest_key_time = std::numeric_limits<uint64_t>::max())
: ioptions(_ioptions),
internal_comparator(_internal_comparator),
int_tbl_prop_collector_factories(_int_tbl_prop_collector_factories),
@ -65,7 +67,8 @@ struct TableBuilderOptions {
skip_filters(_skip_filters),
column_family_name(_column_family_name),
level(_level),
creation_time(_creation_time) {}
creation_time(_creation_time),
oldest_key_time(_oldest_key_time) {}
const ImmutableCFOptions& ioptions;
const InternalKeyComparator& internal_comparator;
const std::vector<std::unique_ptr<IntTblPropCollectorFactory>>*
@ -78,6 +81,7 @@ struct TableBuilderOptions {
const std::string& column_family_name;
int level; // what level this table/file is on, -1 for "not set, don't know"
const uint64_t creation_time;
const int64_t oldest_key_time;
};
// TableBuilder provides the interface used to build a Table

@ -139,6 +139,9 @@ std::string TableProperties::ToString(
AppendProperty(result, "creation time", creation_time, prop_delim, kv_delim);
AppendProperty(result, "time stamp of earliest key", oldest_key_time,
prop_delim, kv_delim);
return result;
}
@ -191,6 +194,8 @@ const std::string TablePropertiesNames::kPropertyCollectors =
"rocksdb.property.collectors";
const std::string TablePropertiesNames::kCompression = "rocksdb.compression";
const std::string TablePropertiesNames::kCreationTime = "rocksdb.creation.time";
const std::string TablePropertiesNames::kOldestKeyTime =
"rocksdb.oldest.key.time";
extern const std::string kPropertiesBlock = "rocksdb.properties";
// Old property block name for backward compatibility

@ -26,21 +26,9 @@ class BlobDBTest : public testing::Test {
public:
const int kMaxBlobSize = 1 << 14;
class MockEnv : public EnvWrapper {
public:
MockEnv() : EnvWrapper(Env::Default()) {}
void set_now_micros(uint64_t now_micros) { now_micros_ = now_micros; }
uint64_t NowMicros() override { return now_micros_; }
private:
uint64_t now_micros_ = 0;
};
BlobDBTest()
: dbname_(test::TmpDir() + "/blob_db_test"),
mock_env_(new MockEnv()),
mock_env_(new MockTimeEnv(Env::Default())),
blob_db_(nullptr) {
Status s = DestroyBlobDB(dbname_, Options(), BlobDBOptions());
assert(s.ok());
@ -155,7 +143,7 @@ class BlobDBTest : public testing::Test {
}
const std::string dbname_;
std::unique_ptr<MockEnv> mock_env_;
std::unique_ptr<MockTimeEnv> mock_env_;
std::shared_ptr<TTLExtractor> ttl_extractor_;
BlobDB *blob_db_;
}; // class BlobDBTest
@ -182,13 +170,13 @@ TEST_F(BlobDBTest, PutWithTTL) {
bdb_options.disable_background_tasks = true;
Open(bdb_options, options);
std::map<std::string, std::string> data;
mock_env_->set_now_micros(50 * 1000000);
mock_env_->set_current_time(50);
for (size_t i = 0; i < 100; i++) {
uint64_t ttl = rnd.Next() % 100;
PutRandomWithTTL("key" + ToString(i), ttl, &rnd,
(ttl <= 50 ? nullptr : &data));
}
mock_env_->set_now_micros(100 * 1000000);
mock_env_->set_current_time(100);
auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
auto blob_files = bdb_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
@ -211,13 +199,13 @@ TEST_F(BlobDBTest, PutUntil) {
bdb_options.disable_background_tasks = true;
Open(bdb_options, options);
std::map<std::string, std::string> data;
mock_env_->set_now_micros(50 * 1000000);
mock_env_->set_current_time(50);
for (size_t i = 0; i < 100; i++) {
uint64_t expiration = rnd.Next() % 100 + 50;
PutRandomUntil("key" + ToString(i), expiration, &rnd,
(expiration <= 100 ? nullptr : &data));
}
mock_env_->set_now_micros(100 * 1000000);
mock_env_->set_current_time(100);
auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
auto blob_files = bdb_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
@ -244,12 +232,13 @@ TEST_F(BlobDBTest, TTLExtrator_NoTTL) {
bdb_options.disable_background_tasks = true;
Open(bdb_options, options);
std::map<std::string, std::string> data;
mock_env_->set_now_micros(0);
mock_env_->set_current_time(0);
for (size_t i = 0; i < 100; i++) {
PutRandom("key" + ToString(i), &rnd, &data);
}
// very far in the future..
mock_env_->set_now_micros(std::numeric_limits<uint64_t>::max() - 10);
mock_env_->set_current_time(std::numeric_limits<uint64_t>::max() / 1000000 -
10);
auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
auto blob_files = bdb_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
@ -290,11 +279,11 @@ TEST_F(BlobDBTest, TTLExtractor_ExtractTTL) {
bdb_options.ttl_extractor = ttl_extractor_;
bdb_options.disable_background_tasks = true;
Open(bdb_options, options);
mock_env_->set_now_micros(50 * 1000000);
mock_env_->set_current_time(50);
for (size_t i = 0; i < 100; i++) {
PutRandom("key" + ToString(i), &rnd);
}
mock_env_->set_now_micros(100 * 1000000);
mock_env_->set_current_time(100);
auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
auto blob_files = bdb_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
@ -337,11 +326,11 @@ TEST_F(BlobDBTest, TTLExtractor_ExtractExpiration) {
bdb_options.ttl_extractor = ttl_extractor_;
bdb_options.disable_background_tasks = true;
Open(bdb_options, options);
mock_env_->set_now_micros(50 * 1000000);
mock_env_->set_current_time(50);
for (size_t i = 0; i < 100; i++) {
PutRandom("key" + ToString(i), &rnd);
}
mock_env_->set_now_micros(100 * 1000000);
mock_env_->set_current_time(100);
auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
auto blob_files = bdb_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
@ -385,7 +374,7 @@ TEST_F(BlobDBTest, TTLExtractor_ChangeValue) {
bdb_options.disable_background_tasks = true;
Open(bdb_options, options);
std::map<std::string, std::string> data;
mock_env_->set_now_micros(50 * 1000000);
mock_env_->set_current_time(50);
for (size_t i = 0; i < 100; i++) {
int len = rnd.Next() % kMaxBlobSize + 1;
std::string key = "key" + ToString(i);
@ -398,7 +387,7 @@ TEST_F(BlobDBTest, TTLExtractor_ChangeValue) {
data[key] = value;
}
}
mock_env_->set_now_micros(100 * 1000000);
mock_env_->set_current_time(100);
auto *bdb_impl = static_cast<BlobDBImpl *>(blob_db_);
auto blob_files = bdb_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
@ -628,14 +617,14 @@ TEST_F(BlobDBTest, GCExpiredKeyWhileOverwriting) {
BlobDBOptions bdb_options;
bdb_options.disable_background_tasks = true;
Open(bdb_options, options);
mock_env_->set_now_micros(100 * 1000000);
mock_env_->set_current_time(100);
ASSERT_OK(blob_db_->PutUntil(WriteOptions(), "foo", "v1", 200));
BlobDBImpl *blob_db_impl =
static_cast_with_check<BlobDBImpl, BlobDB>(blob_db_);
auto blob_files = blob_db_impl->TEST_GetBlobFiles();
ASSERT_EQ(1, blob_files.size());
ASSERT_OK(blob_db_impl->TEST_CloseBlobFile(blob_files[0]));
mock_env_->set_now_micros(300 * 1000000);
mock_env_->set_current_time(300);
SyncPoint::GetInstance()->LoadDependency(
{{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB",
@ -775,7 +764,7 @@ TEST_F(BlobDBTest, ReadWhileGC) {
TEST_F(BlobDBTest, ColumnFamilyNotSupported) {
Options options;
options.env = mock_env_.get();
mock_env_->set_now_micros(0);
mock_env_->set_current_time(0);
Open(BlobDBOptions(), options);
ColumnFamilyHandle *default_handle = blob_db_->DefaultColumnFamily();
ColumnFamilyHandle *handle = nullptr;

Loading…
Cancel
Save