Support for column families in TTL DB

Summary:
This will enable people using TTL DB to do so with multiple column families. They can also specify different TTLs for each one.

TODO: Implement CreateColumnFamily() in TTL world.

Test Plan: Added a very simple sanity test.

Reviewers: dhruba, haobo, ljin, sdong, yhchiang

Reviewed By: haobo

CC: leveldb, alberts

Differential Revision: https://reviews.facebook.net/D17859
main
Igor Canadi 11 years ago
parent e557297acc
commit 3992aec8fa
  1. 95
      db/write_batch.cc
  2. 13
      db/write_batch_internal.h
  3. 4
      include/rocksdb/write_batch.h
  4. 15
      include/utilities/utility_db.h
  5. 75
      utilities/ttl/db_ttl.cc
  6. 6
      utilities/ttl/db_ttl.h
  7. 111
      utilities/ttl/ttl_test.cc

@ -173,78 +173,85 @@ void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
EncodeFixed64(&b->rep_[0], seq); EncodeFixed64(&b->rep_[0], seq);
} }
void WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key, void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
const Slice& value) { const Slice& key, const Slice& value) {
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeValue));
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSlice(&b->rep_, key);
PutLengthPrefixedSlice(&b->rep_, value);
}
namespace {
inline uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
uint32_t column_family_id = 0; uint32_t column_family_id = 0;
if (column_family != nullptr) { if (column_family != nullptr) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
column_family_id = cfh->GetID(); column_family_id = cfh->GetID();
} }
return column_family_id;
}
} // namespace
void WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) {
WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key, value);
}
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
const SliceParts& key, const SliceParts& value) {
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) { if (column_family_id == 0) {
rep_.push_back(static_cast<char>(kTypeValue)); b->rep_.push_back(static_cast<char>(kTypeValue));
} else { } else {
rep_.push_back(static_cast<char>(kTypeColumnFamilyValue)); b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
PutVarint32(&rep_, column_family_id); PutVarint32(&b->rep_, column_family_id);
} }
PutLengthPrefixedSlice(&rep_, key); PutLengthPrefixedSliceParts(&b->rep_, key);
PutLengthPrefixedSlice(&rep_, value); PutLengthPrefixedSliceParts(&b->rep_, value);
} }
void WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& key, void WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& key,
const SliceParts& value) { const SliceParts& value) {
uint32_t column_family_id = 0; WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key, value);
if (column_family != nullptr) { }
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
column_family_id = cfh->GetID();
}
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); void WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
const Slice& key) {
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) { if (column_family_id == 0) {
rep_.push_back(static_cast<char>(kTypeValue)); b->rep_.push_back(static_cast<char>(kTypeDeletion));
} else { } else {
rep_.push_back(static_cast<char>(kTypeColumnFamilyValue)); b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
PutVarint32(&rep_, column_family_id); PutVarint32(&b->rep_, column_family_id);
} }
PutLengthPrefixedSliceParts(&rep_, key); PutLengthPrefixedSlice(&b->rep_, key);
PutLengthPrefixedSliceParts(&rep_, value);
} }
void WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key) { void WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key) {
uint32_t column_family_id = 0; WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family), key);
if (column_family != nullptr) { }
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
column_family_id = cfh->GetID();
}
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); void WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
const Slice& key, const Slice& value) {
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) { if (column_family_id == 0) {
rep_.push_back(static_cast<char>(kTypeDeletion)); b->rep_.push_back(static_cast<char>(kTypeMerge));
} else { } else {
rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion)); b->rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge));
PutVarint32(&rep_, column_family_id); PutVarint32(&b->rep_, column_family_id);
} }
PutLengthPrefixedSlice(&rep_, key); PutLengthPrefixedSlice(&b->rep_, key);
PutLengthPrefixedSlice(&b->rep_, value);
} }
void WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key, void WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) { const Slice& value) {
uint32_t column_family_id = 0; WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), key, value);
if (column_family != nullptr) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
column_family_id = cfh->GetID();
}
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
if (column_family_id == 0) {
rep_.push_back(static_cast<char>(kTypeMerge));
} else {
rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge));
PutVarint32(&rep_, column_family_id);
}
PutLengthPrefixedSlice(&rep_, key);
PutLengthPrefixedSlice(&rep_, value);
} }
void WriteBatch::PutLogData(const Slice& blob) { void WriteBatch::PutLogData(const Slice& blob) {

@ -64,6 +64,19 @@ class ColumnFamilyMemTablesDefault : public ColumnFamilyMemTables {
// WriteBatch that we don't want in the public WriteBatch interface. // WriteBatch that we don't want in the public WriteBatch interface.
class WriteBatchInternal { class WriteBatchInternal {
public: public:
// WriteBatch methods with column_family_id instead of ColumnFamilyHandle*
static void Put(WriteBatch* batch, uint32_t column_family_id,
const Slice& key, const Slice& value);
static void Put(WriteBatch* batch, uint32_t column_family_id,
const SliceParts& key, const SliceParts& value);
static void Delete(WriteBatch* batch, uint32_t column_family_id,
const Slice& key);
static void Merge(WriteBatch* batch, uint32_t column_family_id,
const Slice& key, const Slice& value);
// Return the number of entries in the batch. // Return the number of entries in the batch.
static int Count(const WriteBatch* batch); static int Count(const WriteBatch* batch);

@ -65,9 +65,7 @@ class WriteBatch {
// If the database contains a mapping for "key", erase it. Else do nothing. // If the database contains a mapping for "key", erase it. Else do nothing.
void Delete(ColumnFamilyHandle* column_family, const Slice& key); void Delete(ColumnFamilyHandle* column_family, const Slice& key);
void Delete(const Slice& key) { void Delete(const Slice& key) { Delete(nullptr, key); }
Delete(nullptr, key);
}
// Append a blob of arbitrary size to the records in this batch. The blob will // Append a blob of arbitrary size to the records in this batch. The blob will
// be stored in the transaction log but not in any other file. In particular, // be stored in the transaction log but not in any other file. In particular,

@ -2,9 +2,13 @@
// Use of this source code is governed by a BSD-style license that can be // Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifndef ROCKSDB_LITE
#pragma once #pragma once
#include "stackable_db.h" #ifndef ROCKSDB_LITE
#include <vector>
#include <string>
#include "utilities/stackable_db.h"
#include "rocksdb/db.h"
namespace rocksdb { namespace rocksdb {
@ -46,6 +50,13 @@ class UtilityDB {
StackableDB** dbptr, StackableDB** dbptr,
int32_t ttl = 0, int32_t ttl = 0,
bool read_only = false); bool read_only = false);
// OpenTtlDB with column family support
static Status OpenTtlDB(
const DBOptions& db_options, const std::string& name,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, StackableDB** dbptr,
std::vector<int32_t> ttls, bool read_only = false);
}; };
} // namespace rocksdb } // namespace rocksdb

@ -5,13 +5,14 @@
#include "utilities/ttl/db_ttl.h" #include "utilities/ttl/db_ttl.h"
#include "db/filename.h" #include "db/filename.h"
#include "db/write_batch_internal.h"
#include "util/coding.h" #include "util/coding.h"
#include "include/rocksdb/env.h" #include "include/rocksdb/env.h"
#include "include/rocksdb/iterator.h" #include "include/rocksdb/iterator.h"
namespace rocksdb { namespace rocksdb {
void DBWithTTL::SanitizeOptions(int32_t ttl, Options* options) { void DBWithTTL::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options) {
if (options->compaction_filter) { if (options->compaction_filter) {
options->compaction_filter = options->compaction_filter =
new TtlCompactionFilter(ttl, options->compaction_filter); new TtlCompactionFilter(ttl, options->compaction_filter);
@ -40,20 +41,53 @@ Status UtilityDB::OpenTtlDB(
StackableDB** dbptr, StackableDB** dbptr,
int32_t ttl, int32_t ttl,
bool read_only) { bool read_only) {
Status st;
Options options_to_open = options; DBOptions db_options(options);
DBWithTTL::SanitizeOptions(ttl, &options_to_open); ColumnFamilyOptions cf_options(options);
std::vector<ColumnFamilyDescriptor> column_families;
column_families.push_back(
ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
std::vector<ColumnFamilyHandle*> handles;
Status s = UtilityDB::OpenTtlDB(db_options, dbname, column_families, &handles,
dbptr, {ttl}, read_only);
if (s.ok()) {
assert(handles.size() == 1);
// i can delete the handle since DBImpl is always holding a reference to
// default column family
delete handles[0];
}
return s;
}
Status UtilityDB::OpenTtlDB(
const DBOptions& db_options, const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, StackableDB** dbptr,
std::vector<int32_t> ttls, bool read_only) {
if (ttls.size() != column_families.size()) {
return Status::InvalidArgument(
"ttls size has to be the same as number of column families");
}
std::vector<ColumnFamilyDescriptor> column_families_sanitized =
column_families;
for (size_t i = 0; i < column_families_sanitized.size(); ++i) {
DBWithTTL::SanitizeOptions(ttls[i], &column_families_sanitized[i].options);
}
DB* db; DB* db;
Status st;
if (read_only) { if (read_only) {
st = DB::OpenForReadOnly(options_to_open, dbname, &db); st = DB::OpenForReadOnly(db_options, dbname, column_families_sanitized,
handles, &db);
} else { } else {
st = DB::Open(options_to_open, dbname, &db); st = DB::Open(db_options, dbname, column_families_sanitized, handles, &db);
} }
if (st.ok()) { if (st.ok()) {
*dbptr = new DBWithTTL(db); *dbptr = new DBWithTTL(db);
} else { } else {
delete db; *dbptr = nullptr;
} }
return st; return st;
} }
@ -124,14 +158,14 @@ Status DBWithTTL::Put(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key, ColumnFamilyHandle* column_family, const Slice& key,
const Slice& val) { const Slice& val) {
WriteBatch batch; WriteBatch batch;
batch.Put(key, val); batch.Put(column_family, key, val);
return Write(options, &batch); return Write(options, &batch);
} }
Status DBWithTTL::Get(const ReadOptions& options, Status DBWithTTL::Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key, ColumnFamilyHandle* column_family, const Slice& key,
std::string* value) { std::string* value) {
Status st = db_->Get(options, key, value); Status st = db_->Get(options, column_family, key, value);
if (!st.ok()) { if (!st.ok()) {
return st; return st;
} }
@ -154,7 +188,7 @@ std::vector<Status> DBWithTTL::MultiGet(
bool DBWithTTL::KeyMayExist(const ReadOptions& options, bool DBWithTTL::KeyMayExist(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key, ColumnFamilyHandle* column_family, const Slice& key,
std::string* value, bool* value_found) { std::string* value, bool* value_found) {
bool ret = db_->KeyMayExist(options, key, value, value_found); bool ret = db_->KeyMayExist(options, column_family, key, value, value_found);
if (ret && value != nullptr && value_found != nullptr && *value_found) { if (ret && value != nullptr && value_found != nullptr && *value_found) {
if (!SanityCheckTimestamp(*value).ok() || !StripTS(value).ok()) { if (!SanityCheckTimestamp(*value).ok() || !StripTS(value).ok()) {
return false; return false;
@ -167,7 +201,7 @@ Status DBWithTTL::Merge(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key, ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) { const Slice& value) {
WriteBatch batch; WriteBatch batch;
batch.Merge(key, value); batch.Merge(column_family, key, value);
return Write(options, &batch); return Write(options, &batch);
} }
@ -176,26 +210,33 @@ Status DBWithTTL::Write(const WriteOptions& opts, WriteBatch* updates) {
public: public:
WriteBatch updates_ttl; WriteBatch updates_ttl;
Status batch_rewrite_status; Status batch_rewrite_status;
virtual void Put(const Slice& key, const Slice& value) { virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
std::string value_with_ts; std::string value_with_ts;
Status st = AppendTS(value, value_with_ts); Status st = AppendTS(value, value_with_ts);
if (!st.ok()) { if (!st.ok()) {
batch_rewrite_status = st; batch_rewrite_status = st;
} else { } else {
updates_ttl.Put(key, value_with_ts); WriteBatchInternal::Put(&updates_ttl, column_family_id, key,
value_with_ts);
} }
return Status::OK();
} }
virtual void Merge(const Slice& key, const Slice& value) { virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
std::string value_with_ts; std::string value_with_ts;
Status st = AppendTS(value, value_with_ts); Status st = AppendTS(value, value_with_ts);
if (!st.ok()) { if (!st.ok()) {
batch_rewrite_status = st; batch_rewrite_status = st;
} else { } else {
updates_ttl.Merge(key, value_with_ts); WriteBatchInternal::Merge(&updates_ttl, column_family_id, key,
value_with_ts);
} }
return Status::OK();
} }
virtual void Delete(const Slice& key) { virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) {
updates_ttl.Delete(key); WriteBatchInternal::Delete(&updates_ttl, column_family_id, key);
return Status::OK();
} }
virtual void LogData(const Slice& blob) { virtual void LogData(const Slice& blob) {
updates_ttl.PutLogData(blob); updates_ttl.PutLogData(blob);

@ -2,10 +2,12 @@
// Use of this source code is governed by a BSD-style license that can be // Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifndef ROCKSDB_LITE
#pragma once #pragma once
#ifndef ROCKSDB_LITE
#include <deque> #include <deque>
#include <string> #include <string>
#include <vector>
#include "rocksdb/db.h" #include "rocksdb/db.h"
#include "rocksdb/env.h" #include "rocksdb/env.h"
@ -18,7 +20,7 @@ namespace rocksdb {
class DBWithTTL : public StackableDB { class DBWithTTL : public StackableDB {
public: public:
static void SanitizeOptions(int32_t ttl, Options* options); static void SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options);
explicit DBWithTTL(DB* db); explicit DBWithTTL(DB* db);

@ -43,13 +43,14 @@ class TtlTest {
// Open database with TTL support when TTL not provided with db_ttl_ pointer // Open database with TTL support when TTL not provided with db_ttl_ pointer
void OpenTtl() { void OpenTtl() {
assert(db_ttl_ == nullptr); // db should be closed before opening again ASSERT_TRUE(db_ttl_ ==
nullptr); // db should be closed before opening again
ASSERT_OK(UtilityDB::OpenTtlDB(options_, dbname_, &db_ttl_)); ASSERT_OK(UtilityDB::OpenTtlDB(options_, dbname_, &db_ttl_));
} }
// Open database with TTL support when TTL provided with db_ttl_ pointer // Open database with TTL support when TTL provided with db_ttl_ pointer
void OpenTtl(int32_t ttl) { void OpenTtl(int32_t ttl) {
assert(db_ttl_ == nullptr); ASSERT_TRUE(db_ttl_ == nullptr);
ASSERT_OK(UtilityDB::OpenTtlDB(options_, dbname_, &db_ttl_, ttl)); ASSERT_OK(UtilityDB::OpenTtlDB(options_, dbname_, &db_ttl_, ttl));
} }
@ -63,7 +64,7 @@ class TtlTest {
// Open database with TTL support in read_only mode // Open database with TTL support in read_only mode
void OpenReadOnlyTtl(int32_t ttl) { void OpenReadOnlyTtl(int32_t ttl) {
assert(db_ttl_ == nullptr); ASSERT_TRUE(db_ttl_ == nullptr);
ASSERT_OK(UtilityDB::OpenTtlDB(options_, dbname_, &db_ttl_, ttl, true)); ASSERT_OK(UtilityDB::OpenTtlDB(options_, dbname_, &db_ttl_, ttl, true));
} }
@ -97,7 +98,7 @@ class TtlTest {
// Makes a write-batch with key-vals from kvmap_ and 'Write''s it // Makes a write-batch with key-vals from kvmap_ and 'Write''s it
void MakePutWriteBatch(const BatchOperation* batch_ops, int num_ops) { void MakePutWriteBatch(const BatchOperation* batch_ops, int num_ops) {
assert(num_ops <= (int)kvmap_.size()); ASSERT_LE(num_ops, (int)kvmap_.size());
static WriteOptions wopts; static WriteOptions wopts;
static FlushOptions flush_opts; static FlushOptions flush_opts;
WriteBatch batch; WriteBatch batch;
@ -111,7 +112,7 @@ class TtlTest {
batch.Delete(kv_it_->first); batch.Delete(kv_it_->first);
break; break;
default: default:
assert(false); ASSERT_TRUE(false);
} }
} }
db_ttl_->Write(wopts, &batch); db_ttl_->Write(wopts, &batch);
@ -119,26 +120,38 @@ class TtlTest {
} }
// Puts num_entries starting from start_pos_map from kvmap_ into the database // Puts num_entries starting from start_pos_map from kvmap_ into the database
void PutValues(int start_pos_map, int num_entries, bool flush = true) { void PutValues(int start_pos_map, int num_entries, bool flush = true,
assert(db_ttl_); ColumnFamilyHandle* cf = nullptr) {
ASSERT_TRUE(db_ttl_);
ASSERT_LE(start_pos_map + num_entries, (int)kvmap_.size()); ASSERT_LE(start_pos_map + num_entries, (int)kvmap_.size());
static WriteOptions wopts; static WriteOptions wopts;
static FlushOptions flush_opts; static FlushOptions flush_opts;
kv_it_ = kvmap_.begin(); kv_it_ = kvmap_.begin();
advance(kv_it_, start_pos_map); advance(kv_it_, start_pos_map);
for (int i = 0; kv_it_ != kvmap_.end() && i < num_entries; i++, kv_it_++) { for (int i = 0; kv_it_ != kvmap_.end() && i < num_entries; i++, kv_it_++) {
ASSERT_OK(db_ttl_->Put(wopts, kv_it_->first, kv_it_->second)); ASSERT_OK(cf == nullptr
? db_ttl_->Put(wopts, kv_it_->first, kv_it_->second)
: db_ttl_->Put(wopts, cf, kv_it_->first, kv_it_->second));
} }
// Put a mock kv at the end because CompactionFilter doesn't delete last key // Put a mock kv at the end because CompactionFilter doesn't delete last key
ASSERT_OK(db_ttl_->Put(wopts, "keymock", "valuemock")); ASSERT_OK(cf == nullptr ? db_ttl_->Put(wopts, "keymock", "valuemock")
: db_ttl_->Put(wopts, cf, "keymock", "valuemock"));
if (flush) { if (flush) {
db_ttl_->Flush(flush_opts); if (cf == nullptr) {
db_ttl_->Flush(flush_opts);
} else {
db_ttl_->Flush(flush_opts, cf);
}
} }
} }
// Runs a manual compaction // Runs a manual compaction
void ManualCompact() { void ManualCompact(ColumnFamilyHandle* cf = nullptr) {
db_ttl_->CompactRange(nullptr, nullptr); if (cf == nullptr) {
db_ttl_->CompactRange(nullptr, nullptr);
} else {
db_ttl_->CompactRange(cf, nullptr, nullptr);
}
} }
// checks the whole kvmap_ to return correct values using KeyMayExist // checks the whole kvmap_ to return correct values using KeyMayExist
@ -151,12 +164,12 @@ class TtlTest {
if (ret == false || value_found == false) { if (ret == false || value_found == false) {
fprintf(stderr, "KeyMayExist could not find key=%s in the database but" fprintf(stderr, "KeyMayExist could not find key=%s in the database but"
" should have\n", kv.first.c_str()); " should have\n", kv.first.c_str());
assert(false); ASSERT_TRUE(false);
} else if (val.compare(kv.second) != 0) { } else if (val.compare(kv.second) != 0) {
fprintf(stderr, " value for key=%s present in database is %s but" fprintf(stderr, " value for key=%s present in database is %s but"
" should be %s\n", kv.first.c_str(), val.c_str(), " should be %s\n", kv.first.c_str(), val.c_str(),
kv.second.c_str()); kv.second.c_str());
assert(false); ASSERT_TRUE(false);
} }
} }
} }
@ -167,16 +180,18 @@ class TtlTest {
// Also checks that value that we got is the same as inserted; and =kNewValue // Also checks that value that we got is the same as inserted; and =kNewValue
// if test_compaction_change is true // if test_compaction_change is true
void SleepCompactCheck(int slp_tim, int st_pos, int span, bool check = true, void SleepCompactCheck(int slp_tim, int st_pos, int span, bool check = true,
bool test_compaction_change = false) { bool test_compaction_change = false,
assert(db_ttl_); ColumnFamilyHandle* cf = nullptr) {
ASSERT_TRUE(db_ttl_);
sleep(slp_tim); sleep(slp_tim);
ManualCompact(); ManualCompact(cf);
static ReadOptions ropts; static ReadOptions ropts;
kv_it_ = kvmap_.begin(); kv_it_ = kvmap_.begin();
advance(kv_it_, st_pos); advance(kv_it_, st_pos);
std::string v; std::string v;
for (int i = 0; kv_it_ != kvmap_.end() && i < span; i++, kv_it_++) { for (int i = 0; kv_it_ != kvmap_.end() && i < span; i++, kv_it_++) {
Status s = db_ttl_->Get(ropts, kv_it_->first, &v); Status s = (cf == nullptr) ? db_ttl_->Get(ropts, kv_it_->first, &v)
: db_ttl_->Get(ropts, cf, kv_it_->first, &v);
if (s.ok() != check) { if (s.ok() != check) {
fprintf(stderr, "key=%s ", kv_it_->first.c_str()); fprintf(stderr, "key=%s ", kv_it_->first.c_str());
if (!s.ok()) { if (!s.ok()) {
@ -184,18 +199,18 @@ class TtlTest {
} else { } else {
fprintf(stderr, "is present in db but was expected to be absent\n"); fprintf(stderr, "is present in db but was expected to be absent\n");
} }
assert(false); ASSERT_TRUE(false);
} else if (s.ok()) { } else if (s.ok()) {
if (test_compaction_change && v.compare(kNewValue_) != 0) { if (test_compaction_change && v.compare(kNewValue_) != 0) {
fprintf(stderr, " value for key=%s present in database is %s but " fprintf(stderr, " value for key=%s present in database is %s but "
" should be %s\n", kv_it_->first.c_str(), v.c_str(), " should be %s\n", kv_it_->first.c_str(), v.c_str(),
kNewValue_.c_str()); kNewValue_.c_str());
assert(false); ASSERT_TRUE(false);
} else if (!test_compaction_change && v.compare(kv_it_->second) !=0) { } else if (!test_compaction_change && v.compare(kv_it_->second) !=0) {
fprintf(stderr, " value for key=%s present in database is %s but " fprintf(stderr, " value for key=%s present in database is %s but "
" should be %s\n", kv_it_->first.c_str(), v.c_str(), " should be %s\n", kv_it_->first.c_str(), v.c_str(),
kv_it_->second.c_str()); kv_it_->second.c_str());
assert(false); ASSERT_TRUE(false);
} }
} }
} }
@ -203,7 +218,7 @@ class TtlTest {
// Similar as SleepCompactCheck but uses TtlIterator to read from db // Similar as SleepCompactCheck but uses TtlIterator to read from db
void SleepCompactCheckIter(int slp, int st_pos, int span, bool check=true) { void SleepCompactCheckIter(int slp, int st_pos, int span, bool check=true) {
assert(db_ttl_); ASSERT_TRUE(db_ttl_);
sleep(slp); sleep(slp);
ManualCompact(); ManualCompact();
static ReadOptions ropts; static ReadOptions ropts;
@ -301,10 +316,10 @@ class TtlTest {
// Choose carefully so that Put, Gets & Compaction complete in 1 second buffer // Choose carefully so that Put, Gets & Compaction complete in 1 second buffer
const int64_t kSampleSize_ = 100; const int64_t kSampleSize_ = 100;
private:
std::string dbname_; std::string dbname_;
StackableDB* db_ttl_; StackableDB* db_ttl_;
private:
Options options_; Options options_;
KVMap kvmap_; KVMap kvmap_;
KVMap::iterator kv_it_; KVMap::iterator kv_it_;
@ -496,6 +511,54 @@ TEST(TtlTest, KeyMayExist) {
CloseTtl(); CloseTtl();
} }
TEST(TtlTest, ColumnFamiliesTest) {
DB* db;
Options options;
options.create_if_missing = true;
DB::Open(options, dbname_, &db);
ColumnFamilyHandle* handle;
ASSERT_OK(db->CreateColumnFamily(ColumnFamilyOptions(options),
"ttl_column_family", &handle));
delete handle;
delete db;
std::vector<ColumnFamilyDescriptor> column_families;
column_families.push_back(ColumnFamilyDescriptor(
kDefaultColumnFamilyName, ColumnFamilyOptions(options)));
column_families.push_back(ColumnFamilyDescriptor(
"ttl_column_family", ColumnFamilyOptions(options)));
std::vector<ColumnFamilyHandle*> handles;
ASSERT_OK(UtilityDB::OpenTtlDB(DBOptions(options), dbname_, column_families,
&handles, &db_ttl_, {2, 4}, false));
ASSERT_EQ(handles.size(), 2U);
MakeKVMap(kSampleSize_);
PutValues(0, kSampleSize_, false, handles[0]);
PutValues(0, kSampleSize_, false, handles[1]);
// everything should be there after 1 second
SleepCompactCheck(1, 0, kSampleSize_, true, false, handles[0]);
SleepCompactCheck(0, 0, kSampleSize_, true, false, handles[1]);
// only column family 1 should be alive after 3 seconds
SleepCompactCheck(2, 0, kSampleSize_, false, false, handles[0]);
SleepCompactCheck(0, 0, kSampleSize_, true, false, handles[1]);
// nothing should be there after 5 seconds
SleepCompactCheck(2, 0, kSampleSize_, false, false, handles[0]);
SleepCompactCheck(0, 0, kSampleSize_, false, false, handles[1]);
for (auto h : handles) {
delete h;
}
delete db_ttl_;
db_ttl_ = nullptr;
}
} // namespace rocksdb } // namespace rocksdb
// A black-box test for the ttl wrapper around rocksdb // A black-box test for the ttl wrapper around rocksdb

Loading…
Cancel
Save