From 3992aec8fad301bbbd73d5abe4005878403009ed Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Tue, 22 Apr 2014 11:27:33 -0700 Subject: [PATCH] Support for column families in TTL DB Summary: This will enable people using TTL DB to do so with multiple column families. They can also specify different TTLs for each one. TODO: Implement CreateColumnFamily() in TTL world. Test Plan: Added a very simple sanity test. Reviewers: dhruba, haobo, ljin, sdong, yhchiang Reviewed By: haobo CC: leveldb, alberts Differential Revision: https://reviews.facebook.net/D17859 --- db/write_batch.cc | 95 +++++++++++++++------------- db/write_batch_internal.h | 13 ++++ include/rocksdb/write_batch.h | 4 +- include/utilities/utility_db.h | 15 ++++- utilities/ttl/db_ttl.cc | 75 +++++++++++++++++----- utilities/ttl/db_ttl.h | 6 +- utilities/ttl/ttl_test.cc | 111 ++++++++++++++++++++++++++------- 7 files changed, 227 insertions(+), 92 deletions(-) diff --git a/db/write_batch.cc b/db/write_batch.cc index 8fffdbfbd..734d1e376 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -173,78 +173,85 @@ void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) { EncodeFixed64(&b->rep_[0], seq); } -void WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key, - const Slice& value) { +void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id, + const Slice& key, const Slice& value) { + WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); + if (column_family_id == 0) { + b->rep_.push_back(static_cast(kTypeValue)); + } else { + b->rep_.push_back(static_cast(kTypeColumnFamilyValue)); + PutVarint32(&b->rep_, column_family_id); + } + PutLengthPrefixedSlice(&b->rep_, key); + PutLengthPrefixedSlice(&b->rep_, value); +} + +namespace { +inline uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) { uint32_t column_family_id = 0; if (column_family != nullptr) { auto cfh = reinterpret_cast(column_family); column_family_id = cfh->GetID(); } + return column_family_id; +} +} // namespace + +void WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key, + const Slice& value) { + WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key, value); +} - WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); +void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id, + const SliceParts& key, const SliceParts& value) { + WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); if (column_family_id == 0) { - rep_.push_back(static_cast(kTypeValue)); + b->rep_.push_back(static_cast(kTypeValue)); } else { - rep_.push_back(static_cast(kTypeColumnFamilyValue)); - PutVarint32(&rep_, column_family_id); + b->rep_.push_back(static_cast(kTypeColumnFamilyValue)); + PutVarint32(&b->rep_, column_family_id); } - PutLengthPrefixedSlice(&rep_, key); - PutLengthPrefixedSlice(&rep_, value); + PutLengthPrefixedSliceParts(&b->rep_, key); + PutLengthPrefixedSliceParts(&b->rep_, value); } void WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& key, const SliceParts& value) { - uint32_t column_family_id = 0; - if (column_family != nullptr) { - auto cfh = reinterpret_cast(column_family); - column_family_id = cfh->GetID(); - } + WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key, value); +} - WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); +void WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id, + const Slice& key) { + WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); if (column_family_id == 0) { - rep_.push_back(static_cast(kTypeValue)); + b->rep_.push_back(static_cast(kTypeDeletion)); } else { - rep_.push_back(static_cast(kTypeColumnFamilyValue)); - PutVarint32(&rep_, column_family_id); + b->rep_.push_back(static_cast(kTypeColumnFamilyDeletion)); + PutVarint32(&b->rep_, column_family_id); } - PutLengthPrefixedSliceParts(&rep_, key); - PutLengthPrefixedSliceParts(&rep_, value); + PutLengthPrefixedSlice(&b->rep_, key); } void WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key) { - uint32_t column_family_id = 0; - if (column_family != nullptr) { - auto cfh = reinterpret_cast(column_family); - column_family_id = cfh->GetID(); - } + WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family), key); +} - WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); +void WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id, + const Slice& key, const Slice& value) { + WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1); if (column_family_id == 0) { - rep_.push_back(static_cast(kTypeDeletion)); + b->rep_.push_back(static_cast(kTypeMerge)); } else { - rep_.push_back(static_cast(kTypeColumnFamilyDeletion)); - PutVarint32(&rep_, column_family_id); + b->rep_.push_back(static_cast(kTypeColumnFamilyMerge)); + PutVarint32(&b->rep_, column_family_id); } - PutLengthPrefixedSlice(&rep_, key); + PutLengthPrefixedSlice(&b->rep_, key); + PutLengthPrefixedSlice(&b->rep_, value); } void WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { - uint32_t column_family_id = 0; - if (column_family != nullptr) { - auto cfh = reinterpret_cast(column_family); - column_family_id = cfh->GetID(); - } - - WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); - if (column_family_id == 0) { - rep_.push_back(static_cast(kTypeMerge)); - } else { - rep_.push_back(static_cast(kTypeColumnFamilyMerge)); - PutVarint32(&rep_, column_family_id); - } - PutLengthPrefixedSlice(&rep_, key); - PutLengthPrefixedSlice(&rep_, value); + WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), key, value); } void WriteBatch::PutLogData(const Slice& blob) { diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 793ee3e0e..85e85b33d 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -64,6 +64,19 @@ class ColumnFamilyMemTablesDefault : public ColumnFamilyMemTables { // WriteBatch that we don't want in the public WriteBatch interface. class WriteBatchInternal { public: + // WriteBatch methods with column_family_id instead of ColumnFamilyHandle* + static void Put(WriteBatch* batch, uint32_t column_family_id, + const Slice& key, const Slice& value); + + static void Put(WriteBatch* batch, uint32_t column_family_id, + const SliceParts& key, const SliceParts& value); + + static void Delete(WriteBatch* batch, uint32_t column_family_id, + const Slice& key); + + static void Merge(WriteBatch* batch, uint32_t column_family_id, + const Slice& key, const Slice& value); + // Return the number of entries in the batch. static int Count(const WriteBatch* batch); diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index 60817056f..74ee2ad16 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -65,9 +65,7 @@ class WriteBatch { // If the database contains a mapping for "key", erase it. Else do nothing. void Delete(ColumnFamilyHandle* column_family, const Slice& key); - void Delete(const Slice& key) { - Delete(nullptr, key); - } + void Delete(const Slice& key) { Delete(nullptr, key); } // Append a blob of arbitrary size to the records in this batch. The blob will // be stored in the transaction log but not in any other file. In particular, diff --git a/include/utilities/utility_db.h b/include/utilities/utility_db.h index ddec2b0b4..0d2e11fa8 100644 --- a/include/utilities/utility_db.h +++ b/include/utilities/utility_db.h @@ -2,9 +2,13 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. -#ifndef ROCKSDB_LITE #pragma once -#include "stackable_db.h" +#ifndef ROCKSDB_LITE +#include +#include + +#include "utilities/stackable_db.h" +#include "rocksdb/db.h" namespace rocksdb { @@ -46,6 +50,13 @@ class UtilityDB { StackableDB** dbptr, int32_t ttl = 0, bool read_only = false); + + // OpenTtlDB with column family support + static Status OpenTtlDB( + const DBOptions& db_options, const std::string& name, + const std::vector& column_families, + std::vector* handles, StackableDB** dbptr, + std::vector ttls, bool read_only = false); }; } // namespace rocksdb diff --git a/utilities/ttl/db_ttl.cc b/utilities/ttl/db_ttl.cc index ba5d1241a..fef2ec021 100644 --- a/utilities/ttl/db_ttl.cc +++ b/utilities/ttl/db_ttl.cc @@ -5,13 +5,14 @@ #include "utilities/ttl/db_ttl.h" #include "db/filename.h" +#include "db/write_batch_internal.h" #include "util/coding.h" #include "include/rocksdb/env.h" #include "include/rocksdb/iterator.h" namespace rocksdb { -void DBWithTTL::SanitizeOptions(int32_t ttl, Options* options) { +void DBWithTTL::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options) { if (options->compaction_filter) { options->compaction_filter = new TtlCompactionFilter(ttl, options->compaction_filter); @@ -40,20 +41,53 @@ Status UtilityDB::OpenTtlDB( StackableDB** dbptr, int32_t ttl, bool read_only) { - Status st; - Options options_to_open = options; - DBWithTTL::SanitizeOptions(ttl, &options_to_open); + + DBOptions db_options(options); + ColumnFamilyOptions cf_options(options); + std::vector column_families; + column_families.push_back( + ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options)); + std::vector handles; + Status s = UtilityDB::OpenTtlDB(db_options, dbname, column_families, &handles, + dbptr, {ttl}, read_only); + if (s.ok()) { + assert(handles.size() == 1); + // i can delete the handle since DBImpl is always holding a reference to + // default column family + delete handles[0]; + } + return s; +} + +Status UtilityDB::OpenTtlDB( + const DBOptions& db_options, const std::string& dbname, + const std::vector& column_families, + std::vector* handles, StackableDB** dbptr, + std::vector ttls, bool read_only) { + + if (ttls.size() != column_families.size()) { + return Status::InvalidArgument( + "ttls size has to be the same as number of column families"); + } + + std::vector column_families_sanitized = + column_families; + for (size_t i = 0; i < column_families_sanitized.size(); ++i) { + DBWithTTL::SanitizeOptions(ttls[i], &column_families_sanitized[i].options); + } DB* db; + Status st; if (read_only) { - st = DB::OpenForReadOnly(options_to_open, dbname, &db); + st = DB::OpenForReadOnly(db_options, dbname, column_families_sanitized, + handles, &db); } else { - st = DB::Open(options_to_open, dbname, &db); + st = DB::Open(db_options, dbname, column_families_sanitized, handles, &db); } if (st.ok()) { *dbptr = new DBWithTTL(db); } else { - delete db; + *dbptr = nullptr; } return st; } @@ -124,14 +158,14 @@ Status DBWithTTL::Put(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& key, const Slice& val) { WriteBatch batch; - batch.Put(key, val); + batch.Put(column_family, key, val); return Write(options, &batch); } Status DBWithTTL::Get(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, std::string* value) { - Status st = db_->Get(options, key, value); + Status st = db_->Get(options, column_family, key, value); if (!st.ok()) { return st; } @@ -154,7 +188,7 @@ std::vector DBWithTTL::MultiGet( bool DBWithTTL::KeyMayExist(const ReadOptions& options, ColumnFamilyHandle* column_family, const Slice& key, std::string* value, bool* value_found) { - bool ret = db_->KeyMayExist(options, key, value, value_found); + bool ret = db_->KeyMayExist(options, column_family, key, value, value_found); if (ret && value != nullptr && value_found != nullptr && *value_found) { if (!SanityCheckTimestamp(*value).ok() || !StripTS(value).ok()) { return false; @@ -167,7 +201,7 @@ Status DBWithTTL::Merge(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& key, const Slice& value) { WriteBatch batch; - batch.Merge(key, value); + batch.Merge(column_family, key, value); return Write(options, &batch); } @@ -176,26 +210,33 @@ Status DBWithTTL::Write(const WriteOptions& opts, WriteBatch* updates) { public: WriteBatch updates_ttl; Status batch_rewrite_status; - virtual void Put(const Slice& key, const Slice& value) { + virtual Status PutCF(uint32_t column_family_id, const Slice& key, + const Slice& value) { std::string value_with_ts; Status st = AppendTS(value, value_with_ts); if (!st.ok()) { batch_rewrite_status = st; } else { - updates_ttl.Put(key, value_with_ts); + WriteBatchInternal::Put(&updates_ttl, column_family_id, key, + value_with_ts); } + return Status::OK(); } - virtual void Merge(const Slice& key, const Slice& value) { + virtual Status MergeCF(uint32_t column_family_id, const Slice& key, + const Slice& value) { std::string value_with_ts; Status st = AppendTS(value, value_with_ts); if (!st.ok()) { batch_rewrite_status = st; } else { - updates_ttl.Merge(key, value_with_ts); + WriteBatchInternal::Merge(&updates_ttl, column_family_id, key, + value_with_ts); } + return Status::OK(); } - virtual void Delete(const Slice& key) { - updates_ttl.Delete(key); + virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) { + WriteBatchInternal::Delete(&updates_ttl, column_family_id, key); + return Status::OK(); } virtual void LogData(const Slice& blob) { updates_ttl.PutLogData(blob); diff --git a/utilities/ttl/db_ttl.h b/utilities/ttl/db_ttl.h index 3ed6f7ea3..28fd3b41b 100644 --- a/utilities/ttl/db_ttl.h +++ b/utilities/ttl/db_ttl.h @@ -2,10 +2,12 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. -#ifndef ROCKSDB_LITE #pragma once + +#ifndef ROCKSDB_LITE #include #include +#include #include "rocksdb/db.h" #include "rocksdb/env.h" @@ -18,7 +20,7 @@ namespace rocksdb { class DBWithTTL : public StackableDB { public: - static void SanitizeOptions(int32_t ttl, Options* options); + static void SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options); explicit DBWithTTL(DB* db); diff --git a/utilities/ttl/ttl_test.cc b/utilities/ttl/ttl_test.cc index 789128729..660ebf780 100644 --- a/utilities/ttl/ttl_test.cc +++ b/utilities/ttl/ttl_test.cc @@ -43,13 +43,14 @@ class TtlTest { // Open database with TTL support when TTL not provided with db_ttl_ pointer void OpenTtl() { - assert(db_ttl_ == nullptr); // db should be closed before opening again + ASSERT_TRUE(db_ttl_ == + nullptr); // db should be closed before opening again ASSERT_OK(UtilityDB::OpenTtlDB(options_, dbname_, &db_ttl_)); } // Open database with TTL support when TTL provided with db_ttl_ pointer void OpenTtl(int32_t ttl) { - assert(db_ttl_ == nullptr); + ASSERT_TRUE(db_ttl_ == nullptr); ASSERT_OK(UtilityDB::OpenTtlDB(options_, dbname_, &db_ttl_, ttl)); } @@ -63,7 +64,7 @@ class TtlTest { // Open database with TTL support in read_only mode void OpenReadOnlyTtl(int32_t ttl) { - assert(db_ttl_ == nullptr); + ASSERT_TRUE(db_ttl_ == nullptr); ASSERT_OK(UtilityDB::OpenTtlDB(options_, dbname_, &db_ttl_, ttl, true)); } @@ -97,7 +98,7 @@ class TtlTest { // Makes a write-batch with key-vals from kvmap_ and 'Write''s it void MakePutWriteBatch(const BatchOperation* batch_ops, int num_ops) { - assert(num_ops <= (int)kvmap_.size()); + ASSERT_LE(num_ops, (int)kvmap_.size()); static WriteOptions wopts; static FlushOptions flush_opts; WriteBatch batch; @@ -111,7 +112,7 @@ class TtlTest { batch.Delete(kv_it_->first); break; default: - assert(false); + ASSERT_TRUE(false); } } db_ttl_->Write(wopts, &batch); @@ -119,26 +120,38 @@ class TtlTest { } // Puts num_entries starting from start_pos_map from kvmap_ into the database - void PutValues(int start_pos_map, int num_entries, bool flush = true) { - assert(db_ttl_); + void PutValues(int start_pos_map, int num_entries, bool flush = true, + ColumnFamilyHandle* cf = nullptr) { + ASSERT_TRUE(db_ttl_); ASSERT_LE(start_pos_map + num_entries, (int)kvmap_.size()); static WriteOptions wopts; static FlushOptions flush_opts; kv_it_ = kvmap_.begin(); advance(kv_it_, start_pos_map); for (int i = 0; kv_it_ != kvmap_.end() && i < num_entries; i++, kv_it_++) { - ASSERT_OK(db_ttl_->Put(wopts, kv_it_->first, kv_it_->second)); + ASSERT_OK(cf == nullptr + ? db_ttl_->Put(wopts, kv_it_->first, kv_it_->second) + : db_ttl_->Put(wopts, cf, kv_it_->first, kv_it_->second)); } // Put a mock kv at the end because CompactionFilter doesn't delete last key - ASSERT_OK(db_ttl_->Put(wopts, "keymock", "valuemock")); + ASSERT_OK(cf == nullptr ? db_ttl_->Put(wopts, "keymock", "valuemock") + : db_ttl_->Put(wopts, cf, "keymock", "valuemock")); if (flush) { - db_ttl_->Flush(flush_opts); + if (cf == nullptr) { + db_ttl_->Flush(flush_opts); + } else { + db_ttl_->Flush(flush_opts, cf); + } } } // Runs a manual compaction - void ManualCompact() { - db_ttl_->CompactRange(nullptr, nullptr); + void ManualCompact(ColumnFamilyHandle* cf = nullptr) { + if (cf == nullptr) { + db_ttl_->CompactRange(nullptr, nullptr); + } else { + db_ttl_->CompactRange(cf, nullptr, nullptr); + } } // checks the whole kvmap_ to return correct values using KeyMayExist @@ -151,12 +164,12 @@ class TtlTest { if (ret == false || value_found == false) { fprintf(stderr, "KeyMayExist could not find key=%s in the database but" " should have\n", kv.first.c_str()); - assert(false); + ASSERT_TRUE(false); } else if (val.compare(kv.second) != 0) { fprintf(stderr, " value for key=%s present in database is %s but" " should be %s\n", kv.first.c_str(), val.c_str(), kv.second.c_str()); - assert(false); + ASSERT_TRUE(false); } } } @@ -167,16 +180,18 @@ class TtlTest { // Also checks that value that we got is the same as inserted; and =kNewValue // if test_compaction_change is true void SleepCompactCheck(int slp_tim, int st_pos, int span, bool check = true, - bool test_compaction_change = false) { - assert(db_ttl_); + bool test_compaction_change = false, + ColumnFamilyHandle* cf = nullptr) { + ASSERT_TRUE(db_ttl_); sleep(slp_tim); - ManualCompact(); + ManualCompact(cf); static ReadOptions ropts; kv_it_ = kvmap_.begin(); advance(kv_it_, st_pos); std::string v; for (int i = 0; kv_it_ != kvmap_.end() && i < span; i++, kv_it_++) { - Status s = db_ttl_->Get(ropts, kv_it_->first, &v); + Status s = (cf == nullptr) ? db_ttl_->Get(ropts, kv_it_->first, &v) + : db_ttl_->Get(ropts, cf, kv_it_->first, &v); if (s.ok() != check) { fprintf(stderr, "key=%s ", kv_it_->first.c_str()); if (!s.ok()) { @@ -184,18 +199,18 @@ class TtlTest { } else { fprintf(stderr, "is present in db but was expected to be absent\n"); } - assert(false); + ASSERT_TRUE(false); } else if (s.ok()) { if (test_compaction_change && v.compare(kNewValue_) != 0) { fprintf(stderr, " value for key=%s present in database is %s but " " should be %s\n", kv_it_->first.c_str(), v.c_str(), kNewValue_.c_str()); - assert(false); + ASSERT_TRUE(false); } else if (!test_compaction_change && v.compare(kv_it_->second) !=0) { fprintf(stderr, " value for key=%s present in database is %s but " " should be %s\n", kv_it_->first.c_str(), v.c_str(), kv_it_->second.c_str()); - assert(false); + ASSERT_TRUE(false); } } } @@ -203,7 +218,7 @@ class TtlTest { // Similar as SleepCompactCheck but uses TtlIterator to read from db void SleepCompactCheckIter(int slp, int st_pos, int span, bool check=true) { - assert(db_ttl_); + ASSERT_TRUE(db_ttl_); sleep(slp); ManualCompact(); static ReadOptions ropts; @@ -301,10 +316,10 @@ class TtlTest { // Choose carefully so that Put, Gets & Compaction complete in 1 second buffer const int64_t kSampleSize_ = 100; - - private: std::string dbname_; StackableDB* db_ttl_; + + private: Options options_; KVMap kvmap_; KVMap::iterator kv_it_; @@ -496,6 +511,54 @@ TEST(TtlTest, KeyMayExist) { CloseTtl(); } +TEST(TtlTest, ColumnFamiliesTest) { + DB* db; + Options options; + options.create_if_missing = true; + + DB::Open(options, dbname_, &db); + ColumnFamilyHandle* handle; + ASSERT_OK(db->CreateColumnFamily(ColumnFamilyOptions(options), + "ttl_column_family", &handle)); + + delete handle; + delete db; + + std::vector column_families; + column_families.push_back(ColumnFamilyDescriptor( + kDefaultColumnFamilyName, ColumnFamilyOptions(options))); + column_families.push_back(ColumnFamilyDescriptor( + "ttl_column_family", ColumnFamilyOptions(options))); + + std::vector handles; + + ASSERT_OK(UtilityDB::OpenTtlDB(DBOptions(options), dbname_, column_families, + &handles, &db_ttl_, {2, 4}, false)); + ASSERT_EQ(handles.size(), 2U); + + MakeKVMap(kSampleSize_); + PutValues(0, kSampleSize_, false, handles[0]); + PutValues(0, kSampleSize_, false, handles[1]); + + // everything should be there after 1 second + SleepCompactCheck(1, 0, kSampleSize_, true, false, handles[0]); + SleepCompactCheck(0, 0, kSampleSize_, true, false, handles[1]); + + // only column family 1 should be alive after 3 seconds + SleepCompactCheck(2, 0, kSampleSize_, false, false, handles[0]); + SleepCompactCheck(0, 0, kSampleSize_, true, false, handles[1]); + + // nothing should be there after 5 seconds + SleepCompactCheck(2, 0, kSampleSize_, false, false, handles[0]); + SleepCompactCheck(0, 0, kSampleSize_, false, false, handles[1]); + + for (auto h : handles) { + delete h; + } + delete db_ttl_; + db_ttl_ = nullptr; +} + } // namespace rocksdb // A black-box test for the ttl wrapper around rocksdb