Made merge_oprator a shared_ptr; and added TTL unit tests

Test Plan:
- make all check;
- make release;
- make stringappend_test; ./stringappend_test

Reviewers: haobo, emayanke

Reviewed By: haobo

CC: leveldb, kailiu

Differential Revision: https://reviews.facebook.net/D12381
main
Deon Nicholas 12 years ago
parent 3ab2792f93
commit b87dcae1a3
  1. 2
      db/builder.cc
  2. 8
      db/db_bench.cc
  3. 2
      db/db_impl.cc
  4. 2
      db/db_iter.cc
  5. 5
      db/db_test.cc
  6. 2
      db/memtable.cc
  7. 3
      db/merge_test.cc
  8. 2
      db/version_set.cc
  9. 2
      include/leveldb/options.h
  10. 5
      tools/db_stress.cc
  11. 455
      utilities/merge_operators/string_append/stringappend_test.cc
  12. 4
      utilities/ttl/db_ttl.cc
  13. 5
      utilities/ttl/db_ttl.h

@ -54,7 +54,7 @@ Status BuildTable(const std::string& dbname,
meta->smallest_seqno = GetInternalKeySeqno(key); meta->smallest_seqno = GetInternalKeySeqno(key);
meta->largest_seqno = meta->smallest_seqno; meta->largest_seqno = meta->smallest_seqno;
MergeHelper merge(user_comparator, options.merge_operator, MergeHelper merge(user_comparator, options.merge_operator.get(),
options.info_log.get(), options.info_log.get(),
true /* internal key corruption is not ok */); true /* internal key corruption is not ok */);

@ -637,7 +637,6 @@ class Benchmark {
int key_size_; int key_size_;
int entries_per_batch_; int entries_per_batch_;
WriteOptions write_options_; WriteOptions write_options_;
std::shared_ptr<MergeOperator> merge_operator_;
long reads_; long reads_;
long writes_; long writes_;
long readwrites_; long readwrites_;
@ -779,7 +778,6 @@ class Benchmark {
value_size_(FLAGS_value_size), value_size_(FLAGS_value_size),
key_size_(FLAGS_key_size), key_size_(FLAGS_key_size),
entries_per_batch_(1), entries_per_batch_(1),
merge_operator_(nullptr),
reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads), reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads),
writes_(FLAGS_writes < 0 ? FLAGS_num : FLAGS_writes), writes_(FLAGS_writes < 0 ? FLAGS_num : FLAGS_writes),
readwrites_((FLAGS_writes < 0 && FLAGS_reads < 0)? FLAGS_num : readwrites_((FLAGS_writes < 0 && FLAGS_reads < 0)? FLAGS_num :
@ -1212,13 +1210,13 @@ class Benchmark {
options.bytes_per_sync = FLAGS_bytes_per_sync; options.bytes_per_sync = FLAGS_bytes_per_sync;
// merge operator options // merge operator options
merge_operator_ = MergeOperators::CreateFromStringId(FLAGS_merge_operator); options.merge_operator = MergeOperators::CreateFromStringId(
if (merge_operator_ == nullptr && !FLAGS_merge_operator.empty()) { FLAGS_merge_operator);
if (options.merge_operator == nullptr && !FLAGS_merge_operator.empty()) {
fprintf(stderr, "invalid merge operator: %s\n", fprintf(stderr, "invalid merge operator: %s\n",
FLAGS_merge_operator.c_str()); FLAGS_merge_operator.c_str());
exit(1); exit(1);
} }
options.merge_operator = merge_operator_.get();
Status s; Status s;
if(FLAGS_read_only) { if(FLAGS_read_only) {

@ -1777,7 +1777,7 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
SequenceNumber visible_in_snapshot = kMaxSequenceNumber; SequenceNumber visible_in_snapshot = kMaxSequenceNumber;
std::string compaction_filter_value; std::string compaction_filter_value;
std::vector<char> delete_key; // for compaction filter std::vector<char> delete_key; // for compaction filter
MergeHelper merge(user_comparator(), options_.merge_operator, MergeHelper merge(user_comparator(), options_.merge_operator.get(),
options_.info_log.get(), options_.info_log.get(),
false /* internal key corruption is expected */); false /* internal key corruption is expected */);
auto compaction_filter = options_.compaction_filter; auto compaction_filter = options_.compaction_filter;

@ -57,7 +57,7 @@ class DBIter: public Iterator {
env_(env), env_(env),
logger_(options.info_log), logger_(options.info_log),
user_comparator_(cmp), user_comparator_(cmp),
user_merge_operator_(options.merge_operator), user_merge_operator_(options.merge_operator.get()),
iter_(iter), iter_(iter),
sequence_(s), sequence_(s),
direction_(kForward), direction_(kForward),

@ -224,8 +224,6 @@ class DBTest {
}; };
int option_config_; int option_config_;
std::shared_ptr<MergeOperator> merge_operator_;
public: public:
std::string dbname_; std::string dbname_;
SpecialEnv* env_; SpecialEnv* env_;
@ -242,7 +240,6 @@ class DBTest {
}; };
DBTest() : option_config_(kDefault), DBTest() : option_config_(kDefault),
merge_operator_(MergeOperators::CreatePutOperator()),
env_(new SpecialEnv(Env::Default())) { env_(new SpecialEnv(Env::Default())) {
filter_policy_ = NewBloomFilterPolicy(10); filter_policy_ = NewBloomFilterPolicy(10);
dbname_ = test::TmpDir() + "/db_test"; dbname_ = test::TmpDir() + "/db_test";
@ -297,7 +294,7 @@ class DBTest {
Options options; Options options;
switch (option_config_) { switch (option_config_) {
case kMergePut: case kMergePut:
options.merge_operator = merge_operator_.get(); options.merge_operator = MergeOperators::CreatePutOperator();
break; break;
case kFilter: case kFilter:
options.filter_policy = filter_policy_; options.filter_policy = filter_policy_;

@ -139,7 +139,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s,
assert(operands != nullptr); assert(operands != nullptr);
bool merge_in_progress = s->IsMergeInProgress(); bool merge_in_progress = s->IsMergeInProgress();
auto merge_operator = options.merge_operator; auto merge_operator = options.merge_operator.get();
auto logger = options.info_log; auto logger = options.info_log;
std::string merge_result; std::string merge_result;

@ -16,14 +16,13 @@
using namespace std; using namespace std;
using namespace leveldb; using namespace leveldb;
auto mergeOperator = MergeOperators::CreateUInt64AddOperator();
std::shared_ptr<DB> OpenDb(const string& dbname, const bool ttl = false) { std::shared_ptr<DB> OpenDb(const string& dbname, const bool ttl = false) {
DB* db; DB* db;
StackableDB* sdb; StackableDB* sdb;
Options options; Options options;
options.create_if_missing = true; options.create_if_missing = true;
options.merge_operator = mergeOperator.get(); options.merge_operator = MergeOperators::CreateUInt64AddOperator();
Status s; Status s;
DestroyDB(dbname, Options()); DestroyDB(dbname, Options());
if (ttl) { if (ttl) {

@ -381,7 +381,7 @@ void Version::Get(const ReadOptions& options,
Slice user_key = k.user_key(); Slice user_key = k.user_key();
const Comparator* ucmp = vset_->icmp_.user_comparator(); const Comparator* ucmp = vset_->icmp_.user_comparator();
auto merge_operator = db_options.merge_operator; auto merge_operator = db_options.merge_operator.get();
auto logger = db_options.info_log; auto logger = db_options.info_log;
assert(status->ok() || status->IsMergeInProgress()); assert(status->ok() || status->IsMergeInProgress());

@ -85,7 +85,7 @@ struct Options {
// for the first time. It's necessary to specify a merge operator when // for the first time. It's necessary to specify a merge operator when
// openning the DB in this case. // openning the DB in this case.
// Default: nullptr // Default: nullptr
const MergeOperator* merge_operator; shared_ptr<MergeOperator> merge_operator;
// The client must provide compaction_filter_factory if it requires a new // The client must provide compaction_filter_factory if it requires a new
// compaction filter to be used for different compaction processes // compaction filter to be used for different compaction processes

@ -537,7 +537,6 @@ class StressTest {
FLAGS_test_batches_snapshots ? FLAGS_test_batches_snapshots ?
sizeof(long) : sizeof(long)-1)), sizeof(long) : sizeof(long)-1)),
db_(nullptr), db_(nullptr),
merge_operator_(MergeOperators::CreatePutOperator()),
num_times_reopened_(0) { num_times_reopened_(0) {
if (FLAGS_destroy_db_initially) { if (FLAGS_destroy_db_initially) {
std::vector<std::string> files; std::vector<std::string> files;
@ -553,7 +552,6 @@ class StressTest {
~StressTest() { ~StressTest() {
delete db_; delete db_;
merge_operator_ = nullptr;
delete filter_policy_; delete filter_policy_;
delete prefix_extractor_; delete prefix_extractor_;
} }
@ -1140,7 +1138,7 @@ class StressTest {
} }
if (FLAGS_use_merge_put) { if (FLAGS_use_merge_put) {
options.merge_operator = merge_operator_.get(); options.merge_operator = MergeOperators::CreatePutOperator();
} }
fprintf(stdout, "DB path: [%s]\n", FLAGS_db); fprintf(stdout, "DB path: [%s]\n", FLAGS_db);
@ -1188,7 +1186,6 @@ class StressTest {
const SliceTransform* prefix_extractor_; const SliceTransform* prefix_extractor_;
DB* db_; DB* db_;
StackableDB* sdb_; StackableDB* sdb_;
std::shared_ptr<MergeOperator> merge_operator_;
int num_times_reopened_; int num_times_reopened_;
}; };

@ -13,6 +13,8 @@
#include "leveldb/merge_operator.h" #include "leveldb/merge_operator.h"
#include "utilities/merge_operators.h" #include "utilities/merge_operators.h"
#include "utilities/merge_operators/string_append/stringappend.h" #include "utilities/merge_operators/string_append/stringappend.h"
#include "utilities/merge_operators/string_append/stringappend2.h"
#include "utilities/ttl/db_ttl.h"
#include "util/testharness.h" #include "util/testharness.h"
#include "util/random.h" #include "util/random.h"
@ -24,21 +26,29 @@ namespace leveldb {
const std::string kDbName = "/tmp/mergetestdb"; const std::string kDbName = "/tmp/mergetestdb";
// OpenDb opens a (possibly new) rocksdb database with a StringAppendOperator // OpenDb opens a (possibly new) rocksdb database with a StringAppendOperator
std::shared_ptr<DB> OpenDb(StringAppendOperator* append_op) { std::shared_ptr<DB> OpenNormalDb(char delim_char) {
DB* db; DB* db;
Options options; Options options;
options.create_if_missing = true; options.create_if_missing = true;
options.merge_operator = append_op; options.merge_operator.reset(new StringAppendOperator(delim_char));
Status s = DB::Open(options, kDbName, &db); ASSERT_OK(DB::Open(options, kDbName, &db));
if (!s.ok()) { return std::shared_ptr<DB>(db);
std::cerr << s.ToString() << std::endl; }
assert(false);
} // Open a TtlDB with a non-associative StringAppendTESTOperator
return std::shared_ptr<DB>(db); std::shared_ptr<DB> OpenTtlDb(char delim_char) {
StackableDB* db;
Options options;
options.create_if_missing = true;
options.merge_operator.reset(new StringAppendTESTOperator(delim_char));
Status s;
db = new DBWithTTL(123456, options, kDbName, s, false);
ASSERT_OK(s);
return std::shared_ptr<DB>(db);
} }
/// StringLists represents a set of string-lists, each with a key-index. /// StringLists represents a set of string-lists, each with a key-index.
/// Supports Append(list,string) and Get(list) /// Supports Append(list, string) and Get(list)
class StringLists { class StringLists {
public: public:
@ -52,8 +62,8 @@ class StringLists {
// Append string val onto the list defined by key; return true on success // Append string val onto the list defined by key; return true on success
bool Append(const std::string& key, const std::string& val){ bool Append(const std::string& key, const std::string& val){
Slice valSlice(val.data(),val.size()); Slice valSlice(val.data(), val.size());
auto s = db_->Merge(merge_option_,key,valSlice); auto s = db_->Merge(merge_option_, key, valSlice);
if (s.ok()) { if (s.ok()) {
return true; return true;
@ -66,7 +76,6 @@ class StringLists {
// Returns the list of strings associated with key (or "" if does not exist) // Returns the list of strings associated with key (or "" if does not exist)
bool Get(const std::string& key, std::string* const result){ bool Get(const std::string& key, std::string* const result){
assert(result != NULL); // we should have a place to store the result assert(result != NULL); // we should have a place to store the result
auto s = db_->Get(get_option_, key, result); auto s = db_->Get(get_option_, key, result);
if (s.ok()) { if (s.ok()) {
@ -86,6 +95,7 @@ class StringLists {
return false; return false;
} }
private: private:
std::shared_ptr<DB> db_; std::shared_ptr<DB> db_;
WriteOptions merge_option_; WriteOptions merge_option_;
@ -93,23 +103,40 @@ class StringLists {
}; };
// The class for unit-testing
class StringAppendOperatorTest {
public:
StringAppendOperatorTest() {
DestroyDB(kDbName, Options()); // Start each test with a fresh DB
}
typedef std::shared_ptr<DB> (* OpenFuncPtr)(char);
// Allows user to open databases with different configurations.
// e.g.: Can open a DB or a TtlDB, etc.
static void SetOpenDbFunction(OpenFuncPtr func) {
OpenDb = func;
}
protected:
static OpenFuncPtr OpenDb;
};
StringAppendOperatorTest::OpenFuncPtr StringAppendOperatorTest::OpenDb = nullptr;
// THE TEST CASES BEGIN HERE // THE TEST CASES BEGIN HERE
class StringAppendOperatorTest { };
TEST(StringAppendOperatorTest, IteratorTest) { TEST(StringAppendOperatorTest, IteratorTest) {
DestroyDB(kDbName, Options()); // Start this test with a fresh DB auto db_ = OpenDb(',');
StringAppendOperator append_op(',');
auto db_ = OpenDb(&append_op);
StringLists slists(db_); StringLists slists(db_);
slists.Append("k1","v1"); slists.Append("k1", "v1");
slists.Append("k1","v2"); slists.Append("k1", "v2");
slists.Append("k1","v3"); slists.Append("k1", "v3");
slists.Append("k2","a1"); slists.Append("k2", "a1");
slists.Append("k2","a2"); slists.Append("k2", "a2");
slists.Append("k2","a3"); slists.Append("k2", "a3");
std::string res; std::string res;
std::unique_ptr<leveldb::Iterator> it(db_->NewIterator(ReadOptions())); std::unique_ptr<leveldb::Iterator> it(db_->NewIterator(ReadOptions()));
@ -165,7 +192,7 @@ TEST(StringAppendOperatorTest, IteratorTest) {
} }
} }
slists.Append("k3","g1"); slists.Append("k3", "g1");
it.reset(db_->NewIterator(ReadOptions())); it.reset(db_->NewIterator(ReadOptions()));
first = true; first = true;
@ -192,101 +219,86 @@ TEST(StringAppendOperatorTest, IteratorTest) {
} }
TEST(StringAppendOperatorTest,SimpleTest) { TEST(StringAppendOperatorTest, SimpleTest) {
DestroyDB(kDbName, Options()); // Start this test with a fresh DB auto db = OpenDb(',');
StringAppendOperator append_op(',');
auto db = OpenDb(&append_op);
StringLists slists(db); StringLists slists(db);
slists.Append("k1","v1"); slists.Append("k1", "v1");
slists.Append("k1","v2"); slists.Append("k1", "v2");
slists.Append("k1","v3"); slists.Append("k1", "v3");
std::string res; std::string res;
bool status = slists.Get("k1",&res); bool status = slists.Get("k1", &res);
ASSERT_TRUE(status); ASSERT_TRUE(status);
ASSERT_EQ(res,"v1,v2,v3"); ASSERT_EQ(res, "v1,v2,v3");
} }
TEST(StringAppendOperatorTest,SimpleDelimiterTest) { TEST(StringAppendOperatorTest, SimpleDelimiterTest) {
DestroyDB(kDbName, Options()); // Start this test with a fresh DB auto db = OpenDb('|');
StringAppendOperator append_op('|');
auto db = OpenDb(&append_op);
StringLists slists(db); StringLists slists(db);
slists.Append("k1","v1"); slists.Append("k1", "v1");
slists.Append("k1","v2"); slists.Append("k1", "v2");
slists.Append("k1","v3"); slists.Append("k1", "v3");
std::string res; std::string res;
slists.Get("k1",&res); slists.Get("k1", &res);
ASSERT_EQ(res,"v1|v2|v3"); ASSERT_EQ(res, "v1|v2|v3");
} }
TEST(StringAppendOperatorTest,OneValueNoDelimiterTest) { TEST(StringAppendOperatorTest, OneValueNoDelimiterTest) {
DestroyDB(kDbName, Options()); // Start this test with a fresh DB auto db = OpenDb('!');
StringAppendOperator append_op('!');
auto db = OpenDb(&append_op);
StringLists slists(db); StringLists slists(db);
slists.Append("random_key","single_val"); slists.Append("random_key", "single_val");
std::string res; std::string res;
slists.Get("random_key",&res); slists.Get("random_key", &res);
ASSERT_EQ(res,"single_val"); ASSERT_EQ(res, "single_val");
} }
TEST(StringAppendOperatorTest,VariousKeys) { TEST(StringAppendOperatorTest, VariousKeys) {
DestroyDB(kDbName, Options()); // Start this test with a fresh DB auto db = OpenDb('\n');
StringAppendOperator append_op('\n');
auto db = OpenDb(&append_op);
StringLists slists(db); StringLists slists(db);
slists.Append("c","asdasd"); slists.Append("c", "asdasd");
slists.Append("a","x"); slists.Append("a", "x");
slists.Append("b","y"); slists.Append("b", "y");
slists.Append("a","t"); slists.Append("a", "t");
slists.Append("a","r"); slists.Append("a", "r");
slists.Append("b","2"); slists.Append("b", "2");
slists.Append("c","asdasd"); slists.Append("c", "asdasd");
std::string a,b,c; std::string a, b, c;
bool sa,sb,sc; bool sa, sb, sc;
sa = slists.Get("a",&a); sa = slists.Get("a", &a);
sb = slists.Get("b",&b); sb = slists.Get("b", &b);
sc = slists.Get("c",&c); sc = slists.Get("c", &c);
ASSERT_TRUE(sa && sb && sc); // All three keys should have been found ASSERT_TRUE(sa && sb && sc); // All three keys should have been found
ASSERT_EQ(a,"x\nt\nr"); ASSERT_EQ(a, "x\nt\nr");
ASSERT_EQ(b,"y\n2"); ASSERT_EQ(b, "y\n2");
ASSERT_EQ(c,"asdasd\nasdasd"); ASSERT_EQ(c, "asdasd\nasdasd");
} }
// Generate semi random keys/words from a small distribution. // Generate semi random keys/words from a small distribution.
TEST(StringAppendOperatorTest,RandomMixGetAppend) { TEST(StringAppendOperatorTest, RandomMixGetAppend) {
DestroyDB(kDbName, Options()); // Start this test with a fresh DB auto db = OpenDb(' ');
StringAppendOperator append_op(' ');
auto db = OpenDb(&append_op);
StringLists slists(db); StringLists slists(db);
// Generate a list of random keys and values // Generate a list of random keys and values
const int kWordCount = 15; const int kWordCount = 15;
std::string words[] = {"sdasd","triejf","fnjsdfn","dfjisdfsf","342839", std::string words[] = {"sdasd", "triejf", "fnjsdfn", "dfjisdfsf", "342839",
"dsuha","mabuais","sadajsid","jf9834hf","2d9j89", "dsuha", "mabuais", "sadajsid", "jf9834hf", "2d9j89",
"dj9823jd","a","dk02ed2dh","$(jd4h984$(*", "mabz"}; "dj9823jd", "a", "dk02ed2dh", "$(jd4h984$(*", "mabz"};
const int kKeyCount = 6; const int kKeyCount = 6;
std::string keys[] = {"dhaiusdhu","denidw","daisda","keykey","muki", std::string keys[] = {"dhaiusdhu", "denidw", "daisda", "keykey", "muki",
"shzassdianmd"}; "shzassdianmd"};
// Will store a local copy of all data in order to verify correctness // Will store a local copy of all data in order to verify correctness
std::map<std::string,std::string> parallel_copy; std::map<std::string, std::string> parallel_copy;
// Generate a bunch of random queries (Append and Get)! // Generate a bunch of random queries (Append and Get)!
enum query_t { APPEND_OP, GET_OP, NUM_OPS }; enum query_t { APPEND_OP, GET_OP, NUM_OPS };
@ -299,14 +311,11 @@ TEST(StringAppendOperatorTest,RandomMixGetAppend) {
std::string key = keys[randomGen.Uniform((int)kKeyCount)]; std::string key = keys[randomGen.Uniform((int)kKeyCount)];
std::string word = words[randomGen.Uniform((int)kWordCount)]; std::string word = words[randomGen.Uniform((int)kWordCount)];
// Debug message.
//std::cout << (int)query << " " << key << " " << word << std::endl;
// Apply the query and any checks. // Apply the query and any checks.
if (query == APPEND_OP) { if (query == APPEND_OP) {
// Apply the rocksdb test-harness Append defined above // Apply the rocksdb test-harness Append defined above
slists.Append(key,word); //apply the rocksdb append slists.Append(key, word); //apply the rocksdb append
// Apply the similar "Append" to the parallel copy // Apply the similar "Append" to the parallel copy
if (parallel_copy[key].size() > 0) { if (parallel_copy[key].size() > 0) {
@ -318,32 +327,29 @@ TEST(StringAppendOperatorTest,RandomMixGetAppend) {
} else if (query == GET_OP) { } else if (query == GET_OP) {
// Assumes that a non-existent key just returns <empty> // Assumes that a non-existent key just returns <empty>
std::string res; std::string res;
slists.Get(key,&res); slists.Get(key, &res);
ASSERT_EQ(res,parallel_copy[key]); ASSERT_EQ(res, parallel_copy[key]);
} }
} }
} }
TEST(StringAppendOperatorTest,BIGRandomMixGetAppend) { TEST(StringAppendOperatorTest, BIGRandomMixGetAppend) {
DestroyDB(kDbName, Options()); // Start this test with a fresh DB auto db = OpenDb(' ');
StringAppendOperator append_op(' ');
auto db = OpenDb(&append_op);
StringLists slists(db); StringLists slists(db);
// Generate a list of random keys and values // Generate a list of random keys and values
const int kWordCount = 15; const int kWordCount = 15;
std::string words[] = {"sdasd","triejf","fnjsdfn","dfjisdfsf","342839", std::string words[] = {"sdasd", "triejf", "fnjsdfn", "dfjisdfsf", "342839",
"dsuha","mabuais","sadajsid","jf9834hf","2d9j89", "dsuha", "mabuais", "sadajsid", "jf9834hf", "2d9j89",
"dj9823jd","a","dk02ed2dh","$(jd4h984$(*", "mabz"}; "dj9823jd", "a", "dk02ed2dh", "$(jd4h984$(*", "mabz"};
const int kKeyCount = 6; const int kKeyCount = 6;
std::string keys[] = {"dhaiusdhu","denidw","daisda","keykey","muki", std::string keys[] = {"dhaiusdhu", "denidw", "daisda", "keykey", "muki",
"shzassdianmd"}; "shzassdianmd"};
// Will store a local copy of all data in order to verify correctness // Will store a local copy of all data in order to verify correctness
std::map<std::string,std::string> parallel_copy; std::map<std::string, std::string> parallel_copy;
// Generate a bunch of random queries (Append and Get)! // Generate a bunch of random queries (Append and Get)!
enum query_t { APPEND_OP, GET_OP, NUM_OPS }; enum query_t { APPEND_OP, GET_OP, NUM_OPS };
@ -356,14 +362,11 @@ TEST(StringAppendOperatorTest,BIGRandomMixGetAppend) {
std::string key = keys[randomGen.Uniform((int)kKeyCount)]; std::string key = keys[randomGen.Uniform((int)kKeyCount)];
std::string word = words[randomGen.Uniform((int)kWordCount)]; std::string word = words[randomGen.Uniform((int)kWordCount)];
// Debug message.
//std::cout << (int)query << " " << key << " " << word << std::endl;
//Apply the query and any checks. //Apply the query and any checks.
if (query == APPEND_OP) { if (query == APPEND_OP) {
// Apply the rocksdb test-harness Append defined above // Apply the rocksdb test-harness Append defined above
slists.Append(key,word); //apply the rocksdb append slists.Append(key, word); //apply the rocksdb append
// Apply the similar "Append" to the parallel copy // Apply the similar "Append" to the parallel copy
if (parallel_copy[key].size() > 0) { if (parallel_copy[key].size() > 0) {
@ -375,8 +378,8 @@ TEST(StringAppendOperatorTest,BIGRandomMixGetAppend) {
} else if (query == GET_OP) { } else if (query == GET_OP) {
// Assumes that a non-existent key just returns <empty> // Assumes that a non-existent key just returns <empty>
std::string res; std::string res;
slists.Get(key,&res); slists.Get(key, &res);
ASSERT_EQ(res,parallel_copy[key]); ASSERT_EQ(res, parallel_copy[key]);
} }
} }
@ -384,191 +387,179 @@ TEST(StringAppendOperatorTest,BIGRandomMixGetAppend) {
} }
TEST(StringAppendOperatorTest,PersistentVariousKeys) { TEST(StringAppendOperatorTest, PersistentVariousKeys) {
DestroyDB(kDbName, Options()); // Start this test with a fresh DB
// Perform the following operations in limited scope // Perform the following operations in limited scope
{ {
StringAppendOperator append_op('\n'); auto db = OpenDb('\n');
auto db = OpenDb(&append_op);
StringLists slists(db); StringLists slists(db);
slists.Append("c","asdasd"); slists.Append("c", "asdasd");
slists.Append("a","x"); slists.Append("a", "x");
slists.Append("b","y"); slists.Append("b", "y");
slists.Append("a","t"); slists.Append("a", "t");
slists.Append("a","r"); slists.Append("a", "r");
slists.Append("b","2"); slists.Append("b", "2");
slists.Append("c","asdasd"); slists.Append("c", "asdasd");
std::string a,b,c; std::string a, b, c;
slists.Get("a",&a); slists.Get("a", &a);
slists.Get("b",&b); slists.Get("b", &b);
slists.Get("c",&c); slists.Get("c", &c);
ASSERT_EQ(a,"x\nt\nr"); ASSERT_EQ(a, "x\nt\nr");
ASSERT_EQ(b,"y\n2"); ASSERT_EQ(b, "y\n2");
ASSERT_EQ(c,"asdasd\nasdasd"); ASSERT_EQ(c, "asdasd\nasdasd");
} }
// Reopen the database (the previous changes should persist / be remembered) // Reopen the database (the previous changes should persist / be remembered)
{ {
StringAppendOperator append_op('\n'); auto db = OpenDb('\n');
auto db = OpenDb(&append_op);
StringLists slists(db); StringLists slists(db);
slists.Append("c","bbnagnagsx"); slists.Append("c", "bbnagnagsx");
slists.Append("a","sa"); slists.Append("a", "sa");
slists.Append("b","df"); slists.Append("b", "df");
slists.Append("a","gh"); slists.Append("a", "gh");
slists.Append("a","jk"); slists.Append("a", "jk");
slists.Append("b","l;"); slists.Append("b", "l;");
slists.Append("c","rogosh"); slists.Append("c", "rogosh");
// The previous changes should be on disk (L0) // The previous changes should be on disk (L0)
// The most recent changes should be in memory (MemTable) // The most recent changes should be in memory (MemTable)
// Hence, this will test both Get() paths. // Hence, this will test both Get() paths.
std::string a,b,c; std::string a, b, c;
slists.Get("a",&a); slists.Get("a", &a);
slists.Get("b",&b); slists.Get("b", &b);
slists.Get("c",&c); slists.Get("c", &c);
ASSERT_EQ(a,"x\nt\nr\nsa\ngh\njk"); ASSERT_EQ(a, "x\nt\nr\nsa\ngh\njk");
ASSERT_EQ(b,"y\n2\ndf\nl;"); ASSERT_EQ(b, "y\n2\ndf\nl;");
ASSERT_EQ(c,"asdasd\nasdasd\nbbnagnagsx\nrogosh"); ASSERT_EQ(c, "asdasd\nasdasd\nbbnagnagsx\nrogosh");
} }
// Reopen the database (the previous changes should persist / be remembered) // Reopen the database (the previous changes should persist / be remembered)
{ {
StringAppendOperator append_op('\n'); auto db = OpenDb('\n');
auto db = OpenDb(&append_op);
StringLists slists(db); StringLists slists(db);
// All changes should be on disk. This will test VersionSet Get() // All changes should be on disk. This will test VersionSet Get()
std::string a,b,c; std::string a, b, c;
slists.Get("a",&a); slists.Get("a", &a);
slists.Get("b",&b); slists.Get("b", &b);
slists.Get("c",&c); slists.Get("c", &c);
ASSERT_EQ(a,"x\nt\nr\nsa\ngh\njk"); ASSERT_EQ(a, "x\nt\nr\nsa\ngh\njk");
ASSERT_EQ(b,"y\n2\ndf\nl;"); ASSERT_EQ(b, "y\n2\ndf\nl;");
ASSERT_EQ(c,"asdasd\nasdasd\nbbnagnagsx\nrogosh"); ASSERT_EQ(c, "asdasd\nasdasd\nbbnagnagsx\nrogosh");
} }
} }
TEST(StringAppendOperatorTest,PersistentFlushAndCompaction) { TEST(StringAppendOperatorTest, PersistentFlushAndCompaction) {
DestroyDB(kDbName, Options()); // Start this test with a fresh DB
StringAppendOperator append_op('\n');
// Perform the following operations in limited scope // Perform the following operations in limited scope
{ {
auto db = OpenDb(&append_op); auto db = OpenDb('\n');
StringLists slists(db); StringLists slists(db);
std::string a,b,c; std::string a, b, c;
bool success; bool success;
// Append, Flush, Get // Append, Flush, Get
slists.Append("c","asdasd"); slists.Append("c", "asdasd");
db->Flush(leveldb::FlushOptions()); db->Flush(leveldb::FlushOptions());
success = slists.Get("c",&c); success = slists.Get("c", &c);
ASSERT_TRUE(success); ASSERT_TRUE(success);
ASSERT_EQ(c,"asdasd"); ASSERT_EQ(c, "asdasd");
// Append, Flush, Append, Get // Append, Flush, Append, Get
slists.Append("a","x"); slists.Append("a", "x");
slists.Append("b","y"); slists.Append("b", "y");
db->Flush(leveldb::FlushOptions()); db->Flush(leveldb::FlushOptions());
slists.Append("a","t"); slists.Append("a", "t");
slists.Append("a","r"); slists.Append("a", "r");
slists.Append("b","2"); slists.Append("b", "2");
success = slists.Get("a",&a); success = slists.Get("a", &a);
assert(success == true); assert(success == true);
ASSERT_EQ(a,"x\nt\nr"); ASSERT_EQ(a, "x\nt\nr");
success = slists.Get("b",&b); success = slists.Get("b", &b);
assert(success == true); assert(success == true);
ASSERT_EQ(b,"y\n2"); ASSERT_EQ(b, "y\n2");
// Append, Get // Append, Get
success = slists.Append("c","asdasd"); success = slists.Append("c", "asdasd");
assert(success); assert(success);
success = slists.Append("b","monkey"); success = slists.Append("b", "monkey");
assert(success); assert(success);
// I omit the "assert(success)" checks here. // I omit the "assert(success)" checks here.
slists.Get("a",&a); slists.Get("a", &a);
slists.Get("b",&b); slists.Get("b", &b);
slists.Get("c",&c); slists.Get("c", &c);
ASSERT_EQ(a,"x\nt\nr"); ASSERT_EQ(a, "x\nt\nr");
ASSERT_EQ(b,"y\n2\nmonkey"); ASSERT_EQ(b, "y\n2\nmonkey");
ASSERT_EQ(c,"asdasd\nasdasd"); ASSERT_EQ(c, "asdasd\nasdasd");
} }
// Reopen the database (the previous changes should persist / be remembered) // Reopen the database (the previous changes should persist / be remembered)
{ {
auto db = OpenDb(&append_op); auto db = OpenDb('\n');
StringLists slists(db); StringLists slists(db);
std::string a,b,c; std::string a, b, c;
// Get (Quick check for persistence of previous database) // Get (Quick check for persistence of previous database)
slists.Get("a",&a); slists.Get("a", &a);
ASSERT_EQ(a,"x\nt\nr"); ASSERT_EQ(a, "x\nt\nr");
//Append, Compact, Get //Append, Compact, Get
slists.Append("c","bbnagnagsx"); slists.Append("c", "bbnagnagsx");
slists.Append("a","sa"); slists.Append("a", "sa");
slists.Append("b","df"); slists.Append("b", "df");
db->CompactRange(nullptr,nullptr); db->CompactRange(nullptr, nullptr);
slists.Get("a",&a); slists.Get("a", &a);
slists.Get("b",&b); slists.Get("b", &b);
slists.Get("c",&c); slists.Get("c", &c);
ASSERT_EQ(a,"x\nt\nr\nsa"); ASSERT_EQ(a, "x\nt\nr\nsa");
ASSERT_EQ(b,"y\n2\nmonkey\ndf"); ASSERT_EQ(b, "y\n2\nmonkey\ndf");
ASSERT_EQ(c,"asdasd\nasdasd\nbbnagnagsx"); ASSERT_EQ(c, "asdasd\nasdasd\nbbnagnagsx");
// Append, Get // Append, Get
slists.Append("a","gh"); slists.Append("a", "gh");
slists.Append("a","jk"); slists.Append("a", "jk");
slists.Append("b","l;"); slists.Append("b", "l;");
slists.Append("c","rogosh"); slists.Append("c", "rogosh");
slists.Get("a",&a); slists.Get("a", &a);
slists.Get("b",&b); slists.Get("b", &b);
slists.Get("c",&c); slists.Get("c", &c);
ASSERT_EQ(a,"x\nt\nr\nsa\ngh\njk"); ASSERT_EQ(a, "x\nt\nr\nsa\ngh\njk");
ASSERT_EQ(b,"y\n2\nmonkey\ndf\nl;"); ASSERT_EQ(b, "y\n2\nmonkey\ndf\nl;");
ASSERT_EQ(c,"asdasd\nasdasd\nbbnagnagsx\nrogosh"); ASSERT_EQ(c, "asdasd\nasdasd\nbbnagnagsx\nrogosh");
// Compact, Get // Compact, Get
db->CompactRange(nullptr,nullptr); db->CompactRange(nullptr, nullptr);
ASSERT_EQ(a,"x\nt\nr\nsa\ngh\njk"); ASSERT_EQ(a, "x\nt\nr\nsa\ngh\njk");
ASSERT_EQ(b,"y\n2\nmonkey\ndf\nl;"); ASSERT_EQ(b, "y\n2\nmonkey\ndf\nl;");
ASSERT_EQ(c,"asdasd\nasdasd\nbbnagnagsx\nrogosh"); ASSERT_EQ(c, "asdasd\nasdasd\nbbnagnagsx\nrogosh");
// Append, Flush, Compact, Get // Append, Flush, Compact, Get
slists.Append("b","afcg"); slists.Append("b", "afcg");
db->Flush(leveldb::FlushOptions()); db->Flush(leveldb::FlushOptions());
db->CompactRange(nullptr,nullptr); db->CompactRange(nullptr, nullptr);
slists.Get("b",&b); slists.Get("b", &b);
ASSERT_EQ(b,"y\n2\nmonkey\ndf\nl;\nafcg"); ASSERT_EQ(b, "y\n2\nmonkey\ndf\nl;\nafcg");
} }
} }
TEST(StringAppendOperatorTest,SimpleTestNullDelimiter) { TEST(StringAppendOperatorTest, SimpleTestNullDelimiter) {
DestroyDB(kDbName, Options()); // Start this test with a fresh DB auto db = OpenDb('\0');
StringAppendOperator append_op('\0');
auto db = OpenDb(&append_op);
StringLists slists(db); StringLists slists(db);
slists.Append("k1","v1"); slists.Append("k1", "v1");
slists.Append("k1","v2"); slists.Append("k1", "v2");
slists.Append("k1","v3"); slists.Append("k1", "v3");
std::string res; std::string res;
bool status = slists.Get("k1",&res); bool status = slists.Get("k1", &res);
ASSERT_TRUE(status); ASSERT_TRUE(status);
// Construct the desired string. Default constructor doesn't like '\0' chars. // Construct the desired string. Default constructor doesn't like '\0' chars.
@ -579,13 +570,25 @@ TEST(StringAppendOperatorTest,SimpleTestNullDelimiter) {
// Check that the leveldb result string matches the desired string // Check that the leveldb result string matches the desired string
assert(res.size() == checker.size()); assert(res.size() == checker.size());
ASSERT_EQ(res,checker); ASSERT_EQ(res, checker);
} }
} // namespace leveldb } // namespace leveldb
int main(int arc, char** argv) { int main(int arc, char** argv) {
leveldb::test::RunAllTests(); // Run with regular database
{
fprintf(stderr, "Running tests with regular db and operator.\n");
StringAppendOperatorTest::SetOpenDbFunction(&OpenNormalDb);
leveldb::test::RunAllTests();
}
// Run with TTL
{
fprintf(stderr, "Running tests with ttl db and generic operator.\n");
StringAppendOperatorTest::SetOpenDbFunction(&OpenTtlDb);
leveldb::test::RunAllTests();
}
return 0; return 0;
} }

@ -32,8 +32,8 @@ DBWithTTL::DBWithTTL(const int32_t ttl,
} }
if (options.merge_operator) { if (options.merge_operator) {
ttl_merge_op_.reset(new TtlMergeOperator(options.merge_operator)); options_to_open.merge_operator.reset(
options_to_open.merge_operator = ttl_merge_op_.get(); new TtlMergeOperator(options.merge_operator));
} }
if (read_only) { if (read_only) {

@ -112,7 +112,6 @@ class DBWithTTL : public StackableDB {
private: private:
DB* db_; DB* db_;
int32_t ttl_; int32_t ttl_;
unique_ptr<MergeOperator> ttl_merge_op_;
unique_ptr<CompactionFilter> ttl_comp_filter_; unique_ptr<CompactionFilter> ttl_comp_filter_;
}; };
@ -258,7 +257,7 @@ class TtlCompactionFilterFactory : public CompactionFilterFactory {
class TtlMergeOperator : public MergeOperator { class TtlMergeOperator : public MergeOperator {
public: public:
explicit TtlMergeOperator(const MergeOperator* merge_op) explicit TtlMergeOperator(const std::shared_ptr<MergeOperator> merge_op)
: user_merge_op_(merge_op) { : user_merge_op_(merge_op) {
assert(merge_op); assert(merge_op);
} }
@ -356,7 +355,7 @@ class TtlMergeOperator : public MergeOperator {
} }
private: private:
const MergeOperator* user_merge_op_; std::shared_ptr<MergeOperator> user_merge_op_;
}; };
} }

Loading…
Cancel
Save