From 21082fa13cd8fc12efe07ff3df949826ce81edd4 Mon Sep 17 00:00:00 2001 From: heyongqiang Date: Tue, 21 Aug 2012 17:33:46 -0700 Subject: [PATCH 1/3] regression for trigger compaction logic Summary: as subject Test Plan: manually run db_bench confirmed Reviewers: dhruba Differential Revision: https://reviews.facebook.net/D4809 --- db/db_bench.cc | 6 +++--- db/version_set.cc | 15 ++++++++------- util/options.cc | 2 +- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/db/db_bench.cc b/db/db_bench.cc index 08f8ba61f..2ee030c90 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -146,8 +146,8 @@ static int FLAGS_max_bytes_for_level_multiplier = 10; // Number of files in level-0 that will trigger put stop. static int FLAGS_level0_stop_writes_trigger = 12; -// Nulber of files in level-0 that will slow down writes. -static int FLAGS_level0_slowdown_writes_trigger = 4; +// Number of files in level-0 that will slow down writes. +static int FLAGS_level0_slowdown_writes_trigger = 8; // posix or hdfs environment static leveldb::Env* FLAGS_env = leveldb::Env::Default(); @@ -1071,7 +1071,7 @@ int main(int argc, char** argv) { (n == 0 || n == 1)) { FLAGS_disable_data_sync = n; } else if (sscanf(argv[i], "--disable_wal=%d%c", &n, &junk) == 1 && - (n == 0 || n == 1)) { + (n == 0 || n == 1)) { FLAGS_disable_wal = n; } else if (sscanf(argv[i], "--hdfs=%s", &hdfsname) == 1) { FLAGS_env = new leveldb::HdfsEnv(hdfsname); diff --git a/db/version_set.cc b/db/version_set.cc index a6c596a13..78b82343d 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -702,15 +702,16 @@ VersionSet::VersionSet(const std::string& dbname, compact_pointer_ = new std::string[options_->num_levels]; max_file_size_ = new uint64_t[options_->num_levels]; level_max_bytes_ = new uint64_t[options->num_levels]; - max_file_size_[0] = options_->target_file_size_base; - level_max_bytes_[0] = options_->max_bytes_for_level_base; int target_file_size_multiplier = options_->target_file_size_multiplier; int max_bytes_multiplier = options_->max_bytes_for_level_multiplier; - int i = 1; - while (i < options_->num_levels) { - max_file_size_[i] = max_file_size_[i-1] * target_file_size_multiplier; - level_max_bytes_[i] = level_max_bytes_[i-1] * max_bytes_multiplier; - i++; + for (int i = 0; i < options_->num_levels; i++) { + if (i > 1) { + max_file_size_[i] = max_file_size_[i-1] * target_file_size_multiplier; + level_max_bytes_[i] = level_max_bytes_[i-1] * max_bytes_multiplier; + } else { + max_file_size_[i] = options_->target_file_size_base; + level_max_bytes_[i] = options_->max_bytes_for_level_base; + } } AppendVersion(new Version(this)); } diff --git a/util/options.cc b/util/options.cc index 2a6ad4387..43f93a9cd 100644 --- a/util/options.cc +++ b/util/options.cc @@ -28,7 +28,7 @@ Options::Options() level0_stop_writes_trigger(12), max_mem_compaction_level(2), target_file_size_base(2 * 1048576), - target_file_size_multiplier(10), + target_file_size_multiplier(1), max_bytes_for_level_base(10 * 1048576), max_bytes_for_level_multiplier(10), expanded_compaction_factor(25), From e5a7c8e58080cc2189bf52e11e6980e85328511c Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Wed, 22 Aug 2012 11:43:53 -0700 Subject: [PATCH 2/3] Log the open-options to the LOG. Summary: Log the open-options to the LOG. Use options_ instead of options because SanitizeOptions could modify the max_file_open limit. Test Plan: num db_bench Reviewers: heyongqiang Reviewed By: heyongqiang Differential Revision: https://reviews.facebook.net/D4833 --- db/db_impl.cc | 4 +++- include/leveldb/options.h | 2 ++ util/options.cc | 22 ++++++++++++++++++++++ 3 files changed, 27 insertions(+), 1 deletion(-) diff --git a/db/db_impl.cc b/db/db_impl.cc index a133281ff..0c709255e 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -141,12 +141,14 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) stats_ = new CompactionStats[options.num_levels]; // Reserve ten files or so for other uses and give the rest to TableCache. - const int table_cache_size = options.max_open_files - 10; + const int table_cache_size = options_.max_open_files - 10; table_cache_ = new TableCache(dbname_, &options_, table_cache_size); versions_ = new VersionSet(dbname_, &options_, table_cache_, &internal_comparator_); + options_.Dump(options_.info_log); + #ifdef USE_SCRIBE logger_ = new ScribeLogger("localhost", 1456); #endif diff --git a/include/leveldb/options.h b/include/leveldb/options.h index f8aa52ed9..a271905fc 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -206,6 +206,8 @@ struct Options { // Create an Options object with default values for all fields. Options(); + + void Dump(Logger * log) const; }; // Options that control read operations diff --git a/util/options.cc b/util/options.cc index 43f93a9cd..c9306387d 100644 --- a/util/options.cc +++ b/util/options.cc @@ -6,6 +6,7 @@ #include "leveldb/comparator.h" #include "leveldb/env.h" +#include "leveldb/filter_policy.h" namespace leveldb { @@ -39,4 +40,25 @@ Options::Options() db_stats_log_interval(1800) { } +void +Options::Dump( + Logger * log) const +{ + Log(log," Options.comparator: %s", comparator->Name()); + Log(log," Options.create_if_missing: %d", create_if_missing); + Log(log," Options.error_if_exists: %d", error_if_exists); + Log(log," Options.paranoid_checks: %d", paranoid_checks); + Log(log," Options.env: %p", env); + Log(log," Options.info_log: %p", info_log); + Log(log," Options.write_buffer_size: %zd", write_buffer_size); + Log(log," Options.max_open_files: %d", max_open_files); + Log(log," Options.block_cache: %p", block_cache); + Log(log," Options.block_size: %zd", block_size); + Log(log,"Options.block_restart_interval: %d", block_restart_interval); + Log(log," Options.compression: %d", compression); + Log(log," Options.filter_policy: %s", filter_policy == NULL ? "NULL" : filter_policy->Name()); + +} // Options::Dump + + } // namespace leveldb From 2b443ba887d5a96c5f78462e55ff1e4a4d22fc65 Mon Sep 17 00:00:00 2001 From: Dhruba Borthakur Date: Tue, 14 Aug 2012 14:52:48 -0700 Subject: [PATCH 3/3] Titile: a command line shell to read/write data from a leveldb thrift server Summary: implemented a commond line shell to talk with leveldb thrift server, which is based on state pattern and can be easily extended. Test Plan: build and run Reviewers: dhruba, zshao, heyongqiang Differential Revision: https://reviews.facebook.net/D4713 --- Makefile | 16 +- tools/shell/DBClientProxy.cpp | 271 +++++++++++++++++++++++++ tools/shell/DBClientProxy.h | 64 ++++++ tools/shell/LeveldbShell.cpp | 8 + tools/shell/ShellContext.cpp | 104 ++++++++++ tools/shell/ShellContext.h | 51 +++++ tools/shell/ShellState.cpp | 139 +++++++++++++ tools/shell/ShellState.h | 87 ++++++++ tools/shell/test/DBClientProxyTest.cpp | 182 +++++++++++++++++ 9 files changed, 918 insertions(+), 4 deletions(-) create mode 100644 tools/shell/DBClientProxy.cpp create mode 100644 tools/shell/DBClientProxy.h create mode 100644 tools/shell/LeveldbShell.cpp create mode 100644 tools/shell/ShellContext.cpp create mode 100644 tools/shell/ShellContext.h create mode 100644 tools/shell/ShellState.cpp create mode 100644 tools/shell/ShellState.h create mode 100644 tools/shell/test/DBClientProxyTest.cpp diff --git a/Makefile b/Makefile index 087634024..e30a19826 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ $(shell ./build_detect_platform build_config.mk) # this file is generated by the previous line to set build flags and sources include build_config.mk -CFLAGS += -I. -I./include $(PLATFORM_CCFLAGS) $(OPT) +CFLAGS += -I. -I./include $(PLATFORM_CCFLAGS) $(OPT) CXXFLAGS += -I. -I./include $(PLATFORM_CXXFLAGS) $(OPT) LDFLAGS += $(PLATFORM_LDFLAGS) @@ -33,6 +33,9 @@ MEMENVOBJECTS = $(MEMENV_SOURCES:.cc=.o) TESTUTIL = ./util/testutil.o TESTHARNESS = ./util/testharness.o $(TESTUTIL) +TOOLS = \ + leveldb_shell + TESTS = \ arena_test \ bloom_test \ @@ -52,8 +55,7 @@ TESTS = \ table_test \ version_edit_test \ version_set_test \ - write_batch_test \ - leveldb_server_test + write_batch_test PROGRAMS = db_bench $(TESTS) BENCHMARKS = db_bench_sqlite3 db_bench_tree_db @@ -80,7 +82,7 @@ $(SHARED1): $(SHARED3) ln -fs $(SHARED3) $(SHARED1) endif -all: $(SHARED) $(LIBRARY) $(THRIFTSERVER) +all: $(SHARED) $(LIBRARY) $(THRIFTSERVER) $(TOOLS) check: all $(PROGRAMS) $(TESTS) for t in $(TESTS); do echo "***** Running $$t"; ./$$t || exit 1; done @@ -93,6 +95,9 @@ $(LIBRARY): $(LIBOBJECTS) rm -f $@ $(AR) -rs $@ $(LIBOBJECTS) +leveldb_shell: tools/shell/ShellContext.o tools/shell/ShellState.o tools/shell/LeveldbShell.o tools/shell/DBClientProxy.o tools/shell/ShellContext.h tools/shell/ShellState.h tools/shell/DBClientProxy.h $(LIBOBJECTS) + $(CXX) tools/shell/ShellContext.o tools/shell/ShellState.o tools/shell/LeveldbShell.o tools/shell/DBClientProxy.o $(LIBOBJECTS) -o $@ $(LDFLAGS) + db_bench: db/db_bench.o $(LIBOBJECTS) $(TESTUTIL) $(CXX) db/db_bench.o $(LIBOBJECTS) $(TESTUTIL) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) @@ -169,6 +174,9 @@ leveldb_server: thrift/server.o $(LIBRARY) leveldb_server_test: thrift/test/simpletest.o $(LIBRARY) $(CXX) thrift/test/simpletest.o $(LIBRARY) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) +DBClientProxy_test: tools/shell/test/DBClientProxyTest.o tools/shell/DBClientProxy.o $(LIBRARY) + $(CXX) tools/shell/test/DBClientProxyTest.o tools/shell/DBClientProxy.o $(LIBRARY) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) + ifeq ($(PLATFORM), IOS) # For iOS, create universal object files to be used on both the simulator and # a device. diff --git a/tools/shell/DBClientProxy.cpp b/tools/shell/DBClientProxy.cpp new file mode 100644 index 000000000..a1ec7b854 --- /dev/null +++ b/tools/shell/DBClientProxy.cpp @@ -0,0 +1,271 @@ + +#include + +#include "DBClientProxy.h" + + +#include "thrift/lib/cpp/protocol/TBinaryProtocol.h" +#include "thrift/lib/cpp/transport/TSocket.h" +#include "thrift/lib/cpp/transport/TTransportUtils.h" + + + +using namespace std; +using namespace boost; +using namespace Tleveldb; +using namespace apache::thrift::protocol; +using namespace apache::thrift::transport; + +namespace leveldb { + +DBClientProxy::DBClientProxy(const string & host, int port) : + host_(host), + port_(port), + dbToHandle_(), + dbClient_() { +} + +DBClientProxy::~DBClientProxy() { + cleanUp(); +} + + +void DBClientProxy::connect(void) { + cleanUp(); + printf("Connecting to %s:%d\n", host_.c_str(), port_); + try { + boost::shared_ptr socket(new TSocket(host_, port_)); + boost::shared_ptr transport(new TBufferedTransport(socket)); + boost::shared_ptr protocol(new TBinaryProtocol(transport)); + dbClient_.reset(new DBClient(protocol)); + + transport->open(); + } catch (const std::exception & e) { + dbClient_.reset(); + throw; + } +} + +void DBClientProxy::cleanUp(void) { + if(dbClient_.get()) { + for(map::iterator itor = dbToHandle_.begin(); + itor != dbToHandle_.end(); + ++itor) { + dbClient_->Close(itor->second, itor->first); + } + dbClient_.reset(); + } + dbToHandle_.clear(); +} + +void DBClientProxy::open(const string & db) { + if(!dbClient_.get()) { + printf("please connect() first\n"); + return; + } + + // printf("opening database : %s\n", db.c_str()); + // we use default DBOptions here + DBOptions opt; + DBHandle handle; + try { + dbClient_->Open(handle, db, opt); + } catch (const LeveldbException & e) { + printf("%s\n", e.message.c_str()); + if(kIOError == e.errorCode) { + printf("no such database : %s\n", db.c_str()); + return; + }else { + printf("Unknown error : %d\n", e.errorCode); + return; + } + } + + dbToHandle_[db] = handle; +} + + +bool DBClientProxy::create(const string & db) { + if(!dbClient_.get()) { + printf("please connect() first\n"); + return false; + } + + printf("creating database : %s\n", db.c_str()); + DBOptions opt; + opt.create_if_missing = true; + opt.error_if_exists = true; + DBHandle handle; + try { + dbClient_->Open(handle, db, opt); + }catch (const LeveldbException & e) { + printf("%s\n", e.message.c_str()); + printf("error code : %d\n", e.errorCode); + if(kNotFound == e.errorCode) { + printf("no such database : %s\n", db.c_str()); + return false;; + } else { + printf("Unknown error : %d\n", e.errorCode); + return false; + } + } + + dbToHandle_[db] = handle; + return true; +} + + +map::iterator +DBClientProxy::getHandle(const string & db) { + map::iterator itor = dbToHandle_.find(db); + if(dbToHandle_.end() == itor) { + open(db); + itor = dbToHandle_.find(db); + } + + return itor; +} + + +bool DBClientProxy::get(const string & db, + const string & key, + string & value) { + if(!dbClient_.get()) { + printf("please connect() first\n"); + return false; + } + + map::iterator itor = getHandle(db); + if(dbToHandle_.end() == itor) { + return false; + } + + ResultItem ret; + Slice k; + k.data = key; + k.size = key.size(); + // we use default values of options here + ReadOptions opt; + dbClient_->Get(ret, + itor->second, + k, + opt); + if(kOk == ret.status) { + value = ret.value.data; + return true; + } else if(kNotFound == ret.status) { + printf("no such key : %s\n", key.c_str()); + return false; + } else { + printf("get data error : %d\n", ret.status); + return false; + } +} + + + +bool DBClientProxy::put(const string & db, + const string & key, + const string & value) { + if(!dbClient_.get()) { + printf("please connect() first\n"); + return false; + } + + map::iterator itor = getHandle(db); + if(dbToHandle_.end() == itor) { + return false; + } + + kv temp; + temp.key.data = key; + temp.key.size = key.size(); + temp.value.data = value; + temp.value.size = value.size(); + WriteOptions opt; + opt.sync = true; + Code code; + code = dbClient_->Put(itor->second, + temp, + opt); + + + if(kOk == code) { + // printf("set value finished\n"); + return true; + } else { + printf("put data error : %d\n", code); + return false; + } +} + +bool DBClientProxy::scan(const string & db, + const string & start_key, + const string & end_key, + const string & limit, + vector > & kvs) { + if(!dbClient_.get()) { + printf("please connect() first\n"); + return false; + } + + int limitInt = -1; + limitInt = atoi(limit.c_str()); + if(limitInt <= 0) { + printf("Error while parse limit : %s\n", limit.c_str()); + return false; + } + + if(start_key > end_key) { + printf("empty range.\n"); + return false; + } + + map::iterator itor = getHandle(db); + if(dbToHandle_.end() == itor) { + return false; + } + + ResultIterator ret; + // we use the default values of options here + ReadOptions opt; + Slice k; + k.data = start_key; + k.size = start_key.size(); + dbClient_->NewIterator(ret, + itor->second, + opt, + seekToKey, + k); + Iterator it; + if(kOk == ret.status) { + it = ret.iterator; + } else { + printf("get iterator error : %d\n", ret.status); + return false; + } + + int idx = 0; + string ck = start_key; + while(idx < limitInt && ck < end_key) { + ResultPair retPair; + dbClient_->GetNext(retPair, itor->second, it); + if(kOk == retPair.status) { + ++idx; + ck = retPair.keyvalue.key.data; + if (ck < end_key) { + kvs.push_back(make_pair(retPair.keyvalue.key.data, + retPair.keyvalue.value.data)); + } + } else if(kEnd == retPair.status) { + printf("not enough values\n"); + return true; + } else { + printf("GetNext() error : %d\n", retPair.status); + return false; + } + } + return true; +} + +} // namespace diff --git a/tools/shell/DBClientProxy.h b/tools/shell/DBClientProxy.h new file mode 100644 index 000000000..3be7d1a73 --- /dev/null +++ b/tools/shell/DBClientProxy.h @@ -0,0 +1,64 @@ + +#ifndef TOOLS_SHELL_DBCLIENTPROXY +#define TOOLS_SHELL_DBCLIENTPROXY + +#include +#include +#include +#include +#include + +#include "DB.h" + +/* + * class DBClientProxy maintains: + * 1. a connection to leveldb service + * 2. a map from db names to opened db handles + * + * it's client codes' responsibility to catch all possible exceptions. + */ + +namespace leveldb { + +class DBClientProxy : private boost::noncopyable { + public: + // connect to host_:port_ + void connect(void); + + // return true on success, false otherwise + bool get(const std::string & db, + const std::string & key, + std::string & value); + + // return true on success, false otherwise + bool put(const std::string & db, + const std::string & key, + const std::string & value); + + // return true on success, false otherwise + bool scan(const std::string & db, + const std::string & start_key, + const std::string & end_key, + const std::string & limit, + std::vector > & kvs); + + // return true on success, false otherwise + bool create(const std::string & db); + + DBClientProxy(const std::string & host, int port); + ~DBClientProxy(); + + private: + // some internal help functions + void cleanUp(void); + void open(const std::string & db); + std::map::iterator getHandle(const std::string & db); + + const std::string host_; + const int port_; + std::map dbToHandle_; + boost::shared_ptr dbClient_; +}; + +} // namespace +#endif diff --git a/tools/shell/LeveldbShell.cpp b/tools/shell/LeveldbShell.cpp new file mode 100644 index 000000000..e6274d3bf --- /dev/null +++ b/tools/shell/LeveldbShell.cpp @@ -0,0 +1,8 @@ + + +#include "ShellContext.h" + +int main(int argc, char ** argv) { + ShellContext c(argc, argv); + c.run(); +} diff --git a/tools/shell/ShellContext.cpp b/tools/shell/ShellContext.cpp new file mode 100644 index 000000000..36a07e544 --- /dev/null +++ b/tools/shell/ShellContext.cpp @@ -0,0 +1,104 @@ + +#include +#include + +#include "ShellContext.h" +#include "ShellState.h" + + + +#include "thrift/lib/cpp/protocol/TBinaryProtocol.h" +#include "thrift/lib/cpp/transport/TSocket.h" +#include "thrift/lib/cpp/transport/TTransportUtils.h" + + + +using namespace std; +using namespace boost; +using namespace Tleveldb; +using namespace leveldb; +using namespace apache::thrift::protocol; +using namespace apache::thrift::transport; + +void ShellContext::changeState(ShellState * pState) { + pShellState_ = pState; +} + +void ShellContext::stop(void) { + exit_ = true; +} + +bool ShellContext::ParseInput(void) { + if(argc_ != 3) { + printf("leveldb_shell host port\n"); + return false; + } + + port_ = atoi(argv_[2]); + if(port_ <= 0) { + printf("Error while parse port : %s\n", argv_[2]); + return false; + } + + clientProxy_.reset(new DBClientProxy(argv_[1], port_)); + if(!clientProxy_.get()) { + return false; + } else { + return true; + } +} + +void ShellContext::connect(void) { + clientProxy_->connect(); +} + +void ShellContext::create(const string & db) { + if (clientProxy_->create(db)) { + printf("%s created\n", db.c_str()); + } +} + +void ShellContext::get(const string & db, + const string & key) { + string v; + if (clientProxy_->get(db, key, v)) { + printf("%s\n", v.c_str()); + } +} + +void ShellContext::put(const string & db, + const string & key, + const string & value) { + if (clientProxy_->put(db, key, value)) { + printf("(%s, %s) has been set\n", key.c_str(), value.c_str()); + } +} + +void ShellContext::scan(const string & db, + const string & start_key, + const string & end_key, + const string & limit) { + vector > kvs; + if (clientProxy_->scan(db, start_key, end_key, limit, kvs)) { + for(unsigned int i = 0; i < kvs.size(); ++i) { + printf("%d (%s, %s)\n", i, kvs[i].first.c_str(), kvs[i].second.c_str()); + } + } +} + +void ShellContext::run(void) { + while(!exit_) { + pShellState_->run(this); + } +} + +ShellContext::ShellContext(int argc, char ** argv) : + pShellState_(ShellStateStart::getInstance()), + exit_(false), + argc_(argc), + argv_(argv), + port_(-1), + clientProxy_() { +} + + diff --git a/tools/shell/ShellContext.h b/tools/shell/ShellContext.h new file mode 100644 index 000000000..95805ad0c --- /dev/null +++ b/tools/shell/ShellContext.h @@ -0,0 +1,51 @@ +#ifndef TOOLS_SHELL_SHELLCONTEXT +#define TOOLS_SHELL_SHELLCONTEXT + +#include +#include +#include +#include + +#include "DB.h" +#include "DBClientProxy.h" + +class ShellState; + +class ShellContext : private boost::noncopyable { + public: + void changeState(ShellState * pState); + + void stop(void); + + bool ParseInput(void); + + void connect(void); + + void get(const std::string & db, + const std::string & key); + + void put(const std::string & db, + const std::string & key, + const std::string & value); + + void scan(const std::string & db, + const std::string & start_key, + const std::string & end_key, + const std::string & limit); + + void create(const std::string & db); + + void run(void); + + ShellContext(int argc, char ** argv); + + private: + ShellState * pShellState_; + bool exit_; + int argc_; + char ** argv_; + int port_; + boost::shared_ptr clientProxy_; +}; + +#endif diff --git a/tools/shell/ShellState.cpp b/tools/shell/ShellState.cpp new file mode 100644 index 000000000..057a337aa --- /dev/null +++ b/tools/shell/ShellState.cpp @@ -0,0 +1,139 @@ +#include +#include +#include +#include + +#include "ShellState.h" +#include "ShellContext.h" +#include "transport/TTransportException.h" + +using namespace std; + +using namespace apache::thrift::transport; + +const char * PMT = ">> "; + + +void ShellStateStart::run(ShellContext * c) { + if(!c->ParseInput()) { + c->changeState(ShellStateStop::getInstance()); + } else { + c->changeState(ShellStateConnecting::getInstance()); + } +} + + +void ShellStateStop::run(ShellContext * c) { + c->stop(); +} + +void ShellStateConnecting::run(ShellContext * c) { + try { + c->connect(); + } catch (const TTransportException & e) { + cout << e.what() << endl; + c->changeState(ShellStateStop::getInstance()); + return; + } + + c->changeState(ShellStateConnected::getInstance()); +} + +void ShellStateConnected::unknownCmd(void) { + cout << "Unknown command!" << endl; + cout << "Use help to list all available commands" << endl; +} + +void ShellStateConnected::helpMsg(void) { + cout << "Currently supported commands:" << endl; + cout << "create db" << endl; + cout << "get db key" << endl; + cout << "scan db start_key end_key limit" << endl; + cout << "put db key value" << endl; + cout << "exit/quit" << endl; +} + +void ShellStateConnected::handleConError(ShellContext * c) { + cout << "Connection down" << endl; + cout << "Reconnect ? (y/n) :" << endl; + string s; + while(getline(cin, s)) { + if("y" == s) { + c->changeState(ShellStateConnecting::getInstance()); + break; + } else if("n" == s) { + c->changeState(ShellStateStop::getInstance()); + break; + } else { + cout << "Reconnect ? (y/n) :" << endl; + } + } +} + +void ShellStateConnected::run(ShellContext * c) { + string line; + cout << PMT; + getline(cin, line); + istringstream is(line); + vector params; + string param; + while(is >> param) { + params.push_back(param); + } + + // empty input line + if(params.empty()) + return; + + if("quit" == params[0] || "exit" == params[0]) { + c->changeState(ShellStateStop::getInstance()); + } else if("get" == params[0]) { + if(params.size() == 3) { + try { + c->get(params[1], params[2]); + } catch (const TTransportException & e) { + cout << e.what() << endl; + handleConError(c); + } + } else { + unknownCmd(); + } + } else if("create" == params[0]) { + if(params.size() == 2) { + try { + c->create(params[1]); + } catch (const TTransportException & e) { + cout << e.what() << endl; + handleConError(c); + } + } else { + unknownCmd(); + } + }else if("put" == params[0]) { + if(params.size() == 4) { + try { + c->put(params[1], params[2], params[3]); + } catch (const TTransportException & e) { + cout << e.what() << endl; + handleConError(c); + } + } else { + unknownCmd(); + } + } else if("scan" == params[0]) { + if(params.size() == 5) { + try { + c->scan(params[1], params[2], params[3], params[4]); + } catch (const TTransportException & e) { + cout << e.what() << endl; + handleConError(c); + } + } else { + unknownCmd(); + } + } else if("help" == params[0]) { + helpMsg(); + } else { + unknownCmd(); + } +} diff --git a/tools/shell/ShellState.h b/tools/shell/ShellState.h new file mode 100644 index 000000000..6167d55b6 --- /dev/null +++ b/tools/shell/ShellState.h @@ -0,0 +1,87 @@ + +#ifndef TOOLS_SHELL_SHELLSTATE +#define TOOLS_SHELL_SHELLSTATE + +class ShellContext; + +/* + * Currently, there are four types of state in total + * 1. start state: the first state the program enters + * 2. connecting state: the program try to connnect to a leveldb server, whose + * previous states could be "start" or "connected" states + * 3. connected states: the program has already connected to a server, and is + * processing user commands + * 4. stop state: the last state the program enters, do some cleanning up things + */ + +class ShellState { + public: + virtual void run(ShellContext *) = 0; + virtual ~ShellState() {} +}; + + +class ShellStateStart : public ShellState { + public: + static ShellStateStart * getInstance(void) { + static ShellStateStart instance; + return &instance; + } + + virtual void run(ShellContext *); + + private: + ShellStateStart() {} + virtual ~ShellStateStart() {} +}; + +class ShellStateStop : public ShellState { + public: + static ShellStateStop * getInstance(void) { + static ShellStateStop instance; + return &instance; + } + + virtual void run(ShellContext *); + + private: + ShellStateStop() {} + virtual ~ShellStateStop() {} + +}; + +class ShellStateConnecting : public ShellState { + public: + static ShellStateConnecting * getInstance(void) { + static ShellStateConnecting instance; + return &instance; + } + + virtual void run(ShellContext *); + + private: + ShellStateConnecting() {} + virtual ~ShellStateConnecting() {} + +}; + +class ShellStateConnected : public ShellState { + public: + static ShellStateConnected * getInstance(void) { + static ShellStateConnected instance; + return &instance; + } + + virtual void run(ShellContext *); + + private: + ShellStateConnected() {} + virtual ~ShellStateConnected() {} + + void unknownCmd(); + void handleConError(ShellContext *); + void helpMsg(); +}; + +#endif + diff --git a/tools/shell/test/DBClientProxyTest.cpp b/tools/shell/test/DBClientProxyTest.cpp new file mode 100644 index 000000000..27e1c8563 --- /dev/null +++ b/tools/shell/test/DBClientProxyTest.cpp @@ -0,0 +1,182 @@ +/** + * Tests for DBClientProxy class for leveldb + * @author Bo Liu (newpoo.liu@gmail.com) + * Copyright 2012 Facebook + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "server_options.h" + + +#include "../DBClientProxy.h" +using namespace leveldb; + + +using namespace apache::thrift; +using namespace apache::thrift::protocol; +using namespace apache::thrift::transport; +using boost::shared_ptr; +using namespace Tleveldb; +using namespace std; + + + +extern "C" void startServer(int argc, char**argv); +extern "C" void stopServer(int port); +extern ServerOptions server_options; + +static const string db1("db1"); + + +static void testDBClientProxy(DBClientProxy & dbcp) { + bool flag; + const int NOK = 100; + const int BUFSIZE = 16; + int testcase = 0; + + vector keys, values; + vector > kvs, correctKvs; + string k, v; + + for(int i = 0; i < NOK; ++i) { + char bufKey[BUFSIZE]; + char bufValue[BUFSIZE]; + snprintf(bufKey, BUFSIZE, "key%d", i); + snprintf(bufValue, BUFSIZE, "value%d", i); + keys.push_back(bufKey); + values.push_back(bufValue); + correctKvs.push_back((make_pair(string(bufKey), string(bufValue)))); + } + + sort(correctKvs.begin(), correctKvs.end()); + + + // can not do get(), put(), scan() or create() before connected. + flag = dbcp.get(db1, keys[0], v); + ASSERT_TRUE(false == flag); + printf("\033[01;40;32mTEST CASE %d passed\033[01;40;37m\n", ++testcase); + flag = dbcp.put(db1, keys[0], keys[1]); + ASSERT_TRUE(false == flag); + printf("\033[01;40;32mTEST CASE %d passed\033[01;40;37m\n", ++testcase); + flag = dbcp.scan(db1, "a", "w", "100", kvs); + ASSERT_TRUE(false == flag); + printf("\033[01;40;32mTEST CASE %d passed\033[01;40;37m\n", ++testcase); + flag = dbcp.create(db1); + ASSERT_TRUE(false == flag); + printf("\033[01;40;32mTEST CASE %d passed\033[01;40;37m\n", ++testcase); + + dbcp.connect(); + + // create a database + flag = dbcp.create(db1); + ASSERT_TRUE(true == flag); + printf("\033[01;40;32mTEST CASE %d passed\033[01;40;37m\n", ++testcase); + + // no such key + flag = dbcp.get(db1, keys[0], v); + ASSERT_TRUE(false == flag); + printf("\033[01;40;32mTEST CASE %d passed\033[01;40;37m\n", ++testcase); + + + // scan() success with empty returned key-value pairs + kvs.clear(); + flag = dbcp.scan(db1, "a", "w", "100", kvs); + ASSERT_TRUE(true == flag); + ASSERT_TRUE(kvs.empty()); + printf("\033[01;40;32mTEST CASE %d passed\033[01;40;37m\n", ++testcase); + + + // put() + for(int i = 0; i < NOK; ++i) { + flag = dbcp.put(db1, keys[i], values[i]); + ASSERT_TRUE(true == flag); + } + printf("\033[01;40;32mTEST CASE %d passed\033[01;40;37m\n", ++testcase); + + + // scan all of key-value pairs + kvs.clear(); + flag = dbcp.scan(db1, "a", "w", "100", kvs); + ASSERT_TRUE(true == flag); + ASSERT_TRUE(kvs == correctKvs); + printf("\033[01;40;32mTEST CASE %d passed\033[01;40;37m\n", ++testcase); + + + // scan the first 20 key-value pairs + { + kvs.clear(); + flag = dbcp.scan(db1, "a", "w", "20", kvs); + ASSERT_TRUE(true == flag); + vector > tkvs(correctKvs.begin(), correctKvs.begin() + 20); + ASSERT_TRUE(kvs == tkvs); + printf("\033[01;40;32mTEST CASE %d passed\033[01;40;37m\n", ++testcase); + } + + // scan key[10] to key[50] + { + kvs.clear(); + flag = dbcp.scan(db1, correctKvs[10].first, correctKvs[50].first, "100", kvs); + ASSERT_TRUE(true == flag); + + vector > tkvs(correctKvs.begin() + 10, correctKvs.begin() + 50); + ASSERT_TRUE(kvs == tkvs); + printf("\033[01;40;32mTEST CASE %d passed\033[01;40;37m\n", ++testcase); + } + + // scan "key10" to "key40" by limit constraint + { + kvs.clear(); + flag = dbcp.scan(db1, correctKvs[10].first.c_str(), "w", "30", kvs); + ASSERT_TRUE(true == flag); + vector > tkvs(correctKvs.begin() + 10, correctKvs.begin() + 40); + ASSERT_TRUE(kvs == tkvs); + printf("\033[01;40;32mTEST CASE %d passed\033[01;40;37m\n", ++testcase); + } + + + // get() + flag = dbcp.get(db1, "unknownKey", v); + ASSERT_TRUE(false == flag); + printf("\033[01;40;32mTEST CASE %d passed\033[01;40;37m\n", ++testcase); + + flag = dbcp.get(db1, keys[0], v); + ASSERT_TRUE(true == flag); + ASSERT_TRUE(v == values[0]); + printf("\033[01;40;32mTEST CASE %d passed\033[01;40;37m\n", ++testcase); +} + + + +static void cleanupDir(std::string dir) { + // remove old data, if any + char* cleanup = new char[100]; + snprintf(cleanup, 100, "rm -rf %s", dir.c_str()); + system(cleanup); +} + +int main(int argc, char **argv) { + // create a server + startServer(argc, argv); + printf("Server thread created.\n"); + + // give some time to the server to initialize itself + while (server_options.getPort() == 0) { + sleep(1); + } + + cleanupDir(server_options.getDataDirectory(db1)); + + DBClientProxy dbcp("localhost", server_options.getPort()); + testDBClientProxy(dbcp); +} +