From 66a2c44ef4b6ecaacc1d9d505cde56b8f5da565e Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Mon, 23 Oct 2017 15:22:05 -0700 Subject: [PATCH] Add DB::Properties::kEstimateOldestKeyTime Summary: With FIFO compaction we would like to get the oldest data time for monitoring. The problem is we don't have timestamp for each key in the DB. As an approximation, we expose the earliest of sst file "creation_time" property. My plan is to override the property with a more accurate value with blob db, where we actually have timestamp. Closes https://github.com/facebook/rocksdb/pull/2842 Differential Revision: D5770600 Pulled By: yiwu-arbug fbshipit-source-id: 03833c8f10bbfbee62f8ea5c0d03c0cafb5d853a --- HISTORY.md | 1 + db/builder.cc | 17 ++++--- db/builder.h | 7 ++- db/compaction_picker.cc | 27 ++++++----- db/db_properties_test.cc | 74 ++++++++++++++++++++++++++++++ db/db_sst_test.cc | 14 ++++-- db/db_test_util.h | 31 +++++++++++++ db/flush_job.cc | 7 ++- db/internal_stats.cc | 34 +++++++++++++- db/internal_stats.h | 2 + db/memtable.cc | 22 ++++++++- db/memtable.h | 9 ++++ db/memtable_list.cc | 8 ++++ db/memtable_list.h | 3 ++ include/rocksdb/db.h | 7 +++ include/rocksdb/table_properties.h | 6 ++- table/block_based_table_builder.cc | 14 ++++-- table/block_based_table_builder.h | 3 +- table/block_based_table_factory.cc | 3 +- table/meta_blocks.cc | 3 ++ table/table_builder.h | 8 +++- table/table_properties.cc | 5 ++ utilities/blob_db/blob_db_test.cc | 47 ++++++++----------- 23 files changed, 282 insertions(+), 70 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index bd67f779b..c8d55736b 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -8,6 +8,7 @@ * Support dynamic adjustment of rate limit according to demand for background I/O. It can be enabled by passing `true` to the `auto_tuned` parameter in `NewGenericRateLimiter()`. The value passed as `rate_bytes_per_sec` will still be respected as an upper-bound. * Support dynamically changing `ColumnFamilyOptions::compaction_options_fifo`. * Introduce `EventListener::OnStallConditionsChanged()` callback. Users can implement it to be notified when user writes are stalled, stopped, or resumed. +* Add a new db property "rocksdb.estimate-oldest-key-time" to return oldest data timestamp. The property is available only for FIFO compaction with compaction_options_fifo.allow_compaction = false. ### Bug Fixes * Fix a potential data inconsistency issue during point-in-time recovery. `DB:Open()` will abort if column family inconsistency is found during PIT recovery. diff --git a/db/builder.cc b/db/builder.cc index 0c3cc0919..11b8fc783 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -47,15 +47,15 @@ TableBuilder* NewTableBuilder( WritableFileWriter* file, const CompressionType compression_type, const CompressionOptions& compression_opts, int level, const std::string* compression_dict, const bool skip_filters, - const uint64_t creation_time) { + const uint64_t creation_time, const uint64_t oldest_key_time) { assert((column_family_id == TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) == column_family_name.empty()); return ioptions.table_factory->NewTableBuilder( - TableBuilderOptions(ioptions, internal_comparator, - int_tbl_prop_collector_factories, compression_type, - compression_opts, compression_dict, skip_filters, - column_family_name, level, creation_time), + TableBuilderOptions( + ioptions, internal_comparator, int_tbl_prop_collector_factories, + compression_type, compression_opts, compression_dict, skip_filters, + column_family_name, level, creation_time, oldest_key_time), column_family_id, file); } @@ -74,8 +74,8 @@ Status BuildTable( const CompressionOptions& compression_opts, bool paranoid_file_checks, InternalStats* internal_stats, TableFileCreationReason reason, EventLogger* event_logger, int job_id, const Env::IOPriority io_priority, - TableProperties* table_properties, int level, - const uint64_t creation_time) { + TableProperties* table_properties, int level, const uint64_t creation_time, + const uint64_t oldest_key_time) { assert((column_family_id == TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) == column_family_name.empty()); @@ -120,12 +120,11 @@ Status BuildTable( file_writer.reset(new WritableFileWriter(std::move(file), env_options, ioptions.statistics)); - builder = NewTableBuilder( ioptions, internal_comparator, int_tbl_prop_collector_factories, column_family_id, column_family_name, file_writer.get(), compression, compression_opts, level, nullptr /* compression_dict */, - false /* skip_filters */, creation_time); + false /* skip_filters */, creation_time, oldest_key_time); } MergeHelper merge(env, internal_comparator.user_comparator(), diff --git a/db/builder.h b/db/builder.h index 17190f8fc..39ec25cbc 100644 --- a/db/builder.h +++ b/db/builder.h @@ -6,6 +6,7 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once +#include #include #include #include @@ -51,7 +52,8 @@ TableBuilder* NewTableBuilder( WritableFileWriter* file, const CompressionType compression_type, const CompressionOptions& compression_opts, int level, const std::string* compression_dict = nullptr, - const bool skip_filters = false, const uint64_t creation_time = 0); + const bool skip_filters = false, const uint64_t creation_time = 0, + const uint64_t oldest_key_time = std::numeric_limits::max()); // Build a Table file from the contents of *iter. The generated file // will be named according to number specified in meta. On success, the rest of @@ -78,6 +80,7 @@ extern Status BuildTable( EventLogger* event_logger = nullptr, int job_id = 0, const Env::IOPriority io_priority = Env::IO_HIGH, TableProperties* table_properties = nullptr, int level = -1, - const uint64_t creation_time = 0); + const uint64_t creation_time = 0, + const uint64_t oldest_key_time = std::numeric_limits::max()); } // namespace rocksdb diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 119a8c453..2bdb38c9d 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -1446,19 +1446,22 @@ Compaction* FIFOCompactionPicker::PickTTLCompaction( inputs.emplace_back(); inputs[0].level = 0; - for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) { - auto f = *ritr; - if (f->fd.table_reader != nullptr && - f->fd.table_reader->GetTableProperties() != nullptr) { - auto creation_time = - f->fd.table_reader->GetTableProperties()->creation_time; - if (creation_time == 0 || - creation_time >= - (current_time - mutable_cf_options.compaction_options_fifo.ttl)) { - break; + // avoid underflow + if (current_time > mutable_cf_options.compaction_options_fifo.ttl) { + for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) { + auto f = *ritr; + if (f->fd.table_reader != nullptr && + f->fd.table_reader->GetTableProperties() != nullptr) { + auto creation_time = + f->fd.table_reader->GetTableProperties()->creation_time; + if (creation_time == 0 || + creation_time >= (current_time - + mutable_cf_options.compaction_options_fifo.ttl)) { + break; + } + total_size -= f->compensated_file_size; + inputs[0].files.push_back(f); } - total_size -= f->compensated_file_size; - inputs[0].files.push_back(f); } } diff --git a/db/db_properties_test.cc b/db/db_properties_test.cc index b09fe1ffa..0da64b136 100644 --- a/db/db_properties_test.cc +++ b/db/db_properties_test.cc @@ -1309,6 +1309,80 @@ TEST_F(DBPropertiesTest, EstimateNumKeysUnderflow) { ASSERT_EQ(0, num_keys); } +TEST_F(DBPropertiesTest, EstimateOldestKeyTime) { + std::unique_ptr mock_env(new MockTimeEnv(Env::Default())); + uint64_t oldest_key_time = 0; + Options options; + options.env = mock_env.get(); + + // "rocksdb.estimate-oldest-key-time" only available to fifo compaction. + mock_env->set_current_time(100); + for (auto compaction : {kCompactionStyleLevel, kCompactionStyleUniversal, + kCompactionStyleNone}) { + options.compaction_style = compaction; + options.create_if_missing = true; + DestroyAndReopen(options); + ASSERT_OK(Put("foo", "bar")); + ASSERT_FALSE(dbfull()->GetIntProperty( + DB::Properties::kEstimateOldestKeyTime, &oldest_key_time)); + } + + options.compaction_style = kCompactionStyleFIFO; + options.compaction_options_fifo.ttl = 300; + options.compaction_options_fifo.allow_compaction = false; + DestroyAndReopen(options); + + mock_env->set_current_time(100); + ASSERT_OK(Put("k1", "v1")); + ASSERT_TRUE(dbfull()->GetIntProperty(DB::Properties::kEstimateOldestKeyTime, + &oldest_key_time)); + ASSERT_EQ(100, oldest_key_time); + ASSERT_OK(Flush()); + ASSERT_EQ("1", FilesPerLevel()); + ASSERT_TRUE(dbfull()->GetIntProperty(DB::Properties::kEstimateOldestKeyTime, + &oldest_key_time)); + ASSERT_EQ(100, oldest_key_time); + + mock_env->set_current_time(200); + ASSERT_OK(Put("k2", "v2")); + ASSERT_OK(Flush()); + ASSERT_EQ("2", FilesPerLevel()); + ASSERT_TRUE(dbfull()->GetIntProperty(DB::Properties::kEstimateOldestKeyTime, + &oldest_key_time)); + ASSERT_EQ(100, oldest_key_time); + + mock_env->set_current_time(300); + ASSERT_OK(Put("k3", "v3")); + ASSERT_OK(Flush()); + ASSERT_EQ("3", FilesPerLevel()); + ASSERT_TRUE(dbfull()->GetIntProperty(DB::Properties::kEstimateOldestKeyTime, + &oldest_key_time)); + ASSERT_EQ(100, oldest_key_time); + + mock_env->set_current_time(450); + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ("2", FilesPerLevel()); + ASSERT_TRUE(dbfull()->GetIntProperty(DB::Properties::kEstimateOldestKeyTime, + &oldest_key_time)); + ASSERT_EQ(200, oldest_key_time); + + mock_env->set_current_time(550); + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ("1", FilesPerLevel()); + ASSERT_TRUE(dbfull()->GetIntProperty(DB::Properties::kEstimateOldestKeyTime, + &oldest_key_time)); + ASSERT_EQ(300, oldest_key_time); + + mock_env->set_current_time(650); + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ("", FilesPerLevel()); + ASSERT_FALSE(dbfull()->GetIntProperty(DB::Properties::kEstimateOldestKeyTime, + &oldest_key_time)); + + // Close before mock_env destructs. + Close(); +} + #endif // ROCKSDB_LITE } // namespace rocksdb diff --git a/db/db_sst_test.cc b/db/db_sst_test.cc index 73c6fe801..56e00df83 100644 --- a/db/db_sst_test.cc +++ b/db/db_sst_test.cc @@ -701,9 +701,13 @@ TEST_F(DBSSTTest, GetTotalSstFilesSize) { ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size", &total_sst_files_size)); // Live SST files = 1 (compacted file) - // Total SST files = 6 (5 original files + compacted file) - ASSERT_EQ(live_sst_files_size, 1 * single_file_size); - ASSERT_EQ(total_sst_files_size, 6 * single_file_size); + // The 5 bytes difference comes from oldest-key-time table property isn't + // propagated on compaction. It is written with default value + // std::numeric_limits::max as varint64. + ASSERT_EQ(live_sst_files_size, 1 * single_file_size + 5); + + // Total SST files = 5 original files + compacted file + ASSERT_EQ(total_sst_files_size, 5 * single_file_size + live_sst_files_size); // hold current version std::unique_ptr iter2(dbfull()->NewIterator(ReadOptions())); @@ -724,14 +728,14 @@ TEST_F(DBSSTTest, GetTotalSstFilesSize) { &total_sst_files_size)); // Live SST files = 0 // Total SST files = 6 (5 original files + compacted file) - ASSERT_EQ(total_sst_files_size, 6 * single_file_size); + ASSERT_EQ(total_sst_files_size, 5 * single_file_size + live_sst_files_size); iter1.reset(); ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size", &total_sst_files_size)); // Live SST files = 0 // Total SST files = 1 (compacted file) - ASSERT_EQ(total_sst_files_size, 1 * single_file_size); + ASSERT_EQ(total_sst_files_size, live_sst_files_size); iter2.reset(); ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size", diff --git a/db/db_test_util.h b/db/db_test_util.h index cd1265e21..f2caa46ca 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -572,6 +572,37 @@ class SpecialEnv : public EnvWrapper { std::atomic is_wal_sync_thread_safe_{true}; }; +class MockTimeEnv : public EnvWrapper { + public: + explicit MockTimeEnv(Env* base) : EnvWrapper(base) {} + + virtual Status GetCurrentTime(int64_t* time) override { + assert(time != nullptr); + assert(current_time_ <= + static_cast(std::numeric_limits::max())); + *time = static_cast(current_time_); + return Status::OK(); + } + + virtual uint64_t NowMicros() override { + assert(current_time_ <= std::numeric_limits::max() / 1000000); + return current_time_ * 1000000; + } + + virtual uint64_t NowNanos() override { + assert(current_time_ <= std::numeric_limits::max() / 1000000000); + return current_time_ * 1000000000; + } + + void set_current_time(uint64_t time) { + assert(time >= current_time_); + current_time_ = time; + } + + private: + uint64_t current_time_ = 0; +}; + #ifndef ROCKSDB_LITE class OnFileDeletionListener : public EventListener { public: diff --git a/db/flush_job.cc b/db/flush_job.cc index ad844be64..9c18ee5ff 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -16,6 +16,7 @@ #include #include +#include #include #include "db/builder.h" @@ -299,6 +300,9 @@ Status FlushJob::WriteLevel0Table() { db_options_.env->GetCurrentTime(&_current_time); // ignore error const uint64_t current_time = static_cast(_current_time); + uint64_t oldest_key_time = + mems_.front()->ApproximateOldestKeyTime(); + s = BuildTable( dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_, env_options_, cfd_->table_cache(), iter.get(), @@ -309,7 +313,8 @@ Status FlushJob::WriteLevel0Table() { output_compression_, cfd_->ioptions()->compression_opts, mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(), TableFileCreationReason::kFlush, event_logger_, job_context_->job_id, - Env::IO_HIGH, &table_properties_, 0 /* level */, current_time); + Env::IO_HIGH, &table_properties_, 0 /* level */, current_time, + oldest_key_time); LogFlush(db_options_.info_log); } ROCKS_LOG_INFO(db_options_.info_log, diff --git a/db/internal_stats.cc b/db/internal_stats.cc index a56c05001..6196b55df 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -13,8 +13,9 @@ #endif #include -#include #include +#include +#include #include #include #include "db/column_family.h" @@ -243,6 +244,7 @@ static const std::string num_running_flushes = "num-running-flushes"; static const std::string actual_delayed_write_rate = "actual-delayed-write-rate"; static const std::string is_write_stopped = "is-write-stopped"; +static const std::string estimate_oldest_key_time = "estimate-oldest-key-time"; const std::string DB::Properties::kNumFilesAtLevelPrefix = rocksdb_prefix + num_files_at_level_prefix; @@ -316,6 +318,8 @@ const std::string DB::Properties::kActualDelayedWriteRate = rocksdb_prefix + actual_delayed_write_rate; const std::string DB::Properties::kIsWriteStopped = rocksdb_prefix + is_write_stopped; +const std::string DB::Properties::kEstimateOldestKeyTime = + rocksdb_prefix + estimate_oldest_key_time; const std::unordered_map InternalStats::ppt_name_to_info = { @@ -414,6 +418,9 @@ const std::unordered_map nullptr}}, {DB::Properties::kIsWriteStopped, {false, nullptr, &InternalStats::HandleIsWriteStopped, nullptr}}, + {DB::Properties::kEstimateOldestKeyTime, + {false, nullptr, &InternalStats::HandleEstimateOldestKeyTime, + nullptr}}, }; const DBPropertyInfo* GetPropertyInfo(const Slice& property) { @@ -776,6 +783,31 @@ bool InternalStats::HandleIsWriteStopped(uint64_t* value, DBImpl* db, return true; } +bool InternalStats::HandleEstimateOldestKeyTime(uint64_t* value, DBImpl* /*db*/, + Version* /*version*/) { + // TODO(yiwu): The property is currently available for fifo compaction + // with allow_compaction = false. This is because we don't propagate + // oldest_key_time on compaction. + if (cfd_->ioptions()->compaction_style != kCompactionStyleFIFO || + cfd_->GetCurrentMutableCFOptions() + ->compaction_options_fifo.allow_compaction) { + return false; + } + + TablePropertiesCollection collection; + auto s = cfd_->current()->GetPropertiesOfAllTables(&collection); + if (!s.ok()) { + return false; + } + *value = std::numeric_limits::max(); + for (auto& p : collection) { + *value = std::min(*value, p.second->oldest_key_time); + } + *value = std::min({cfd_->mem()->ApproximateOldestKeyTime(), + cfd_->imm()->ApproximateOldestKeyTime(), *value}); + return *value < std::numeric_limits::max(); +} + void InternalStats::DumpDBStats(std::string* value) { char buf[1000]; // DB-level stats, only available from default column family diff --git a/db/internal_stats.h b/db/internal_stats.h index 342e7d7d0..dea9c0987 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -477,6 +477,8 @@ class InternalStats { bool HandleActualDelayedWriteRate(uint64_t* value, DBImpl* db, Version* version); bool HandleIsWriteStopped(uint64_t* value, DBImpl* db, Version* version); + bool HandleEstimateOldestKeyTime(uint64_t* value, DBImpl* db, + Version* version); // Total number of background errors encountered. Every time a flush task // or compaction task fails, this counter is incremented. The failure can diff --git a/db/memtable.cc b/db/memtable.cc index ec6cd81be..84e9028e7 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -9,8 +9,9 @@ #include "db/memtable.h" -#include #include +#include +#include #include "db/dbformat.h" #include "db/merge_context.h" @@ -97,7 +98,8 @@ MemTable::MemTable(const InternalKeyComparator& cmp, flush_state_(FLUSH_NOT_REQUESTED), env_(ioptions.env), insert_with_hint_prefix_extractor_( - ioptions.memtable_insert_with_hint_prefix_extractor) { + ioptions.memtable_insert_with_hint_prefix_extractor), + oldest_key_time_(std::numeric_limits::max()) { UpdateFlushState(); // something went wrong if we need to flush before inserting anything assert(!ShouldScheduleFlush()); @@ -203,6 +205,21 @@ void MemTable::UpdateFlushState() { } } +void MemTable::UpdateOldestKeyTime() { + uint64_t oldest_key_time = oldest_key_time_.load(std::memory_order_relaxed); + if (oldest_key_time == std::numeric_limits::max()) { + int64_t current_time = 0; + auto s = env_->GetCurrentTime(¤t_time); + if (s.ok()) { + assert(current_time >= 0); + // If fail, the timestamp is already set. + oldest_key_time_.compare_exchange_strong( + oldest_key_time, static_cast(current_time), + std::memory_order_relaxed, std::memory_order_relaxed); + } + } +} + int MemTable::KeyComparator::operator()(const char* prefix_len_key1, const char* prefix_len_key2) const { // Internal keys are encoded as length-prefixed strings. @@ -517,6 +534,7 @@ void MemTable::Add(SequenceNumber s, ValueType type, if (is_range_del_table_empty_ && type == kTypeRangeDeletion) { is_range_del_table_empty_ = false; } + UpdateOldestKeyTime(); } // Callback from MemTable::Get() diff --git a/db/memtable.h b/db/memtable.h index fb01b5a82..e1fe59c4d 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -352,6 +352,10 @@ class MemTable { const MemTableOptions* GetMemTableOptions() const { return &moptions_; } + uint64_t ApproximateOldestKeyTime() const { + return oldest_key_time_.load(std::memory_order_relaxed); + } + private: enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED }; @@ -415,12 +419,17 @@ class MemTable { // Insert hints for each prefix. std::unordered_map insert_hints_; + // Timestamp of oldest key + std::atomic oldest_key_time_; + // Returns a heuristic flush decision bool ShouldFlushNow() const; // Updates flush_state_ using ShouldFlushNow() void UpdateFlushState(); + void UpdateOldestKeyTime(); + // No copying allowed MemTable(const MemTable&); MemTable& operator=(const MemTable&); diff --git a/db/memtable_list.cc b/db/memtable_list.cc index d8eb1fefc..a09a118b9 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -10,6 +10,7 @@ #endif #include +#include #include #include "db/memtable.h" #include "db/version_set.h" @@ -448,6 +449,13 @@ size_t MemTableList::ApproximateUnflushedMemTablesMemoryUsage() { size_t MemTableList::ApproximateMemoryUsage() { return current_memory_usage_; } +uint64_t MemTableList::ApproximateOldestKeyTime() const { + if (!current_->memlist_.empty()) { + return current_->memlist_.back()->ApproximateOldestKeyTime(); + } + return std::numeric_limits::max(); +} + void MemTableList::InstallNewVersion() { if (current_->refs_ == 1) { // we're the only one using the version, just keep using it diff --git a/db/memtable_list.h b/db/memtable_list.h index ca0fd9d77..1e46642a9 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -224,6 +224,9 @@ class MemTableList { // the unflushed mem-tables. size_t ApproximateUnflushedMemTablesMemoryUsage(); + // Returns an estimate of the timestamp of the earliest key. + uint64_t ApproximateOldestKeyTime() const; + // Request a flush of all existing memtables to storage. This will // cause future calls to IsFlushPending() to return true if this list is // non-empty (regardless of the min_write_buffer_number_to_merge diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 32392c2f7..e91815877 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -582,6 +582,12 @@ class DB { // "rocksdb.is-write-stopped" - Return 1 if write has been stopped. static const std::string kIsWriteStopped; + + // "rocksdb.estimate-oldest-key-time" - returns an estimation of + // oldest key timestamp in the DB. Currently only available for + // FIFO compaction with + // compaction_options_fifo.allow_compaction = false. + static const std::string kEstimateOldestKeyTime; }; #endif /* ROCKSDB_LITE */ @@ -632,6 +638,7 @@ class DB { // "rocksdb.num-running-flushes" // "rocksdb.actual-delayed-write-rate" // "rocksdb.is-write-stopped" + // "rocksdb.estimate-oldest-key-time" virtual bool GetIntProperty(ColumnFamilyHandle* column_family, const Slice& property, uint64_t* value) = 0; virtual bool GetIntProperty(const Slice& property, uint64_t* value) { diff --git a/include/rocksdb/table_properties.h b/include/rocksdb/table_properties.h index 08360d179..e8bbabc3b 100644 --- a/include/rocksdb/table_properties.h +++ b/include/rocksdb/table_properties.h @@ -4,8 +4,9 @@ #pragma once #include -#include +#include #include +#include #include "rocksdb/status.h" #include "rocksdb/types.h" @@ -49,6 +50,7 @@ struct TablePropertiesNames { static const std::string kPropertyCollectors; static const std::string kCompression; static const std::string kCreationTime; + static const std::string kOldestKeyTime; }; extern const std::string kPropertiesBlock; @@ -162,6 +164,8 @@ struct TableProperties { // The time when the SST file was created. // Since SST files are immutable, this is equivalent to last modified time. uint64_t creation_time = 0; + // Timestamp of the earliest key + uint64_t oldest_key_time = std::numeric_limits::max(); // Name of the column family with which this SST file is associated. // If column family is unknown, `column_family_name` will be an empty string. diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index d356cb5c0..34b68c0e1 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -276,6 +276,7 @@ struct BlockBasedTableBuilder::Rep { uint32_t column_family_id; const std::string& column_family_name; uint64_t creation_time = 0; + uint64_t oldest_key_time = std::numeric_limits::max(); std::vector> table_properties_collectors; @@ -288,7 +289,8 @@ struct BlockBasedTableBuilder::Rep { const CompressionType _compression_type, const CompressionOptions& _compression_opts, const std::string* _compression_dict, const bool skip_filters, - const std::string& _column_family_name, const uint64_t _creation_time) + const std::string& _column_family_name, const uint64_t _creation_time, + const uint64_t _oldest_key_time) : ioptions(_ioptions), table_options(table_opt), internal_comparator(icomparator), @@ -305,7 +307,8 @@ struct BlockBasedTableBuilder::Rep { table_options, data_block)), column_family_id(_column_family_id), column_family_name(_column_family_name), - creation_time(_creation_time) { + creation_time(_creation_time), + oldest_key_time(_oldest_key_time) { if (table_options.index_type == BlockBasedTableOptions::kTwoLevelIndexSearch) { p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder( @@ -344,7 +347,8 @@ BlockBasedTableBuilder::BlockBasedTableBuilder( const CompressionType compression_type, const CompressionOptions& compression_opts, const std::string* compression_dict, const bool skip_filters, - const std::string& column_family_name, const uint64_t creation_time) { + const std::string& column_family_name, const uint64_t creation_time, + const uint64_t oldest_key_time) { BlockBasedTableOptions sanitized_table_options(table_options); if (sanitized_table_options.format_version == 0 && sanitized_table_options.checksum != kCRC32c) { @@ -360,7 +364,8 @@ BlockBasedTableBuilder::BlockBasedTableBuilder( rep_ = new Rep(ioptions, sanitized_table_options, internal_comparator, int_tbl_prop_collector_factories, column_family_id, file, compression_type, compression_opts, compression_dict, - skip_filters, column_family_name, creation_time); + skip_filters, column_family_name, creation_time, + oldest_key_time); if (rep_->filter_builder != nullptr) { rep_->filter_builder->StartBlock(0); @@ -738,6 +743,7 @@ Status BlockBasedTableBuilder::Finish() { r->p_index_builder_->EstimateTopLevelIndexSize(r->offset); } r->props.creation_time = r->creation_time; + r->props.oldest_key_time = r->oldest_key_time; // Add basic properties property_block_builder.AddTableProperty(r->props); diff --git a/table/block_based_table_builder.h b/table/block_based_table_builder.h index 2e8606271..c9197ef5c 100644 --- a/table/block_based_table_builder.h +++ b/table/block_based_table_builder.h @@ -47,7 +47,8 @@ class BlockBasedTableBuilder : public TableBuilder { const CompressionType compression_type, const CompressionOptions& compression_opts, const std::string* compression_dict, const bool skip_filters, - const std::string& column_family_name, const uint64_t creation_time = 0); + const std::string& column_family_name, const uint64_t creation_time = 0, + const uint64_t oldest_key_time = std::numeric_limits::max()); // REQUIRES: Either Finish() or Abandon() has been called. ~BlockBasedTableBuilder(); diff --git a/table/block_based_table_factory.cc b/table/block_based_table_factory.cc index 309886c0d..7ee7f9fd5 100644 --- a/table/block_based_table_factory.cc +++ b/table/block_based_table_factory.cc @@ -85,7 +85,8 @@ TableBuilder* BlockBasedTableFactory::NewTableBuilder( table_builder_options.compression_dict, table_builder_options.skip_filters, table_builder_options.column_family_name, - table_builder_options.creation_time); + table_builder_options.creation_time, + table_builder_options.oldest_key_time); return table_builder; } diff --git a/table/meta_blocks.cc b/table/meta_blocks.cc index f8ea933f1..3089f463e 100644 --- a/table/meta_blocks.cc +++ b/table/meta_blocks.cc @@ -79,6 +79,7 @@ void PropertyBlockBuilder::AddTableProperty(const TableProperties& props) { Add(TablePropertiesNames::kFixedKeyLen, props.fixed_key_len); Add(TablePropertiesNames::kColumnFamilyId, props.column_family_id); Add(TablePropertiesNames::kCreationTime, props.creation_time); + Add(TablePropertiesNames::kOldestKeyTime, props.oldest_key_time); if (!props.filter_policy_name.empty()) { Add(TablePropertiesNames::kFilterPolicy, props.filter_policy_name); @@ -213,6 +214,8 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file, &new_table_properties->column_family_id}, {TablePropertiesNames::kCreationTime, &new_table_properties->creation_time}, + {TablePropertiesNames::kOldestKeyTime, + &new_table_properties->oldest_key_time}, }; std::string last_key; diff --git a/table/table_builder.h b/table/table_builder.h index ef2e608ed..d0ca0678e 100644 --- a/table/table_builder.h +++ b/table/table_builder.h @@ -10,6 +10,7 @@ #pragma once #include +#include #include #include #include @@ -55,7 +56,8 @@ struct TableBuilderOptions { const CompressionOptions& _compression_opts, const std::string* _compression_dict, bool _skip_filters, const std::string& _column_family_name, int _level, - const uint64_t _creation_time = 0) + const uint64_t _creation_time = 0, + const int64_t _oldest_key_time = std::numeric_limits::max()) : ioptions(_ioptions), internal_comparator(_internal_comparator), int_tbl_prop_collector_factories(_int_tbl_prop_collector_factories), @@ -65,7 +67,8 @@ struct TableBuilderOptions { skip_filters(_skip_filters), column_family_name(_column_family_name), level(_level), - creation_time(_creation_time) {} + creation_time(_creation_time), + oldest_key_time(_oldest_key_time) {} const ImmutableCFOptions& ioptions; const InternalKeyComparator& internal_comparator; const std::vector>* @@ -78,6 +81,7 @@ struct TableBuilderOptions { const std::string& column_family_name; int level; // what level this table/file is on, -1 for "not set, don't know" const uint64_t creation_time; + const int64_t oldest_key_time; }; // TableBuilder provides the interface used to build a Table diff --git a/table/table_properties.cc b/table/table_properties.cc index ef77ae566..24453f6f9 100644 --- a/table/table_properties.cc +++ b/table/table_properties.cc @@ -139,6 +139,9 @@ std::string TableProperties::ToString( AppendProperty(result, "creation time", creation_time, prop_delim, kv_delim); + AppendProperty(result, "time stamp of earliest key", oldest_key_time, + prop_delim, kv_delim); + return result; } @@ -191,6 +194,8 @@ const std::string TablePropertiesNames::kPropertyCollectors = "rocksdb.property.collectors"; const std::string TablePropertiesNames::kCompression = "rocksdb.compression"; const std::string TablePropertiesNames::kCreationTime = "rocksdb.creation.time"; +const std::string TablePropertiesNames::kOldestKeyTime = + "rocksdb.oldest.key.time"; extern const std::string kPropertiesBlock = "rocksdb.properties"; // Old property block name for backward compatibility diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc index 535b86a6e..b4907eef1 100644 --- a/utilities/blob_db/blob_db_test.cc +++ b/utilities/blob_db/blob_db_test.cc @@ -26,21 +26,9 @@ class BlobDBTest : public testing::Test { public: const int kMaxBlobSize = 1 << 14; - class MockEnv : public EnvWrapper { - public: - MockEnv() : EnvWrapper(Env::Default()) {} - - void set_now_micros(uint64_t now_micros) { now_micros_ = now_micros; } - - uint64_t NowMicros() override { return now_micros_; } - - private: - uint64_t now_micros_ = 0; - }; - BlobDBTest() : dbname_(test::TmpDir() + "/blob_db_test"), - mock_env_(new MockEnv()), + mock_env_(new MockTimeEnv(Env::Default())), blob_db_(nullptr) { Status s = DestroyBlobDB(dbname_, Options(), BlobDBOptions()); assert(s.ok()); @@ -155,7 +143,7 @@ class BlobDBTest : public testing::Test { } const std::string dbname_; - std::unique_ptr mock_env_; + std::unique_ptr mock_env_; std::shared_ptr ttl_extractor_; BlobDB *blob_db_; }; // class BlobDBTest @@ -182,13 +170,13 @@ TEST_F(BlobDBTest, PutWithTTL) { bdb_options.disable_background_tasks = true; Open(bdb_options, options); std::map data; - mock_env_->set_now_micros(50 * 1000000); + mock_env_->set_current_time(50); for (size_t i = 0; i < 100; i++) { uint64_t ttl = rnd.Next() % 100; PutRandomWithTTL("key" + ToString(i), ttl, &rnd, (ttl <= 50 ? nullptr : &data)); } - mock_env_->set_now_micros(100 * 1000000); + mock_env_->set_current_time(100); auto *bdb_impl = static_cast(blob_db_); auto blob_files = bdb_impl->TEST_GetBlobFiles(); ASSERT_EQ(1, blob_files.size()); @@ -211,13 +199,13 @@ TEST_F(BlobDBTest, PutUntil) { bdb_options.disable_background_tasks = true; Open(bdb_options, options); std::map data; - mock_env_->set_now_micros(50 * 1000000); + mock_env_->set_current_time(50); for (size_t i = 0; i < 100; i++) { uint64_t expiration = rnd.Next() % 100 + 50; PutRandomUntil("key" + ToString(i), expiration, &rnd, (expiration <= 100 ? nullptr : &data)); } - mock_env_->set_now_micros(100 * 1000000); + mock_env_->set_current_time(100); auto *bdb_impl = static_cast(blob_db_); auto blob_files = bdb_impl->TEST_GetBlobFiles(); ASSERT_EQ(1, blob_files.size()); @@ -244,12 +232,13 @@ TEST_F(BlobDBTest, TTLExtrator_NoTTL) { bdb_options.disable_background_tasks = true; Open(bdb_options, options); std::map data; - mock_env_->set_now_micros(0); + mock_env_->set_current_time(0); for (size_t i = 0; i < 100; i++) { PutRandom("key" + ToString(i), &rnd, &data); } // very far in the future.. - mock_env_->set_now_micros(std::numeric_limits::max() - 10); + mock_env_->set_current_time(std::numeric_limits::max() / 1000000 - + 10); auto *bdb_impl = static_cast(blob_db_); auto blob_files = bdb_impl->TEST_GetBlobFiles(); ASSERT_EQ(1, blob_files.size()); @@ -290,11 +279,11 @@ TEST_F(BlobDBTest, TTLExtractor_ExtractTTL) { bdb_options.ttl_extractor = ttl_extractor_; bdb_options.disable_background_tasks = true; Open(bdb_options, options); - mock_env_->set_now_micros(50 * 1000000); + mock_env_->set_current_time(50); for (size_t i = 0; i < 100; i++) { PutRandom("key" + ToString(i), &rnd); } - mock_env_->set_now_micros(100 * 1000000); + mock_env_->set_current_time(100); auto *bdb_impl = static_cast(blob_db_); auto blob_files = bdb_impl->TEST_GetBlobFiles(); ASSERT_EQ(1, blob_files.size()); @@ -337,11 +326,11 @@ TEST_F(BlobDBTest, TTLExtractor_ExtractExpiration) { bdb_options.ttl_extractor = ttl_extractor_; bdb_options.disable_background_tasks = true; Open(bdb_options, options); - mock_env_->set_now_micros(50 * 1000000); + mock_env_->set_current_time(50); for (size_t i = 0; i < 100; i++) { PutRandom("key" + ToString(i), &rnd); } - mock_env_->set_now_micros(100 * 1000000); + mock_env_->set_current_time(100); auto *bdb_impl = static_cast(blob_db_); auto blob_files = bdb_impl->TEST_GetBlobFiles(); ASSERT_EQ(1, blob_files.size()); @@ -385,7 +374,7 @@ TEST_F(BlobDBTest, TTLExtractor_ChangeValue) { bdb_options.disable_background_tasks = true; Open(bdb_options, options); std::map data; - mock_env_->set_now_micros(50 * 1000000); + mock_env_->set_current_time(50); for (size_t i = 0; i < 100; i++) { int len = rnd.Next() % kMaxBlobSize + 1; std::string key = "key" + ToString(i); @@ -398,7 +387,7 @@ TEST_F(BlobDBTest, TTLExtractor_ChangeValue) { data[key] = value; } } - mock_env_->set_now_micros(100 * 1000000); + mock_env_->set_current_time(100); auto *bdb_impl = static_cast(blob_db_); auto blob_files = bdb_impl->TEST_GetBlobFiles(); ASSERT_EQ(1, blob_files.size()); @@ -628,14 +617,14 @@ TEST_F(BlobDBTest, GCExpiredKeyWhileOverwriting) { BlobDBOptions bdb_options; bdb_options.disable_background_tasks = true; Open(bdb_options, options); - mock_env_->set_now_micros(100 * 1000000); + mock_env_->set_current_time(100); ASSERT_OK(blob_db_->PutUntil(WriteOptions(), "foo", "v1", 200)); BlobDBImpl *blob_db_impl = static_cast_with_check(blob_db_); auto blob_files = blob_db_impl->TEST_GetBlobFiles(); ASSERT_EQ(1, blob_files.size()); ASSERT_OK(blob_db_impl->TEST_CloseBlobFile(blob_files[0])); - mock_env_->set_now_micros(300 * 1000000); + mock_env_->set_current_time(300); SyncPoint::GetInstance()->LoadDependency( {{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB", @@ -775,7 +764,7 @@ TEST_F(BlobDBTest, ReadWhileGC) { TEST_F(BlobDBTest, ColumnFamilyNotSupported) { Options options; options.env = mock_env_.get(); - mock_env_->set_now_micros(0); + mock_env_->set_current_time(0); Open(BlobDBOptions(), options); ColumnFamilyHandle *default_handle = blob_db_->DefaultColumnFamily(); ColumnFamilyHandle *handle = nullptr;