Add time series database (resubmitted)

Summary: Implement a time series database that supports DateTieredCompactionStrategy. It wraps a db object and separate SST files in different column families (time windows).

Test Plan: Add `date_tiered_test`.

Reviewers: dhruba, sdong

Reviewed By: sdong

Subscribers: andrewkr, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D61653
main
omegaga 8 years ago
parent 7c4615cf1f
commit 44f5cc57a5
  1. 2
      CMakeLists.txt
  2. 4
      Makefile
  3. 108
      include/rocksdb/utilities/date_tiered_db.h
  4. 2
      src.mk
  5. 50
      util/options_helper.cc
  6. 2
      util/options_helper.h
  7. 395
      utilities/date_tiered/date_tiered_db_impl.cc
  8. 88
      utilities/date_tiered/date_tiered_db_impl.h
  9. 467
      utilities/date_tiered/date_tiered_test.cc

@ -250,6 +250,7 @@ set(SOURCES
utilities/backupable/backupable_db.cc utilities/backupable/backupable_db.cc
utilities/checkpoint/checkpoint.cc utilities/checkpoint/checkpoint.cc
utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc utilities/compaction_filters/remove_emptyvalue_compactionfilter.cc
utilities/date_tiered/date_tiered_db_impl.cc
utilities/document/document_db.cc utilities/document/document_db.cc
utilities/document/json_document.cc utilities/document/json_document.cc
utilities/document/json_document_builder.cc utilities/document/json_document_builder.cc
@ -434,6 +435,7 @@ set(TESTS
util/thread_local_test.cc util/thread_local_test.cc
utilities/backupable/backupable_db_test.cc utilities/backupable/backupable_db_test.cc
utilities/checkpoint/checkpoint_test.cc utilities/checkpoint/checkpoint_test.cc
utilities/date_tiered/date_tiered_test.cc
utilities/document/document_db_test.cc utilities/document/document_db_test.cc
utilities/document/json_document_test.cc utilities/document/json_document_test.cc
utilities/env_registry_test.cc utilities/env_registry_test.cc

@ -336,6 +336,7 @@ TESTS = \
skiplist_test \ skiplist_test \
stringappend_test \ stringappend_test \
ttl_test \ ttl_test \
date_tiered_test \
backupable_db_test \ backupable_db_test \
document_db_test \ document_db_test \
json_document_test \ json_document_test \
@ -1027,6 +1028,9 @@ env_registry_test: utilities/env_registry_test.o $(LIBOBJECTS) $(TESTHARNESS)
ttl_test: utilities/ttl/ttl_test.o $(LIBOBJECTS) $(TESTHARNESS) ttl_test: utilities/ttl/ttl_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK) $(AM_LINK)
date_tiered_test: utilities/date_tiered/date_tiered_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
write_batch_with_index_test: utilities/write_batch_with_index/write_batch_with_index_test.o $(LIBOBJECTS) $(TESTHARNESS) write_batch_with_index_test: utilities/write_batch_with_index/write_batch_with_index_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK) $(AM_LINK)

@ -0,0 +1,108 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#ifndef ROCKSDB_LITE
#include <map>
#include <string>
#include <vector>
#include "rocksdb/db.h"
namespace rocksdb {
// Date tiered database is a wrapper of DB that implements
// a simplified DateTieredCompactionStrategy by using multiple column famillies
// as time windows.
//
// DateTieredDB provides an interface similar to DB, but it assumes that user
// provides keys with last 8 bytes encoded as timestamp in seconds. DateTieredDB
// is assigned with a TTL to declare when data should be deleted.
//
// DateTieredDB hides column families layer from standard RocksDB instance. It
// uses multiple column families to manage time series data, each containing a
// specific range of time. Column families are named by its maximum possible
// timestamp. A column family is created automatically when data newer than
// latest timestamp of all existing column families. The time range of a column
// family is configurable by `column_family_interval`. By doing this, we
// guarantee that compaction will only happen in a column family.
//
// DateTieredDB is assigned with a TTL. When all data in a column family are
// expired (CF_Timestamp <= CUR_Timestamp - TTL), we directly drop the whole
// column family.
//
// TODO(jhli): This is only a simplified version of DTCS. In a complete DTCS,
// time windows can be merged over time, so that older time windows will have
// larger time range. Also, compaction are executed only for adjacent SST files
// to guarantee there is no time overlap between SST files.
class DateTieredDB {
public:
// Open a DateTieredDB whose name is `dbname`.
// Similar to DB::Open(), created database object is stored in dbptr.
//
// Two parameters can be configured: `ttl` to specify the length of time that
// keys should exist in the database, and `column_family_interval` to specify
// the time range of a column family interval.
//
// Open a read only database if read only is set as true.
// TODO(jhli): Should use an option object that includes ttl and
// column_family_interval.
static Status Open(const Options& options, const std::string& dbname,
DateTieredDB** dbptr, int64_t ttl,
int64_t column_family_interval, bool read_only = false);
explicit DateTieredDB() {}
virtual ~DateTieredDB() {}
// Wrapper for Put method. Similar to DB::Put(), but column family to be
// inserted is decided by the timestamp in keys, i.e. the last 8 bytes of user
// key. If key is already obsolete, it will not be inserted.
//
// When client put a key value pair in DateTieredDB, it assumes last 8 bytes
// of keys are encoded as timestamp. Timestamp is a 64-bit signed integer
// encoded as the number of seconds since 1970-01-01 00:00:00 (UTC) (Same as
// Env::GetCurrentTime()). Timestamp should be encoded in big endian.
virtual Status Put(const WriteOptions& options, const Slice& key,
const Slice& val) = 0;
// Wrapper for Get method. Similar to DB::Get() but column family is decided
// by timestamp in keys. If key is already obsolete, it will not be found.
virtual Status Get(const ReadOptions& options, const Slice& key,
std::string* value) = 0;
// Wrapper for Delete method. Similar to DB::Delete() but column family is
// decided by timestamp in keys. If key is already obsolete, return NotFound
// status.
virtual Status Delete(const WriteOptions& options, const Slice& key) = 0;
// Wrapper for KeyMayExist method. Similar to DB::KeyMayExist() but column
// family is decided by timestamp in keys. Return false when key is already
// obsolete.
virtual bool KeyMayExist(const ReadOptions& options, const Slice& key,
std::string* value, bool* value_found = nullptr) = 0;
// Wrapper for Merge method. Similar to DB::Merge() but column family is
// decided by timestamp in keys.
virtual Status Merge(const WriteOptions& options, const Slice& key,
const Slice& value) = 0;
// Create an iterator that hides low level details. This iterator internally
// merge results from all active time series column families. Note that
// column families are not deleted until all data are obsolete, so this
// iterator can possibly access obsolete key value pairs.
virtual Iterator* NewIterator(const ReadOptions& opts) = 0;
// Explicitly drop column families in which all keys are obsolete. This
// process is also inplicitly done in Put() operation.
virtual Status DropObsoleteColumnFamilies() = 0;
static const uint64_t kTSLength = sizeof(int64_t); // size of timestamp
};
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -151,6 +151,7 @@ LIB_SOURCES = \
utilities/transactions/transaction_impl.cc \ utilities/transactions/transaction_impl.cc \
utilities/transactions/transaction_util.cc \ utilities/transactions/transaction_util.cc \
utilities/ttl/db_ttl_impl.cc \ utilities/ttl/db_ttl_impl.cc \
utilities/date_tiered/date_tiered_db_impl.cc \
utilities/write_batch_with_index/write_batch_with_index.cc \ utilities/write_batch_with_index/write_batch_with_index.cc \
utilities/write_batch_with_index/write_batch_with_index_internal.cc \ utilities/write_batch_with_index/write_batch_with_index_internal.cc \
util/event_logger.cc \ util/event_logger.cc \
@ -302,6 +303,7 @@ MAIN_SOURCES = \
utilities/transactions/optimistic_transaction_test.cc \ utilities/transactions/optimistic_transaction_test.cc \
utilities/transactions/transaction_test.cc \ utilities/transactions/transaction_test.cc \
utilities/ttl/ttl_test.cc \ utilities/ttl/ttl_test.cc \
utilities/date_tiered/date_tiered_test.cc \
utilities/write_batch_with_index/write_batch_with_index_test.cc \ utilities/write_batch_with_index/write_batch_with_index_test.cc \
utilities/column_aware_encoding_test.cc \ utilities/column_aware_encoding_test.cc \
util/iostats_context_test.cc \ util/iostats_context_test.cc \

@ -89,6 +89,31 @@ std::string UnescapeOptionString(const std::string& escaped_string) {
return output; return output;
} }
uint64_t ParseUint64(const std::string& value) {
size_t endchar;
#ifndef CYGWIN
uint64_t num = std::stoull(value.c_str(), &endchar);
#else
char* endptr;
uint64_t num = std::strtoul(value.c_str(), &endptr, 0);
endchar = endptr - value.c_str();
#endif
if (endchar < value.length()) {
char c = value[endchar];
if (c == 'k' || c == 'K')
num <<= 10LL;
else if (c == 'm' || c == 'M')
num <<= 20LL;
else if (c == 'g' || c == 'G')
num <<= 30LL;
else if (c == 't' || c == 'T')
num <<= 40LL;
}
return num;
}
namespace { namespace {
std::string trim(const std::string& str) { std::string trim(const std::string& str) {
if (str.empty()) return std::string(); if (str.empty()) return std::string();
@ -158,31 +183,6 @@ bool ParseBoolean(const std::string& type, const std::string& value) {
throw std::invalid_argument(type); throw std::invalid_argument(type);
} }
uint64_t ParseUint64(const std::string& value) {
size_t endchar;
#ifndef CYGWIN
uint64_t num = std::stoull(value.c_str(), &endchar);
#else
char* endptr;
uint64_t num = std::strtoul(value.c_str(), &endptr, 0);
endchar = endptr - value.c_str();
#endif
if (endchar < value.length()) {
char c = value[endchar];
if (c == 'k' || c == 'K')
num <<= 10LL;
else if (c == 'm' || c == 'M')
num <<= 20LL;
else if (c == 'g' || c == 'G')
num <<= 30LL;
else if (c == 't' || c == 'T')
num <<= 40LL;
}
return num;
}
size_t ParseSizeT(const std::string& value) { size_t ParseSizeT(const std::string& value) {
return static_cast<size_t>(ParseUint64(value)); return static_cast<size_t>(ParseUint64(value));
} }

@ -53,6 +53,8 @@ std::string EscapeOptionString(const std::string& raw_string);
// @return the raw string of the input "escaped_string" // @return the raw string of the input "escaped_string"
std::string UnescapeOptionString(const std::string& escaped_string); std::string UnescapeOptionString(const std::string& escaped_string);
uint64_t ParseUint64(const std::string& value);
Status GetMutableOptionsFromStrings( Status GetMutableOptionsFromStrings(
const MutableCFOptions& base_options, const MutableCFOptions& base_options,
const std::unordered_map<std::string, std::string>& options_map, const std::unordered_map<std::string, std::string>& options_map,

@ -0,0 +1,395 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifndef ROCKSDB_LITE
#include "utilities/date_tiered/date_tiered_db_impl.h"
#include <limits>
#include "db/db_impl.h"
#include "db/db_iter.h"
#include "db/filename.h"
#include "db/write_batch_internal.h"
#include "rocksdb/convenience.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
#include "rocksdb/utilities/date_tiered_db.h"
#include "table/merger.h"
#include "util/coding.h"
#include "util/instrumented_mutex.h"
#include "util/options_helper.h"
#include "util/string_util.h"
namespace rocksdb {
// Open the db inside DateTieredDBImpl because options needs pointer to its ttl
DateTieredDBImpl::DateTieredDBImpl(
DB* db, Options options,
const std::vector<ColumnFamilyDescriptor>& descriptors,
const std::vector<ColumnFamilyHandle*>& handles, int64_t ttl,
int64_t column_family_interval)
: db_(db),
cf_options_(ColumnFamilyOptions(options)),
ioptions_(ImmutableCFOptions(options)),
ttl_(ttl),
column_family_interval_(column_family_interval),
mutex_(options.statistics.get(), db->GetEnv(), DB_MUTEX_WAIT_MICROS,
options.use_adaptive_mutex) {
latest_timebound_ = std::numeric_limits<int64_t>::min();
for (size_t i = 0; i < handles.size(); ++i) {
const auto& name = descriptors[i].name;
int64_t timestamp = 0;
try {
timestamp = ParseUint64(name);
} catch (const std::invalid_argument&) {
// Bypass unrelated column family, e.g. default
db_->DestroyColumnFamilyHandle(handles[i]);
continue;
}
if (timestamp > latest_timebound_) {
latest_timebound_ = timestamp;
}
handle_map_.insert(std::make_pair(timestamp, handles[i]));
}
}
DateTieredDBImpl::~DateTieredDBImpl() {
for (auto handle : handle_map_) {
db_->DestroyColumnFamilyHandle(handle.second);
}
delete db_;
db_ = nullptr;
}
Status DateTieredDB::Open(const Options& options, const std::string& dbname,
DateTieredDB** dbptr, int64_t ttl,
int64_t column_family_interval, bool read_only) {
DBOptions db_options(options);
ColumnFamilyOptions cf_options(options);
std::vector<ColumnFamilyDescriptor> descriptors;
std::vector<ColumnFamilyHandle*> handles;
DB* db;
Status s;
// Get column families
std::vector<std::string> column_family_names;
s = DB::ListColumnFamilies(db_options, dbname, &column_family_names);
if (!s.ok()) {
// No column family found. Use default
s = DB::Open(options, dbname, &db);
if (!s.ok()) {
return s;
}
} else {
for (auto name : column_family_names) {
descriptors.emplace_back(ColumnFamilyDescriptor(name, cf_options));
}
// Open database
if (read_only) {
s = DB::OpenForReadOnly(db_options, dbname, descriptors, &handles, &db);
} else {
s = DB::Open(db_options, dbname, descriptors, &handles, &db);
}
}
if (s.ok()) {
*dbptr = new DateTieredDBImpl(db, options, descriptors, handles, ttl,
column_family_interval);
}
return s;
}
// Checks if the string is stale or not according to TTl provided
bool DateTieredDBImpl::IsStale(int64_t keytime, int64_t ttl, Env* env) {
if (ttl <= 0) {
// Data is fresh if TTL is non-positive
return false;
}
int64_t curtime;
if (!env->GetCurrentTime(&curtime).ok()) {
// Treat the data as fresh if could not get current time
return false;
}
return curtime >= keytime + ttl;
}
// Drop column family when all data in that column family is expired
// TODO(jhli): Can be made a background job
Status DateTieredDBImpl::DropObsoleteColumnFamilies() {
int64_t curtime;
Status s;
s = db_->GetEnv()->GetCurrentTime(&curtime);
if (!s.ok()) {
return s;
}
{
InstrumentedMutexLock l(&mutex_);
auto iter = handle_map_.begin();
while (iter != handle_map_.end()) {
if (iter->first <= curtime - ttl_) {
s = db_->DropColumnFamily(iter->second);
if (!s.ok()) {
return s;
}
delete iter->second;
iter = handle_map_.erase(iter);
} else {
break;
}
}
}
return Status::OK();
}
// Get timestamp from user key
Status DateTieredDBImpl::GetTimestamp(const Slice& key, int64_t* result) {
if (key.size() < kTSLength) {
return Status::Corruption("Bad timestamp in key");
}
const char* pos = key.data() + key.size() - 8;
int64_t timestamp = 0;
if (port::kLittleEndian) {
int bytes_to_fill = 8;
for (int i = 0; i < bytes_to_fill; ++i) {
timestamp |= (static_cast<uint64_t>(static_cast<unsigned char>(pos[i]))
<< ((bytes_to_fill - i - 1) << 3));
}
} else {
memcpy(&timestamp, pos, sizeof(timestamp));
}
*result = timestamp;
return Status::OK();
}
Status DateTieredDBImpl::CreateColumnFamily(
ColumnFamilyHandle** column_family) {
int64_t curtime;
Status s;
mutex_.AssertHeld();
s = db_->GetEnv()->GetCurrentTime(&curtime);
if (!s.ok()) {
return s;
}
int64_t new_timebound;
if (handle_map_.empty()) {
new_timebound = curtime + column_family_interval_;
} else {
new_timebound =
latest_timebound_ +
((curtime - latest_timebound_) / column_family_interval_ + 1) *
column_family_interval_;
}
std::string cf_name = ToString(new_timebound);
latest_timebound_ = new_timebound;
s = db_->CreateColumnFamily(cf_options_, cf_name, column_family);
if (s.ok()) {
handle_map_.insert(std::make_pair(new_timebound, *column_family));
}
return s;
}
Status DateTieredDBImpl::FindColumnFamily(int64_t keytime,
ColumnFamilyHandle** column_family,
bool create_if_missing) {
*column_family = nullptr;
{
InstrumentedMutexLock l(&mutex_);
auto iter = handle_map_.upper_bound(keytime);
if (iter == handle_map_.end()) {
if (!create_if_missing) {
return Status::NotFound();
} else {
return CreateColumnFamily(column_family);
}
}
// Move to previous element to get the appropriate time window
*column_family = iter->second;
}
return Status::OK();
}
Status DateTieredDBImpl::Put(const WriteOptions& options, const Slice& key,
const Slice& val) {
int64_t timestamp = 0;
Status s;
s = GetTimestamp(key, &timestamp);
if (!s.ok()) {
return s;
}
DropObsoleteColumnFamilies();
// Prune request to obsolete data
if (IsStale(timestamp, ttl_, db_->GetEnv())) {
return Status::InvalidArgument();
}
// Decide column family (i.e. the time window) to put into
ColumnFamilyHandle* column_family;
s = FindColumnFamily(timestamp, &column_family, true /*create_if_missing*/);
if (!s.ok()) {
return s;
}
// Efficiently put with WriteBatch
WriteBatch batch;
batch.Put(column_family, key, val);
return Write(options, &batch);
}
Status DateTieredDBImpl::Get(const ReadOptions& options, const Slice& key,
std::string* value) {
int64_t timestamp = 0;
Status s;
s = GetTimestamp(key, &timestamp);
if (!s.ok()) {
return s;
}
// Prune request to obsolete data
if (IsStale(timestamp, ttl_, db_->GetEnv())) {
return Status::NotFound();
}
// Decide column family to get from
ColumnFamilyHandle* column_family;
s = FindColumnFamily(timestamp, &column_family, false /*create_if_missing*/);
if (!s.ok()) {
return s;
}
if (column_family == nullptr) {
// Cannot find column family
return Status::NotFound();
}
// Get value with key
return db_->Get(options, column_family, key, value);
}
bool DateTieredDBImpl::KeyMayExist(const ReadOptions& options, const Slice& key,
std::string* value, bool* value_found) {
int64_t timestamp = 0;
Status s;
s = GetTimestamp(key, &timestamp);
if (!s.ok()) {
// Cannot get current time
return false;
}
// Decide column family to get from
ColumnFamilyHandle* column_family;
s = FindColumnFamily(timestamp, &column_family, false /*create_if_missing*/);
if (!s.ok() || column_family == nullptr) {
// Cannot find column family
return false;
}
if (IsStale(timestamp, ttl_, db_->GetEnv())) {
return false;
}
return db_->KeyMayExist(options, column_family, key, value, value_found);
}
Status DateTieredDBImpl::Delete(const WriteOptions& options, const Slice& key) {
int64_t timestamp = 0;
Status s;
s = GetTimestamp(key, &timestamp);
if (!s.ok()) {
return s;
}
DropObsoleteColumnFamilies();
// Prune request to obsolete data
if (IsStale(timestamp, ttl_, db_->GetEnv())) {
return Status::NotFound();
}
// Decide column family to get from
ColumnFamilyHandle* column_family;
s = FindColumnFamily(timestamp, &column_family, false /*create_if_missing*/);
if (!s.ok()) {
return s;
}
if (column_family == nullptr) {
// Cannot find column family
return Status::NotFound();
}
// Get value with key
return db_->Delete(options, column_family, key);
}
Status DateTieredDBImpl::Merge(const WriteOptions& options, const Slice& key,
const Slice& value) {
// Decide column family to get from
int64_t timestamp = 0;
Status s;
s = GetTimestamp(key, &timestamp);
if (!s.ok()) {
// Cannot get current time
return s;
}
ColumnFamilyHandle* column_family;
s = FindColumnFamily(timestamp, &column_family, true /*create_if_missing*/);
if (!s.ok()) {
return s;
}
WriteBatch batch;
batch.Merge(column_family, key, value);
return Write(options, &batch);
}
Status DateTieredDBImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
class Handler : public WriteBatch::Handler {
public:
explicit Handler() {}
WriteBatch updates_ttl;
Status batch_rewrite_status;
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
WriteBatchInternal::Put(&updates_ttl, column_family_id, key, value);
return Status::OK();
}
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
WriteBatchInternal::Merge(&updates_ttl, column_family_id, key, value);
return Status::OK();
}
virtual Status DeleteCF(uint32_t column_family_id,
const Slice& key) override {
WriteBatchInternal::Delete(&updates_ttl, column_family_id, key);
return Status::OK();
}
virtual void LogData(const Slice& blob) override {
updates_ttl.PutLogData(blob);
}
};
Handler handler;
updates->Iterate(&handler);
if (!handler.batch_rewrite_status.ok()) {
return handler.batch_rewrite_status;
} else {
return db_->Write(opts, &(handler.updates_ttl));
}
}
Iterator* DateTieredDBImpl::NewIterator(const ReadOptions& opts) {
if (handle_map_.empty()) {
return NewEmptyIterator();
}
DBImpl* db_impl = reinterpret_cast<DBImpl*>(db_);
auto db_iter = NewArenaWrappedDbIterator(
db_impl->GetEnv(), ioptions_, cf_options_.comparator, kMaxSequenceNumber,
cf_options_.max_sequential_skip_in_iterations, 0);
auto arena = db_iter->GetArena();
MergeIteratorBuilder builder(cf_options_.comparator, arena);
for (auto& item : handle_map_) {
auto handle = item.second;
builder.AddIterator(db_impl->NewInternalIterator(arena, handle));
}
auto internal_iter = builder.Finish();
db_iter->SetIterUnderDBIter(internal_iter);
return db_iter;
}
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -0,0 +1,88 @@
// Copyright (c) 2011-present, Facebook, Inc. All rights reserved.
// This source code is licensed under the BSD-style license found in the
// LICENSE file in the root directory of this source tree. An additional grant
// of patent rights can be found in the PATENTS file in the same directory.
#pragma once
#ifndef ROCKSDB_LITE
#include <map>
#include <string>
#include <vector>
#include "rocksdb/db.h"
#include "rocksdb/utilities/date_tiered_db.h"
#include "util/instrumented_mutex.h"
namespace rocksdb {
// Implementation of DateTieredDB.
class DateTieredDBImpl : public DateTieredDB {
public:
DateTieredDBImpl(DB* db, Options options,
const std::vector<ColumnFamilyDescriptor>& descriptors,
const std::vector<ColumnFamilyHandle*>& handles, int64_t ttl,
int64_t column_family_interval);
virtual ~DateTieredDBImpl();
Status Put(const WriteOptions& options, const Slice& key,
const Slice& val) override;
Status Get(const ReadOptions& options, const Slice& key,
std::string* value) override;
Status Delete(const WriteOptions& options, const Slice& key) override;
bool KeyMayExist(const ReadOptions& options, const Slice& key,
std::string* value, bool* value_found = nullptr) override;
Status Merge(const WriteOptions& options, const Slice& key,
const Slice& value) override;
Iterator* NewIterator(const ReadOptions& opts) override;
Status DropObsoleteColumnFamilies() override;
// Extract timestamp from key.
static Status GetTimestamp(const Slice& key, int64_t* result);
private:
// Base database object
DB* db_;
const ColumnFamilyOptions cf_options_;
const ImmutableCFOptions ioptions_;
// Storing all column family handles for time series data.
std::vector<ColumnFamilyHandle*> handles_;
// Manages a mapping from a column family's maximum timestamp to its handle.
std::map<int64_t, ColumnFamilyHandle*> handle_map_;
// A time-to-live value to indicate when the data should be removed.
int64_t ttl_;
// An variable to indicate the time range of a column family.
int64_t column_family_interval_;
// Indicate largest maximum timestamp of a column family.
int64_t latest_timebound_;
// Mutex to protect handle_map_ operations.
InstrumentedMutex mutex_;
// Internal method to execute Put and Merge in batch.
Status Write(const WriteOptions& opts, WriteBatch* updates);
Status CreateColumnFamily(ColumnFamilyHandle** column_family);
Status FindColumnFamily(int64_t keytime, ColumnFamilyHandle** column_family,
bool create_if_missing);
static bool IsStale(int64_t keytime, int64_t ttl, Env* env);
};
} // namespace rocksdb
#endif // ROCKSDB_LITE

@ -0,0 +1,467 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifndef ROCKSDB_LITE
#ifndef OS_WIN
#include <unistd.h>
#endif
#include <map>
#include <memory>
#include "rocksdb/compaction_filter.h"
#include "rocksdb/utilities/date_tiered_db.h"
#include "util/logging.h"
#include "util/testharness.h"
namespace rocksdb {
namespace {
typedef std::map<std::string, std::string> KVMap;
}
class SpecialTimeEnv : public EnvWrapper {
public:
explicit SpecialTimeEnv(Env* base) : EnvWrapper(base) {
base->GetCurrentTime(&current_time_);
}
void Sleep(int64_t sleep_time) { current_time_ += sleep_time; }
virtual Status GetCurrentTime(int64_t* current_time) override {
*current_time = current_time_;
return Status::OK();
}
private:
int64_t current_time_;
};
class DateTieredTest : public testing::Test {
public:
DateTieredTest() {
env_.reset(new SpecialTimeEnv(Env::Default()));
dbname_ = test::TmpDir() + "/date_tiered";
options_.create_if_missing = true;
options_.env = env_.get();
date_tiered_db_.reset(nullptr);
DestroyDB(dbname_, Options());
}
~DateTieredTest() {
CloseDateTieredDB();
DestroyDB(dbname_, Options());
}
void OpenDateTieredDB(int64_t ttl, int64_t column_family_interval,
bool read_only = false) {
ASSERT_TRUE(date_tiered_db_.get() == nullptr);
DateTieredDB* date_tiered_db = nullptr;
ASSERT_OK(DateTieredDB::Open(options_, dbname_, &date_tiered_db, ttl,
column_family_interval, read_only));
date_tiered_db_.reset(date_tiered_db);
}
void CloseDateTieredDB() { date_tiered_db_.reset(nullptr); }
Status AppendTimestamp(std::string* key) {
char ts[8];
int bytes_to_fill = 8;
int64_t timestamp_value = 0;
Status s = env_->GetCurrentTime(&timestamp_value);
if (!s.ok()) {
return s;
}
if (port::kLittleEndian) {
for (int i = 0; i < bytes_to_fill; ++i) {
ts[i] = (timestamp_value >> ((bytes_to_fill - i - 1) << 3)) & 0xFF;
}
} else {
memcpy(ts, static_cast<void*>(&timestamp_value), bytes_to_fill);
}
key->append(ts, 8);
return Status::OK();
}
// Populates and returns a kv-map
void MakeKVMap(int64_t num_entries, KVMap* kvmap) {
kvmap->clear();
int digits = 1;
for (int64_t dummy = num_entries; dummy /= 10; ++digits) {
}
int digits_in_i = 1;
for (int64_t i = 0; i < num_entries; i++) {
std::string key = "key";
std::string value = "value";
if (i % 10 == 0) {
digits_in_i++;
}
for (int j = digits_in_i; j < digits; j++) {
key.append("0");
value.append("0");
}
AppendNumberTo(&key, i);
AppendNumberTo(&value, i);
ASSERT_OK(AppendTimestamp(&key));
(*kvmap)[key] = value;
}
// check all insertions done
ASSERT_EQ(num_entries, static_cast<int64_t>(kvmap->size()));
}
size_t GetColumnFamilyCount() {
DBOptions db_options(options_);
std::vector<std::string> cf;
DB::ListColumnFamilies(db_options, dbname_, &cf);
return cf.size();
}
void Sleep(int64_t sleep_time) { env_->Sleep(sleep_time); }
static const int64_t kSampleSize_ = 100;
std::string dbname_;
std::unique_ptr<DateTieredDB> date_tiered_db_;
std::unique_ptr<SpecialTimeEnv> env_;
KVMap kvmap_;
private:
Options options_;
KVMap::iterator kv_it_;
const std::string kNewValue_ = "new_value";
unique_ptr<CompactionFilter> test_comp_filter_;
};
// Puts a set of values and checks its presence using Get during ttl
TEST_F(DateTieredTest, KeyLifeCycle) {
WriteOptions wopts;
ReadOptions ropts;
// T=0, open the database and insert data
OpenDateTieredDB(2, 2);
ASSERT_TRUE(date_tiered_db_.get() != nullptr);
// Create key value pairs to insert
KVMap map_insert;
MakeKVMap(kSampleSize_, &map_insert);
// Put data in database
for (auto& kv : map_insert) {
ASSERT_OK(date_tiered_db_->Put(wopts, kv.first, kv.second));
}
Sleep(1);
// T=1, keys should still reside in database
for (auto& kv : map_insert) {
std::string value;
ASSERT_OK(date_tiered_db_->Get(ropts, kv.first, &value));
ASSERT_EQ(value, kv.second);
}
Sleep(1);
// T=2, keys should not be retrieved
for (auto& kv : map_insert) {
std::string value;
auto s = date_tiered_db_->Get(ropts, kv.first, &value);
ASSERT_TRUE(s.IsNotFound());
}
CloseDateTieredDB();
}
TEST_F(DateTieredTest, DeleteTest) {
WriteOptions wopts;
ReadOptions ropts;
// T=0, open the database and insert data
OpenDateTieredDB(2, 2);
ASSERT_TRUE(date_tiered_db_.get() != nullptr);
// Create key value pairs to insert
KVMap map_insert;
MakeKVMap(kSampleSize_, &map_insert);
// Put data in database
for (auto& kv : map_insert) {
ASSERT_OK(date_tiered_db_->Put(wopts, kv.first, kv.second));
}
Sleep(1);
// Delete keys when they are not obsolete
for (auto& kv : map_insert) {
ASSERT_OK(date_tiered_db_->Delete(wopts, kv.first));
}
// Key should not be found
for (auto& kv : map_insert) {
std::string value;
auto s = date_tiered_db_->Get(ropts, kv.first, &value);
ASSERT_TRUE(s.IsNotFound());
}
}
TEST_F(DateTieredTest, KeyMayExistTest) {
WriteOptions wopts;
ReadOptions ropts;
// T=0, open the database and insert data
OpenDateTieredDB(2, 2);
ASSERT_TRUE(date_tiered_db_.get() != nullptr);
// Create key value pairs to insert
KVMap map_insert;
MakeKVMap(kSampleSize_, &map_insert);
// Put data in database
for (auto& kv : map_insert) {
ASSERT_OK(date_tiered_db_->Put(wopts, kv.first, kv.second));
}
Sleep(1);
// T=1, keys should still reside in database
for (auto& kv : map_insert) {
std::string value;
ASSERT_TRUE(date_tiered_db_->KeyMayExist(ropts, kv.first, &value));
ASSERT_EQ(value, kv.second);
}
}
// Database open and close should not affect
TEST_F(DateTieredTest, MultiOpen) {
WriteOptions wopts;
ReadOptions ropts;
// T=0, open the database and insert data
OpenDateTieredDB(4, 4);
ASSERT_TRUE(date_tiered_db_.get() != nullptr);
// Create key value pairs to insert
KVMap map_insert;
MakeKVMap(kSampleSize_, &map_insert);
// Put data in database
for (auto& kv : map_insert) {
ASSERT_OK(date_tiered_db_->Put(wopts, kv.first, kv.second));
}
CloseDateTieredDB();
Sleep(1);
OpenDateTieredDB(2, 2);
// T=1, keys should still reside in database
for (auto& kv : map_insert) {
std::string value;
ASSERT_OK(date_tiered_db_->Get(ropts, kv.first, &value));
ASSERT_EQ(value, kv.second);
}
Sleep(1);
// T=2, keys should not be retrieved
for (auto& kv : map_insert) {
std::string value;
auto s = date_tiered_db_->Get(ropts, kv.first, &value);
ASSERT_TRUE(s.IsNotFound());
}
CloseDateTieredDB();
}
// If the key in Put() is obsolete, the data should not be written into database
TEST_F(DateTieredTest, InsertObsoleteDate) {
WriteOptions wopts;
ReadOptions ropts;
// T=0, open the database and insert data
OpenDateTieredDB(2, 2);
ASSERT_TRUE(date_tiered_db_.get() != nullptr);
// Create key value pairs to insert
KVMap map_insert;
MakeKVMap(kSampleSize_, &map_insert);
Sleep(2);
// T=2, keys put into database are already obsolete
// Put data in database. Operations should not return OK
for (auto& kv : map_insert) {
auto s = date_tiered_db_->Put(wopts, kv.first, kv.second);
ASSERT_TRUE(s.IsInvalidArgument());
}
// Data should not be found in database
for (auto& kv : map_insert) {
std::string value;
auto s = date_tiered_db_->Get(ropts, kv.first, &value);
ASSERT_TRUE(s.IsNotFound());
}
CloseDateTieredDB();
}
// Resets the timestamp of a set of kvs by updating them and checks that they
// are not deleted according to the old timestamp
TEST_F(DateTieredTest, ColumnFamilyCounts) {
WriteOptions wopts;
ReadOptions ropts;
// T=0, open the database and insert data
OpenDateTieredDB(4, 2);
ASSERT_TRUE(date_tiered_db_.get() != nullptr);
// Only default column family
ASSERT_EQ(1, GetColumnFamilyCount());
// Create key value pairs to insert
KVMap map_insert;
MakeKVMap(kSampleSize_, &map_insert);
for (auto& kv : map_insert) {
ASSERT_OK(date_tiered_db_->Put(wopts, kv.first, kv.second));
}
// A time series column family is created
ASSERT_EQ(2, GetColumnFamilyCount());
Sleep(2);
KVMap map_insert2;
MakeKVMap(kSampleSize_, &map_insert2);
for (auto& kv : map_insert2) {
ASSERT_OK(date_tiered_db_->Put(wopts, kv.first, kv.second));
}
// Another time series column family is created
ASSERT_EQ(3, GetColumnFamilyCount());
Sleep(4);
// Data should not be found in database
for (auto& kv : map_insert) {
std::string value;
auto s = date_tiered_db_->Get(ropts, kv.first, &value);
ASSERT_TRUE(s.IsNotFound());
}
// Explicitly drop obsolete column families
date_tiered_db_->DropObsoleteColumnFamilies();
// The first column family is deleted from database
ASSERT_EQ(2, GetColumnFamilyCount());
CloseDateTieredDB();
}
// Puts a set of values and checks its presence using iterator during ttl
TEST_F(DateTieredTest, IteratorLifeCycle) {
WriteOptions wopts;
ReadOptions ropts;
// T=0, open the database and insert data
OpenDateTieredDB(2, 2);
ASSERT_TRUE(date_tiered_db_.get() != nullptr);
// Create key value pairs to insert
KVMap map_insert;
MakeKVMap(kSampleSize_, &map_insert);
Iterator* dbiter;
// Put data in database
for (auto& kv : map_insert) {
ASSERT_OK(date_tiered_db_->Put(wopts, kv.first, kv.second));
}
Sleep(1);
ASSERT_EQ(2, GetColumnFamilyCount());
// T=1, keys should still reside in database
dbiter = date_tiered_db_->NewIterator(ropts);
dbiter->SeekToFirst();
for (auto& kv : map_insert) {
ASSERT_TRUE(dbiter->Valid());
ASSERT_EQ(0, dbiter->value().compare(kv.second));
dbiter->Next();
}
delete dbiter;
Sleep(4);
// T=5, keys should not be retrieved
for (auto& kv : map_insert) {
std::string value;
auto s = date_tiered_db_->Get(ropts, kv.first, &value);
ASSERT_TRUE(s.IsNotFound());
}
// Explicitly drop obsolete column families
date_tiered_db_->DropObsoleteColumnFamilies();
// Only default column family
ASSERT_EQ(1, GetColumnFamilyCount());
// Empty iterator
dbiter = date_tiered_db_->NewIterator(ropts);
dbiter->Seek(map_insert.begin()->first);
ASSERT_FALSE(dbiter->Valid());
delete dbiter;
CloseDateTieredDB();
}
// Iterator should be able to merge data from multiple column families
TEST_F(DateTieredTest, IteratorMerge) {
WriteOptions wopts;
ReadOptions ropts;
// T=0, open the database and insert data
OpenDateTieredDB(4, 2);
ASSERT_TRUE(date_tiered_db_.get() != nullptr);
Iterator* dbiter;
// Put data in database
KVMap map_insert1;
MakeKVMap(kSampleSize_, &map_insert1);
for (auto& kv : map_insert1) {
ASSERT_OK(date_tiered_db_->Put(wopts, kv.first, kv.second));
}
ASSERT_EQ(2, GetColumnFamilyCount());
Sleep(2);
// Put more data
KVMap map_insert2;
MakeKVMap(kSampleSize_, &map_insert2);
for (auto& kv : map_insert2) {
ASSERT_OK(date_tiered_db_->Put(wopts, kv.first, kv.second));
}
// Multiple column families for time series data
ASSERT_EQ(3, GetColumnFamilyCount());
// Iterator should be able to merge data from different column families
dbiter = date_tiered_db_->NewIterator(ropts);
dbiter->SeekToFirst();
KVMap::iterator iter1 = map_insert1.begin();
KVMap::iterator iter2 = map_insert2.begin();
for (; iter1 != map_insert1.end() && iter2 != map_insert2.end();
iter1++, iter2++) {
ASSERT_TRUE(dbiter->Valid());
ASSERT_EQ(0, dbiter->value().compare(iter1->second));
dbiter->Next();
ASSERT_TRUE(dbiter->Valid());
ASSERT_EQ(0, dbiter->value().compare(iter2->second));
dbiter->Next();
}
delete dbiter;
CloseDateTieredDB();
}
} // namespace rocksdb
// A black-box test for the DateTieredDB around rocksdb
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#else
#include <stdio.h>
int main(int argc, char** argv) {
fprintf(stderr, "SKIPPED as DateTieredDB is not supported in ROCKSDB_LITE\n");
return 0;
}
#endif // !ROCKSDB_LITE
Loading…
Cancel
Save