Summary: This diff includes an implementation of CompactionFilter that allows users to write CompactionFilter in Lua. With this ability, users can dynamically change compaction filter logic without requiring building the rocksdb binary and restarting the database. To compile, WITH_LUA_PATH must be specified to the base directory of lua. Closes https://github.com/facebook/rocksdb/pull/1478 Differential Revision: D4150138 Pulled By: yhchiang fbshipit-source-id: ed84222main
parent
760ef68a69
commit
647eafdc21
@ -0,0 +1,187 @@ |
||||
// Copyright (c) 2016, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
|
||||
#pragma once |
||||
|
||||
#if defined(LUA) && !defined(ROCKSDB_LITE) |
||||
// lua headers
|
||||
extern "C" { |
||||
#include <lauxlib.h> |
||||
#include <lua.h> |
||||
#include <lualib.h> |
||||
} |
||||
|
||||
#include <mutex> |
||||
#include <string> |
||||
#include <vector> |
||||
|
||||
#include "rocksdb/compaction_filter.h" |
||||
#include "rocksdb/env.h" |
||||
#include "rocksdb/slice.h" |
||||
#include "rocksdb/utilities/lua/rocks_lua_custom_library.h" |
||||
#include "rocksdb/utilities/lua/rocks_lua_util.h" |
||||
|
||||
namespace rocksdb { |
||||
namespace lua { |
||||
|
||||
struct RocksLuaCompactionFilterOptions { |
||||
// The lua script in string that implements all necessary CompactionFilter
|
||||
// virtual functions. The specified lua_script must implement the following
|
||||
// functions, which are Name and Filter, as described below.
|
||||
//
|
||||
// 0. The Name function simply returns a string representing the name of
|
||||
// the lua script. If there's any erorr in the Name function, an
|
||||
// empty string will be used.
|
||||
// --- Example
|
||||
// function Name()
|
||||
// return "DefaultLuaCompactionFilter"
|
||||
// end
|
||||
//
|
||||
//
|
||||
// 1. The script must contains a function called Filter, which implements
|
||||
// CompactionFilter::Filter() , takes three input arguments, and returns
|
||||
// three values as the following API:
|
||||
//
|
||||
// function Filter(level, key, existing_value)
|
||||
// ...
|
||||
// return is_filtered, is_changed, new_value
|
||||
// end
|
||||
//
|
||||
// Note that if ignore_value is set to true, then Filter should implement
|
||||
// the following API:
|
||||
//
|
||||
// function Filter(level, key)
|
||||
// ...
|
||||
// return is_filtered
|
||||
// end
|
||||
//
|
||||
// If there're any error in the Filter() function, then it will keep
|
||||
// the input key / value pair.
|
||||
//
|
||||
// -- Input
|
||||
// The function must take three arguments (integer, string, string),
|
||||
// which map to "level", "key", and "existing_value" passed from
|
||||
// RocksDB.
|
||||
//
|
||||
// -- Output
|
||||
// The function must return three values (boolean, boolean, string).
|
||||
// - is_filtered: if the first return value is true, then it indicates
|
||||
// the input key / value pair should be filtered.
|
||||
// - is_changed: if the second return value is true, then it indicates
|
||||
// the existing_value needs to be changed, and the resulting value
|
||||
// is stored in the third return value.
|
||||
// - new_value: if the second return value is true, then this third
|
||||
// return value stores the new value of the input key / value pair.
|
||||
//
|
||||
// -- Examples
|
||||
// -- a filter that keeps all key-value pairs
|
||||
// function Filter(level, key, existing_value)
|
||||
// return false, false, ""
|
||||
// end
|
||||
//
|
||||
// -- a filter that keeps all keys and change their values to "Rocks"
|
||||
// function Filter(level, key, existing_value)
|
||||
// return false, true, "Rocks"
|
||||
// end
|
||||
|
||||
std::string lua_script; |
||||
|
||||
// If set to true, then existing_value will not be passed to the Filter
|
||||
// function, and the Filter function only needs to return a single boolean
|
||||
// flag indicating whether to filter out this key or not.
|
||||
//
|
||||
// function Filter(level, key)
|
||||
// ...
|
||||
// return is_filtered
|
||||
// end
|
||||
bool ignore_value = false; |
||||
|
||||
// A boolean flag to determine whether to ignore snapshots.
|
||||
bool ignore_snapshots = false; |
||||
|
||||
// When specified a non-null pointer, the first "error_limit_per_filter"
|
||||
// errors of each CompactionFilter that is lua related will be included
|
||||
// in this log.
|
||||
std::shared_ptr<Logger> error_log; |
||||
|
||||
// The number of errors per CompactionFilter will be printed
|
||||
// to error_log.
|
||||
int error_limit_per_filter = 1; |
||||
|
||||
// A string to luaL_reg array map that allows the Lua CompactionFilter
|
||||
// to use custom C library. The string will be used as the library
|
||||
// name in Lua.
|
||||
std::vector<std::shared_ptr<RocksLuaCustomLibrary>> libraries; |
||||
|
||||
///////////////////////////////////////////////////////////////////////////
|
||||
// NOT YET SUPPORTED
|
||||
// The name of the Lua function in "lua_script" that implements
|
||||
// CompactionFilter::FilterMergeOperand(). The function must take
|
||||
// three input arguments (integer, string, string), which map to "level",
|
||||
// "key", and "operand" passed from the RocksDB. In addition, the
|
||||
// function must return a single boolean value, indicating whether
|
||||
// to filter the input key / operand.
|
||||
//
|
||||
// DEFAULT: the default implementation always returns false.
|
||||
// @see CompactionFilter::FilterMergeOperand
|
||||
}; |
||||
|
||||
class RocksLuaCompactionFilterFactory : public CompactionFilterFactory { |
||||
public: |
||||
explicit RocksLuaCompactionFilterFactory( |
||||
const RocksLuaCompactionFilterOptions opt); |
||||
|
||||
virtual ~RocksLuaCompactionFilterFactory() {} |
||||
|
||||
std::unique_ptr<CompactionFilter> CreateCompactionFilter( |
||||
const CompactionFilter::Context& context) override; |
||||
|
||||
// Change the Lua script so that the next compaction after this
|
||||
// function call will use the new Lua script.
|
||||
void SetScript(const std::string& new_script); |
||||
|
||||
// Obtain the current Lua script
|
||||
std::string GetScript(); |
||||
|
||||
const char* Name() const override; |
||||
|
||||
private: |
||||
RocksLuaCompactionFilterOptions opt_; |
||||
std::string name_; |
||||
// A lock to protect "opt_" to make it dynamically changeable.
|
||||
std::mutex opt_mutex_; |
||||
}; |
||||
|
||||
// A wrapper class that invokes Lua script to perform CompactionFilter
|
||||
// functions.
|
||||
class RocksLuaCompactionFilter : public rocksdb::CompactionFilter { |
||||
public: |
||||
explicit RocksLuaCompactionFilter(const RocksLuaCompactionFilterOptions& opt) |
||||
: options_(opt), |
||||
lua_state_wrapper_(opt.lua_script, opt.libraries), |
||||
error_count_(0) {} |
||||
|
||||
virtual bool Filter(int level, const Slice& key, const Slice& existing_value, |
||||
std::string* new_value, |
||||
bool* value_changed) const override; |
||||
// Not yet supported
|
||||
virtual bool FilterMergeOperand(int level, const Slice& key, |
||||
const Slice& operand) const override { |
||||
return false; |
||||
} |
||||
virtual bool IgnoreSnapshots() const override; |
||||
virtual const char* Name() const override; |
||||
|
||||
protected: |
||||
void LogLuaError(const char* format, ...) const; |
||||
|
||||
RocksLuaCompactionFilterOptions options_; |
||||
LuaStateWrapper lua_state_wrapper_; |
||||
mutable int error_count_; |
||||
}; |
||||
|
||||
} // namespace lua
|
||||
} // namespace rocksdb
|
||||
#endif // defined(LUA) && !defined(ROCKSDB_LITE)
|
@ -0,0 +1,43 @@ |
||||
// Copyright (c) 2016, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
|
||||
#pragma once |
||||
#ifdef LUA |
||||
|
||||
// lua headers
|
||||
extern "C" { |
||||
#include <lauxlib.h> |
||||
#include <lua.h> |
||||
#include <lualib.h> |
||||
} |
||||
|
||||
namespace rocksdb { |
||||
namespace lua { |
||||
// A class that used to define custom C Library that is callable
|
||||
// from Lua script
|
||||
class RocksLuaCustomLibrary { |
||||
public: |
||||
virtual ~RocksLuaCustomLibrary() {} |
||||
// The name of the C library. This name will also be used as the table
|
||||
// (namespace) in Lua that contains the C library.
|
||||
virtual const char* Name() const = 0; |
||||
|
||||
// Returns a "static const struct luaL_Reg[]", which includes a list of
|
||||
// C functions. Note that the last entry of this static array must be
|
||||
// {nullptr, nullptr} as required by Lua.
|
||||
//
|
||||
// More details about how to implement Lua C libraries can be found
|
||||
// in the official Lua document http://www.lua.org/pil/26.2.html
|
||||
virtual const struct luaL_Reg* Lib() const = 0; |
||||
|
||||
// A function that will be called right after the library has been created
|
||||
// and pushed on the top of the lua_State. This custom setup function
|
||||
// allows developers to put additional table or constant values inside
|
||||
// the same table / namespace.
|
||||
virtual void CustomSetup(lua_State* L) const {} |
||||
}; |
||||
} // namespace lua
|
||||
} // namespace rocksdb
|
||||
#endif // LUA
|
@ -0,0 +1,55 @@ |
||||
// Copyright (c) 2016, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
|
||||
#pragma once |
||||
// lua headers
|
||||
extern "C" { |
||||
#include <lauxlib.h> |
||||
#include <lua.h> |
||||
#include <lualib.h> |
||||
} |
||||
|
||||
#ifdef LUA |
||||
#include <string> |
||||
#include <vector> |
||||
|
||||
#include "rocksdb/utilities/lua/rocks_lua_custom_library.h" |
||||
|
||||
namespace rocksdb { |
||||
namespace lua { |
||||
class LuaStateWrapper { |
||||
public: |
||||
explicit LuaStateWrapper(const std::string& lua_script) { |
||||
lua_state_ = luaL_newstate(); |
||||
Init(lua_script, {}); |
||||
} |
||||
LuaStateWrapper( |
||||
const std::string& lua_script, |
||||
const std::vector<std::shared_ptr<RocksLuaCustomLibrary>>& libraries) { |
||||
lua_state_ = luaL_newstate(); |
||||
Init(lua_script, libraries); |
||||
} |
||||
lua_State* GetLuaState() const { return lua_state_; } |
||||
~LuaStateWrapper() { lua_close(lua_state_); } |
||||
|
||||
private: |
||||
void Init( |
||||
const std::string& lua_script, |
||||
const std::vector<std::shared_ptr<RocksLuaCustomLibrary>>& libraries) { |
||||
if (lua_state_) { |
||||
luaL_openlibs(lua_state_); |
||||
for (const auto& library : libraries) { |
||||
luaL_openlib(lua_state_, library->Name(), library->Lib(), 0); |
||||
library->CustomSetup(lua_state_); |
||||
} |
||||
luaL_dostring(lua_state_, lua_script.c_str()); |
||||
} |
||||
} |
||||
|
||||
lua_State* lua_state_; |
||||
}; |
||||
} // namespace lua
|
||||
} // namespace rocksdb
|
||||
#endif // LUA
|
@ -0,0 +1,240 @@ |
||||
// Copyright (c) 2016, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
|
||||
#if defined(LUA) && !defined(ROCKSDB_LITE) |
||||
#include "rocksdb/utilities/lua/rocks_lua_compaction_filter.h" |
||||
|
||||
extern "C" { |
||||
#include <luaconf.h> |
||||
} |
||||
|
||||
#include "rocksdb/compaction_filter.h" |
||||
|
||||
namespace rocksdb { |
||||
namespace lua { |
||||
|
||||
const std::string kFilterFunctionName = "Filter"; |
||||
const std::string kNameFunctionName = "Name"; |
||||
|
||||
void RocksLuaCompactionFilter::LogLuaError(const char* format, ...) const { |
||||
if (options_.error_log.get() != nullptr && |
||||
error_count_ < options_.error_limit_per_filter) { |
||||
error_count_++; |
||||
|
||||
va_list ap; |
||||
va_start(ap, format); |
||||
options_.error_log->Logv(InfoLogLevel::ERROR_LEVEL, format, ap); |
||||
va_end(ap); |
||||
} |
||||
} |
||||
|
||||
bool RocksLuaCompactionFilter::Filter(int level, const Slice& key, |
||||
const Slice& existing_value, |
||||
std::string* new_value, |
||||
bool* value_changed) const { |
||||
auto* lua_state = lua_state_wrapper_.GetLuaState(); |
||||
// push the right function into the lua stack
|
||||
lua_getglobal(lua_state, kFilterFunctionName.c_str()); |
||||
|
||||
int error_no = 0; |
||||
int num_input_values; |
||||
int num_return_values; |
||||
if (options_.ignore_value == false) { |
||||
// push input arguments into the lua stack
|
||||
lua_pushnumber(lua_state, level); |
||||
lua_pushlstring(lua_state, key.data(), key.size()); |
||||
lua_pushlstring(lua_state, existing_value.data(), existing_value.size()); |
||||
num_input_values = 3; |
||||
num_return_values = 3; |
||||
} else { |
||||
// If ignore_value is set to true, then we only put two arguments
|
||||
// and expect one return value
|
||||
lua_pushnumber(lua_state, level); |
||||
lua_pushlstring(lua_state, key.data(), key.size()); |
||||
num_input_values = 2; |
||||
num_return_values = 1; |
||||
} |
||||
|
||||
// perform the lua call
|
||||
if ((error_no = |
||||
lua_pcall(lua_state, num_input_values, num_return_values, 0)) != 0) { |
||||
LogLuaError("[Lua] Error(%d) in Filter function --- %s", error_no, |
||||
lua_tostring(lua_state, -1)); |
||||
// pops out the lua error from stack
|
||||
lua_pop(lua_state, 1); |
||||
return false; |
||||
} |
||||
|
||||
// As lua_pcall went successfully, it can be guaranteed that the top
|
||||
// three elements in the Lua stack are the three returned values.
|
||||
|
||||
bool has_error = false; |
||||
const int kIndexIsFiltered = -num_return_values; |
||||
const int kIndexValueChanged = -num_return_values + 1; |
||||
const int kIndexNewValue = -num_return_values + 2; |
||||
|
||||
// check the types of three return values
|
||||
// is_filtered
|
||||
if (!lua_isboolean(lua_state, kIndexIsFiltered)) { |
||||
LogLuaError( |
||||
"[Lua] Error in Filter function -- " |
||||
"1st return value (is_filtered) is not a boolean " |
||||
"while a boolean is expected."); |
||||
has_error = true; |
||||
} |
||||
|
||||
if (options_.ignore_value == false) { |
||||
// value_changed
|
||||
if (!lua_isboolean(lua_state, kIndexValueChanged)) { |
||||
LogLuaError( |
||||
"[Lua] Error in Filter function -- " |
||||
"2nd return value (value_changed) is not a boolean " |
||||
"while a boolean is expected."); |
||||
has_error = true; |
||||
} |
||||
// new_value
|
||||
if (!lua_isstring(lua_state, kIndexNewValue)) { |
||||
LogLuaError( |
||||
"[Lua] Error in Filter function -- " |
||||
"3rd return value (new_value) is not a string " |
||||
"while a string is expected."); |
||||
has_error = true; |
||||
} |
||||
} |
||||
|
||||
if (has_error) { |
||||
lua_pop(lua_state, num_return_values); |
||||
return false; |
||||
} |
||||
|
||||
// Fetch the return values
|
||||
bool is_filtered = false; |
||||
if (!has_error) { |
||||
is_filtered = lua_toboolean(lua_state, kIndexIsFiltered); |
||||
if (options_.ignore_value == false) { |
||||
*value_changed = lua_toboolean(lua_state, kIndexValueChanged); |
||||
if (*value_changed) { |
||||
const char* new_value_buf = lua_tostring(lua_state, kIndexNewValue); |
||||
const size_t new_value_size = lua_strlen(lua_state, kIndexNewValue); |
||||
// Note that any string that lua_tostring returns always has a zero at
|
||||
// its end, bu/t it can have other zeros inside it
|
||||
assert(new_value_buf[new_value_size] == '\0'); |
||||
assert(strlen(new_value_buf) <= new_value_size); |
||||
new_value->assign(new_value_buf, new_value_size); |
||||
} |
||||
} else { |
||||
*value_changed = false; |
||||
} |
||||
} |
||||
// pops the three return values.
|
||||
lua_pop(lua_state, num_return_values); |
||||
return is_filtered; |
||||
} |
||||
|
||||
const char* RocksLuaCompactionFilter::Name() const { |
||||
std::string name = ""; |
||||
auto* lua_state = lua_state_wrapper_.GetLuaState(); |
||||
// push the right function into the lua stack
|
||||
lua_getglobal(lua_state, kNameFunctionName.c_str()); |
||||
|
||||
// perform the call (0 arguments, 1 result)
|
||||
int error_no; |
||||
if ((error_no = lua_pcall(lua_state, 0, 1, 0)) != 0) { |
||||
LogLuaError("[Lua] Error(%d) in Name function --- %s", error_no, |
||||
lua_tostring(lua_state, -1)); |
||||
// pops out the lua error from stack
|
||||
lua_pop(lua_state, 1); |
||||
return name.c_str(); |
||||
} |
||||
|
||||
// check the return value
|
||||
if (!lua_isstring(lua_state, -1)) { |
||||
LogLuaError( |
||||
"[Lua] Error in Name function -- " |
||||
"return value is not a string while string is expected"); |
||||
} else { |
||||
const char* name_buf = lua_tostring(lua_state, -1); |
||||
const size_t name_size = lua_strlen(lua_state, -1); |
||||
assert(name_buf[name_size] == '\0'); |
||||
assert(strlen(name_buf) <= name_size); |
||||
name = name_buf; |
||||
} |
||||
lua_pop(lua_state, 1); |
||||
return name.c_str(); |
||||
} |
||||
|
||||
/* Not yet supported
|
||||
bool RocksLuaCompactionFilter::FilterMergeOperand( |
||||
int level, const Slice& key, const Slice& operand) const { |
||||
auto* lua_state = lua_state_wrapper_.GetLuaState(); |
||||
// push the right function into the lua stack
|
||||
lua_getglobal(lua_state, "FilterMergeOperand"); |
||||
|
||||
// push input arguments into the lua stack
|
||||
lua_pushnumber(lua_state, level); |
||||
lua_pushlstring(lua_state, key.data(), key.size()); |
||||
lua_pushlstring(lua_state, operand.data(), operand.size()); |
||||
|
||||
// perform the call (3 arguments, 1 result)
|
||||
int error_no; |
||||
if ((error_no = lua_pcall(lua_state, 3, 1, 0)) != 0) { |
||||
LogLuaError("[Lua] Error(%d) in FilterMergeOperand function --- %s", |
||||
error_no, lua_tostring(lua_state, -1)); |
||||
// pops out the lua error from stack
|
||||
lua_pop(lua_state, 1); |
||||
return false; |
||||
} |
||||
|
||||
bool is_filtered = false; |
||||
// check the return value
|
||||
if (!lua_isboolean(lua_state, -1)) { |
||||
LogLuaError("[Lua] Error in FilterMergeOperand function -- " |
||||
"return value is not a boolean while boolean is expected"); |
||||
} else { |
||||
is_filtered = lua_toboolean(lua_state, -1); |
||||
} |
||||
|
||||
lua_pop(lua_state, 1); |
||||
|
||||
return is_filtered; |
||||
} |
||||
*/ |
||||
|
||||
bool RocksLuaCompactionFilter::IgnoreSnapshots() const { |
||||
return options_.ignore_snapshots; |
||||
} |
||||
|
||||
RocksLuaCompactionFilterFactory::RocksLuaCompactionFilterFactory( |
||||
const RocksLuaCompactionFilterOptions opt) |
||||
: opt_(opt) { |
||||
auto filter = CreateCompactionFilter(CompactionFilter::Context()); |
||||
name_ = std::string("RocksLuaCompactionFilterFactory::") + |
||||
std::string(filter->Name()); |
||||
} |
||||
|
||||
std::unique_ptr<CompactionFilter> |
||||
RocksLuaCompactionFilterFactory::CreateCompactionFilter( |
||||
const CompactionFilter::Context& context) { |
||||
std::lock_guard<std::mutex> lock(opt_mutex_); |
||||
return std::unique_ptr<CompactionFilter>(new RocksLuaCompactionFilter(opt_)); |
||||
} |
||||
|
||||
std::string RocksLuaCompactionFilterFactory::GetScript() { |
||||
std::lock_guard<std::mutex> lock(opt_mutex_); |
||||
return opt_.lua_script; |
||||
} |
||||
|
||||
void RocksLuaCompactionFilterFactory::SetScript(const std::string& new_script) { |
||||
std::lock_guard<std::mutex> lock(opt_mutex_); |
||||
opt_.lua_script = new_script; |
||||
} |
||||
|
||||
const char* RocksLuaCompactionFilterFactory::Name() const { |
||||
return name_.c_str(); |
||||
} |
||||
|
||||
} // namespace lua
|
||||
} // namespace rocksdb
|
||||
#endif // defined(LUA) && !defined(ROCKSDB_LITE)
|
@ -0,0 +1,497 @@ |
||||
// Copyright (c) 2016, Facebook, Inc. All rights reserved.
|
||||
// This source code is licensed under the BSD-style license found in the
|
||||
// LICENSE file in the root directory of this source tree. An additional grant
|
||||
// of patent rights can be found in the PATENTS file in the same directory.
|
||||
|
||||
#include <stdio.h> |
||||
|
||||
#if !defined(ROCKSDB_LITE) |
||||
|
||||
#if defined(LUA) |
||||
|
||||
#include <string> |
||||
|
||||
#include "db/db_test_util.h" |
||||
#include "port/stack_trace.h" |
||||
#include "rocksdb/compaction_filter.h" |
||||
#include "rocksdb/db.h" |
||||
#include "rocksdb/utilities/lua/rocks_lua_compaction_filter.h" |
||||
#include "util/testharness.h" |
||||
|
||||
namespace rocksdb { |
||||
|
||||
class StopOnErrorLogger : public Logger { |
||||
public: |
||||
using Logger::Logv; |
||||
virtual void Logv(const char* format, va_list ap) override { |
||||
vfprintf(stderr, format, ap); |
||||
fprintf(stderr, "\n"); |
||||
ASSERT_TRUE(false); |
||||
} |
||||
}; |
||||
|
||||
|
||||
class RocksLuaTest : public testing::Test { |
||||
public: |
||||
RocksLuaTest() : rnd_(301) { |
||||
temp_dir_ = test::TmpDir(Env::Default()); |
||||
db_ = nullptr; |
||||
} |
||||
|
||||
std::string RandomString(int len) { |
||||
std::string res; |
||||
for (int i = 0; i < len; ++i) { |
||||
res += rnd_.Uniform(26) + 'a'; |
||||
} |
||||
return res; |
||||
} |
||||
|
||||
void CreateDBWithLuaCompactionFilter( |
||||
const lua::RocksLuaCompactionFilterOptions& lua_opt, |
||||
const std::string& db_path, |
||||
std::unordered_map<std::string, std::string>* kvs, |
||||
const int kNumFlushes = 5, |
||||
std::shared_ptr<rocksdb::lua::RocksLuaCompactionFilterFactory>* |
||||
output_factory = nullptr) { |
||||
const int kKeySize = 10; |
||||
const int kValueSize = 50; |
||||
const int kKeysPerFlush = 2; |
||||
auto factory = |
||||
std::make_shared<rocksdb::lua::RocksLuaCompactionFilterFactory>( |
||||
lua_opt); |
||||
if (output_factory != nullptr) { |
||||
*output_factory = factory; |
||||
} |
||||
|
||||
options_ = Options(); |
||||
options_.create_if_missing = true; |
||||
options_.compaction_filter_factory = factory; |
||||
options_.max_bytes_for_level_base = |
||||
(kKeySize + kValueSize) * kKeysPerFlush * 2; |
||||
options_.max_bytes_for_level_multiplier = 2; |
||||
options_.target_file_size_base = (kKeySize + kValueSize) * kKeysPerFlush; |
||||
options_.level0_file_num_compaction_trigger = 2; |
||||
DestroyDB(db_path, options_); |
||||
ASSERT_OK(DB::Open(options_, db_path, &db_)); |
||||
|
||||
for (int f = 0; f < kNumFlushes; ++f) { |
||||
for (int i = 0; i < kKeysPerFlush; ++i) { |
||||
std::string key = RandomString(kKeySize); |
||||
std::string value = RandomString(kValueSize); |
||||
kvs->insert({key, value}); |
||||
ASSERT_OK(db_->Put(WriteOptions(), key, value)); |
||||
} |
||||
db_->Flush(FlushOptions()); |
||||
} |
||||
} |
||||
|
||||
~RocksLuaTest() { |
||||
if (db_) { |
||||
delete db_; |
||||
} |
||||
} |
||||
std::string temp_dir_; |
||||
DB* db_; |
||||
Random rnd_; |
||||
Options options_; |
||||
}; |
||||
|
||||
TEST_F(RocksLuaTest, Default) { |
||||
// If nothing is set in the LuaCompactionFilterOptions, then
|
||||
// RocksDB will keep all the key / value pairs, but it will also
|
||||
// print our error log indicating failure.
|
||||
std::string db_path = temp_dir_ + "/rocks_lua_test"; |
||||
|
||||
lua::RocksLuaCompactionFilterOptions lua_opt; |
||||
|
||||
std::unordered_map<std::string, std::string> kvs; |
||||
CreateDBWithLuaCompactionFilter(lua_opt, db_path, &kvs); |
||||
|
||||
for (auto const& entry : kvs) { |
||||
std::string value; |
||||
ASSERT_OK(db_->Get(ReadOptions(), entry.first, &value)); |
||||
ASSERT_EQ(value, entry.second); |
||||
} |
||||
} |
||||
|
||||
TEST_F(RocksLuaTest, KeepsAll) { |
||||
std::string db_path = temp_dir_ + "/rocks_lua_test"; |
||||
|
||||
lua::RocksLuaCompactionFilterOptions lua_opt; |
||||
lua_opt.error_log = std::make_shared<StopOnErrorLogger>(); |
||||
// keeps all the key value pairs
|
||||
lua_opt.lua_script = |
||||
"function Filter(level, key, existing_value)\n" |
||||
" return false, false, \"\"\n" |
||||
"end\n" |
||||
"\n" |
||||
"function FilterMergeOperand(level, key, operand)\n" |
||||
" return false\n" |
||||
"end\n" |
||||
"function Name()\n" |
||||
" return \"KeepsAll\"\n" |
||||
"end\n" |
||||
"\n"; |
||||
|
||||
std::unordered_map<std::string, std::string> kvs; |
||||
CreateDBWithLuaCompactionFilter(lua_opt, db_path, &kvs); |
||||
|
||||
for (auto const& entry : kvs) { |
||||
std::string value; |
||||
ASSERT_OK(db_->Get(ReadOptions(), entry.first, &value)); |
||||
ASSERT_EQ(value, entry.second); |
||||
} |
||||
} |
||||
|
||||
TEST_F(RocksLuaTest, GetName) { |
||||
std::string db_path = temp_dir_ + "/rocks_lua_test"; |
||||
|
||||
lua::RocksLuaCompactionFilterOptions lua_opt; |
||||
lua_opt.error_log = std::make_shared<StopOnErrorLogger>(); |
||||
const std::string kScriptName = "SimpleLuaCompactionFilter"; |
||||
lua_opt.lua_script = |
||||
std::string( |
||||
"function Filter(level, key, existing_value)\n" |
||||
" return false, false, \"\"\n" |
||||
"end\n" |
||||
"\n" |
||||
"function FilterMergeOperand(level, key, operand)\n" |
||||
" return false\n" |
||||
"end\n" |
||||
"function Name()\n" |
||||
" return \"") + kScriptName + "\"\n" |
||||
"end\n" |
||||
"\n"; |
||||
|
||||
std::shared_ptr<CompactionFilterFactory> factory = |
||||
std::make_shared<lua::RocksLuaCompactionFilterFactory>(lua_opt); |
||||
std::string factory_name(factory->Name()); |
||||
ASSERT_NE(factory_name.find(kScriptName), std::string::npos); |
||||
} |
||||
|
||||
TEST_F(RocksLuaTest, RemovesAll) { |
||||
std::string db_path = temp_dir_ + "/rocks_lua_test"; |
||||
|
||||
lua::RocksLuaCompactionFilterOptions lua_opt; |
||||
lua_opt.error_log = std::make_shared<StopOnErrorLogger>(); |
||||
// removes all the key value pairs
|
||||
lua_opt.lua_script = |
||||
"function Filter(level, key, existing_value)\n" |
||||
" return true, false, \"\"\n" |
||||
"end\n" |
||||
"\n" |
||||
"function FilterMergeOperand(level, key, operand)\n" |
||||
" return false\n" |
||||
"end\n" |
||||
"function Name()\n" |
||||
" return \"RemovesAll\"\n" |
||||
"end\n" |
||||
"\n"; |
||||
|
||||
std::unordered_map<std::string, std::string> kvs; |
||||
CreateDBWithLuaCompactionFilter(lua_opt, db_path, &kvs); |
||||
// Issue full compaction and expect nothing is in the DB.
|
||||
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); |
||||
|
||||
for (auto const& entry : kvs) { |
||||
std::string value; |
||||
auto s = db_->Get(ReadOptions(), entry.first, &value); |
||||
ASSERT_TRUE(s.IsNotFound()); |
||||
} |
||||
} |
||||
|
||||
TEST_F(RocksLuaTest, FilterByKey) { |
||||
std::string db_path = temp_dir_ + "/rocks_lua_test"; |
||||
|
||||
lua::RocksLuaCompactionFilterOptions lua_opt; |
||||
lua_opt.error_log = std::make_shared<StopOnErrorLogger>(); |
||||
// removes all keys whose initial is less than 'r'
|
||||
lua_opt.lua_script = |
||||
"function Filter(level, key, existing_value)\n" |
||||
" if key:sub(1,1) < 'r' then\n" |
||||
" return true, false, \"\"\n" |
||||
" end\n" |
||||
" return false, false, \"\"\n" |
||||
"end\n" |
||||
"\n" |
||||
"function FilterMergeOperand(level, key, operand)\n" |
||||
" return false\n" |
||||
"end\n" |
||||
"function Name()\n" |
||||
" return \"KeepsAll\"\n" |
||||
"end\n"; |
||||
|
||||
std::unordered_map<std::string, std::string> kvs; |
||||
CreateDBWithLuaCompactionFilter(lua_opt, db_path, &kvs); |
||||
// Issue full compaction and expect nothing is in the DB.
|
||||
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); |
||||
|
||||
for (auto const& entry : kvs) { |
||||
std::string value; |
||||
auto s = db_->Get(ReadOptions(), entry.first, &value); |
||||
if (entry.first[0] < 'r') { |
||||
ASSERT_TRUE(s.IsNotFound()); |
||||
} else { |
||||
ASSERT_TRUE(s.ok()); |
||||
ASSERT_TRUE(value == entry.second); |
||||
} |
||||
} |
||||
} |
||||
|
||||
TEST_F(RocksLuaTest, FilterByValue) { |
||||
std::string db_path = temp_dir_ + "/rocks_lua_test"; |
||||
|
||||
lua::RocksLuaCompactionFilterOptions lua_opt; |
||||
lua_opt.error_log = std::make_shared<StopOnErrorLogger>(); |
||||
// removes all values whose initial is less than 'r'
|
||||
lua_opt.lua_script = |
||||
"function Filter(level, key, existing_value)\n" |
||||
" if existing_value:sub(1,1) < 'r' then\n" |
||||
" return true, false, \"\"\n" |
||||
" end\n" |
||||
" return false, false, \"\"\n" |
||||
"end\n" |
||||
"\n" |
||||
"function FilterMergeOperand(level, key, operand)\n" |
||||
" return false\n" |
||||
"end\n" |
||||
"function Name()\n" |
||||
" return \"FilterByValue\"\n" |
||||
"end\n" |
||||
"\n"; |
||||
|
||||
std::unordered_map<std::string, std::string> kvs; |
||||
CreateDBWithLuaCompactionFilter(lua_opt, db_path, &kvs); |
||||
// Issue full compaction and expect nothing is in the DB.
|
||||
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); |
||||
|
||||
for (auto const& entry : kvs) { |
||||
std::string value; |
||||
auto s = db_->Get(ReadOptions(), entry.first, &value); |
||||
if (entry.second[0] < 'r') { |
||||
ASSERT_TRUE(s.IsNotFound()); |
||||
} else { |
||||
ASSERT_TRUE(s.ok()); |
||||
ASSERT_EQ(value, entry.second); |
||||
} |
||||
} |
||||
} |
||||
|
||||
TEST_F(RocksLuaTest, ChangeValue) { |
||||
std::string db_path = temp_dir_ + "/rocks_lua_test"; |
||||
|
||||
lua::RocksLuaCompactionFilterOptions lua_opt; |
||||
lua_opt.error_log = std::make_shared<StopOnErrorLogger>(); |
||||
// Replace all values by their reversed key
|
||||
lua_opt.lua_script = |
||||
"function Filter(level, key, existing_value)\n" |
||||
" return false, true, key:reverse()\n" |
||||
"end\n" |
||||
"\n" |
||||
"function FilterMergeOperand(level, key, operand)\n" |
||||
" return false\n" |
||||
"end\n" |
||||
"function Name()\n" |
||||
" return \"ChangeValue\"\n" |
||||
"end\n" |
||||
"\n"; |
||||
|
||||
std::unordered_map<std::string, std::string> kvs; |
||||
CreateDBWithLuaCompactionFilter(lua_opt, db_path, &kvs); |
||||
// Issue full compaction and expect nothing is in the DB.
|
||||
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); |
||||
|
||||
for (auto const& entry : kvs) { |
||||
std::string value; |
||||
ASSERT_OK(db_->Get(ReadOptions(), entry.first, &value)); |
||||
std::string new_value = entry.first; |
||||
std::reverse(new_value.begin(), new_value.end()); |
||||
ASSERT_EQ(value, new_value); |
||||
} |
||||
} |
||||
|
||||
TEST_F(RocksLuaTest, ConditionallyChangeAndFilterValue) { |
||||
std::string db_path = temp_dir_ + "/rocks_lua_test"; |
||||
|
||||
lua::RocksLuaCompactionFilterOptions lua_opt; |
||||
lua_opt.error_log = std::make_shared<StopOnErrorLogger>(); |
||||
// Performs the following logic:
|
||||
// If key[0] < 'h' --> replace value by reverse key
|
||||
// If key[0] >= 'r' --> keep the original key value
|
||||
// Otherwise, filter the key value
|
||||
lua_opt.lua_script = |
||||
"function Filter(level, key, existing_value)\n" |
||||
" if key:sub(1,1) < 'h' then\n" |
||||
" return false, true, key:reverse()\n" |
||||
" elseif key:sub(1,1) < 'r' then\n" |
||||
" return true, false, \"\"\n" |
||||
" end\n" |
||||
" return false, false, \"\"\n" |
||||
"end\n" |
||||
"function Name()\n" |
||||
" return \"ConditionallyChangeAndFilterValue\"\n" |
||||
"end\n" |
||||
"\n"; |
||||
|
||||
std::unordered_map<std::string, std::string> kvs; |
||||
CreateDBWithLuaCompactionFilter(lua_opt, db_path, &kvs); |
||||
// Issue full compaction and expect nothing is in the DB.
|
||||
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); |
||||
|
||||
for (auto const& entry : kvs) { |
||||
std::string value; |
||||
auto s = db_->Get(ReadOptions(), entry.first, &value); |
||||
if (entry.first[0] < 'h') { |
||||
ASSERT_TRUE(s.ok()); |
||||
std::string new_value = entry.first; |
||||
std::reverse(new_value.begin(), new_value.end()); |
||||
ASSERT_EQ(value, new_value); |
||||
} else if (entry.first[0] < 'r') { |
||||
ASSERT_TRUE(s.IsNotFound()); |
||||
} else { |
||||
ASSERT_TRUE(s.ok()); |
||||
ASSERT_EQ(value, entry.second); |
||||
} |
||||
} |
||||
} |
||||
|
||||
TEST_F(RocksLuaTest, DynamicChangeScript) { |
||||
std::string db_path = temp_dir_ + "/rocks_lua_test"; |
||||
|
||||
lua::RocksLuaCompactionFilterOptions lua_opt; |
||||
lua_opt.error_log = std::make_shared<StopOnErrorLogger>(); |
||||
// keeps all the key value pairs
|
||||
lua_opt.lua_script = |
||||
"function Filter(level, key, existing_value)\n" |
||||
" return false, false, \"\"\n" |
||||
"end\n" |
||||
"\n" |
||||
"function FilterMergeOperand(level, key, operand)\n" |
||||
" return false\n" |
||||
"end\n" |
||||
"function Name()\n" |
||||
" return \"KeepsAll\"\n" |
||||
"end\n" |
||||
"\n"; |
||||
|
||||
std::unordered_map<std::string, std::string> kvs; |
||||
std::shared_ptr<rocksdb::lua::RocksLuaCompactionFilterFactory> factory; |
||||
CreateDBWithLuaCompactionFilter(lua_opt, db_path, &kvs, 30, &factory); |
||||
uint64_t count = 0; |
||||
ASSERT_TRUE(db_->GetIntProperty( |
||||
rocksdb::DB::Properties::kNumEntriesActiveMemTable, &count)); |
||||
ASSERT_EQ(count, 0); |
||||
ASSERT_TRUE(db_->GetIntProperty( |
||||
rocksdb::DB::Properties::kNumEntriesImmMemTables, &count)); |
||||
ASSERT_EQ(count, 0); |
||||
|
||||
CompactRangeOptions cr_opt; |
||||
cr_opt.bottommost_level_compaction = |
||||
rocksdb::BottommostLevelCompaction::kForce; |
||||
|
||||
// Issue full compaction and expect everything is in the DB.
|
||||
ASSERT_OK(db_->CompactRange(cr_opt, nullptr, nullptr)); |
||||
|
||||
for (auto const& entry : kvs) { |
||||
std::string value; |
||||
ASSERT_OK(db_->Get(ReadOptions(), entry.first, &value)); |
||||
ASSERT_EQ(value, entry.second); |
||||
} |
||||
|
||||
// change the lua script to removes all the key value pairs
|
||||
factory->SetScript( |
||||
"function Filter(level, key, existing_value)\n" |
||||
" return true, false, \"\"\n" |
||||
"end\n" |
||||
"\n" |
||||
"function FilterMergeOperand(level, key, operand)\n" |
||||
" return false\n" |
||||
"end\n" |
||||
"function Name()\n" |
||||
" return \"RemovesAll\"\n" |
||||
"end\n" |
||||
"\n"); |
||||
{ |
||||
std::string key = "another-key"; |
||||
std::string value = "another-value"; |
||||
kvs.insert({key, value}); |
||||
ASSERT_OK(db_->Put(WriteOptions(), key, value)); |
||||
db_->Flush(FlushOptions()); |
||||
} |
||||
|
||||
cr_opt.change_level = true; |
||||
cr_opt.target_level = 5; |
||||
// Issue full compaction and expect nothing is in the DB.
|
||||
ASSERT_OK(db_->CompactRange(cr_opt, nullptr, nullptr)); |
||||
|
||||
for (auto const& entry : kvs) { |
||||
std::string value; |
||||
auto s = db_->Get(ReadOptions(), entry.first, &value); |
||||
ASSERT_TRUE(s.IsNotFound()); |
||||
} |
||||
} |
||||
|
||||
TEST_F(RocksLuaTest, LuaConditionalTypeError) { |
||||
std::string db_path = temp_dir_ + "/rocks_lua_test"; |
||||
|
||||
lua::RocksLuaCompactionFilterOptions lua_opt; |
||||
// Filter() error when input key's initial >= 'r'
|
||||
lua_opt.lua_script = |
||||
"function Filter(level, key, existing_value)\n" |
||||
" if existing_value:sub(1,1) >= 'r' then\n" |
||||
" return true, 2, \"\" -- incorrect type of 2nd return value\n" |
||||
" end\n" |
||||
" return true, false, \"\"\n" |
||||
"end\n" |
||||
"\n" |
||||
"function FilterMergeOperand(level, key, operand)\n" |
||||
" return false\n" |
||||
"end\n" |
||||
"function Name()\n" |
||||
" return \"BuggyCode\"\n" |
||||
"end\n" |
||||
"\n"; |
||||
|
||||
std::unordered_map<std::string, std::string> kvs; |
||||
// Create DB with 10 files
|
||||
CreateDBWithLuaCompactionFilter(lua_opt, db_path, &kvs, 10); |
||||
|
||||
// Issue full compaction and expect all keys which initial is < 'r'
|
||||
// will be deleted as we keep the key value when we hit an error.
|
||||
ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); |
||||
|
||||
for (auto const& entry : kvs) { |
||||
std::string value; |
||||
auto s = db_->Get(ReadOptions(), entry.first, &value); |
||||
if (entry.second[0] < 'r') { |
||||
ASSERT_TRUE(s.IsNotFound()); |
||||
} else { |
||||
ASSERT_TRUE(s.ok()); |
||||
ASSERT_EQ(value, entry.second); |
||||
} |
||||
} |
||||
} |
||||
|
||||
} // namespace rocksdb
|
||||
|
||||
int main(int argc, char** argv) { |
||||
rocksdb::port::InstallStackTraceHandler(); |
||||
::testing::InitGoogleTest(&argc, argv); |
||||
return RUN_ALL_TESTS(); |
||||
} |
||||
|
||||
#else |
||||
|
||||
int main(int argc, char** argv) { |
||||
printf("LUA_PATH is not set. Ignoring the test.\n"); |
||||
} |
||||
|
||||
#endif // defined(LUA)
|
||||
|
||||
#else |
||||
|
||||
int main(int argc, char** argv) { |
||||
printf("Lua is not supported in RocksDBLite. Ignoring the test.\n"); |
||||
} |
||||
|
||||
#endif // !defined(ROCKSDB_LITE)
|
Loading…
Reference in new issue