diff --git a/CMakeLists.txt b/CMakeLists.txt index 82d9cf1c2..db80bd5ba 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -250,6 +250,7 @@ set(SOURCES utilities/backupable/backupable_db.cc utilities/checkpoint/checkpoint.cc utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc + utilities/date_tiered/date_tiered_db_impl.cc utilities/document/document_db.cc utilities/document/json_document.cc utilities/document/json_document_builder.cc @@ -434,6 +435,7 @@ set(TESTS util/thread_local_test.cc utilities/backupable/backupable_db_test.cc utilities/checkpoint/checkpoint_test.cc + utilities/date_tiered/date_tiered_test.cc utilities/document/document_db_test.cc utilities/document/json_document_test.cc utilities/env_registry_test.cc diff --git a/Makefile b/Makefile index 5be5fdd65..1565ce3c6 100644 --- a/Makefile +++ b/Makefile @@ -336,6 +336,7 @@ TESTS = \ skiplist_test \ stringappend_test \ ttl_test \ + date_tiered_test \ backupable_db_test \ document_db_test \ json_document_test \ @@ -1027,6 +1028,9 @@ env_registry_test: utilities/env_registry_test.o $(LIBOBJECTS) $(TESTHARNESS) ttl_test: utilities/ttl/ttl_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) +date_tiered_test: utilities/date_tiered/date_tiered_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(AM_LINK) + write_batch_with_index_test: utilities/write_batch_with_index/write_batch_with_index_test.o $(LIBOBJECTS) $(TESTHARNESS) $(AM_LINK) diff --git a/include/rocksdb/utilities/date_tiered_db.h b/include/rocksdb/utilities/date_tiered_db.h new file mode 100644 index 000000000..5403f9adb --- /dev/null +++ b/include/rocksdb/utilities/date_tiered_db.h @@ -0,0 +1,108 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once +#ifndef ROCKSDB_LITE + +#include +#include +#include + +#include "rocksdb/db.h" + +namespace rocksdb { + +// Date tiered database is a wrapper of DB that implements +// a simplified DateTieredCompactionStrategy by using multiple column famillies +// as time windows. +// +// DateTieredDB provides an interface similar to DB, but it assumes that user +// provides keys with last 8 bytes encoded as timestamp in seconds. DateTieredDB +// is assigned with a TTL to declare when data should be deleted. +// +// DateTieredDB hides column families layer from standard RocksDB instance. It +// uses multiple column families to manage time series data, each containing a +// specific range of time. Column families are named by its maximum possible +// timestamp. A column family is created automatically when data newer than +// latest timestamp of all existing column families. The time range of a column +// family is configurable by `column_family_interval`. By doing this, we +// guarantee that compaction will only happen in a column family. +// +// DateTieredDB is assigned with a TTL. When all data in a column family are +// expired (CF_Timestamp <= CUR_Timestamp - TTL), we directly drop the whole +// column family. +// +// TODO(jhli): This is only a simplified version of DTCS. In a complete DTCS, +// time windows can be merged over time, so that older time windows will have +// larger time range. Also, compaction are executed only for adjacent SST files +// to guarantee there is no time overlap between SST files. + +class DateTieredDB { + public: + // Open a DateTieredDB whose name is `dbname`. + // Similar to DB::Open(), created database object is stored in dbptr. + // + // Two parameters can be configured: `ttl` to specify the length of time that + // keys should exist in the database, and `column_family_interval` to specify + // the time range of a column family interval. + // + // Open a read only database if read only is set as true. + // TODO(jhli): Should use an option object that includes ttl and + // column_family_interval. + static Status Open(const Options& options, const std::string& dbname, + DateTieredDB** dbptr, int64_t ttl, + int64_t column_family_interval, bool read_only = false); + + explicit DateTieredDB() {} + + virtual ~DateTieredDB() {} + + // Wrapper for Put method. Similar to DB::Put(), but column family to be + // inserted is decided by the timestamp in keys, i.e. the last 8 bytes of user + // key. If key is already obsolete, it will not be inserted. + // + // When client put a key value pair in DateTieredDB, it assumes last 8 bytes + // of keys are encoded as timestamp. Timestamp is a 64-bit signed integer + // encoded as the number of seconds since 1970-01-01 00:00:00 (UTC) (Same as + // Env::GetCurrentTime()). Timestamp should be encoded in big endian. + virtual Status Put(const WriteOptions& options, const Slice& key, + const Slice& val) = 0; + + // Wrapper for Get method. Similar to DB::Get() but column family is decided + // by timestamp in keys. If key is already obsolete, it will not be found. + virtual Status Get(const ReadOptions& options, const Slice& key, + std::string* value) = 0; + + // Wrapper for Delete method. Similar to DB::Delete() but column family is + // decided by timestamp in keys. If key is already obsolete, return NotFound + // status. + virtual Status Delete(const WriteOptions& options, const Slice& key) = 0; + + // Wrapper for KeyMayExist method. Similar to DB::KeyMayExist() but column + // family is decided by timestamp in keys. Return false when key is already + // obsolete. + virtual bool KeyMayExist(const ReadOptions& options, const Slice& key, + std::string* value, bool* value_found = nullptr) = 0; + + // Wrapper for Merge method. Similar to DB::Merge() but column family is + // decided by timestamp in keys. + virtual Status Merge(const WriteOptions& options, const Slice& key, + const Slice& value) = 0; + + // Create an iterator that hides low level details. This iterator internally + // merge results from all active time series column families. Note that + // column families are not deleted until all data are obsolete, so this + // iterator can possibly access obsolete key value pairs. + virtual Iterator* NewIterator(const ReadOptions& opts) = 0; + + // Explicitly drop column families in which all keys are obsolete. This + // process is also inplicitly done in Put() operation. + virtual Status DropObsoleteColumnFamilies() = 0; + + static const uint64_t kTSLength = sizeof(int64_t); // size of timestamp +}; + +} // namespace rocksdb +#endif // ROCKSDB_LITE diff --git a/src.mk b/src.mk index 0b479e2e7..0f41cb94f 100644 --- a/src.mk +++ b/src.mk @@ -151,6 +151,7 @@ LIB_SOURCES = \ utilities/transactions/transaction_impl.cc \ utilities/transactions/transaction_util.cc \ utilities/ttl/db_ttl_impl.cc \ + utilities/date_tiered/date_tiered_db_impl.cc \ utilities/write_batch_with_index/write_batch_with_index.cc \ utilities/write_batch_with_index/write_batch_with_index_internal.cc \ util/event_logger.cc \ @@ -302,6 +303,7 @@ MAIN_SOURCES = \ utilities/transactions/optimistic_transaction_test.cc \ utilities/transactions/transaction_test.cc \ utilities/ttl/ttl_test.cc \ + utilities/date_tiered/date_tiered_test.cc \ utilities/write_batch_with_index/write_batch_with_index_test.cc \ utilities/column_aware_encoding_test.cc \ util/iostats_context_test.cc \ diff --git a/util/options_helper.cc b/util/options_helper.cc index 259c27c81..5c9e8331f 100644 --- a/util/options_helper.cc +++ b/util/options_helper.cc @@ -89,6 +89,31 @@ std::string UnescapeOptionString(const std::string& escaped_string) { return output; } +uint64_t ParseUint64(const std::string& value) { + size_t endchar; +#ifndef CYGWIN + uint64_t num = std::stoull(value.c_str(), &endchar); +#else + char* endptr; + uint64_t num = std::strtoul(value.c_str(), &endptr, 0); + endchar = endptr - value.c_str(); +#endif + + if (endchar < value.length()) { + char c = value[endchar]; + if (c == 'k' || c == 'K') + num <<= 10LL; + else if (c == 'm' || c == 'M') + num <<= 20LL; + else if (c == 'g' || c == 'G') + num <<= 30LL; + else if (c == 't' || c == 'T') + num <<= 40LL; + } + + return num; +} + namespace { std::string trim(const std::string& str) { if (str.empty()) return std::string(); @@ -158,31 +183,6 @@ bool ParseBoolean(const std::string& type, const std::string& value) { throw std::invalid_argument(type); } -uint64_t ParseUint64(const std::string& value) { - size_t endchar; -#ifndef CYGWIN - uint64_t num = std::stoull(value.c_str(), &endchar); -#else - char* endptr; - uint64_t num = std::strtoul(value.c_str(), &endptr, 0); - endchar = endptr - value.c_str(); -#endif - - if (endchar < value.length()) { - char c = value[endchar]; - if (c == 'k' || c == 'K') - num <<= 10LL; - else if (c == 'm' || c == 'M') - num <<= 20LL; - else if (c == 'g' || c == 'G') - num <<= 30LL; - else if (c == 't' || c == 'T') - num <<= 40LL; - } - - return num; -} - size_t ParseSizeT(const std::string& value) { return static_cast(ParseUint64(value)); } diff --git a/util/options_helper.h b/util/options_helper.h index b30cb4b90..67979ab25 100644 --- a/util/options_helper.h +++ b/util/options_helper.h @@ -53,6 +53,8 @@ std::string EscapeOptionString(const std::string& raw_string); // @return the raw string of the input "escaped_string" std::string UnescapeOptionString(const std::string& escaped_string); +uint64_t ParseUint64(const std::string& value); + Status GetMutableOptionsFromStrings( const MutableCFOptions& base_options, const std::unordered_map& options_map, diff --git a/utilities/date_tiered/date_tiered_db_impl.cc b/utilities/date_tiered/date_tiered_db_impl.cc new file mode 100644 index 000000000..2352aa2e9 --- /dev/null +++ b/utilities/date_tiered/date_tiered_db_impl.cc @@ -0,0 +1,395 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +#ifndef ROCKSDB_LITE + +#include "utilities/date_tiered/date_tiered_db_impl.h" + +#include + +#include "db/db_impl.h" +#include "db/db_iter.h" +#include "db/filename.h" +#include "db/write_batch_internal.h" +#include "rocksdb/convenience.h" +#include "rocksdb/env.h" +#include "rocksdb/iterator.h" +#include "rocksdb/utilities/date_tiered_db.h" +#include "table/merger.h" +#include "util/coding.h" +#include "util/instrumented_mutex.h" +#include "util/options_helper.h" +#include "util/string_util.h" + +namespace rocksdb { + +// Open the db inside DateTieredDBImpl because options needs pointer to its ttl +DateTieredDBImpl::DateTieredDBImpl( + DB* db, Options options, + const std::vector& descriptors, + const std::vector& handles, int64_t ttl, + int64_t column_family_interval) + : db_(db), + cf_options_(ColumnFamilyOptions(options)), + ioptions_(ImmutableCFOptions(options)), + ttl_(ttl), + column_family_interval_(column_family_interval), + mutex_(options.statistics.get(), db->GetEnv(), DB_MUTEX_WAIT_MICROS, + options.use_adaptive_mutex) { + latest_timebound_ = std::numeric_limits::min(); + for (size_t i = 0; i < handles.size(); ++i) { + const auto& name = descriptors[i].name; + int64_t timestamp = 0; + try { + timestamp = ParseUint64(name); + } catch (const std::invalid_argument&) { + // Bypass unrelated column family, e.g. default + db_->DestroyColumnFamilyHandle(handles[i]); + continue; + } + if (timestamp > latest_timebound_) { + latest_timebound_ = timestamp; + } + handle_map_.insert(std::make_pair(timestamp, handles[i])); + } +} + +DateTieredDBImpl::~DateTieredDBImpl() { + for (auto handle : handle_map_) { + db_->DestroyColumnFamilyHandle(handle.second); + } + delete db_; + db_ = nullptr; +} + +Status DateTieredDB::Open(const Options& options, const std::string& dbname, + DateTieredDB** dbptr, int64_t ttl, + int64_t column_family_interval, bool read_only) { + DBOptions db_options(options); + ColumnFamilyOptions cf_options(options); + std::vector descriptors; + std::vector handles; + DB* db; + Status s; + + // Get column families + std::vector column_family_names; + s = DB::ListColumnFamilies(db_options, dbname, &column_family_names); + if (!s.ok()) { + // No column family found. Use default + s = DB::Open(options, dbname, &db); + if (!s.ok()) { + return s; + } + } else { + for (auto name : column_family_names) { + descriptors.emplace_back(ColumnFamilyDescriptor(name, cf_options)); + } + + // Open database + if (read_only) { + s = DB::OpenForReadOnly(db_options, dbname, descriptors, &handles, &db); + } else { + s = DB::Open(db_options, dbname, descriptors, &handles, &db); + } + } + + if (s.ok()) { + *dbptr = new DateTieredDBImpl(db, options, descriptors, handles, ttl, + column_family_interval); + } + return s; +} + +// Checks if the string is stale or not according to TTl provided +bool DateTieredDBImpl::IsStale(int64_t keytime, int64_t ttl, Env* env) { + if (ttl <= 0) { + // Data is fresh if TTL is non-positive + return false; + } + int64_t curtime; + if (!env->GetCurrentTime(&curtime).ok()) { + // Treat the data as fresh if could not get current time + return false; + } + return curtime >= keytime + ttl; +} + +// Drop column family when all data in that column family is expired +// TODO(jhli): Can be made a background job +Status DateTieredDBImpl::DropObsoleteColumnFamilies() { + int64_t curtime; + Status s; + s = db_->GetEnv()->GetCurrentTime(&curtime); + if (!s.ok()) { + return s; + } + { + InstrumentedMutexLock l(&mutex_); + auto iter = handle_map_.begin(); + while (iter != handle_map_.end()) { + if (iter->first <= curtime - ttl_) { + s = db_->DropColumnFamily(iter->second); + if (!s.ok()) { + return s; + } + delete iter->second; + iter = handle_map_.erase(iter); + } else { + break; + } + } + } + return Status::OK(); +} + +// Get timestamp from user key +Status DateTieredDBImpl::GetTimestamp(const Slice& key, int64_t* result) { + if (key.size() < kTSLength) { + return Status::Corruption("Bad timestamp in key"); + } + const char* pos = key.data() + key.size() - 8; + int64_t timestamp = 0; + if (port::kLittleEndian) { + int bytes_to_fill = 8; + for (int i = 0; i < bytes_to_fill; ++i) { + timestamp |= (static_cast(static_cast(pos[i])) + << ((bytes_to_fill - i - 1) << 3)); + } + } else { + memcpy(×tamp, pos, sizeof(timestamp)); + } + *result = timestamp; + return Status::OK(); +} + +Status DateTieredDBImpl::CreateColumnFamily( + ColumnFamilyHandle** column_family) { + int64_t curtime; + Status s; + mutex_.AssertHeld(); + s = db_->GetEnv()->GetCurrentTime(&curtime); + if (!s.ok()) { + return s; + } + int64_t new_timebound; + if (handle_map_.empty()) { + new_timebound = curtime + column_family_interval_; + } else { + new_timebound = + latest_timebound_ + + ((curtime - latest_timebound_) / column_family_interval_ + 1) * + column_family_interval_; + } + std::string cf_name = ToString(new_timebound); + latest_timebound_ = new_timebound; + s = db_->CreateColumnFamily(cf_options_, cf_name, column_family); + if (s.ok()) { + handle_map_.insert(std::make_pair(new_timebound, *column_family)); + } + return s; +} + +Status DateTieredDBImpl::FindColumnFamily(int64_t keytime, + ColumnFamilyHandle** column_family, + bool create_if_missing) { + *column_family = nullptr; + { + InstrumentedMutexLock l(&mutex_); + auto iter = handle_map_.upper_bound(keytime); + if (iter == handle_map_.end()) { + if (!create_if_missing) { + return Status::NotFound(); + } else { + return CreateColumnFamily(column_family); + } + } + // Move to previous element to get the appropriate time window + *column_family = iter->second; + } + return Status::OK(); +} + +Status DateTieredDBImpl::Put(const WriteOptions& options, const Slice& key, + const Slice& val) { + int64_t timestamp = 0; + Status s; + s = GetTimestamp(key, ×tamp); + if (!s.ok()) { + return s; + } + DropObsoleteColumnFamilies(); + + // Prune request to obsolete data + if (IsStale(timestamp, ttl_, db_->GetEnv())) { + return Status::InvalidArgument(); + } + + // Decide column family (i.e. the time window) to put into + ColumnFamilyHandle* column_family; + s = FindColumnFamily(timestamp, &column_family, true /*create_if_missing*/); + if (!s.ok()) { + return s; + } + + // Efficiently put with WriteBatch + WriteBatch batch; + batch.Put(column_family, key, val); + return Write(options, &batch); +} + +Status DateTieredDBImpl::Get(const ReadOptions& options, const Slice& key, + std::string* value) { + int64_t timestamp = 0; + Status s; + s = GetTimestamp(key, ×tamp); + if (!s.ok()) { + return s; + } + // Prune request to obsolete data + if (IsStale(timestamp, ttl_, db_->GetEnv())) { + return Status::NotFound(); + } + + // Decide column family to get from + ColumnFamilyHandle* column_family; + s = FindColumnFamily(timestamp, &column_family, false /*create_if_missing*/); + if (!s.ok()) { + return s; + } + if (column_family == nullptr) { + // Cannot find column family + return Status::NotFound(); + } + + // Get value with key + return db_->Get(options, column_family, key, value); +} + +bool DateTieredDBImpl::KeyMayExist(const ReadOptions& options, const Slice& key, + std::string* value, bool* value_found) { + int64_t timestamp = 0; + Status s; + s = GetTimestamp(key, ×tamp); + if (!s.ok()) { + // Cannot get current time + return false; + } + // Decide column family to get from + ColumnFamilyHandle* column_family; + s = FindColumnFamily(timestamp, &column_family, false /*create_if_missing*/); + if (!s.ok() || column_family == nullptr) { + // Cannot find column family + return false; + } + if (IsStale(timestamp, ttl_, db_->GetEnv())) { + return false; + } + return db_->KeyMayExist(options, column_family, key, value, value_found); +} + +Status DateTieredDBImpl::Delete(const WriteOptions& options, const Slice& key) { + int64_t timestamp = 0; + Status s; + s = GetTimestamp(key, ×tamp); + if (!s.ok()) { + return s; + } + DropObsoleteColumnFamilies(); + // Prune request to obsolete data + if (IsStale(timestamp, ttl_, db_->GetEnv())) { + return Status::NotFound(); + } + + // Decide column family to get from + ColumnFamilyHandle* column_family; + s = FindColumnFamily(timestamp, &column_family, false /*create_if_missing*/); + if (!s.ok()) { + return s; + } + if (column_family == nullptr) { + // Cannot find column family + return Status::NotFound(); + } + + // Get value with key + return db_->Delete(options, column_family, key); +} + +Status DateTieredDBImpl::Merge(const WriteOptions& options, const Slice& key, + const Slice& value) { + // Decide column family to get from + int64_t timestamp = 0; + Status s; + s = GetTimestamp(key, ×tamp); + if (!s.ok()) { + // Cannot get current time + return s; + } + ColumnFamilyHandle* column_family; + s = FindColumnFamily(timestamp, &column_family, true /*create_if_missing*/); + if (!s.ok()) { + return s; + } + WriteBatch batch; + batch.Merge(column_family, key, value); + return Write(options, &batch); +} + +Status DateTieredDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) { + class Handler : public WriteBatch::Handler { + public: + explicit Handler() {} + WriteBatch updates_ttl; + Status batch_rewrite_status; + virtual Status PutCF(uint32_t column_family_id, const Slice& key, + const Slice& value) override { + WriteBatchInternal::Put(&updates_ttl, column_family_id, key, value); + return Status::OK(); + } + virtual Status MergeCF(uint32_t column_family_id, const Slice& key, + const Slice& value) override { + WriteBatchInternal::Merge(&updates_ttl, column_family_id, key, value); + return Status::OK(); + } + virtual Status DeleteCF(uint32_t column_family_id, + const Slice& key) override { + WriteBatchInternal::Delete(&updates_ttl, column_family_id, key); + return Status::OK(); + } + virtual void LogData(const Slice& blob) override { + updates_ttl.PutLogData(blob); + } + }; + Handler handler; + updates->Iterate(&handler); + if (!handler.batch_rewrite_status.ok()) { + return handler.batch_rewrite_status; + } else { + return db_->Write(opts, &(handler.updates_ttl)); + } +} + +Iterator* DateTieredDBImpl::NewIterator(const ReadOptions& opts) { + if (handle_map_.empty()) { + return NewEmptyIterator(); + } + + DBImpl* db_impl = reinterpret_cast(db_); + + auto db_iter = NewArenaWrappedDbIterator( + db_impl->GetEnv(), ioptions_, cf_options_.comparator, kMaxSequenceNumber, + cf_options_.max_sequential_skip_in_iterations, 0); + + auto arena = db_iter->GetArena(); + MergeIteratorBuilder builder(cf_options_.comparator, arena); + for (auto& item : handle_map_) { + auto handle = item.second; + builder.AddIterator(db_impl->NewInternalIterator(arena, handle)); + } + auto internal_iter = builder.Finish(); + db_iter->SetIterUnderDBIter(internal_iter); + return db_iter; +} +} // namespace rocksdb +#endif // ROCKSDB_LITE diff --git a/utilities/date_tiered/date_tiered_db_impl.h b/utilities/date_tiered/date_tiered_db_impl.h new file mode 100644 index 000000000..18e0e076a --- /dev/null +++ b/utilities/date_tiered/date_tiered_db_impl.h @@ -0,0 +1,88 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#pragma once +#ifndef ROCKSDB_LITE + +#include +#include +#include + +#include "rocksdb/db.h" +#include "rocksdb/utilities/date_tiered_db.h" +#include "util/instrumented_mutex.h" + +namespace rocksdb { + +// Implementation of DateTieredDB. +class DateTieredDBImpl : public DateTieredDB { + public: + DateTieredDBImpl(DB* db, Options options, + const std::vector& descriptors, + const std::vector& handles, int64_t ttl, + int64_t column_family_interval); + + virtual ~DateTieredDBImpl(); + + Status Put(const WriteOptions& options, const Slice& key, + const Slice& val) override; + + Status Get(const ReadOptions& options, const Slice& key, + std::string* value) override; + + Status Delete(const WriteOptions& options, const Slice& key) override; + + bool KeyMayExist(const ReadOptions& options, const Slice& key, + std::string* value, bool* value_found = nullptr) override; + + Status Merge(const WriteOptions& options, const Slice& key, + const Slice& value) override; + + Iterator* NewIterator(const ReadOptions& opts) override; + + Status DropObsoleteColumnFamilies() override; + + // Extract timestamp from key. + static Status GetTimestamp(const Slice& key, int64_t* result); + + private: + // Base database object + DB* db_; + + const ColumnFamilyOptions cf_options_; + + const ImmutableCFOptions ioptions_; + + // Storing all column family handles for time series data. + std::vector handles_; + + // Manages a mapping from a column family's maximum timestamp to its handle. + std::map handle_map_; + + // A time-to-live value to indicate when the data should be removed. + int64_t ttl_; + + // An variable to indicate the time range of a column family. + int64_t column_family_interval_; + + // Indicate largest maximum timestamp of a column family. + int64_t latest_timebound_; + + // Mutex to protect handle_map_ operations. + InstrumentedMutex mutex_; + + // Internal method to execute Put and Merge in batch. + Status Write(const WriteOptions& opts, WriteBatch* updates); + + Status CreateColumnFamily(ColumnFamilyHandle** column_family); + + Status FindColumnFamily(int64_t keytime, ColumnFamilyHandle** column_family, + bool create_if_missing); + + static bool IsStale(int64_t keytime, int64_t ttl, Env* env); +}; + +} // namespace rocksdb +#endif // ROCKSDB_LITE diff --git a/utilities/date_tiered/date_tiered_test.cc b/utilities/date_tiered/date_tiered_test.cc new file mode 100644 index 000000000..1a1c8a8ff --- /dev/null +++ b/utilities/date_tiered/date_tiered_test.cc @@ -0,0 +1,467 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#ifndef ROCKSDB_LITE + +#ifndef OS_WIN +#include +#endif + +#include +#include + +#include "rocksdb/compaction_filter.h" +#include "rocksdb/utilities/date_tiered_db.h" +#include "util/logging.h" +#include "util/testharness.h" + +namespace rocksdb { + +namespace { + +typedef std::map KVMap; +} + +class SpecialTimeEnv : public EnvWrapper { + public: + explicit SpecialTimeEnv(Env* base) : EnvWrapper(base) { + base->GetCurrentTime(¤t_time_); + } + + void Sleep(int64_t sleep_time) { current_time_ += sleep_time; } + virtual Status GetCurrentTime(int64_t* current_time) override { + *current_time = current_time_; + return Status::OK(); + } + + private: + int64_t current_time_; +}; + +class DateTieredTest : public testing::Test { + public: + DateTieredTest() { + env_.reset(new SpecialTimeEnv(Env::Default())); + dbname_ = test::TmpDir() + "/date_tiered"; + options_.create_if_missing = true; + options_.env = env_.get(); + date_tiered_db_.reset(nullptr); + DestroyDB(dbname_, Options()); + } + + ~DateTieredTest() { + CloseDateTieredDB(); + DestroyDB(dbname_, Options()); + } + + void OpenDateTieredDB(int64_t ttl, int64_t column_family_interval, + bool read_only = false) { + ASSERT_TRUE(date_tiered_db_.get() == nullptr); + DateTieredDB* date_tiered_db = nullptr; + ASSERT_OK(DateTieredDB::Open(options_, dbname_, &date_tiered_db, ttl, + column_family_interval, read_only)); + date_tiered_db_.reset(date_tiered_db); + } + + void CloseDateTieredDB() { date_tiered_db_.reset(nullptr); } + + Status AppendTimestamp(std::string* key) { + char ts[8]; + int bytes_to_fill = 8; + int64_t timestamp_value = 0; + Status s = env_->GetCurrentTime(×tamp_value); + if (!s.ok()) { + return s; + } + if (port::kLittleEndian) { + for (int i = 0; i < bytes_to_fill; ++i) { + ts[i] = (timestamp_value >> ((bytes_to_fill - i - 1) << 3)) & 0xFF; + } + } else { + memcpy(ts, static_cast(×tamp_value), bytes_to_fill); + } + key->append(ts, 8); + return Status::OK(); + } + + // Populates and returns a kv-map + void MakeKVMap(int64_t num_entries, KVMap* kvmap) { + kvmap->clear(); + int digits = 1; + for (int64_t dummy = num_entries; dummy /= 10; ++digits) { + } + int digits_in_i = 1; + for (int64_t i = 0; i < num_entries; i++) { + std::string key = "key"; + std::string value = "value"; + if (i % 10 == 0) { + digits_in_i++; + } + for (int j = digits_in_i; j < digits; j++) { + key.append("0"); + value.append("0"); + } + AppendNumberTo(&key, i); + AppendNumberTo(&value, i); + ASSERT_OK(AppendTimestamp(&key)); + (*kvmap)[key] = value; + } + // check all insertions done + ASSERT_EQ(num_entries, static_cast(kvmap->size())); + } + + size_t GetColumnFamilyCount() { + DBOptions db_options(options_); + std::vector cf; + DB::ListColumnFamilies(db_options, dbname_, &cf); + return cf.size(); + } + + void Sleep(int64_t sleep_time) { env_->Sleep(sleep_time); } + + static const int64_t kSampleSize_ = 100; + std::string dbname_; + std::unique_ptr date_tiered_db_; + std::unique_ptr env_; + KVMap kvmap_; + + private: + Options options_; + KVMap::iterator kv_it_; + const std::string kNewValue_ = "new_value"; + unique_ptr test_comp_filter_; +}; + +// Puts a set of values and checks its presence using Get during ttl +TEST_F(DateTieredTest, KeyLifeCycle) { + WriteOptions wopts; + ReadOptions ropts; + + // T=0, open the database and insert data + OpenDateTieredDB(2, 2); + ASSERT_TRUE(date_tiered_db_.get() != nullptr); + + // Create key value pairs to insert + KVMap map_insert; + MakeKVMap(kSampleSize_, &map_insert); + + // Put data in database + for (auto& kv : map_insert) { + ASSERT_OK(date_tiered_db_->Put(wopts, kv.first, kv.second)); + } + + Sleep(1); + // T=1, keys should still reside in database + for (auto& kv : map_insert) { + std::string value; + ASSERT_OK(date_tiered_db_->Get(ropts, kv.first, &value)); + ASSERT_EQ(value, kv.second); + } + + Sleep(1); + // T=2, keys should not be retrieved + for (auto& kv : map_insert) { + std::string value; + auto s = date_tiered_db_->Get(ropts, kv.first, &value); + ASSERT_TRUE(s.IsNotFound()); + } + + CloseDateTieredDB(); +} + +TEST_F(DateTieredTest, DeleteTest) { + WriteOptions wopts; + ReadOptions ropts; + + // T=0, open the database and insert data + OpenDateTieredDB(2, 2); + ASSERT_TRUE(date_tiered_db_.get() != nullptr); + + // Create key value pairs to insert + KVMap map_insert; + MakeKVMap(kSampleSize_, &map_insert); + + // Put data in database + for (auto& kv : map_insert) { + ASSERT_OK(date_tiered_db_->Put(wopts, kv.first, kv.second)); + } + + Sleep(1); + // Delete keys when they are not obsolete + for (auto& kv : map_insert) { + ASSERT_OK(date_tiered_db_->Delete(wopts, kv.first)); + } + + // Key should not be found + for (auto& kv : map_insert) { + std::string value; + auto s = date_tiered_db_->Get(ropts, kv.first, &value); + ASSERT_TRUE(s.IsNotFound()); + } +} + +TEST_F(DateTieredTest, KeyMayExistTest) { + WriteOptions wopts; + ReadOptions ropts; + + // T=0, open the database and insert data + OpenDateTieredDB(2, 2); + ASSERT_TRUE(date_tiered_db_.get() != nullptr); + + // Create key value pairs to insert + KVMap map_insert; + MakeKVMap(kSampleSize_, &map_insert); + + // Put data in database + for (auto& kv : map_insert) { + ASSERT_OK(date_tiered_db_->Put(wopts, kv.first, kv.second)); + } + + Sleep(1); + // T=1, keys should still reside in database + for (auto& kv : map_insert) { + std::string value; + ASSERT_TRUE(date_tiered_db_->KeyMayExist(ropts, kv.first, &value)); + ASSERT_EQ(value, kv.second); + } +} + +// Database open and close should not affect +TEST_F(DateTieredTest, MultiOpen) { + WriteOptions wopts; + ReadOptions ropts; + + // T=0, open the database and insert data + OpenDateTieredDB(4, 4); + ASSERT_TRUE(date_tiered_db_.get() != nullptr); + + // Create key value pairs to insert + KVMap map_insert; + MakeKVMap(kSampleSize_, &map_insert); + + // Put data in database + for (auto& kv : map_insert) { + ASSERT_OK(date_tiered_db_->Put(wopts, kv.first, kv.second)); + } + CloseDateTieredDB(); + + Sleep(1); + OpenDateTieredDB(2, 2); + // T=1, keys should still reside in database + for (auto& kv : map_insert) { + std::string value; + ASSERT_OK(date_tiered_db_->Get(ropts, kv.first, &value)); + ASSERT_EQ(value, kv.second); + } + + Sleep(1); + // T=2, keys should not be retrieved + for (auto& kv : map_insert) { + std::string value; + auto s = date_tiered_db_->Get(ropts, kv.first, &value); + ASSERT_TRUE(s.IsNotFound()); + } + + CloseDateTieredDB(); +} + +// If the key in Put() is obsolete, the data should not be written into database +TEST_F(DateTieredTest, InsertObsoleteDate) { + WriteOptions wopts; + ReadOptions ropts; + + // T=0, open the database and insert data + OpenDateTieredDB(2, 2); + ASSERT_TRUE(date_tiered_db_.get() != nullptr); + + // Create key value pairs to insert + KVMap map_insert; + MakeKVMap(kSampleSize_, &map_insert); + + Sleep(2); + // T=2, keys put into database are already obsolete + // Put data in database. Operations should not return OK + for (auto& kv : map_insert) { + auto s = date_tiered_db_->Put(wopts, kv.first, kv.second); + ASSERT_TRUE(s.IsInvalidArgument()); + } + + // Data should not be found in database + for (auto& kv : map_insert) { + std::string value; + auto s = date_tiered_db_->Get(ropts, kv.first, &value); + ASSERT_TRUE(s.IsNotFound()); + } + + CloseDateTieredDB(); +} + +// Resets the timestamp of a set of kvs by updating them and checks that they +// are not deleted according to the old timestamp +TEST_F(DateTieredTest, ColumnFamilyCounts) { + WriteOptions wopts; + ReadOptions ropts; + + // T=0, open the database and insert data + OpenDateTieredDB(4, 2); + ASSERT_TRUE(date_tiered_db_.get() != nullptr); + // Only default column family + ASSERT_EQ(1, GetColumnFamilyCount()); + + // Create key value pairs to insert + KVMap map_insert; + MakeKVMap(kSampleSize_, &map_insert); + for (auto& kv : map_insert) { + ASSERT_OK(date_tiered_db_->Put(wopts, kv.first, kv.second)); + } + // A time series column family is created + ASSERT_EQ(2, GetColumnFamilyCount()); + + Sleep(2); + KVMap map_insert2; + MakeKVMap(kSampleSize_, &map_insert2); + for (auto& kv : map_insert2) { + ASSERT_OK(date_tiered_db_->Put(wopts, kv.first, kv.second)); + } + // Another time series column family is created + ASSERT_EQ(3, GetColumnFamilyCount()); + + Sleep(4); + + // Data should not be found in database + for (auto& kv : map_insert) { + std::string value; + auto s = date_tiered_db_->Get(ropts, kv.first, &value); + ASSERT_TRUE(s.IsNotFound()); + } + + // Explicitly drop obsolete column families + date_tiered_db_->DropObsoleteColumnFamilies(); + + // The first column family is deleted from database + ASSERT_EQ(2, GetColumnFamilyCount()); + + CloseDateTieredDB(); +} + +// Puts a set of values and checks its presence using iterator during ttl +TEST_F(DateTieredTest, IteratorLifeCycle) { + WriteOptions wopts; + ReadOptions ropts; + + // T=0, open the database and insert data + OpenDateTieredDB(2, 2); + ASSERT_TRUE(date_tiered_db_.get() != nullptr); + + // Create key value pairs to insert + KVMap map_insert; + MakeKVMap(kSampleSize_, &map_insert); + Iterator* dbiter; + + // Put data in database + for (auto& kv : map_insert) { + ASSERT_OK(date_tiered_db_->Put(wopts, kv.first, kv.second)); + } + + Sleep(1); + ASSERT_EQ(2, GetColumnFamilyCount()); + // T=1, keys should still reside in database + dbiter = date_tiered_db_->NewIterator(ropts); + dbiter->SeekToFirst(); + for (auto& kv : map_insert) { + ASSERT_TRUE(dbiter->Valid()); + ASSERT_EQ(0, dbiter->value().compare(kv.second)); + dbiter->Next(); + } + delete dbiter; + + Sleep(4); + // T=5, keys should not be retrieved + for (auto& kv : map_insert) { + std::string value; + auto s = date_tiered_db_->Get(ropts, kv.first, &value); + ASSERT_TRUE(s.IsNotFound()); + } + + // Explicitly drop obsolete column families + date_tiered_db_->DropObsoleteColumnFamilies(); + + // Only default column family + ASSERT_EQ(1, GetColumnFamilyCount()); + + // Empty iterator + dbiter = date_tiered_db_->NewIterator(ropts); + dbiter->Seek(map_insert.begin()->first); + ASSERT_FALSE(dbiter->Valid()); + delete dbiter; + + CloseDateTieredDB(); +} + +// Iterator should be able to merge data from multiple column families +TEST_F(DateTieredTest, IteratorMerge) { + WriteOptions wopts; + ReadOptions ropts; + + // T=0, open the database and insert data + OpenDateTieredDB(4, 2); + ASSERT_TRUE(date_tiered_db_.get() != nullptr); + + Iterator* dbiter; + + // Put data in database + KVMap map_insert1; + MakeKVMap(kSampleSize_, &map_insert1); + for (auto& kv : map_insert1) { + ASSERT_OK(date_tiered_db_->Put(wopts, kv.first, kv.second)); + } + ASSERT_EQ(2, GetColumnFamilyCount()); + + Sleep(2); + // Put more data + KVMap map_insert2; + MakeKVMap(kSampleSize_, &map_insert2); + for (auto& kv : map_insert2) { + ASSERT_OK(date_tiered_db_->Put(wopts, kv.first, kv.second)); + } + // Multiple column families for time series data + ASSERT_EQ(3, GetColumnFamilyCount()); + + // Iterator should be able to merge data from different column families + dbiter = date_tiered_db_->NewIterator(ropts); + dbiter->SeekToFirst(); + KVMap::iterator iter1 = map_insert1.begin(); + KVMap::iterator iter2 = map_insert2.begin(); + for (; iter1 != map_insert1.end() && iter2 != map_insert2.end(); + iter1++, iter2++) { + ASSERT_TRUE(dbiter->Valid()); + ASSERT_EQ(0, dbiter->value().compare(iter1->second)); + dbiter->Next(); + + ASSERT_TRUE(dbiter->Valid()); + ASSERT_EQ(0, dbiter->value().compare(iter2->second)); + dbiter->Next(); + } + delete dbiter; + + CloseDateTieredDB(); +} + +} // namespace rocksdb + +// A black-box test for the DateTieredDB around rocksdb +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +#else +#include + +int main(int argc, char** argv) { + fprintf(stderr, "SKIPPED as DateTieredDB is not supported in ROCKSDB_LITE\n"); + return 0; +} + +#endif // !ROCKSDB_LITE