fork of https://github.com/oxigraph/rocksdb and https://github.com/facebook/rocksdb for nextgraph and oxigraph
				
			
			
		
			You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							498 lines
						
					
					
						
							15 KiB
						
					
					
				
			
		
		
	
	
							498 lines
						
					
					
						
							15 KiB
						
					
					
				| //  Copyright (c) 2016, Facebook, Inc.  All rights reserved.
 | |
| //  This source code is licensed under both the GPLv2 (found in the
 | |
| //  COPYING file in the root directory) and Apache 2.0 License
 | |
| //  (found in the LICENSE.Apache file in the root directory).
 | |
| 
 | |
| #include <stdio.h>
 | |
| 
 | |
| #if !defined(ROCKSDB_LITE)
 | |
| 
 | |
| #if defined(LUA)
 | |
| 
 | |
| #include <string>
 | |
| 
 | |
| #include "db/db_test_util.h"
 | |
| #include "port/stack_trace.h"
 | |
| #include "rocksdb/compaction_filter.h"
 | |
| #include "rocksdb/db.h"
 | |
| #include "rocksdb/utilities/lua/rocks_lua_compaction_filter.h"
 | |
| #include "util/testharness.h"
 | |
| 
 | |
| namespace rocksdb {
 | |
| 
 | |
| class StopOnErrorLogger : public Logger {
 | |
|  public:
 | |
|   using Logger::Logv;
 | |
|   virtual void Logv(const char* format, va_list ap) override {
 | |
|     vfprintf(stderr, format, ap);
 | |
|     fprintf(stderr, "\n");
 | |
|     FAIL();
 | |
|   }
 | |
| };
 | |
| 
 | |
| 
 | |
| class RocksLuaTest : public testing::Test {
 | |
|  public:
 | |
|   RocksLuaTest() : rnd_(301) {
 | |
|     temp_dir_ = test::TmpDir(Env::Default());
 | |
|     db_ = nullptr;
 | |
|   }
 | |
| 
 | |
|   std::string RandomString(int len) {
 | |
|     std::string res;
 | |
|     for (int i = 0; i < len; ++i) {
 | |
|       res += rnd_.Uniform(26) + 'a';
 | |
|     }
 | |
|     return res;
 | |
|   }
 | |
| 
 | |
|   void CreateDBWithLuaCompactionFilter(
 | |
|       const lua::RocksLuaCompactionFilterOptions& lua_opt,
 | |
|       const std::string& db_path,
 | |
|       std::unordered_map<std::string, std::string>* kvs,
 | |
|       const int kNumFlushes = 5,
 | |
|       std::shared_ptr<rocksdb::lua::RocksLuaCompactionFilterFactory>*
 | |
|           output_factory = nullptr) {
 | |
|     const int kKeySize = 10;
 | |
|     const int kValueSize = 50;
 | |
|     const int kKeysPerFlush = 2;
 | |
|     auto factory =
 | |
|         std::make_shared<rocksdb::lua::RocksLuaCompactionFilterFactory>(
 | |
|             lua_opt);
 | |
|     if (output_factory != nullptr) {
 | |
|       *output_factory = factory;
 | |
|     }
 | |
| 
 | |
|     options_ = Options();
 | |
|     options_.create_if_missing = true;
 | |
|     options_.compaction_filter_factory = factory;
 | |
|     options_.disable_auto_compactions = true;
 | |
|     options_.max_bytes_for_level_base =
 | |
|         (kKeySize + kValueSize) * kKeysPerFlush * 2;
 | |
|     options_.max_bytes_for_level_multiplier = 2;
 | |
|     options_.target_file_size_base = (kKeySize + kValueSize) * kKeysPerFlush;
 | |
|     options_.level0_file_num_compaction_trigger = 2;
 | |
|     DestroyDB(db_path, options_);
 | |
|     ASSERT_OK(DB::Open(options_, db_path, &db_));
 | |
| 
 | |
|     for (int f = 0; f < kNumFlushes; ++f) {
 | |
|       for (int i = 0; i < kKeysPerFlush; ++i) {
 | |
|         std::string key = RandomString(kKeySize);
 | |
|         std::string value = RandomString(kValueSize);
 | |
|         kvs->insert({key, value});
 | |
|         ASSERT_OK(db_->Put(WriteOptions(), key, value));
 | |
|       }
 | |
|       db_->Flush(FlushOptions());
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   ~RocksLuaTest() {
 | |
|     if (db_) {
 | |
|       delete db_;
 | |
|     }
 | |
|   }
 | |
|   std::string temp_dir_;
 | |
|   DB* db_;
 | |
|   Random rnd_;
 | |
|   Options options_;
 | |
| };
 | |
| 
 | |
| TEST_F(RocksLuaTest, Default) {
 | |
|   // If nothing is set in the LuaCompactionFilterOptions, then
 | |
|   // RocksDB will keep all the key / value pairs, but it will also
 | |
|   // print our error log indicating failure.
 | |
|   std::string db_path = temp_dir_ + "/rocks_lua_test";
 | |
| 
 | |
|   lua::RocksLuaCompactionFilterOptions lua_opt;
 | |
| 
 | |
|   std::unordered_map<std::string, std::string> kvs;
 | |
|   CreateDBWithLuaCompactionFilter(lua_opt, db_path, &kvs);
 | |
| 
 | |
|   for (auto const& entry : kvs) {
 | |
|     std::string value;
 | |
|     ASSERT_OK(db_->Get(ReadOptions(), entry.first, &value));
 | |
|     ASSERT_EQ(value, entry.second);
 | |
|   }
 | |
| }
 | |
| 
 | |
| TEST_F(RocksLuaTest, KeepsAll) {
 | |
|   std::string db_path = temp_dir_ + "/rocks_lua_test";
 | |
| 
 | |
|   lua::RocksLuaCompactionFilterOptions lua_opt;
 | |
|   lua_opt.error_log = std::make_shared<StopOnErrorLogger>();
 | |
|   // keeps all the key value pairs
 | |
|   lua_opt.lua_script =
 | |
|       "function Filter(level, key, existing_value)\n"
 | |
|       "  return false, false, \"\"\n"
 | |
|       "end\n"
 | |
|       "\n"
 | |
|       "function FilterMergeOperand(level, key, operand)\n"
 | |
|       "  return false\n"
 | |
|       "end\n"
 | |
|       "function Name()\n"
 | |
|       "  return \"KeepsAll\"\n"
 | |
|       "end\n"
 | |
|       "\n";
 | |
| 
 | |
|   std::unordered_map<std::string, std::string> kvs;
 | |
|   CreateDBWithLuaCompactionFilter(lua_opt, db_path, &kvs);
 | |
| 
 | |
|   for (auto const& entry : kvs) {
 | |
|     std::string value;
 | |
|     ASSERT_OK(db_->Get(ReadOptions(), entry.first, &value));
 | |
|     ASSERT_EQ(value, entry.second);
 | |
|   }
 | |
| }
 | |
| 
 | |
| TEST_F(RocksLuaTest, GetName) {
 | |
|   std::string db_path = temp_dir_ + "/rocks_lua_test";
 | |
| 
 | |
|   lua::RocksLuaCompactionFilterOptions lua_opt;
 | |
|   lua_opt.error_log = std::make_shared<StopOnErrorLogger>();
 | |
|   const std::string kScriptName = "SimpleLuaCompactionFilter";
 | |
|   lua_opt.lua_script =
 | |
|       std::string(
 | |
|           "function Filter(level, key, existing_value)\n"
 | |
|           "  return false, false, \"\"\n"
 | |
|           "end\n"
 | |
|           "\n"
 | |
|           "function FilterMergeOperand(level, key, operand)\n"
 | |
|           "  return false\n"
 | |
|           "end\n"
 | |
|           "function Name()\n"
 | |
|           "  return \"") + kScriptName + "\"\n"
 | |
|       "end\n"
 | |
|       "\n";
 | |
| 
 | |
|   std::shared_ptr<CompactionFilterFactory> factory =
 | |
|       std::make_shared<lua::RocksLuaCompactionFilterFactory>(lua_opt);
 | |
|   std::string factory_name(factory->Name());
 | |
|   ASSERT_NE(factory_name.find(kScriptName), std::string::npos);
 | |
| }
 | |
| 
 | |
| TEST_F(RocksLuaTest, RemovesAll) {
 | |
|   std::string db_path = temp_dir_ + "/rocks_lua_test";
 | |
| 
 | |
|   lua::RocksLuaCompactionFilterOptions lua_opt;
 | |
|   lua_opt.error_log = std::make_shared<StopOnErrorLogger>();
 | |
|   // removes all the key value pairs
 | |
|   lua_opt.lua_script =
 | |
|       "function Filter(level, key, existing_value)\n"
 | |
|       "  return true, false, \"\"\n"
 | |
|       "end\n"
 | |
|       "\n"
 | |
|       "function FilterMergeOperand(level, key, operand)\n"
 | |
|       "  return false\n"
 | |
|       "end\n"
 | |
|       "function Name()\n"
 | |
|       "  return \"RemovesAll\"\n"
 | |
|       "end\n"
 | |
|       "\n";
 | |
| 
 | |
|   std::unordered_map<std::string, std::string> kvs;
 | |
|   CreateDBWithLuaCompactionFilter(lua_opt, db_path, &kvs);
 | |
|   // Issue full compaction and expect nothing is in the DB.
 | |
|   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
 | |
| 
 | |
|   for (auto const& entry : kvs) {
 | |
|     std::string value;
 | |
|     auto s = db_->Get(ReadOptions(), entry.first, &value);
 | |
|     ASSERT_TRUE(s.IsNotFound());
 | |
|   }
 | |
| }
 | |
| 
 | |
| TEST_F(RocksLuaTest, FilterByKey) {
 | |
|   std::string db_path = temp_dir_ + "/rocks_lua_test";
 | |
| 
 | |
|   lua::RocksLuaCompactionFilterOptions lua_opt;
 | |
|   lua_opt.error_log = std::make_shared<StopOnErrorLogger>();
 | |
|   // removes all keys whose initial is less than 'r'
 | |
|   lua_opt.lua_script =
 | |
|       "function Filter(level, key, existing_value)\n"
 | |
|       "  if key:sub(1,1) < 'r' then\n"
 | |
|       "    return true, false, \"\"\n"
 | |
|       "  end\n"
 | |
|       "  return false, false, \"\"\n"
 | |
|       "end\n"
 | |
|       "\n"
 | |
|       "function FilterMergeOperand(level, key, operand)\n"
 | |
|       "  return false\n"
 | |
|       "end\n"
 | |
|       "function Name()\n"
 | |
|       "  return \"KeepsAll\"\n"
 | |
|       "end\n";
 | |
| 
 | |
|   std::unordered_map<std::string, std::string> kvs;
 | |
|   CreateDBWithLuaCompactionFilter(lua_opt, db_path, &kvs);
 | |
|   // Issue full compaction and expect nothing is in the DB.
 | |
|   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
 | |
| 
 | |
|   for (auto const& entry : kvs) {
 | |
|     std::string value;
 | |
|     auto s = db_->Get(ReadOptions(), entry.first, &value);
 | |
|     if (entry.first[0] < 'r') {
 | |
|       ASSERT_TRUE(s.IsNotFound());
 | |
|     } else {
 | |
|       ASSERT_TRUE(s.ok());
 | |
|       ASSERT_TRUE(value == entry.second);
 | |
|     }
 | |
|   }
 | |
| }
 | |
| 
 | |
| TEST_F(RocksLuaTest, FilterByValue) {
 | |
|   std::string db_path = temp_dir_ + "/rocks_lua_test";
 | |
| 
 | |
|   lua::RocksLuaCompactionFilterOptions lua_opt;
 | |
|   lua_opt.error_log = std::make_shared<StopOnErrorLogger>();
 | |
|   // removes all values whose initial is less than 'r'
 | |
|   lua_opt.lua_script =
 | |
|       "function Filter(level, key, existing_value)\n"
 | |
|       "  if existing_value:sub(1,1) < 'r' then\n"
 | |
|       "    return true, false, \"\"\n"
 | |
|       "  end\n"
 | |
|       "  return false, false, \"\"\n"
 | |
|       "end\n"
 | |
|       "\n"
 | |
|       "function FilterMergeOperand(level, key, operand)\n"
 | |
|       "  return false\n"
 | |
|       "end\n"
 | |
|       "function Name()\n"
 | |
|       "  return \"FilterByValue\"\n"
 | |
|       "end\n"
 | |
|       "\n";
 | |
| 
 | |
|   std::unordered_map<std::string, std::string> kvs;
 | |
|   CreateDBWithLuaCompactionFilter(lua_opt, db_path, &kvs);
 | |
|   // Issue full compaction and expect nothing is in the DB.
 | |
|   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
 | |
| 
 | |
|   for (auto const& entry : kvs) {
 | |
|     std::string value;
 | |
|     auto s = db_->Get(ReadOptions(), entry.first, &value);
 | |
|     if (entry.second[0] < 'r') {
 | |
|       ASSERT_TRUE(s.IsNotFound());
 | |
|     } else {
 | |
|       ASSERT_TRUE(s.ok());
 | |
|       ASSERT_EQ(value, entry.second);
 | |
|     }
 | |
|   }
 | |
| }
 | |
| 
 | |
| TEST_F(RocksLuaTest, ChangeValue) {
 | |
|   std::string db_path = temp_dir_ + "/rocks_lua_test";
 | |
| 
 | |
|   lua::RocksLuaCompactionFilterOptions lua_opt;
 | |
|   lua_opt.error_log = std::make_shared<StopOnErrorLogger>();
 | |
|   // Replace all values by their reversed key
 | |
|   lua_opt.lua_script =
 | |
|       "function Filter(level, key, existing_value)\n"
 | |
|       "  return false, true, key:reverse()\n"
 | |
|       "end\n"
 | |
|       "\n"
 | |
|       "function FilterMergeOperand(level, key, operand)\n"
 | |
|       "  return false\n"
 | |
|       "end\n"
 | |
|       "function Name()\n"
 | |
|       "  return \"ChangeValue\"\n"
 | |
|       "end\n"
 | |
|       "\n";
 | |
| 
 | |
|   std::unordered_map<std::string, std::string> kvs;
 | |
|   CreateDBWithLuaCompactionFilter(lua_opt, db_path, &kvs);
 | |
|   // Issue full compaction and expect nothing is in the DB.
 | |
|   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
 | |
| 
 | |
|   for (auto const& entry : kvs) {
 | |
|     std::string value;
 | |
|     ASSERT_OK(db_->Get(ReadOptions(), entry.first, &value));
 | |
|     std::string new_value = entry.first;
 | |
|     std::reverse(new_value.begin(), new_value.end());
 | |
|     ASSERT_EQ(value, new_value);
 | |
|   }
 | |
| }
 | |
| 
 | |
| TEST_F(RocksLuaTest, ConditionallyChangeAndFilterValue) {
 | |
|   std::string db_path = temp_dir_ + "/rocks_lua_test";
 | |
| 
 | |
|   lua::RocksLuaCompactionFilterOptions lua_opt;
 | |
|   lua_opt.error_log = std::make_shared<StopOnErrorLogger>();
 | |
|   // Performs the following logic:
 | |
|   // If key[0] < 'h' --> replace value by reverse key
 | |
|   // If key[0] >= 'r' --> keep the original key value
 | |
|   // Otherwise, filter the key value
 | |
|   lua_opt.lua_script =
 | |
|       "function Filter(level, key, existing_value)\n"
 | |
|       "  if key:sub(1,1) < 'h' then\n"
 | |
|       "    return false, true, key:reverse()\n"
 | |
|       "  elseif key:sub(1,1) < 'r' then\n"
 | |
|       "    return true, false, \"\"\n"
 | |
|       "  end\n"
 | |
|       "  return false, false, \"\"\n"
 | |
|       "end\n"
 | |
|       "function Name()\n"
 | |
|       "  return \"ConditionallyChangeAndFilterValue\"\n"
 | |
|       "end\n"
 | |
|       "\n";
 | |
| 
 | |
|   std::unordered_map<std::string, std::string> kvs;
 | |
|   CreateDBWithLuaCompactionFilter(lua_opt, db_path, &kvs);
 | |
|   // Issue full compaction and expect nothing is in the DB.
 | |
|   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
 | |
| 
 | |
|   for (auto const& entry : kvs) {
 | |
|     std::string value;
 | |
|     auto s = db_->Get(ReadOptions(), entry.first, &value);
 | |
|     if (entry.first[0] < 'h') {
 | |
|       ASSERT_TRUE(s.ok());
 | |
|       std::string new_value = entry.first;
 | |
|       std::reverse(new_value.begin(), new_value.end());
 | |
|       ASSERT_EQ(value, new_value);
 | |
|     } else if (entry.first[0] < 'r') {
 | |
|       ASSERT_TRUE(s.IsNotFound());
 | |
|     } else {
 | |
|       ASSERT_TRUE(s.ok());
 | |
|       ASSERT_EQ(value, entry.second);
 | |
|     }
 | |
|   }
 | |
| }
 | |
| 
 | |
| TEST_F(RocksLuaTest, DynamicChangeScript) {
 | |
|   std::string db_path = temp_dir_ + "/rocks_lua_test";
 | |
| 
 | |
|   lua::RocksLuaCompactionFilterOptions lua_opt;
 | |
|   lua_opt.error_log = std::make_shared<StopOnErrorLogger>();
 | |
|   // keeps all the key value pairs
 | |
|   lua_opt.lua_script =
 | |
|       "function Filter(level, key, existing_value)\n"
 | |
|       "  return false, false, \"\"\n"
 | |
|       "end\n"
 | |
|       "\n"
 | |
|       "function FilterMergeOperand(level, key, operand)\n"
 | |
|       "  return false\n"
 | |
|       "end\n"
 | |
|       "function Name()\n"
 | |
|       "  return \"KeepsAll\"\n"
 | |
|       "end\n"
 | |
|       "\n";
 | |
| 
 | |
|   std::unordered_map<std::string, std::string> kvs;
 | |
|   std::shared_ptr<rocksdb::lua::RocksLuaCompactionFilterFactory> factory;
 | |
|   CreateDBWithLuaCompactionFilter(lua_opt, db_path, &kvs, 30, &factory);
 | |
|   uint64_t count = 0;
 | |
|   ASSERT_TRUE(db_->GetIntProperty(
 | |
|       rocksdb::DB::Properties::kNumEntriesActiveMemTable, &count));
 | |
|   ASSERT_EQ(count, 0);
 | |
|   ASSERT_TRUE(db_->GetIntProperty(
 | |
|       rocksdb::DB::Properties::kNumEntriesImmMemTables, &count));
 | |
|   ASSERT_EQ(count, 0);
 | |
| 
 | |
|   CompactRangeOptions cr_opt;
 | |
|   cr_opt.bottommost_level_compaction =
 | |
|       rocksdb::BottommostLevelCompaction::kForce;
 | |
| 
 | |
|   // Issue full compaction and expect everything is in the DB.
 | |
|   ASSERT_OK(db_->CompactRange(cr_opt, nullptr, nullptr));
 | |
| 
 | |
|   for (auto const& entry : kvs) {
 | |
|     std::string value;
 | |
|     ASSERT_OK(db_->Get(ReadOptions(), entry.first, &value));
 | |
|     ASSERT_EQ(value, entry.second);
 | |
|   }
 | |
| 
 | |
|   // change the lua script to removes all the key value pairs
 | |
|   factory->SetScript(
 | |
|       "function Filter(level, key, existing_value)\n"
 | |
|       "  return true, false, \"\"\n"
 | |
|       "end\n"
 | |
|       "\n"
 | |
|       "function FilterMergeOperand(level, key, operand)\n"
 | |
|       "  return false\n"
 | |
|       "end\n"
 | |
|       "function Name()\n"
 | |
|       "  return \"RemovesAll\"\n"
 | |
|       "end\n"
 | |
|       "\n");
 | |
|   {
 | |
|     std::string key = "another-key";
 | |
|     std::string value = "another-value";
 | |
|     kvs.insert({key, value});
 | |
|     ASSERT_OK(db_->Put(WriteOptions(), key, value));
 | |
|     db_->Flush(FlushOptions());
 | |
|   }
 | |
| 
 | |
|   cr_opt.change_level = true;
 | |
|   cr_opt.target_level = 5;
 | |
|   // Issue full compaction and expect nothing is in the DB.
 | |
|   ASSERT_OK(db_->CompactRange(cr_opt, nullptr, nullptr));
 | |
| 
 | |
|   for (auto const& entry : kvs) {
 | |
|     std::string value;
 | |
|     auto s = db_->Get(ReadOptions(), entry.first, &value);
 | |
|     ASSERT_TRUE(s.IsNotFound());
 | |
|   }
 | |
| }
 | |
| 
 | |
| TEST_F(RocksLuaTest, LuaConditionalTypeError) {
 | |
|   std::string db_path = temp_dir_ + "/rocks_lua_test";
 | |
| 
 | |
|   lua::RocksLuaCompactionFilterOptions lua_opt;
 | |
|   // Filter() error when input key's initial >= 'r'
 | |
|   lua_opt.lua_script =
 | |
|       "function Filter(level, key, existing_value)\n"
 | |
|       "  if existing_value:sub(1,1) >= 'r' then\n"
 | |
|       "    return true, 2, \"\" -- incorrect type of 2nd return value\n"
 | |
|       "  end\n"
 | |
|       "  return true, false, \"\"\n"
 | |
|       "end\n"
 | |
|       "\n"
 | |
|       "function FilterMergeOperand(level, key, operand)\n"
 | |
|       "  return false\n"
 | |
|       "end\n"
 | |
|       "function Name()\n"
 | |
|       "  return \"BuggyCode\"\n"
 | |
|       "end\n"
 | |
|       "\n";
 | |
| 
 | |
|   std::unordered_map<std::string, std::string> kvs;
 | |
|   // Create DB with 10 files
 | |
|   CreateDBWithLuaCompactionFilter(lua_opt, db_path, &kvs, 10);
 | |
| 
 | |
|   // Issue full compaction and expect all keys which initial is < 'r'
 | |
|   // will be deleted as we keep the key value when we hit an error.
 | |
|   ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr));
 | |
| 
 | |
|   for (auto const& entry : kvs) {
 | |
|     std::string value;
 | |
|     auto s = db_->Get(ReadOptions(), entry.first, &value);
 | |
|     if (entry.second[0] < 'r') {
 | |
|       ASSERT_TRUE(s.IsNotFound());
 | |
|     } else {
 | |
|       ASSERT_TRUE(s.ok());
 | |
|       ASSERT_EQ(value, entry.second);
 | |
|     }
 | |
|   }
 | |
| }
 | |
| 
 | |
| }  // namespace rocksdb
 | |
| 
 | |
| int main(int argc, char** argv) {
 | |
|   rocksdb::port::InstallStackTraceHandler();
 | |
|   ::testing::InitGoogleTest(&argc, argv);
 | |
|   return RUN_ALL_TESTS();
 | |
| }
 | |
| 
 | |
| #else
 | |
| 
 | |
| int main(int /*argc*/, char** /*argv*/) {
 | |
|   printf("LUA_PATH is not set.  Ignoring the test.\n");
 | |
| }
 | |
| 
 | |
| #endif  // defined(LUA)
 | |
| 
 | |
| #else
 | |
| 
 | |
| int main(int /*argc*/, char** /*argv*/) {
 | |
|   printf("Lua is not supported in RocksDBLite.  Ignoring the test.\n");
 | |
| }
 | |
| 
 | |
| #endif  // !defined(ROCKSDB_LITE)
 | |
| 
 |