Expose base db object from ttl wrapper

Summary: rocksdb replicaiton will need this when writing value+TS from master to slave 'as is'

Test Plan: make

Reviewers: dhruba, vamsi, haobo

Reviewed By: dhruba

CC: leveldb

Differential Revision: https://reviews.facebook.net/D11919
main
Mayank Agarwal 11 years ago
parent 1036537c94
commit 1d7b4765c3
  1. 4
      db/merge_test.cc
  2. 154
      include/utilities/stackable_db.h
  3. 4
      include/utilities/utility_db.h
  4. 4
      tools/db_stress.cc
  5. 6
      util/ldb_cmd.cc
  6. 6
      util/ldb_cmd.h
  7. 6
      utilities/ttl/db_ttl.cc
  8. 7
      utilities/ttl/db_ttl.h
  9. 2
      utilities/ttl/ttl_test.cc

@ -20,6 +20,7 @@ auto mergeOperator = MergeOperators::CreateUInt64AddOperator();
std::shared_ptr<DB> OpenDb(const string& dbname, const bool ttl = false) { std::shared_ptr<DB> OpenDb(const string& dbname, const bool ttl = false) {
DB* db; DB* db;
StackableDB* sdb;
Options options; Options options;
options.create_if_missing = true; options.create_if_missing = true;
options.merge_operator = mergeOperator.get(); options.merge_operator = mergeOperator.get();
@ -27,7 +28,8 @@ std::shared_ptr<DB> OpenDb(const string& dbname, const bool ttl = false) {
DestroyDB(dbname, Options()); DestroyDB(dbname, Options());
if (ttl) { if (ttl) {
cout << "Opening database with TTL\n"; cout << "Opening database with TTL\n";
s = UtilityDB::OpenTtlDB(options, test::TmpDir() + "/merge_testdbttl", &db); s = UtilityDB::OpenTtlDB(options, test::TmpDir() + "/merge_testdbttl",&sdb);
db = sdb;
} else { } else {
s = DB::Open(options, test::TmpDir() + "/merge_testdb", &db); s = DB::Open(options, test::TmpDir() + "/merge_testdb", &db);
} }

@ -0,0 +1,154 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifndef LEVELDB_INCLUDE_UTILITIES_STACKABLE_DB_H_
#define LEVELDB_INCLUDE_UTILITIES_STACKABLE_DB_H_
#include "leveldb/db.h"
namespace leveldb {
// This class contains APIs to stack rocksdb wrappers.Eg. Stack TTL over base d
class StackableDB : public DB {
public:
explicit StackableDB(StackableDB* sdb) : sdb_(sdb) {}
// Returns the DB object that is the lowermost component in the stack of DBs
virtual DB* GetRawDB() {
return sdb_->GetRawDB();
}
// convert a DB to StackableDB
static StackableDB* DBToStackableDB(DB* db) {
class NewStackableDB : public StackableDB {
public:
NewStackableDB(DB* db)
: StackableDB(nullptr),
db_(db) {}
DB* GetRawDB() {
return db_;
}
private:
DB* db_;
};
return new NewStackableDB(db);
}
virtual Status Put(const WriteOptions& options,
const Slice& key,
const Slice& val) override {
return sdb_->Put(options, key, val);
}
virtual Status Get(const ReadOptions& options,
const Slice& key,
std::string* value) override {
return sdb_->Get(options, key, value);
}
virtual std::vector<Status> MultiGet(const ReadOptions& options,
const std::vector<Slice>& keys,
std::vector<std::string>* values)
override {
return sdb_->MultiGet(options, keys, values);
}
virtual bool KeyMayExist(const ReadOptions& options,
const Slice& key,
std::string* value,
bool* value_found = nullptr) override {
return KeyMayExist(options, key, value, value_found);
}
virtual Status Delete(const WriteOptions& wopts, const Slice& key) override {
return sdb_->Delete(wopts, key);
}
virtual Status Merge(const WriteOptions& options,
const Slice& key,
const Slice& value) override {
return sdb_->Merge(options, key, value);
}
virtual Status Write(const WriteOptions& opts, WriteBatch* updates)
override {
return sdb_->Write(opts, updates);
}
virtual Iterator* NewIterator(const ReadOptions& opts) override {
return sdb_->NewIterator(opts);
}
virtual const Snapshot* GetSnapshot() override {
return sdb_->GetSnapshot();
}
virtual void ReleaseSnapshot(const Snapshot* snapshot) override {
return sdb_->ReleaseSnapshot(snapshot);
}
virtual bool GetProperty(const Slice& property, std::string* value)
override {
return sdb_->GetProperty(property, value);
}
virtual void GetApproximateSizes(const Range* r, int n, uint64_t* sizes)
override {
return sdb_->GetApproximateSizes(r, n, sizes);
}
virtual void CompactRange(const Slice* begin, const Slice* end,
bool reduce_level = false) override {
return sdb_->CompactRange(begin, end, reduce_level);
}
virtual int NumberLevels() override {
return sdb_->NumberLevels();
}
virtual int MaxMemCompactionLevel() override {
return sdb_->MaxMemCompactionLevel();
}
virtual int Level0StopWriteTrigger() override {
return sdb_->Level0StopWriteTrigger();
}
virtual Status Flush(const FlushOptions& fopts) override {
return sdb_->Flush(fopts);
}
virtual Status DisableFileDeletions() override {
return sdb_->DisableFileDeletions();
}
virtual Status EnableFileDeletions() override {
return sdb_->EnableFileDeletions();
}
virtual Status GetLiveFiles(std::vector<std::string>& vec, uint64_t* mfs)
override {
return sdb_->GetLiveFiles(vec, mfs);
}
virtual SequenceNumber GetLatestSequenceNumber() override {
return sdb_->GetLatestSequenceNumber();
}
virtual Status GetUpdatesSince(SequenceNumber seq_number,
unique_ptr<TransactionLogIterator>* iter)
override {
return sdb_->GetUpdatesSince(seq_number, iter);
}
protected:
StackableDB* sdb_;
};
} // namespace leveldb
#endif // LEVELDB_INCLUDE_UTILITIES_STACKABLE_DB_H_

@ -5,7 +5,7 @@
#ifndef LEVELDB_INCLUDE_UTILITIES_UTILITY_DB_H_ #ifndef LEVELDB_INCLUDE_UTILITIES_UTILITY_DB_H_
#define LEVELDB_INCLUDE_UTILITIES_UTILITY_DB_H_ #define LEVELDB_INCLUDE_UTILITIES_UTILITY_DB_H_
#include "leveldb/db.h" #include "stackable_db.h"
namespace leveldb { namespace leveldb {
@ -44,7 +44,7 @@ class UtilityDB {
// whole database may be deleted in a small amount of time // whole database may be deleted in a small amount of time
static Status OpenTtlDB(const Options& options, static Status OpenTtlDB(const Options& options,
const std::string& name, const std::string& name,
DB** dbptr, StackableDB** dbptr,
int32_t ttl = 0, int32_t ttl = 0,
bool read_only = false); bool read_only = false);
}; };

@ -972,7 +972,8 @@ class StressTest {
if (FLAGS_ttl == -1) { if (FLAGS_ttl == -1) {
s = DB::Open(options, FLAGS_db, &db_); s = DB::Open(options, FLAGS_db, &db_);
} else { } else {
s = UtilityDB::OpenTtlDB(options, FLAGS_db, &db_, FLAGS_ttl); s = UtilityDB::OpenTtlDB(options, FLAGS_db, &sdb_, FLAGS_ttl);
db_ = sdb_;
} }
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "open error: %s\n", s.ToString().c_str()); fprintf(stderr, "open error: %s\n", s.ToString().c_str());
@ -1008,6 +1009,7 @@ class StressTest {
shared_ptr<Cache> cache_; shared_ptr<Cache> cache_;
const FilterPolicy* filter_policy_; const FilterPolicy* filter_policy_;
DB* db_; DB* db_;
StackableDB* sdb_;
int num_times_reopened_; int num_times_reopened_;
}; };

@ -821,7 +821,8 @@ void DBDumperCommand::DoCommand() {
if (max_keys == 0) if (max_keys == 0)
break; break;
if (is_db_ttl_) { if (is_db_ttl_) {
TtlIterator* it_ttl = (TtlIterator*)iter; TtlIterator* it_ttl = dynamic_cast<TtlIterator*>(iter);
assert(it_ttl);
rawtime = it_ttl->timestamp(); rawtime = it_ttl->timestamp();
if (rawtime < ttl_start || rawtime >= ttl_end) { if (rawtime < ttl_start || rawtime >= ttl_end) {
continue; continue;
@ -1342,7 +1343,8 @@ void ScanCommand::DoCommand() {
it->Next()) { it->Next()) {
string key = it->key().ToString(); string key = it->key().ToString();
if (is_db_ttl_) { if (is_db_ttl_) {
TtlIterator* it_ttl = (TtlIterator*)it; TtlIterator* it_ttl = dynamic_cast<TtlIterator*>(it);
assert(it_ttl);
int rawtime = it_ttl->timestamp(); int rawtime = it_ttl->timestamp();
if (rawtime < ttl_start || rawtime >= ttl_end) { if (rawtime < ttl_start || rawtime >= ttl_end) {
continue; continue;

@ -150,6 +150,7 @@ protected:
LDBCommandExecuteResult exec_state_; LDBCommandExecuteResult exec_state_;
string db_path_; string db_path_;
DB* db_; DB* db_;
StackableDB* sdb_;
/** /**
* true implies that this command can work if the db is opened in read-only * true implies that this command can work if the db is opened in read-only
@ -217,10 +218,11 @@ protected:
Status st; Status st;
if (is_db_ttl_) { if (is_db_ttl_) {
if (is_read_only_) { if (is_read_only_) {
st = UtilityDB::OpenTtlDB(opt, db_path_, &db_, 0, true); st = UtilityDB::OpenTtlDB(opt, db_path_, &sdb_, 0, true);
} else { } else {
st = UtilityDB::OpenTtlDB(opt, db_path_, &db_); st = UtilityDB::OpenTtlDB(opt, db_path_, &sdb_);
} }
db_ = sdb_;
} else if (is_read_only_) { } else if (is_read_only_) {
st = DB::OpenForReadOnly(opt, db_path_, &db_); st = DB::OpenForReadOnly(opt, db_path_, &db_);
} else { } else {

@ -3,7 +3,6 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "utilities/ttl/db_ttl.h" #include "utilities/ttl/db_ttl.h"
#include "include/utilities/utility_db.h"
#include "db/filename.h" #include "db/filename.h"
#include "util/coding.h" #include "util/coding.h"
#include "include/leveldb/env.h" #include "include/leveldb/env.h"
@ -17,7 +16,8 @@ DBWithTTL::DBWithTTL(const int32_t ttl,
const std::string& dbname, const std::string& dbname,
Status& st, Status& st,
bool read_only) bool read_only)
: ttl_(ttl) { : StackableDB(nullptr),
ttl_(ttl) {
Options options_to_open = options; Options options_to_open = options;
ttl_comp_filter_.reset(new TtlCompactionFilter(ttl, ttl_comp_filter_.reset(new TtlCompactionFilter(ttl,
@ -43,7 +43,7 @@ DBWithTTL::~DBWithTTL() {
Status UtilityDB::OpenTtlDB( Status UtilityDB::OpenTtlDB(
const Options& options, const Options& options,
const std::string& dbname, const std::string& dbname,
DB** dbptr, StackableDB** dbptr,
int32_t ttl, int32_t ttl,
bool read_only) { bool read_only) {
Status st; Status st;

@ -9,11 +9,12 @@
#include "leveldb/env.h" #include "leveldb/env.h"
#include "leveldb/compaction_filter.h" #include "leveldb/compaction_filter.h"
#include "leveldb/merge_operator.h" #include "leveldb/merge_operator.h"
#include "utilities/utility_db.h"
#include "db/db_impl.h" #include "db/db_impl.h"
namespace leveldb { namespace leveldb {
class DBWithTTL : public DB { class DBWithTTL : public StackableDB {
public: public:
DBWithTTL(const int32_t ttl, DBWithTTL(const int32_t ttl,
const Options& options, const Options& options,
@ -84,6 +85,10 @@ class DBWithTTL : public DB {
// Simulate a db crash, no elegant closing of database. // Simulate a db crash, no elegant closing of database.
void TEST_Destroy_DBWithTtl(); void TEST_Destroy_DBWithTtl();
virtual DB* GetRawDB() {
return db_;
}
static bool IsStale(const Slice& value, int32_t ttl); static bool IsStale(const Slice& value, int32_t ttl);
static Status AppendTS(const Slice& val, std::string& val_with_ts); static Status AppendTS(const Slice& val, std::string& val_with_ts);

@ -257,7 +257,7 @@ class TtlTest {
private: private:
std::string dbname_; std::string dbname_;
DB* db_ttl_; StackableDB* db_ttl_;
Options options_; Options options_;
KVMap kvmap_; KVMap kvmap_;
KVMap::iterator kv_it_; KVMap::iterator kv_it_;

Loading…
Cancel
Save