diff --git a/HISTORY.md b/HISTORY.md index bd67f779b..c8d55736b 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -8,6 +8,7 @@ * Support dynamic adjustment of rate limit according to demand for background I/O. It can be enabled by passing `true` to the `auto_tuned` parameter in `NewGenericRateLimiter()`. The value passed as `rate_bytes_per_sec` will still be respected as an upper-bound. * Support dynamically changing `ColumnFamilyOptions::compaction_options_fifo`. * Introduce `EventListener::OnStallConditionsChanged()` callback. Users can implement it to be notified when user writes are stalled, stopped, or resumed. +* Add a new db property "rocksdb.estimate-oldest-key-time" to return oldest data timestamp. The property is available only for FIFO compaction with compaction_options_fifo.allow_compaction = false. ### Bug Fixes * Fix a potential data inconsistency issue during point-in-time recovery. `DB:Open()` will abort if column family inconsistency is found during PIT recovery. diff --git a/db/builder.cc b/db/builder.cc index 0c3cc0919..11b8fc783 100644 --- a/db/builder.cc +++ b/db/builder.cc @@ -47,15 +47,15 @@ TableBuilder* NewTableBuilder( WritableFileWriter* file, const CompressionType compression_type, const CompressionOptions& compression_opts, int level, const std::string* compression_dict, const bool skip_filters, - const uint64_t creation_time) { + const uint64_t creation_time, const uint64_t oldest_key_time) { assert((column_family_id == TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) == column_family_name.empty()); return ioptions.table_factory->NewTableBuilder( - TableBuilderOptions(ioptions, internal_comparator, - int_tbl_prop_collector_factories, compression_type, - compression_opts, compression_dict, skip_filters, - column_family_name, level, creation_time), + TableBuilderOptions( + ioptions, internal_comparator, int_tbl_prop_collector_factories, + compression_type, compression_opts, compression_dict, skip_filters, + column_family_name, level, creation_time, oldest_key_time), column_family_id, file); } @@ -74,8 +74,8 @@ Status BuildTable( const CompressionOptions& compression_opts, bool paranoid_file_checks, InternalStats* internal_stats, TableFileCreationReason reason, EventLogger* event_logger, int job_id, const Env::IOPriority io_priority, - TableProperties* table_properties, int level, - const uint64_t creation_time) { + TableProperties* table_properties, int level, const uint64_t creation_time, + const uint64_t oldest_key_time) { assert((column_family_id == TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) == column_family_name.empty()); @@ -120,12 +120,11 @@ Status BuildTable( file_writer.reset(new WritableFileWriter(std::move(file), env_options, ioptions.statistics)); - builder = NewTableBuilder( ioptions, internal_comparator, int_tbl_prop_collector_factories, column_family_id, column_family_name, file_writer.get(), compression, compression_opts, level, nullptr /* compression_dict */, - false /* skip_filters */, creation_time); + false /* skip_filters */, creation_time, oldest_key_time); } MergeHelper merge(env, internal_comparator.user_comparator(), diff --git a/db/builder.h b/db/builder.h index 17190f8fc..39ec25cbc 100644 --- a/db/builder.h +++ b/db/builder.h @@ -6,6 +6,7 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #pragma once +#include #include #include #include @@ -51,7 +52,8 @@ TableBuilder* NewTableBuilder( WritableFileWriter* file, const CompressionType compression_type, const CompressionOptions& compression_opts, int level, const std::string* compression_dict = nullptr, - const bool skip_filters = false, const uint64_t creation_time = 0); + const bool skip_filters = false, const uint64_t creation_time = 0, + const uint64_t oldest_key_time = std::numeric_limits::max()); // Build a Table file from the contents of *iter. The generated file // will be named according to number specified in meta. On success, the rest of @@ -78,6 +80,7 @@ extern Status BuildTable( EventLogger* event_logger = nullptr, int job_id = 0, const Env::IOPriority io_priority = Env::IO_HIGH, TableProperties* table_properties = nullptr, int level = -1, - const uint64_t creation_time = 0); + const uint64_t creation_time = 0, + const uint64_t oldest_key_time = std::numeric_limits::max()); } // namespace rocksdb diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 119a8c453..2bdb38c9d 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -1446,19 +1446,22 @@ Compaction* FIFOCompactionPicker::PickTTLCompaction( inputs.emplace_back(); inputs[0].level = 0; - for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) { - auto f = *ritr; - if (f->fd.table_reader != nullptr && - f->fd.table_reader->GetTableProperties() != nullptr) { - auto creation_time = - f->fd.table_reader->GetTableProperties()->creation_time; - if (creation_time == 0 || - creation_time >= - (current_time - mutable_cf_options.compaction_options_fifo.ttl)) { - break; + // avoid underflow + if (current_time > mutable_cf_options.compaction_options_fifo.ttl) { + for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) { + auto f = *ritr; + if (f->fd.table_reader != nullptr && + f->fd.table_reader->GetTableProperties() != nullptr) { + auto creation_time = + f->fd.table_reader->GetTableProperties()->creation_time; + if (creation_time == 0 || + creation_time >= (current_time - + mutable_cf_options.compaction_options_fifo.ttl)) { + break; + } + total_size -= f->compensated_file_size; + inputs[0].files.push_back(f); } - total_size -= f->compensated_file_size; - inputs[0].files.push_back(f); } } diff --git a/db/db_properties_test.cc b/db/db_properties_test.cc index b09fe1ffa..0da64b136 100644 --- a/db/db_properties_test.cc +++ b/db/db_properties_test.cc @@ -1309,6 +1309,80 @@ TEST_F(DBPropertiesTest, EstimateNumKeysUnderflow) { ASSERT_EQ(0, num_keys); } +TEST_F(DBPropertiesTest, EstimateOldestKeyTime) { + std::unique_ptr mock_env(new MockTimeEnv(Env::Default())); + uint64_t oldest_key_time = 0; + Options options; + options.env = mock_env.get(); + + // "rocksdb.estimate-oldest-key-time" only available to fifo compaction. + mock_env->set_current_time(100); + for (auto compaction : {kCompactionStyleLevel, kCompactionStyleUniversal, + kCompactionStyleNone}) { + options.compaction_style = compaction; + options.create_if_missing = true; + DestroyAndReopen(options); + ASSERT_OK(Put("foo", "bar")); + ASSERT_FALSE(dbfull()->GetIntProperty( + DB::Properties::kEstimateOldestKeyTime, &oldest_key_time)); + } + + options.compaction_style = kCompactionStyleFIFO; + options.compaction_options_fifo.ttl = 300; + options.compaction_options_fifo.allow_compaction = false; + DestroyAndReopen(options); + + mock_env->set_current_time(100); + ASSERT_OK(Put("k1", "v1")); + ASSERT_TRUE(dbfull()->GetIntProperty(DB::Properties::kEstimateOldestKeyTime, + &oldest_key_time)); + ASSERT_EQ(100, oldest_key_time); + ASSERT_OK(Flush()); + ASSERT_EQ("1", FilesPerLevel()); + ASSERT_TRUE(dbfull()->GetIntProperty(DB::Properties::kEstimateOldestKeyTime, + &oldest_key_time)); + ASSERT_EQ(100, oldest_key_time); + + mock_env->set_current_time(200); + ASSERT_OK(Put("k2", "v2")); + ASSERT_OK(Flush()); + ASSERT_EQ("2", FilesPerLevel()); + ASSERT_TRUE(dbfull()->GetIntProperty(DB::Properties::kEstimateOldestKeyTime, + &oldest_key_time)); + ASSERT_EQ(100, oldest_key_time); + + mock_env->set_current_time(300); + ASSERT_OK(Put("k3", "v3")); + ASSERT_OK(Flush()); + ASSERT_EQ("3", FilesPerLevel()); + ASSERT_TRUE(dbfull()->GetIntProperty(DB::Properties::kEstimateOldestKeyTime, + &oldest_key_time)); + ASSERT_EQ(100, oldest_key_time); + + mock_env->set_current_time(450); + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ("2", FilesPerLevel()); + ASSERT_TRUE(dbfull()->GetIntProperty(DB::Properties::kEstimateOldestKeyTime, + &oldest_key_time)); + ASSERT_EQ(200, oldest_key_time); + + mock_env->set_current_time(550); + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ("1", FilesPerLevel()); + ASSERT_TRUE(dbfull()->GetIntProperty(DB::Properties::kEstimateOldestKeyTime, + &oldest_key_time)); + ASSERT_EQ(300, oldest_key_time); + + mock_env->set_current_time(650); + ASSERT_OK(dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr)); + ASSERT_EQ("", FilesPerLevel()); + ASSERT_FALSE(dbfull()->GetIntProperty(DB::Properties::kEstimateOldestKeyTime, + &oldest_key_time)); + + // Close before mock_env destructs. + Close(); +} + #endif // ROCKSDB_LITE } // namespace rocksdb diff --git a/db/db_sst_test.cc b/db/db_sst_test.cc index 73c6fe801..56e00df83 100644 --- a/db/db_sst_test.cc +++ b/db/db_sst_test.cc @@ -701,9 +701,13 @@ TEST_F(DBSSTTest, GetTotalSstFilesSize) { ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size", &total_sst_files_size)); // Live SST files = 1 (compacted file) - // Total SST files = 6 (5 original files + compacted file) - ASSERT_EQ(live_sst_files_size, 1 * single_file_size); - ASSERT_EQ(total_sst_files_size, 6 * single_file_size); + // The 5 bytes difference comes from oldest-key-time table property isn't + // propagated on compaction. It is written with default value + // std::numeric_limits::max as varint64. + ASSERT_EQ(live_sst_files_size, 1 * single_file_size + 5); + + // Total SST files = 5 original files + compacted file + ASSERT_EQ(total_sst_files_size, 5 * single_file_size + live_sst_files_size); // hold current version std::unique_ptr iter2(dbfull()->NewIterator(ReadOptions())); @@ -724,14 +728,14 @@ TEST_F(DBSSTTest, GetTotalSstFilesSize) { &total_sst_files_size)); // Live SST files = 0 // Total SST files = 6 (5 original files + compacted file) - ASSERT_EQ(total_sst_files_size, 6 * single_file_size); + ASSERT_EQ(total_sst_files_size, 5 * single_file_size + live_sst_files_size); iter1.reset(); ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size", &total_sst_files_size)); // Live SST files = 0 // Total SST files = 1 (compacted file) - ASSERT_EQ(total_sst_files_size, 1 * single_file_size); + ASSERT_EQ(total_sst_files_size, live_sst_files_size); iter2.reset(); ASSERT_TRUE(dbfull()->GetIntProperty("rocksdb.total-sst-files-size", diff --git a/db/db_test_util.h b/db/db_test_util.h index cd1265e21..f2caa46ca 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -572,6 +572,37 @@ class SpecialEnv : public EnvWrapper { std::atomic is_wal_sync_thread_safe_{true}; }; +class MockTimeEnv : public EnvWrapper { + public: + explicit MockTimeEnv(Env* base) : EnvWrapper(base) {} + + virtual Status GetCurrentTime(int64_t* time) override { + assert(time != nullptr); + assert(current_time_ <= + static_cast(std::numeric_limits::max())); + *time = static_cast(current_time_); + return Status::OK(); + } + + virtual uint64_t NowMicros() override { + assert(current_time_ <= std::numeric_limits::max() / 1000000); + return current_time_ * 1000000; + } + + virtual uint64_t NowNanos() override { + assert(current_time_ <= std::numeric_limits::max() / 1000000000); + return current_time_ * 1000000000; + } + + void set_current_time(uint64_t time) { + assert(time >= current_time_); + current_time_ = time; + } + + private: + uint64_t current_time_ = 0; +}; + #ifndef ROCKSDB_LITE class OnFileDeletionListener : public EventListener { public: diff --git a/db/flush_job.cc b/db/flush_job.cc index ad844be64..9c18ee5ff 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -16,6 +16,7 @@ #include #include +#include #include #include "db/builder.h" @@ -299,6 +300,9 @@ Status FlushJob::WriteLevel0Table() { db_options_.env->GetCurrentTime(&_current_time); // ignore error const uint64_t current_time = static_cast(_current_time); + uint64_t oldest_key_time = + mems_.front()->ApproximateOldestKeyTime(); + s = BuildTable( dbname_, db_options_.env, *cfd_->ioptions(), mutable_cf_options_, env_options_, cfd_->table_cache(), iter.get(), @@ -309,7 +313,8 @@ Status FlushJob::WriteLevel0Table() { output_compression_, cfd_->ioptions()->compression_opts, mutable_cf_options_.paranoid_file_checks, cfd_->internal_stats(), TableFileCreationReason::kFlush, event_logger_, job_context_->job_id, - Env::IO_HIGH, &table_properties_, 0 /* level */, current_time); + Env::IO_HIGH, &table_properties_, 0 /* level */, current_time, + oldest_key_time); LogFlush(db_options_.info_log); } ROCKS_LOG_INFO(db_options_.info_log, diff --git a/db/internal_stats.cc b/db/internal_stats.cc index a56c05001..6196b55df 100644 --- a/db/internal_stats.cc +++ b/db/internal_stats.cc @@ -13,8 +13,9 @@ #endif #include -#include #include +#include +#include #include #include #include "db/column_family.h" @@ -243,6 +244,7 @@ static const std::string num_running_flushes = "num-running-flushes"; static const std::string actual_delayed_write_rate = "actual-delayed-write-rate"; static const std::string is_write_stopped = "is-write-stopped"; +static const std::string estimate_oldest_key_time = "estimate-oldest-key-time"; const std::string DB::Properties::kNumFilesAtLevelPrefix = rocksdb_prefix + num_files_at_level_prefix; @@ -316,6 +318,8 @@ const std::string DB::Properties::kActualDelayedWriteRate = rocksdb_prefix + actual_delayed_write_rate; const std::string DB::Properties::kIsWriteStopped = rocksdb_prefix + is_write_stopped; +const std::string DB::Properties::kEstimateOldestKeyTime = + rocksdb_prefix + estimate_oldest_key_time; const std::unordered_map InternalStats::ppt_name_to_info = { @@ -414,6 +418,9 @@ const std::unordered_map nullptr}}, {DB::Properties::kIsWriteStopped, {false, nullptr, &InternalStats::HandleIsWriteStopped, nullptr}}, + {DB::Properties::kEstimateOldestKeyTime, + {false, nullptr, &InternalStats::HandleEstimateOldestKeyTime, + nullptr}}, }; const DBPropertyInfo* GetPropertyInfo(const Slice& property) { @@ -776,6 +783,31 @@ bool InternalStats::HandleIsWriteStopped(uint64_t* value, DBImpl* db, return true; } +bool InternalStats::HandleEstimateOldestKeyTime(uint64_t* value, DBImpl* /*db*/, + Version* /*version*/) { + // TODO(yiwu): The property is currently available for fifo compaction + // with allow_compaction = false. This is because we don't propagate + // oldest_key_time on compaction. + if (cfd_->ioptions()->compaction_style != kCompactionStyleFIFO || + cfd_->GetCurrentMutableCFOptions() + ->compaction_options_fifo.allow_compaction) { + return false; + } + + TablePropertiesCollection collection; + auto s = cfd_->current()->GetPropertiesOfAllTables(&collection); + if (!s.ok()) { + return false; + } + *value = std::numeric_limits::max(); + for (auto& p : collection) { + *value = std::min(*value, p.second->oldest_key_time); + } + *value = std::min({cfd_->mem()->ApproximateOldestKeyTime(), + cfd_->imm()->ApproximateOldestKeyTime(), *value}); + return *value < std::numeric_limits::max(); +} + void InternalStats::DumpDBStats(std::string* value) { char buf[1000]; // DB-level stats, only available from default column family diff --git a/db/internal_stats.h b/db/internal_stats.h index 342e7d7d0..dea9c0987 100644 --- a/db/internal_stats.h +++ b/db/internal_stats.h @@ -477,6 +477,8 @@ class InternalStats { bool HandleActualDelayedWriteRate(uint64_t* value, DBImpl* db, Version* version); bool HandleIsWriteStopped(uint64_t* value, DBImpl* db, Version* version); + bool HandleEstimateOldestKeyTime(uint64_t* value, DBImpl* db, + Version* version); // Total number of background errors encountered. Every time a flush task // or compaction task fails, this counter is incremented. The failure can diff --git a/db/memtable.cc b/db/memtable.cc index ec6cd81be..84e9028e7 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -9,8 +9,9 @@ #include "db/memtable.h" -#include #include +#include +#include #include "db/dbformat.h" #include "db/merge_context.h" @@ -97,7 +98,8 @@ MemTable::MemTable(const InternalKeyComparator& cmp, flush_state_(FLUSH_NOT_REQUESTED), env_(ioptions.env), insert_with_hint_prefix_extractor_( - ioptions.memtable_insert_with_hint_prefix_extractor) { + ioptions.memtable_insert_with_hint_prefix_extractor), + oldest_key_time_(std::numeric_limits::max()) { UpdateFlushState(); // something went wrong if we need to flush before inserting anything assert(!ShouldScheduleFlush()); @@ -203,6 +205,21 @@ void MemTable::UpdateFlushState() { } } +void MemTable::UpdateOldestKeyTime() { + uint64_t oldest_key_time = oldest_key_time_.load(std::memory_order_relaxed); + if (oldest_key_time == std::numeric_limits::max()) { + int64_t current_time = 0; + auto s = env_->GetCurrentTime(¤t_time); + if (s.ok()) { + assert(current_time >= 0); + // If fail, the timestamp is already set. + oldest_key_time_.compare_exchange_strong( + oldest_key_time, static_cast(current_time), + std::memory_order_relaxed, std::memory_order_relaxed); + } + } +} + int MemTable::KeyComparator::operator()(const char* prefix_len_key1, const char* prefix_len_key2) const { // Internal keys are encoded as length-prefixed strings. @@ -517,6 +534,7 @@ void MemTable::Add(SequenceNumber s, ValueType type, if (is_range_del_table_empty_ && type == kTypeRangeDeletion) { is_range_del_table_empty_ = false; } + UpdateOldestKeyTime(); } // Callback from MemTable::Get() diff --git a/db/memtable.h b/db/memtable.h index fb01b5a82..e1fe59c4d 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -352,6 +352,10 @@ class MemTable { const MemTableOptions* GetMemTableOptions() const { return &moptions_; } + uint64_t ApproximateOldestKeyTime() const { + return oldest_key_time_.load(std::memory_order_relaxed); + } + private: enum FlushStateEnum { FLUSH_NOT_REQUESTED, FLUSH_REQUESTED, FLUSH_SCHEDULED }; @@ -415,12 +419,17 @@ class MemTable { // Insert hints for each prefix. std::unordered_map insert_hints_; + // Timestamp of oldest key + std::atomic oldest_key_time_; + // Returns a heuristic flush decision bool ShouldFlushNow() const; // Updates flush_state_ using ShouldFlushNow() void UpdateFlushState(); + void UpdateOldestKeyTime(); + // No copying allowed MemTable(const MemTable&); MemTable& operator=(const MemTable&); diff --git a/db/memtable_list.cc b/db/memtable_list.cc index d8eb1fefc..a09a118b9 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -10,6 +10,7 @@ #endif #include +#include #include #include "db/memtable.h" #include "db/version_set.h" @@ -448,6 +449,13 @@ size_t MemTableList::ApproximateUnflushedMemTablesMemoryUsage() { size_t MemTableList::ApproximateMemoryUsage() { return current_memory_usage_; } +uint64_t MemTableList::ApproximateOldestKeyTime() const { + if (!current_->memlist_.empty()) { + return current_->memlist_.back()->ApproximateOldestKeyTime(); + } + return std::numeric_limits::max(); +} + void MemTableList::InstallNewVersion() { if (current_->refs_ == 1) { // we're the only one using the version, just keep using it diff --git a/db/memtable_list.h b/db/memtable_list.h index ca0fd9d77..1e46642a9 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -224,6 +224,9 @@ class MemTableList { // the unflushed mem-tables. size_t ApproximateUnflushedMemTablesMemoryUsage(); + // Returns an estimate of the timestamp of the earliest key. + uint64_t ApproximateOldestKeyTime() const; + // Request a flush of all existing memtables to storage. This will // cause future calls to IsFlushPending() to return true if this list is // non-empty (regardless of the min_write_buffer_number_to_merge diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 32392c2f7..e91815877 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -582,6 +582,12 @@ class DB { // "rocksdb.is-write-stopped" - Return 1 if write has been stopped. static const std::string kIsWriteStopped; + + // "rocksdb.estimate-oldest-key-time" - returns an estimation of + // oldest key timestamp in the DB. Currently only available for + // FIFO compaction with + // compaction_options_fifo.allow_compaction = false. + static const std::string kEstimateOldestKeyTime; }; #endif /* ROCKSDB_LITE */ @@ -632,6 +638,7 @@ class DB { // "rocksdb.num-running-flushes" // "rocksdb.actual-delayed-write-rate" // "rocksdb.is-write-stopped" + // "rocksdb.estimate-oldest-key-time" virtual bool GetIntProperty(ColumnFamilyHandle* column_family, const Slice& property, uint64_t* value) = 0; virtual bool GetIntProperty(const Slice& property, uint64_t* value) { diff --git a/include/rocksdb/table_properties.h b/include/rocksdb/table_properties.h index 08360d179..e8bbabc3b 100644 --- a/include/rocksdb/table_properties.h +++ b/include/rocksdb/table_properties.h @@ -4,8 +4,9 @@ #pragma once #include -#include +#include #include +#include #include "rocksdb/status.h" #include "rocksdb/types.h" @@ -49,6 +50,7 @@ struct TablePropertiesNames { static const std::string kPropertyCollectors; static const std::string kCompression; static const std::string kCreationTime; + static const std::string kOldestKeyTime; }; extern const std::string kPropertiesBlock; @@ -162,6 +164,8 @@ struct TableProperties { // The time when the SST file was created. // Since SST files are immutable, this is equivalent to last modified time. uint64_t creation_time = 0; + // Timestamp of the earliest key + uint64_t oldest_key_time = std::numeric_limits::max(); // Name of the column family with which this SST file is associated. // If column family is unknown, `column_family_name` will be an empty string. diff --git a/table/block_based_table_builder.cc b/table/block_based_table_builder.cc index d356cb5c0..34b68c0e1 100644 --- a/table/block_based_table_builder.cc +++ b/table/block_based_table_builder.cc @@ -276,6 +276,7 @@ struct BlockBasedTableBuilder::Rep { uint32_t column_family_id; const std::string& column_family_name; uint64_t creation_time = 0; + uint64_t oldest_key_time = std::numeric_limits::max(); std::vector> table_properties_collectors; @@ -288,7 +289,8 @@ struct BlockBasedTableBuilder::Rep { const CompressionType _compression_type, const CompressionOptions& _compression_opts, const std::string* _compression_dict, const bool skip_filters, - const std::string& _column_family_name, const uint64_t _creation_time) + const std::string& _column_family_name, const uint64_t _creation_time, + const uint64_t _oldest_key_time) : ioptions(_ioptions), table_options(table_opt), internal_comparator(icomparator), @@ -305,7 +307,8 @@ struct BlockBasedTableBuilder::Rep { table_options, data_block)), column_family_id(_column_family_id), column_family_name(_column_family_name), - creation_time(_creation_time) { + creation_time(_creation_time), + oldest_key_time(_oldest_key_time) { if (table_options.index_type == BlockBasedTableOptions::kTwoLevelIndexSearch) { p_index_builder_ = PartitionedIndexBuilder::CreateIndexBuilder( @@ -344,7 +347,8 @@ BlockBasedTableBuilder::BlockBasedTableBuilder( const CompressionType compression_type, const CompressionOptions& compression_opts, const std::string* compression_dict, const bool skip_filters, - const std::string& column_family_name, const uint64_t creation_time) { + const std::string& column_family_name, const uint64_t creation_time, + const uint64_t oldest_key_time) { BlockBasedTableOptions sanitized_table_options(table_options); if (sanitized_table_options.format_version == 0 && sanitized_table_options.checksum != kCRC32c) { @@ -360,7 +364,8 @@ BlockBasedTableBuilder::BlockBasedTableBuilder( rep_ = new Rep(ioptions, sanitized_table_options, internal_comparator, int_tbl_prop_collector_factories, column_family_id, file, compression_type, compression_opts, compression_dict, - skip_filters, column_family_name, creation_time); + skip_filters, column_family_name, creation_time, + oldest_key_time); if (rep_->filter_builder != nullptr) { rep_->filter_builder->StartBlock(0); @@ -738,6 +743,7 @@ Status BlockBasedTableBuilder::Finish() { r->p_index_builder_->EstimateTopLevelIndexSize(r->offset); } r->props.creation_time = r->creation_time; + r->props.oldest_key_time = r->oldest_key_time; // Add basic properties property_block_builder.AddTableProperty(r->props); diff --git a/table/block_based_table_builder.h b/table/block_based_table_builder.h index 2e8606271..c9197ef5c 100644 --- a/table/block_based_table_builder.h +++ b/table/block_based_table_builder.h @@ -47,7 +47,8 @@ class BlockBasedTableBuilder : public TableBuilder { const CompressionType compression_type, const CompressionOptions& compression_opts, const std::string* compression_dict, const bool skip_filters, - const std::string& column_family_name, const uint64_t creation_time = 0); + const std::string& column_family_name, const uint64_t creation_time = 0, + const uint64_t oldest_key_time = std::numeric_limits::max()); // REQUIRES: Either Finish() or Abandon() has been called. ~BlockBasedTableBuilder(); diff --git a/table/block_based_table_factory.cc b/table/block_based_table_factory.cc index 309886c0d..7ee7f9fd5 100644 --- a/table/block_based_table_factory.cc +++ b/table/block_based_table_factory.cc @@ -85,7 +85,8 @@ TableBuilder* BlockBasedTableFactory::NewTableBuilder( table_builder_options.compression_dict, table_builder_options.skip_filters, table_builder_options.column_family_name, - table_builder_options.creation_time); + table_builder_options.creation_time, + table_builder_options.oldest_key_time); return table_builder; } diff --git a/table/meta_blocks.cc b/table/meta_blocks.cc index f8ea933f1..3089f463e 100644 --- a/table/meta_blocks.cc +++ b/table/meta_blocks.cc @@ -79,6 +79,7 @@ void PropertyBlockBuilder::AddTableProperty(const TableProperties& props) { Add(TablePropertiesNames::kFixedKeyLen, props.fixed_key_len); Add(TablePropertiesNames::kColumnFamilyId, props.column_family_id); Add(TablePropertiesNames::kCreationTime, props.creation_time); + Add(TablePropertiesNames::kOldestKeyTime, props.oldest_key_time); if (!props.filter_policy_name.empty()) { Add(TablePropertiesNames::kFilterPolicy, props.filter_policy_name); @@ -213,6 +214,8 @@ Status ReadProperties(const Slice& handle_value, RandomAccessFileReader* file, &new_table_properties->column_family_id}, {TablePropertiesNames::kCreationTime, &new_table_properties->creation_time}, + {TablePropertiesNames::kOldestKeyTime, + &new_table_properties->oldest_key_time}, }; std::string last_key; diff --git a/table/table_builder.h b/table/table_builder.h index ef2e608ed..d0ca0678e 100644 --- a/table/table_builder.h +++ b/table/table_builder.h @@ -10,6 +10,7 @@ #pragma once #include +#include #include #include #include @@ -55,7 +56,8 @@ struct TableBuilderOptions { const CompressionOptions& _compression_opts, const std::string* _compression_dict, bool _skip_filters, const std::string& _column_family_name, int _level, - const uint64_t _creation_time = 0) + const uint64_t _creation_time = 0, + const int64_t _oldest_key_time = std::numeric_limits::max()) : ioptions(_ioptions), internal_comparator(_internal_comparator), int_tbl_prop_collector_factories(_int_tbl_prop_collector_factories), @@ -65,7 +67,8 @@ struct TableBuilderOptions { skip_filters(_skip_filters), column_family_name(_column_family_name), level(_level), - creation_time(_creation_time) {} + creation_time(_creation_time), + oldest_key_time(_oldest_key_time) {} const ImmutableCFOptions& ioptions; const InternalKeyComparator& internal_comparator; const std::vector>* @@ -78,6 +81,7 @@ struct TableBuilderOptions { const std::string& column_family_name; int level; // what level this table/file is on, -1 for "not set, don't know" const uint64_t creation_time; + const int64_t oldest_key_time; }; // TableBuilder provides the interface used to build a Table diff --git a/table/table_properties.cc b/table/table_properties.cc index ef77ae566..24453f6f9 100644 --- a/table/table_properties.cc +++ b/table/table_properties.cc @@ -139,6 +139,9 @@ std::string TableProperties::ToString( AppendProperty(result, "creation time", creation_time, prop_delim, kv_delim); + AppendProperty(result, "time stamp of earliest key", oldest_key_time, + prop_delim, kv_delim); + return result; } @@ -191,6 +194,8 @@ const std::string TablePropertiesNames::kPropertyCollectors = "rocksdb.property.collectors"; const std::string TablePropertiesNames::kCompression = "rocksdb.compression"; const std::string TablePropertiesNames::kCreationTime = "rocksdb.creation.time"; +const std::string TablePropertiesNames::kOldestKeyTime = + "rocksdb.oldest.key.time"; extern const std::string kPropertiesBlock = "rocksdb.properties"; // Old property block name for backward compatibility diff --git a/utilities/blob_db/blob_db_test.cc b/utilities/blob_db/blob_db_test.cc index 535b86a6e..b4907eef1 100644 --- a/utilities/blob_db/blob_db_test.cc +++ b/utilities/blob_db/blob_db_test.cc @@ -26,21 +26,9 @@ class BlobDBTest : public testing::Test { public: const int kMaxBlobSize = 1 << 14; - class MockEnv : public EnvWrapper { - public: - MockEnv() : EnvWrapper(Env::Default()) {} - - void set_now_micros(uint64_t now_micros) { now_micros_ = now_micros; } - - uint64_t NowMicros() override { return now_micros_; } - - private: - uint64_t now_micros_ = 0; - }; - BlobDBTest() : dbname_(test::TmpDir() + "/blob_db_test"), - mock_env_(new MockEnv()), + mock_env_(new MockTimeEnv(Env::Default())), blob_db_(nullptr) { Status s = DestroyBlobDB(dbname_, Options(), BlobDBOptions()); assert(s.ok()); @@ -155,7 +143,7 @@ class BlobDBTest : public testing::Test { } const std::string dbname_; - std::unique_ptr mock_env_; + std::unique_ptr mock_env_; std::shared_ptr ttl_extractor_; BlobDB *blob_db_; }; // class BlobDBTest @@ -182,13 +170,13 @@ TEST_F(BlobDBTest, PutWithTTL) { bdb_options.disable_background_tasks = true; Open(bdb_options, options); std::map data; - mock_env_->set_now_micros(50 * 1000000); + mock_env_->set_current_time(50); for (size_t i = 0; i < 100; i++) { uint64_t ttl = rnd.Next() % 100; PutRandomWithTTL("key" + ToString(i), ttl, &rnd, (ttl <= 50 ? nullptr : &data)); } - mock_env_->set_now_micros(100 * 1000000); + mock_env_->set_current_time(100); auto *bdb_impl = static_cast(blob_db_); auto blob_files = bdb_impl->TEST_GetBlobFiles(); ASSERT_EQ(1, blob_files.size()); @@ -211,13 +199,13 @@ TEST_F(BlobDBTest, PutUntil) { bdb_options.disable_background_tasks = true; Open(bdb_options, options); std::map data; - mock_env_->set_now_micros(50 * 1000000); + mock_env_->set_current_time(50); for (size_t i = 0; i < 100; i++) { uint64_t expiration = rnd.Next() % 100 + 50; PutRandomUntil("key" + ToString(i), expiration, &rnd, (expiration <= 100 ? nullptr : &data)); } - mock_env_->set_now_micros(100 * 1000000); + mock_env_->set_current_time(100); auto *bdb_impl = static_cast(blob_db_); auto blob_files = bdb_impl->TEST_GetBlobFiles(); ASSERT_EQ(1, blob_files.size()); @@ -244,12 +232,13 @@ TEST_F(BlobDBTest, TTLExtrator_NoTTL) { bdb_options.disable_background_tasks = true; Open(bdb_options, options); std::map data; - mock_env_->set_now_micros(0); + mock_env_->set_current_time(0); for (size_t i = 0; i < 100; i++) { PutRandom("key" + ToString(i), &rnd, &data); } // very far in the future.. - mock_env_->set_now_micros(std::numeric_limits::max() - 10); + mock_env_->set_current_time(std::numeric_limits::max() / 1000000 - + 10); auto *bdb_impl = static_cast(blob_db_); auto blob_files = bdb_impl->TEST_GetBlobFiles(); ASSERT_EQ(1, blob_files.size()); @@ -290,11 +279,11 @@ TEST_F(BlobDBTest, TTLExtractor_ExtractTTL) { bdb_options.ttl_extractor = ttl_extractor_; bdb_options.disable_background_tasks = true; Open(bdb_options, options); - mock_env_->set_now_micros(50 * 1000000); + mock_env_->set_current_time(50); for (size_t i = 0; i < 100; i++) { PutRandom("key" + ToString(i), &rnd); } - mock_env_->set_now_micros(100 * 1000000); + mock_env_->set_current_time(100); auto *bdb_impl = static_cast(blob_db_); auto blob_files = bdb_impl->TEST_GetBlobFiles(); ASSERT_EQ(1, blob_files.size()); @@ -337,11 +326,11 @@ TEST_F(BlobDBTest, TTLExtractor_ExtractExpiration) { bdb_options.ttl_extractor = ttl_extractor_; bdb_options.disable_background_tasks = true; Open(bdb_options, options); - mock_env_->set_now_micros(50 * 1000000); + mock_env_->set_current_time(50); for (size_t i = 0; i < 100; i++) { PutRandom("key" + ToString(i), &rnd); } - mock_env_->set_now_micros(100 * 1000000); + mock_env_->set_current_time(100); auto *bdb_impl = static_cast(blob_db_); auto blob_files = bdb_impl->TEST_GetBlobFiles(); ASSERT_EQ(1, blob_files.size()); @@ -385,7 +374,7 @@ TEST_F(BlobDBTest, TTLExtractor_ChangeValue) { bdb_options.disable_background_tasks = true; Open(bdb_options, options); std::map data; - mock_env_->set_now_micros(50 * 1000000); + mock_env_->set_current_time(50); for (size_t i = 0; i < 100; i++) { int len = rnd.Next() % kMaxBlobSize + 1; std::string key = "key" + ToString(i); @@ -398,7 +387,7 @@ TEST_F(BlobDBTest, TTLExtractor_ChangeValue) { data[key] = value; } } - mock_env_->set_now_micros(100 * 1000000); + mock_env_->set_current_time(100); auto *bdb_impl = static_cast(blob_db_); auto blob_files = bdb_impl->TEST_GetBlobFiles(); ASSERT_EQ(1, blob_files.size()); @@ -628,14 +617,14 @@ TEST_F(BlobDBTest, GCExpiredKeyWhileOverwriting) { BlobDBOptions bdb_options; bdb_options.disable_background_tasks = true; Open(bdb_options, options); - mock_env_->set_now_micros(100 * 1000000); + mock_env_->set_current_time(100); ASSERT_OK(blob_db_->PutUntil(WriteOptions(), "foo", "v1", 200)); BlobDBImpl *blob_db_impl = static_cast_with_check(blob_db_); auto blob_files = blob_db_impl->TEST_GetBlobFiles(); ASSERT_EQ(1, blob_files.size()); ASSERT_OK(blob_db_impl->TEST_CloseBlobFile(blob_files[0])); - mock_env_->set_now_micros(300 * 1000000); + mock_env_->set_current_time(300); SyncPoint::GetInstance()->LoadDependency( {{"BlobDBImpl::GCFileAndUpdateLSM:AfterGetFromBaseDB", @@ -775,7 +764,7 @@ TEST_F(BlobDBTest, ReadWhileGC) { TEST_F(BlobDBTest, ColumnFamilyNotSupported) { Options options; options.env = mock_env_.get(); - mock_env_->set_now_micros(0); + mock_env_->set_current_time(0); Open(BlobDBOptions(), options); ColumnFamilyHandle *default_handle = blob_db_->DefaultColumnFamily(); ColumnFamilyHandle *handle = nullptr;