diff --git a/utilities/ttl/db_ttl_impl.cc b/utilities/ttl/db_ttl_impl.cc index 0006b56a4..f7a697fa6 100644 --- a/utilities/ttl/db_ttl_impl.cc +++ b/utilities/ttl/db_ttl_impl.cc @@ -14,19 +14,20 @@ namespace rocksdb { -void DBWithTTLImpl::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options) { +void DBWithTTLImpl::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options, + Env* env) { if (options->compaction_filter) { options->compaction_filter = - new TtlCompactionFilter(ttl, options->compaction_filter); + new TtlCompactionFilter(ttl, env, options->compaction_filter); } else { options->compaction_filter_factory = std::shared_ptr(new TtlCompactionFilterFactory( - ttl, options->compaction_filter_factory)); + ttl, env, options->compaction_filter_factory)); } if (options->merge_operator) { options->merge_operator.reset( - new TtlMergeOperator(options->merge_operator)); + new TtlMergeOperator(options->merge_operator, env)); } } @@ -81,8 +82,9 @@ Status DBWithTTL::Open( std::vector column_families_sanitized = column_families; for (size_t i = 0; i < column_families_sanitized.size(); ++i) { - DBWithTTLImpl::SanitizeOptions(ttls[i], - &column_families_sanitized[i].options); + DBWithTTLImpl::SanitizeOptions( + ttls[i], &column_families_sanitized[i].options, + db_options.env == nullptr ? Env::Default() : db_options.env); } DB* db; @@ -105,7 +107,7 @@ Status DBWithTTLImpl::CreateColumnFamilyWithTtl( const ColumnFamilyOptions& options, const std::string& column_family_name, ColumnFamilyHandle** handle, int ttl) { ColumnFamilyOptions sanitized_options = options; - DBWithTTLImpl::SanitizeOptions(ttl, &sanitized_options); + DBWithTTLImpl::SanitizeOptions(ttl, &sanitized_options, GetEnv()); return DBWithTTL::CreateColumnFamily(sanitized_options, column_family_name, handle); @@ -117,18 +119,14 @@ Status DBWithTTLImpl::CreateColumnFamily(const ColumnFamilyOptions& options, return CreateColumnFamilyWithTtl(options, column_family_name, handle, 0); } -// Gives back the current time -Status DBWithTTLImpl::GetCurrentTime(int64_t* curtime) { - return Env::Default()->GetCurrentTime(curtime); -} - // Appends the current timestamp to the string. // Returns false if could not get the current_time, true if append succeeds -Status DBWithTTLImpl::AppendTS(const Slice& val, std::string* val_with_ts) { +Status DBWithTTLImpl::AppendTS(const Slice& val, std::string* val_with_ts, + Env* env) { val_with_ts->reserve(kTSLength + val.size()); char ts_string[kTSLength]; int64_t curtime; - Status st = GetCurrentTime(&curtime); + Status st = env->GetCurrentTime(&curtime); if (!st.ok()) { return st; } @@ -154,12 +152,12 @@ Status DBWithTTLImpl::SanityCheckTimestamp(const Slice& str) { } // Checks if the string is stale or not according to TTl provided -bool DBWithTTLImpl::IsStale(const Slice& value, int32_t ttl) { +bool DBWithTTLImpl::IsStale(const Slice& value, int32_t ttl, Env* env) { if (ttl <= 0) { // Data is fresh if TTL is non-positive return false; } int64_t curtime; - if (!GetCurrentTime(&curtime).ok()) { + if (!env->GetCurrentTime(&curtime).ok()) { return false; // Treat the data as fresh if could not get current time } int32_t timestamp_value = @@ -232,12 +230,13 @@ Status DBWithTTLImpl::Merge(const WriteOptions& options, Status DBWithTTLImpl::Write(const WriteOptions& opts, WriteBatch* updates) { class Handler : public WriteBatch::Handler { public: + explicit Handler(Env* env) : env_(env) {} WriteBatch updates_ttl; Status batch_rewrite_status; virtual Status PutCF(uint32_t column_family_id, const Slice& key, const Slice& value) { std::string value_with_ts; - Status st = AppendTS(value, &value_with_ts); + Status st = AppendTS(value, &value_with_ts, env_); if (!st.ok()) { batch_rewrite_status = st; } else { @@ -249,7 +248,7 @@ Status DBWithTTLImpl::Write(const WriteOptions& opts, WriteBatch* updates) { virtual Status MergeCF(uint32_t column_family_id, const Slice& key, const Slice& value) { std::string value_with_ts; - Status st = AppendTS(value, &value_with_ts); + Status st = AppendTS(value, &value_with_ts, env_); if (!st.ok()) { batch_rewrite_status = st; } else { @@ -263,8 +262,11 @@ Status DBWithTTLImpl::Write(const WriteOptions& opts, WriteBatch* updates) { return Status::OK(); } virtual void LogData(const Slice& blob) { updates_ttl.PutLogData(blob); } + + private: + Env* env_; }; - Handler handler; + Handler handler(GetEnv()); updates->Iterate(&handler); if (!handler.batch_rewrite_status.ok()) { return handler.batch_rewrite_status; diff --git a/utilities/ttl/db_ttl_impl.h b/utilities/ttl/db_ttl_impl.h index 9f7b65822..a5c8fc8ca 100644 --- a/utilities/ttl/db_ttl_impl.h +++ b/utilities/ttl/db_ttl_impl.h @@ -21,7 +21,8 @@ namespace rocksdb { class DBWithTTLImpl : public DBWithTTL { public: - static void SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options); + static void SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options, + Env* env); explicit DBWithTTLImpl(DB* db); @@ -72,16 +73,14 @@ class DBWithTTLImpl : public DBWithTTL { virtual DB* GetBaseDB() { return db_; } - static bool IsStale(const Slice& value, int32_t ttl); + static bool IsStale(const Slice& value, int32_t ttl, Env* env); - static Status AppendTS(const Slice& val, std::string* val_with_ts); + static Status AppendTS(const Slice& val, std::string* val_with_ts, Env* env); static Status SanityCheckTimestamp(const Slice& str); static Status StripTS(std::string* str); - static Status GetCurrentTime(int64_t* curtime); - static const uint32_t kTSLength = sizeof(int32_t); // size of timestamp static const int32_t kMinTimestamp = 1368146402; // 05/09/2013:5:40PM GMT-8 @@ -130,13 +129,13 @@ class TtlIterator : public Iterator { }; class TtlCompactionFilter : public CompactionFilter { - public: TtlCompactionFilter( - int32_t ttl, const CompactionFilter* user_comp_filter, + int32_t ttl, Env* env, const CompactionFilter* user_comp_filter, std::unique_ptr user_comp_filter_from_factory = nullptr) : ttl_(ttl), + env_(env), user_comp_filter_(user_comp_filter), user_comp_filter_from_factory_( std::move(user_comp_filter_from_factory)) { @@ -150,7 +149,7 @@ class TtlCompactionFilter : public CompactionFilter { virtual bool Filter(int level, const Slice& key, const Slice& old_val, std::string* new_val, bool* value_changed) const override { - if (DBWithTTLImpl::IsStale(old_val, ttl_)) { + if (DBWithTTLImpl::IsStale(old_val, ttl_, env_)) { return true; } if (user_comp_filter_ == nullptr) { @@ -175,6 +174,7 @@ class TtlCompactionFilter : public CompactionFilter { private: int32_t ttl_; + Env* env_; const CompactionFilter* user_comp_filter_; std::unique_ptr user_comp_filter_from_factory_; }; @@ -182,13 +182,14 @@ class TtlCompactionFilter : public CompactionFilter { class TtlCompactionFilterFactory : public CompactionFilterFactory { public: TtlCompactionFilterFactory( - int32_t ttl, std::shared_ptr comp_filter_factory) - : ttl_(ttl), user_comp_filter_factory_(comp_filter_factory) {} + int32_t ttl, Env* env, + std::shared_ptr comp_filter_factory) + : ttl_(ttl), env_(env), user_comp_filter_factory_(comp_filter_factory) {} virtual std::unique_ptr CreateCompactionFilter( const CompactionFilter::Context& context) { return std::unique_ptr(new TtlCompactionFilter( - ttl_, nullptr, + ttl_, env_, nullptr, std::move(user_comp_filter_factory_->CreateCompactionFilter(context)))); } @@ -198,15 +199,18 @@ class TtlCompactionFilterFactory : public CompactionFilterFactory { private: int32_t ttl_; + Env* env_; std::shared_ptr user_comp_filter_factory_; }; class TtlMergeOperator : public MergeOperator { public: - explicit TtlMergeOperator(const std::shared_ptr merge_op) - : user_merge_op_(merge_op) { + explicit TtlMergeOperator(const std::shared_ptr merge_op, + Env* env) + : user_merge_op_(merge_op), env_(env) { assert(merge_op); + assert(env); } virtual bool FullMerge(const Slice& key, const Slice* existing_value, @@ -248,7 +252,7 @@ class TtlMergeOperator : public MergeOperator { // Augment the *new_value with the ttl time-stamp int64_t curtime; - if (!DBWithTTLImpl::GetCurrentTime(&curtime).ok()) { + if (!env_->GetCurrentTime(&curtime).ok()) { Log(logger, "Error: Could not get current time to be attached internally " "to the new value."); @@ -287,7 +291,7 @@ class TtlMergeOperator : public MergeOperator { // Augment the *new_value with the ttl time-stamp int64_t curtime; - if (!DBWithTTLImpl::GetCurrentTime(&curtime).ok()) { + if (!env_->GetCurrentTime(&curtime).ok()) { Log(logger, "Error: Could not get current time to be attached internally " "to the new value."); @@ -304,6 +308,7 @@ class TtlMergeOperator : public MergeOperator { private: std::shared_ptr user_merge_op_; + Env* env_; }; } #endif // ROCKSDB_LITE diff --git a/utilities/ttl/ttl_test.cc b/utilities/ttl/ttl_test.cc index 3b819f0c1..4791a2a77 100644 --- a/utilities/ttl/ttl_test.cc +++ b/utilities/ttl/ttl_test.cc @@ -20,14 +20,31 @@ enum BatchOperation { PUT = 0, DELETE = 1 }; - } +class SpecialTimeEnv : public EnvWrapper { + public: + explicit SpecialTimeEnv(Env* base) : EnvWrapper(base) { + base->GetCurrentTime(¤t_time_); + } + + void Sleep(int64_t sleep_time) { current_time_ += sleep_time; } + virtual Status GetCurrentTime(int64_t* current_time) { + *current_time = current_time_; + return Status::OK(); + } + + private: + int64_t current_time_; +}; + class TtlTest { public: TtlTest() { + env_.reset(new SpecialTimeEnv(Env::Default())); dbname_ = test::TmpDir() + "/db_ttl"; options_.create_if_missing = true; + options_.env = env_.get(); // ensure that compaction is kicked in to always strip timestamp from kvs options_.max_grandparent_overlap_factor = 0; // compaction should take place always from level0 for determinism @@ -183,7 +200,8 @@ class TtlTest { bool test_compaction_change = false, ColumnFamilyHandle* cf = nullptr) { ASSERT_TRUE(db_ttl_); - sleep(slp_tim); + + env_->Sleep(slp_tim); ManualCompact(cf); static ReadOptions ropts; kv_it_ = kvmap_.begin(); @@ -219,7 +237,7 @@ class TtlTest { // Similar as SleepCompactCheck but uses TtlIterator to read from db void SleepCompactCheckIter(int slp, int st_pos, int span, bool check=true) { ASSERT_TRUE(db_ttl_); - sleep(slp); + env_->Sleep(slp); ManualCompact(); static ReadOptions ropts; Iterator *dbiter = db_ttl_->NewIterator(ropts); @@ -318,6 +336,7 @@ class TtlTest { const int64_t kSampleSize_ = 100; std::string dbname_; DBWithTTL* db_ttl_; + unique_ptr env_; private: Options options_; @@ -379,7 +398,7 @@ TEST(TtlTest, ResetTimestamp) { OpenTtl(3); PutValues(0, kSampleSize_); // T=0: Insert Set1. Delete at t=3 - sleep(2); // T=2 + env_->Sleep(2); // T=2 PutValues(0, kSampleSize_); // T=2: Insert Set1. Delete at t=5 SleepCompactCheck(2, 0, kSampleSize_); // T=4: Set1 should still be there CloseTtl(); @@ -515,6 +534,7 @@ TEST(TtlTest, ColumnFamiliesTest) { DB* db; Options options; options.create_if_missing = true; + options.env = env_.get(); DB::Open(options, dbname_, &db); ColumnFamilyHandle* handle;