Remove Lua compaction filter from RocksDB main repo (#4971)

Summary:
as title. For people who continue to need Lua compaction filter, you
can copy the include/rocksdb/utilities/rocks_lua/lua_compaction_filter.h and
utilities/lua/rocks_lua_compaction_filter.cc to your own codebase.
Pull Request resolved: https://github.com/facebook/rocksdb/pull/4971

Differential Revision: D14047468

Pulled By: riversand963

fbshipit-source-id: 9ad1a6484a7c94e478f1e108127a3184e4069f70
main
Yanqin Jin 5 years ago committed by Facebook Github Bot
parent a69d4deefb
commit 5af9446ee6
  1. 2
      CMakeLists.txt
  2. 1
      HISTORY.md
  3. 4
      Makefile
  4. 1
      TARGETS
  5. 189
      include/rocksdb/utilities/lua/rocks_lua_compaction_filter.h
  6. 2
      src.mk
  7. 242
      utilities/lua/rocks_lua_compaction_filter.cc
  8. 498
      utilities/lua/rocks_lua_test.cc

@ -647,7 +647,6 @@ set(SOURCES
utilities/env_mirror.cc
utilities/env_timed.cc
utilities/leveldb_options/leveldb_options.cc
utilities/lua/rocks_lua_compaction_filter.cc
utilities/memory/memory_util.cc
utilities/merge_operators/bytesxor.cc
utilities/merge_operators/max.cc
@ -964,7 +963,6 @@ if(WITH_TESTS)
utilities/cassandra/cassandra_row_merge_test.cc
utilities/cassandra/cassandra_serialize_test.cc
utilities/checkpoint/checkpoint_test.cc
utilities/lua/rocks_lua_test.cc
utilities/memory/memory_test.cc
utilities/merge_operators/string_append/stringappend_test.cc
utilities/object_registry_test.cc

@ -25,6 +25,7 @@
* Remove CuckooHash memtable.
* The counter stat `number.block.not_compressed` now also counts blocks not compressed due to poor compression ratio.
* Support SST file ingestion across multiple column families via DB::IngestExternalFiles. See the function's comment about atomicity.
* Remove Lua compaction filter.
### Bug Fixes
* Fix a deadlock caused by compaction and file ingestion waiting for each other in the event of write stalls.

@ -535,7 +535,6 @@ TESTS = \
ldb_cmd_test \
persistent_cache_test \
statistics_test \
lua_test \
lru_cache_test \
object_registry_test \
repair_test \
@ -1537,9 +1536,6 @@ statistics_test: monitoring/statistics_test.o $(LIBOBJECTS) $(TESTHARNESS)
lru_cache_test: cache/lru_cache_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
lua_test: utilities/lua/rocks_lua_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)
range_del_aggregator_test: db/range_del_aggregator_test.o db/db_test_util.o $(LIBOBJECTS) $(TESTHARNESS)
$(AM_LINK)

@ -260,7 +260,6 @@ cpp_library(
"utilities/env_mirror.cc",
"utilities/env_timed.cc",
"utilities/leveldb_options/leveldb_options.cc",
"utilities/lua/rocks_lua_compaction_filter.cc",
"utilities/memory/memory_util.cc",
"utilities/merge_operators/bytesxor.cc",
"utilities/merge_operators/max.cc",

@ -1,189 +0,0 @@
// Copyright (c) 2016, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#pragma once
#if defined(LUA) && !defined(ROCKSDB_LITE)
// lua headers
extern "C" {
#include <lauxlib.h>
#include <lua.h>
#include <lualib.h>
}
#include <mutex>
#include <string>
#include <vector>
#include "rocksdb/compaction_filter.h"
#include "rocksdb/env.h"
#include "rocksdb/slice.h"
#include "rocksdb/utilities/lua/rocks_lua_custom_library.h"
#include "rocksdb/utilities/lua/rocks_lua_util.h"
namespace rocksdb {
namespace lua {
struct RocksLuaCompactionFilterOptions {
// The lua script in string that implements all necessary CompactionFilter
// virtual functions. The specified lua_script must implement the following
// functions, which are Name and Filter, as described below.
//
// 0. The Name function simply returns a string representing the name of
// the lua script. If there's any erorr in the Name function, an
// empty string will be used.
// --- Example
// function Name()
// return "DefaultLuaCompactionFilter"
// end
//
//
// 1. The script must contains a function called Filter, which implements
// CompactionFilter::Filter() , takes three input arguments, and returns
// three values as the following API:
//
// function Filter(level, key, existing_value)
// ...
// return is_filtered, is_changed, new_value
// end
//
// Note that if ignore_value is set to true, then Filter should implement
// the following API:
//
// function Filter(level, key)
// ...
// return is_filtered
// end
//
// If there're any error in the Filter() function, then it will keep
// the input key / value pair.
//
// -- Input
// The function must take three arguments (integer, string, string),
// which map to "level", "key", and "existing_value" passed from
// RocksDB.
//
// -- Output
// The function must return three values (boolean, boolean, string).
// - is_filtered: if the first return value is true, then it indicates
// the input key / value pair should be filtered.
// - is_changed: if the second return value is true, then it indicates
// the existing_value needs to be changed, and the resulting value
// is stored in the third return value.
// - new_value: if the second return value is true, then this third
// return value stores the new value of the input key / value pair.
//
// -- Examples
// -- a filter that keeps all key-value pairs
// function Filter(level, key, existing_value)
// return false, false, ""
// end
//
// -- a filter that keeps all keys and change their values to "Rocks"
// function Filter(level, key, existing_value)
// return false, true, "Rocks"
// end
std::string lua_script;
// If set to true, then existing_value will not be passed to the Filter
// function, and the Filter function only needs to return a single boolean
// flag indicating whether to filter out this key or not.
//
// function Filter(level, key)
// ...
// return is_filtered
// end
bool ignore_value = false;
// A boolean flag to determine whether to ignore snapshots.
bool ignore_snapshots = true;
// When specified a non-null pointer, the first "error_limit_per_filter"
// errors of each CompactionFilter that is lua related will be included
// in this log.
std::shared_ptr<Logger> error_log;
// The number of errors per CompactionFilter will be printed
// to error_log.
int error_limit_per_filter = 1;
// A string to luaL_reg array map that allows the Lua CompactionFilter
// to use custom C library. The string will be used as the library
// name in Lua.
std::vector<std::shared_ptr<RocksLuaCustomLibrary>> libraries;
///////////////////////////////////////////////////////////////////////////
// NOT YET SUPPORTED
// The name of the Lua function in "lua_script" that implements
// CompactionFilter::FilterMergeOperand(). The function must take
// three input arguments (integer, string, string), which map to "level",
// "key", and "operand" passed from the RocksDB. In addition, the
// function must return a single boolean value, indicating whether
// to filter the input key / operand.
//
// DEFAULT: the default implementation always returns false.
// @see CompactionFilter::FilterMergeOperand
};
class RocksLuaCompactionFilterFactory : public CompactionFilterFactory {
public:
explicit RocksLuaCompactionFilterFactory(
const RocksLuaCompactionFilterOptions opt);
virtual ~RocksLuaCompactionFilterFactory() {}
std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) override;
// Change the Lua script so that the next compaction after this
// function call will use the new Lua script.
void SetScript(const std::string& new_script);
// Obtain the current Lua script
std::string GetScript();
const char* Name() const override;
private:
RocksLuaCompactionFilterOptions opt_;
std::string name_;
// A lock to protect "opt_" to make it dynamically changeable.
std::mutex opt_mutex_;
};
// A wrapper class that invokes Lua script to perform CompactionFilter
// functions.
class RocksLuaCompactionFilter : public rocksdb::CompactionFilter {
public:
explicit RocksLuaCompactionFilter(const RocksLuaCompactionFilterOptions& opt)
: options_(opt),
lua_state_wrapper_(opt.lua_script, opt.libraries),
error_count_(0),
name_("") {}
virtual bool Filter(int level, const Slice& key, const Slice& existing_value,
std::string* new_value,
bool* value_changed) const override;
// Not yet supported
virtual bool FilterMergeOperand(int /*level*/, const Slice& /*key*/,
const Slice& /*operand*/) const override {
return false;
}
virtual bool IgnoreSnapshots() const override;
virtual const char* Name() const override;
protected:
void LogLuaError(const char* format, ...) const;
RocksLuaCompactionFilterOptions options_;
LuaStateWrapper lua_state_wrapper_;
mutable int error_count_;
mutable std::string name_;
};
} // namespace lua
} // namespace rocksdb
#endif // defined(LUA) && !defined(ROCKSDB_LITE)

@ -180,7 +180,6 @@ LIB_SOURCES = \
utilities/env_mirror.cc \
utilities/env_timed.cc \
utilities/leveldb_options/leveldb_options.cc \
utilities/lua/rocks_lua_compaction_filter.cc \
utilities/memory/memory_util.cc \
utilities/merge_operators/max.cc \
utilities/merge_operators/put.cc \
@ -383,7 +382,6 @@ MAIN_SOURCES = \
utilities/cassandra/cassandra_row_merge_test.cc \
utilities/cassandra/cassandra_serialize_test.cc \
utilities/checkpoint/checkpoint_test.cc \
utilities/lua/rocks_lua_test.cc \
utilities/memory/memory_test.cc \
utilities/merge_operators/string_append/stringappend_test.cc \
utilities/object_registry_test.cc \

@ -1,242 +0,0 @@
// Copyright (c) 2016, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#if defined(LUA) && !defined(ROCKSDB_LITE)
#include "rocksdb/utilities/lua/rocks_lua_compaction_filter.h"
extern "C" {
#include <luaconf.h>
}
#include "rocksdb/compaction_filter.h"
namespace rocksdb {
namespace lua {
const std::string kFilterFunctionName = "Filter";
const std::string kNameFunctionName = "Name";
void RocksLuaCompactionFilter::LogLuaError(const char* format, ...) const {
if (options_.error_log.get() != nullptr &&
error_count_ < options_.error_limit_per_filter) {
error_count_++;
va_list ap;
va_start(ap, format);
options_.error_log->Logv(InfoLogLevel::ERROR_LEVEL, format, ap);
va_end(ap);
}
}
bool RocksLuaCompactionFilter::Filter(int level, const Slice& key,
const Slice& existing_value,
std::string* new_value,
bool* value_changed) const {
auto* lua_state = lua_state_wrapper_.GetLuaState();
// push the right function into the lua stack
lua_getglobal(lua_state, kFilterFunctionName.c_str());
int error_no = 0;
int num_input_values;
int num_return_values;
if (options_.ignore_value == false) {
// push input arguments into the lua stack
lua_pushnumber(lua_state, level);
lua_pushlstring(lua_state, key.data(), key.size());
lua_pushlstring(lua_state, existing_value.data(), existing_value.size());
num_input_values = 3;
num_return_values = 3;
} else {
// If ignore_value is set to true, then we only put two arguments
// and expect one return value
lua_pushnumber(lua_state, level);
lua_pushlstring(lua_state, key.data(), key.size());
num_input_values = 2;
num_return_values = 1;
}
// perform the lua call
if ((error_no =
lua_pcall(lua_state, num_input_values, num_return_values, 0)) != 0) {
LogLuaError("[Lua] Error(%d) in Filter function --- %s", error_no,
lua_tostring(lua_state, -1));
// pops out the lua error from stack
lua_pop(lua_state, 1);
return false;
}
// As lua_pcall went successfully, it can be guaranteed that the top
// three elements in the Lua stack are the three returned values.
bool has_error = false;
const int kIndexIsFiltered = -num_return_values;
const int kIndexValueChanged = -num_return_values + 1;
const int kIndexNewValue = -num_return_values + 2;
// check the types of three return values
// is_filtered
if (!lua_isboolean(lua_state, kIndexIsFiltered)) {
LogLuaError(
"[Lua] Error in Filter function -- "
"1st return value (is_filtered) is not a boolean "
"while a boolean is expected.");
has_error = true;
}
if (options_.ignore_value == false) {
// value_changed
if (!lua_isboolean(lua_state, kIndexValueChanged)) {
LogLuaError(
"[Lua] Error in Filter function -- "
"2nd return value (value_changed) is not a boolean "
"while a boolean is expected.");
has_error = true;
}
// new_value
if (!lua_isstring(lua_state, kIndexNewValue)) {
LogLuaError(
"[Lua] Error in Filter function -- "
"3rd return value (new_value) is not a string "
"while a string is expected.");
has_error = true;
}
}
if (has_error) {
lua_pop(lua_state, num_return_values);
return false;
}
// Fetch the return values
bool is_filtered = false;
if (!has_error) {
is_filtered = lua_toboolean(lua_state, kIndexIsFiltered);
if (options_.ignore_value == false) {
*value_changed = lua_toboolean(lua_state, kIndexValueChanged);
if (*value_changed) {
const char* new_value_buf = lua_tostring(lua_state, kIndexNewValue);
const size_t new_value_size = lua_strlen(lua_state, kIndexNewValue);
// Note that any string that lua_tostring returns always has a zero at
// its end, bu/t it can have other zeros inside it
assert(new_value_buf[new_value_size] == '\0');
assert(strlen(new_value_buf) <= new_value_size);
new_value->assign(new_value_buf, new_value_size);
}
} else {
*value_changed = false;
}
}
// pops the three return values.
lua_pop(lua_state, num_return_values);
return is_filtered;
}
const char* RocksLuaCompactionFilter::Name() const {
if (name_ != "") {
return name_.c_str();
}
auto* lua_state = lua_state_wrapper_.GetLuaState();
// push the right function into the lua stack
lua_getglobal(lua_state, kNameFunctionName.c_str());
// perform the call (0 arguments, 1 result)
int error_no;
if ((error_no = lua_pcall(lua_state, 0, 1, 0)) != 0) {
LogLuaError("[Lua] Error(%d) in Name function --- %s", error_no,
lua_tostring(lua_state, -1));
// pops out the lua error from stack
lua_pop(lua_state, 1);
return name_.c_str();
}
// check the return value
if (!lua_isstring(lua_state, -1)) {
LogLuaError(
"[Lua] Error in Name function -- "
"return value is not a string while string is expected");
} else {
const char* name_buf = lua_tostring(lua_state, -1);
const size_t name_size __attribute__((__unused__)) = lua_strlen(lua_state, -1);
assert(name_buf[name_size] == '\0');
assert(strlen(name_buf) <= name_size);
name_ = name_buf;
}
lua_pop(lua_state, 1);
return name_.c_str();
}
/* Not yet supported
bool RocksLuaCompactionFilter::FilterMergeOperand(
int level, const Slice& key, const Slice& operand) const {
auto* lua_state = lua_state_wrapper_.GetLuaState();
// push the right function into the lua stack
lua_getglobal(lua_state, "FilterMergeOperand");
// push input arguments into the lua stack
lua_pushnumber(lua_state, level);
lua_pushlstring(lua_state, key.data(), key.size());
lua_pushlstring(lua_state, operand.data(), operand.size());
// perform the call (3 arguments, 1 result)
int error_no;
if ((error_no = lua_pcall(lua_state, 3, 1, 0)) != 0) {
LogLuaError("[Lua] Error(%d) in FilterMergeOperand function --- %s",
error_no, lua_tostring(lua_state, -1));
// pops out the lua error from stack
lua_pop(lua_state, 1);
return false;
}
bool is_filtered = false;
// check the return value
if (!lua_isboolean(lua_state, -1)) {
LogLuaError("[Lua] Error in FilterMergeOperand function -- "
"return value is not a boolean while boolean is expected");
} else {
is_filtered = lua_toboolean(lua_state, -1);
}
lua_pop(lua_state, 1);
return is_filtered;
}
*/
bool RocksLuaCompactionFilter::IgnoreSnapshots() const {
return options_.ignore_snapshots;
}
RocksLuaCompactionFilterFactory::RocksLuaCompactionFilterFactory(
const RocksLuaCompactionFilterOptions opt)
: opt_(opt) {
auto filter = CreateCompactionFilter(CompactionFilter::Context());
name_ = std::string("RocksLuaCompactionFilterFactory::") +
std::string(filter->Name());
}
std::unique_ptr<CompactionFilter>
RocksLuaCompactionFilterFactory::CreateCompactionFilter(
const CompactionFilter::Context& /*context*/) {
std::lock_guard<std::mutex> lock(opt_mutex_);
return std::unique_ptr<CompactionFilter>(new RocksLuaCompactionFilter(opt_));
}
std::string RocksLuaCompactionFilterFactory::GetScript() {
std::lock_guard<std::mutex> lock(opt_mutex_);
return opt_.lua_script;
}
void RocksLuaCompactionFilterFactory::SetScript(const std::string& new_script) {
std::lock_guard<std::mutex> lock(opt_mutex_);
opt_.lua_script = new_script;
}
const char* RocksLuaCompactionFilterFactory::Name() const {
return name_.c_str();
}
} // namespace lua
} // namespace rocksdb
#endif // defined(LUA) && !defined(ROCKSDB_LITE)

@ -1,498 +0,0 @@
// Copyright (c) 2016, Facebook, Inc. All rights reserved.
// This source code is licensed under both the GPLv2 (found in the
// COPYING file in the root directory) and Apache 2.0 License
// (found in the LICENSE.Apache file in the root directory).
#include <stdio.h>
#if !defined(ROCKSDB_LITE)
#if defined(LUA)
#include <string>
#include "db/db_test_util.h"
#include "port/stack_trace.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/db.h"
#include "rocksdb/utilities/lua/rocks_lua_compaction_filter.h"
#include "util/testharness.h"
namespace rocksdb {
class StopOnErrorLogger : public Logger {
public:
using Logger::Logv;
virtual void Logv(const char* format, va_list ap) override {
vfprintf(stderr, format, ap);
fprintf(stderr, "\n");
FAIL();
}
};
class RocksLuaTest : public testing::Test {
public:
RocksLuaTest() : rnd_(301) {
temp_dir_ = test::TmpDir(Env::Default());
db_ = nullptr;
}
std::string RandomString(int len) {
std::string res;
for (int i = 0; i < len; ++i) {
res += rnd_.Uniform(26) + 'a';
}
return res;
}
void CreateDBWithLuaCompactionFilter(
const lua::RocksLuaCompactionFilterOptions& lua_opt,
const std::string& db_path,
std::unordered_map<std::string, std::string>* kvs,
const int kNumFlushes = 5,
std::shared_ptr<rocksdb::lua::RocksLuaCompactionFilterFactory>*
output_factory = nullptr) {
const int kKeySize = 10;
const int kValueSize = 50;
const int kKeysPerFlush = 2;
auto factory =
std::make_shared<rocksdb::lua::RocksLuaCompactionFilterFactory>(
lua_opt);
if (output_factory != nullptr) {
*output_factory = factory;
}
options_ = Options();
options_.create_if_missing = true;
options_.compaction_filter_factory = factory;
options_.disable_auto_compactions = true;
options_.max_bytes_for_level_base =
(kKeySize + kValueSize) * kKeysPerFlush * 2;
options_.max_bytes_for_level_multiplier = 2;
options_.target_file_size_base = (kKeySize + kValueSize) * kKeysPerFlush;
options_.level0_file_num_compaction_trigger = 2;
DestroyDB(db_path, options_);
ASSERT_OK(DB::Open(options_, db_path, &db_));
for (int f = 0; f < kNumFlushes; ++f) {
for (int i = 0; i < kKeysPerFlush; ++i) {
std::string key = RandomString(kKeySize);
std::string value = RandomString(kValueSize);
kvs->insert({key, value});
ASSERT_OK(db_->Put(WriteOptions(), key, value));
}
db_->Flush(FlushOptions());
}
}
~RocksLuaTest() {
if (db_) {
delete db_;
}
}
std::string temp_dir_;
DB* db_;
Random rnd_;
Options options_;
};
TEST_F(RocksLuaTest, Default) {
// If nothing is set in the LuaCompactionFilterOptions, then
// RocksDB will keep all the key / value pairs, but it will also
// print our error log indicating failure.
std::string db_path = test::PerThreadDBPath(temp_dir_, "rocks_lua_test");
lua::RocksLuaCompactionFilterOptions lua_opt;
std::unordered_map<std::string, std::string> kvs;
CreateDBWithLuaCompactionFilter(lua_opt, db_path, &kvs);
for (auto const& entry : kvs) {
std::string value;
ASSERT_OK(db_->Get(ReadOptions(), entry.first, &value));
ASSERT_EQ(value, entry.second);
}
}
TEST_F(RocksLuaTest, KeepsAll) {
std::string db_path = test::PerThreadDBPath(temp_dir_, "rocks_lua_test");
lua::RocksLuaCompactionFilterOptions lua_opt;
lua_opt.error_log = std::make_shared<StopOnErrorLogger>();
// keeps all the key value pairs
lua_opt.lua_script =
"function Filter(level, key, existing_value)\n"
" return false, false, \"\"\n"
"end\n"
"\n"
"function FilterMergeOperand(level, key, operand)\n"
" return false\n"
"end\n"
"function Name()\n"
" return \"KeepsAll\"\n"
"end\n"
"\n";
std::unordered_map<std::string, std::string> kvs;
CreateDBWithLuaCompactionFilter(lua_opt, db_path, &kvs);
for (auto const& entry : kvs) {
std::string value;
ASSERT_OK(db_->Get(ReadOptions(), entry.first, &value));
ASSERT_EQ(value, entry.second);
}
}
TEST_F(RocksLuaTest, GetName) {
std::string db_path = test::PerThreadDBPath(temp_dir_, "rocks_lua_test");
lua::RocksLuaCompactionFilterOptions lua_opt;
lua_opt.error_log = std::make_shared<StopOnErrorLogger>();
const std::string kScriptName = "SimpleLuaCompactionFilter";
lua_opt.lua_script =
std::string(
"function Filter(level, key, existing_value)\n"
" return false, false, \"\"\n"
"end\n"
"\n"
"function FilterMergeOperand(level, key, operand)\n"
" return false\n"
"end\n"
"function Name()\n"
" return \"") + kScriptName + "\"\n"
"end\n"
"\n";
std::shared_ptr<CompactionFilterFactory> factory =
std::make_shared<lua::RocksLuaCompactionFilterFactory>(lua_opt);
std::string factory_name(factory->Name());
ASSERT_NE(factory_name.find(kScriptName), std::string::npos);
}
TEST_F(RocksLuaTest, RemovesAll) {
std::string db_path = test::PerThreadDBPath(temp_dir_, "rocks_lua_test");
lua::RocksLuaCompactionFilterOptions lua_opt;
lua_opt.error_log = std::make_shared<StopOnErrorLogger>();
// removes all the key value pairs
lua_opt.lua_script =
"function Filter(level, key, existing_value)\n"
" return true, false, \"\"\n"
"end\n"
"\n"
"function FilterMergeOperand(level, key, operand)\n"
" return false\n"
"end\n"
"function Name()\n"
" return \"RemovesAll\"\n"
"end\n"
"\n";
std::unordered_map<std::string, std::string> kvs;
CreateDBWithLuaCompactionFilter(lua_opt, db_path, &kvs);
// Issue full compaction and expect nothing is in the DB.
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
for (auto const& entry : kvs) {
std::string value;
auto s = db_->Get(ReadOptions(), entry.first, &value);
ASSERT_TRUE(s.IsNotFound());
}
}
TEST_F(RocksLuaTest, FilterByKey) {
std::string db_path = test::PerThreadDBPath(temp_dir_, "rocks_lua_test");
lua::RocksLuaCompactionFilterOptions lua_opt;
lua_opt.error_log = std::make_shared<StopOnErrorLogger>();
// removes all keys whose initial is less than 'r'
lua_opt.lua_script =
"function Filter(level, key, existing_value)\n"
" if key:sub(1,1) < 'r' then\n"
" return true, false, \"\"\n"
" end\n"
" return false, false, \"\"\n"
"end\n"
"\n"
"function FilterMergeOperand(level, key, operand)\n"
" return false\n"
"end\n"
"function Name()\n"
" return \"KeepsAll\"\n"
"end\n";
std::unordered_map<std::string, std::string> kvs;
CreateDBWithLuaCompactionFilter(lua_opt, db_path, &kvs);
// Issue full compaction and expect nothing is in the DB.
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
for (auto const& entry : kvs) {
std::string value;
auto s = db_->Get(ReadOptions(), entry.first, &value);
if (entry.first[0] < 'r') {
ASSERT_TRUE(s.IsNotFound());
} else {
ASSERT_TRUE(s.ok());
ASSERT_TRUE(value == entry.second);
}
}
}
TEST_F(RocksLuaTest, FilterByValue) {
std::string db_path = test::PerThreadDBPath(temp_dir_, "rocks_lua_test");
lua::RocksLuaCompactionFilterOptions lua_opt;
lua_opt.error_log = std::make_shared<StopOnErrorLogger>();
// removes all values whose initial is less than 'r'
lua_opt.lua_script =
"function Filter(level, key, existing_value)\n"
" if existing_value:sub(1,1) < 'r' then\n"
" return true, false, \"\"\n"
" end\n"
" return false, false, \"\"\n"
"end\n"
"\n"
"function FilterMergeOperand(level, key, operand)\n"
" return false\n"
"end\n"
"function Name()\n"
" return \"FilterByValue\"\n"
"end\n"
"\n";
std::unordered_map<std::string, std::string> kvs;
CreateDBWithLuaCompactionFilter(lua_opt, db_path, &kvs);
// Issue full compaction and expect nothing is in the DB.
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
for (auto const& entry : kvs) {
std::string value;
auto s = db_->Get(ReadOptions(), entry.first, &value);
if (entry.second[0] < 'r') {
ASSERT_TRUE(s.IsNotFound());
} else {
ASSERT_TRUE(s.ok());
ASSERT_EQ(value, entry.second);
}
}
}
TEST_F(RocksLuaTest, ChangeValue) {
std::string db_path = test::PerThreadDBPath(temp_dir_, "rocks_lua_test");
lua::RocksLuaCompactionFilterOptions lua_opt;
lua_opt.error_log = std::make_shared<StopOnErrorLogger>();
// Replace all values by their reversed key
lua_opt.lua_script =
"function Filter(level, key, existing_value)\n"
" return false, true, key:reverse()\n"
"end\n"
"\n"
"function FilterMergeOperand(level, key, operand)\n"
" return false\n"
"end\n"
"function Name()\n"
" return \"ChangeValue\"\n"
"end\n"
"\n";
std::unordered_map<std::string, std::string> kvs;
CreateDBWithLuaCompactionFilter(lua_opt, db_path, &kvs);
// Issue full compaction and expect nothing is in the DB.
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
for (auto const& entry : kvs) {
std::string value;
ASSERT_OK(db_->Get(ReadOptions(), entry.first, &value));
std::string new_value = entry.first;
std::reverse(new_value.begin(), new_value.end());
ASSERT_EQ(value, new_value);
}
}
TEST_F(RocksLuaTest, ConditionallyChangeAndFilterValue) {
std::string db_path = test::PerThreadDBPath(temp_dir_, "rocks_lua_test");
lua::RocksLuaCompactionFilterOptions lua_opt;
lua_opt.error_log = std::make_shared<StopOnErrorLogger>();
// Performs the following logic:
// If key[0] < 'h' --> replace value by reverse key
// If key[0] >= 'r' --> keep the original key value
// Otherwise, filter the key value
lua_opt.lua_script =
"function Filter(level, key, existing_value)\n"
" if key:sub(1,1) < 'h' then\n"
" return false, true, key:reverse()\n"
" elseif key:sub(1,1) < 'r' then\n"
" return true, false, \"\"\n"
" end\n"
" return false, false, \"\"\n"
"end\n"
"function Name()\n"
" return \"ConditionallyChangeAndFilterValue\"\n"
"end\n"
"\n";
std::unordered_map<std::string, std::string> kvs;
CreateDBWithLuaCompactionFilter(lua_opt, db_path, &kvs);
// Issue full compaction and expect nothing is in the DB.
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
for (auto const& entry : kvs) {
std::string value;
auto s = db_->Get(ReadOptions(), entry.first, &value);
if (entry.first[0] < 'h') {
ASSERT_TRUE(s.ok());
std::string new_value = entry.first;
std::reverse(new_value.begin(), new_value.end());
ASSERT_EQ(value, new_value);
} else if (entry.first[0] < 'r') {
ASSERT_TRUE(s.IsNotFound());
} else {
ASSERT_TRUE(s.ok());
ASSERT_EQ(value, entry.second);
}
}
}
TEST_F(RocksLuaTest, DynamicChangeScript) {
std::string db_path = test::PerThreadDBPath(temp_dir_, "rocks_lua_test");
lua::RocksLuaCompactionFilterOptions lua_opt;
lua_opt.error_log = std::make_shared<StopOnErrorLogger>();
// keeps all the key value pairs
lua_opt.lua_script =
"function Filter(level, key, existing_value)\n"
" return false, false, \"\"\n"
"end\n"
"\n"
"function FilterMergeOperand(level, key, operand)\n"
" return false\n"
"end\n"
"function Name()\n"
" return \"KeepsAll\"\n"
"end\n"
"\n";
std::unordered_map<std::string, std::string> kvs;
std::shared_ptr<rocksdb::lua::RocksLuaCompactionFilterFactory> factory;
CreateDBWithLuaCompactionFilter(lua_opt, db_path, &kvs, 30, &factory);
uint64_t count = 0;
ASSERT_TRUE(db_->GetIntProperty(
rocksdb::DB::Properties::kNumEntriesActiveMemTable, &count));
ASSERT_EQ(count, 0);
ASSERT_TRUE(db_->GetIntProperty(
rocksdb::DB::Properties::kNumEntriesImmMemTables, &count));
ASSERT_EQ(count, 0);
CompactRangeOptions cr_opt;
cr_opt.bottommost_level_compaction =
rocksdb::BottommostLevelCompaction::kForce;
// Issue full compaction and expect everything is in the DB.
ASSERT_OK(db_->CompactRange(cr_opt, nullptr, nullptr));
for (auto const& entry : kvs) {
std::string value;
ASSERT_OK(db_->Get(ReadOptions(), entry.first, &value));
ASSERT_EQ(value, entry.second);
}
// change the lua script to removes all the key value pairs
factory->SetScript(
"function Filter(level, key, existing_value)\n"
" return true, false, \"\"\n"
"end\n"
"\n"
"function FilterMergeOperand(level, key, operand)\n"
" return false\n"
"end\n"
"function Name()\n"
" return \"RemovesAll\"\n"
"end\n"
"\n");
{
std::string key = "another-key";
std::string value = "another-value";
kvs.insert({key, value});
ASSERT_OK(db_->Put(WriteOptions(), key, value));
db_->Flush(FlushOptions());
}
cr_opt.change_level = true;
cr_opt.target_level = 5;
// Issue full compaction and expect nothing is in the DB.
ASSERT_OK(db_->CompactRange(cr_opt, nullptr, nullptr));
for (auto const& entry : kvs) {
std::string value;
auto s = db_->Get(ReadOptions(), entry.first, &value);
ASSERT_TRUE(s.IsNotFound());
}
}
TEST_F(RocksLuaTest, LuaConditionalTypeError) {
std::string db_path = test::PerThreadDBPath(temp_dir_, "rocks_lua_test");
lua::RocksLuaCompactionFilterOptions lua_opt;
// Filter() error when input key's initial >= 'r'
lua_opt.lua_script =
"function Filter(level, key, existing_value)\n"
" if existing_value:sub(1,1) >= 'r' then\n"
" return true, 2, \"\" -- incorrect type of 2nd return value\n"
" end\n"
" return true, false, \"\"\n"
"end\n"
"\n"
"function FilterMergeOperand(level, key, operand)\n"
" return false\n"
"end\n"
"function Name()\n"
" return \"BuggyCode\"\n"
"end\n"
"\n";
std::unordered_map<std::string, std::string> kvs;
// Create DB with 10 files
CreateDBWithLuaCompactionFilter(lua_opt, db_path, &kvs, 10);
// Issue full compaction and expect all keys which initial is < 'r'
// will be deleted as we keep the key value when we hit an error.
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
for (auto const& entry : kvs) {
std::string value;
auto s = db_->Get(ReadOptions(), entry.first, &value);
if (entry.second[0] < 'r') {
ASSERT_TRUE(s.IsNotFound());
} else {
ASSERT_TRUE(s.ok());
ASSERT_EQ(value, entry.second);
}
}
}
} // namespace rocksdb
int main(int argc, char** argv) {
rocksdb::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}
#else
int main(int /*argc*/, char** /*argv*/) {
printf("LUA_PATH is not set. Ignoring the test.\n");
}
#endif // defined(LUA)
#else
int main(int /*argc*/, char** /*argv*/) {
printf("Lua is not supported in RocksDBLite. Ignoring the test.\n");
}
#endif // !defined(ROCKSDB_LITE)
Loading…
Cancel
Save