Share a single cache for all the DBs served by this server.

Summary:

Test Plan:

Reviewers:

CC:

Task ID: #

Blame Rev:
main
Dhruba Borthakur 13 years ago
parent bfaa1bd26c
commit d5503208cf
  1. 2
      build_detect_platform
  2. 3
      thrift/if/leveldb.thrift
  3. 10
      thrift/openhandles.h
  4. 10
      thrift/server.cpp
  5. 11
      thrift/server_options.cpp
  6. 212
      thrift/server_options.h
  7. 50
      thrift/server_utils.cpp
  8. 36
      thrift/test/simpletest.cpp

@ -102,7 +102,7 @@ esac
# prune take effect.
DIRS="util db table"
if test "$USE_THRIFT"; then
DIRS+=" thrift/gen-cpp thrift/server_utils.cpp"
DIRS+=" thrift/gen-cpp thrift/server_utils.cpp thrift/server_options.cpp"
THRIFTSERVER=leveldb_server
fi
set -f # temporarily disable globbing so that our patterns aren't expanded

@ -121,7 +121,8 @@ exception LeveldbException {
// The Database service
service DB {
// opens the database
// opens the database. The database name cannot have "/"
// in its name.
DBHandle Open(1:Text dbname, 2:DBOptions dboptions)
throws (1:LeveldbException se),

@ -4,6 +4,9 @@
* Copyright 2012 Facebook
*/
#ifndef THRIFT_LEVELDB_SERVER_H_
#define THRIFT_LEVELDB_SERVER_H_
#include <unordered_map>
#include <atomic>
#include "DB.h"
@ -142,15 +145,15 @@ class OpenHandles {
// Inserts a new database into the list.
// If the database is already open, increase refcount.
// If the database is not already open, open and insert into list.
int64_t add(leveldb::Options& options, Text dbname) {
int64_t add(leveldb::Options& options, Text dbname, std::string dbdir) {
struct onehandle* found = head_[dbname];
if (found == NULL) {
found = new onehandle;
found->name = dbname;
found->uniqueid = dbnum_++;
fprintf(stderr, "openhandle.add: Opening leveldb directory %s\n",
fprintf(stderr, "openhandle.add: Opening leveldb DB %s\n",
dbname.c_str());
leveldb::Status status = leveldb::DB::Open(options, dbname, &found->onedb);
leveldb::Status status = leveldb::DB::Open(options, dbdir, &found->onedb);
if (!status.ok()) {
LeveldbException e;
e.errorCode = Code::kIOError;
@ -223,5 +226,6 @@ class OpenHandles {
}
};
#endif // THRIFT_LEVELDB_SERVER_H_

@ -13,6 +13,7 @@
#include <transport/TBufferTransports.h>
#include <leveldb_types.h>
#include "openhandles.h"
#include "server_options.h"
#include "leveldb/db.h"
#include "leveldb/write_batch.h"
@ -24,23 +25,22 @@ using namespace apache::thrift::server;
using namespace Tleveldb;
using boost::shared_ptr;
extern "C" void startServer(int port);
extern "C" void startServer(int argc, char** argv);
extern "C" void stopServer(int port);
static int port = 9090;
extern ServerOptions server_options;
void signal_handler(int sig) {
switch (sig) {
case SIGINT:
fprintf(stderr, "Received SIGINT, stopping leveldb server");
stopServer(port);
stopServer(server_options.getPort());
break;
}
}
int main(int argc, char **argv) {
signal(SIGINT, signal_handler);
startServer(port);
startServer(argc, argv);
return 0;
}

@ -0,0 +1,11 @@
/**
* Options for the Thrift leveldb server.
* @author Dhruba Borthakur (dhruba@gmail.com)
* Copyright 2012 Facebook
**/
#include <DB.h>
#include "server_options.h"
const std::string ServerOptions::DEFAULT_HOST = "hostname";
const std::string ServerOptions::DEFAULT_ROOTDIR = "/tmp/ldb/";

@ -0,0 +1,212 @@
/**
* Options for the Thrift leveldb server.
* @author Dhruba Borthakur (dhruba@gmail.com)
* Copyright 2012 Facebook
*/
#ifndef THRIFT_LEVELDB_SERVER_OPTIONS_
#define THRIFT_LEVELDB_SERVER_OPTIONS_
#include <sys/stat.h>
#include <sys/types.h>
#include "leveldb/db.h"
#include "leveldb/cache.h"
//
// These are configuration options for the entire server.
//
class ServerOptions {
private:
int num_threads_; // number of thrift server threads
int cache_numshardbits_; // cache shards
long cache_size_; // cache size in bytes
int port_; // port number
std::string hostname_; // host name of this machine
std::string rootdir_; // root directory of all DBs
leveldb::Cache* cache_; // the block cache
// Number of concurrent threads to run.
const static int DEFAULT_threads = 1;
// Number of bytes to use as a cache of uncompressed data.
// Default setting of 100 MB
const static long DEFAULT_cache_size = 100 * 1024 * 1024;
// Number of shards for the block cache is 2 ** DEFAULT_cache_numshardbits.
// Negative means use default settings. This is applied only
// if DEFAULT_cache_size is non-negative.
const static int DEFAULT_cache_numshardbits = 6;
// default port
const static int DEFAULT_PORT = 6666;
// default machine name
const static std::string DEFAULT_HOST;
// default directory where the server stores all its data
const static std::string DEFAULT_ROOTDIR;
public:
ServerOptions() : num_threads_(DEFAULT_threads),
cache_numshardbits_(DEFAULT_cache_numshardbits),
cache_size_(DEFAULT_cache_size),
port_(DEFAULT_PORT),
hostname_(DEFAULT_HOST),
rootdir_(DEFAULT_ROOTDIR + DEFAULT_HOST),
cache_(NULL) {
char buf[100];
if (gethostname(buf, sizeof(buf)) == 0) {
hostname_ = buf;
}
}
//
// Returns succes if all command line options are parsed successfully,
// otherwise returns false.
bool parseOptions(int argc, char** argv) {
int n;
long l;
char junk;
char* cports = NULL;
for (int i = 1; i < argc; i++) {
if (sscanf(argv[i], "--port=%d%c", &n, &junk) == 1) {
port_ = n;
} else if (sscanf(argv[i], "--threads=%d%c", &n, &junk) == 1) {
num_threads_ = n;
} else if (sscanf(argv[i], "--cache_size=%ld%c", &n, &junk) == 1) {
cache_size_ = n;
} else if (sscanf(argv[i], "--cache_numshardbits=%d%c", &n, &junk) == 1) {
cache_numshardbits_ = n;
} else if (strncmp(argv[i], "--hostname=", 10) == 0) {
hostname_ = argv[i] + 10;
} else if (strncmp(argv[i], "--rootdir=", 9) == 0) {
rootdir_ = argv[i] + 9;
} else {
fprintf(stderr, "Invalid flag '%s'\n", argv[i]);
return false;
}
}
return true;
}
// Create the directory format on disk.
// Returns true on success, false on failure
bool createDirectories() {
mode_t mode = 0755;
const char* dir = getRootDirectory().c_str();
if (mkpath(dir, mode) < 0) {
fprintf(stderr, "Unable to create root directory %s\n", dir);
return false;
}
dir = getDataDirectory().c_str();;
if (mkpath(dir, mode) < 0) {
fprintf(stderr, "Unable to create data directory %s\n", dir);
return false;
}
dir = getConfigDirectory().c_str();;
if (mkpath(dir, mode) < 0) {
fprintf(stderr, "Unable to create config directory %s\n", dir);
return false;
}
return true;
}
// create a cache instance that is shared by all DBs served by this server
void createCache() {
if (cache_numshardbits_ >= 1) {
cache_ = leveldb::NewLRUCache(cache_size_, cache_numshardbits_);
} else {
cache_ = leveldb::NewLRUCache(cache_size_);
}
}
// Returns the server port
int getPort() {
return port_;
}
// Returns the cache
leveldb::Cache* getCache() {
return cache_;
}
// Returns the configured number of server threads
int getNumThreads() {
return num_threads_;
}
// Returns the root directory where the server is rooted.
// The hostname is appended to the rootdir to arrive at the directory name.
std::string getRootDirectory() {
return rootdir_ + "/" + hostname_;
}
// Returns the directory where the server stores all users's DBs.
std::string getDataDirectory() {
return getRootDirectory() + "/userdata/";
}
// Returns the directory where the server stores all its configurations
std::string getConfigDirectory() {
return getRootDirectory() + "/config/";
}
// Returns the data directory for the specified DB
std::string getDataDirectory(const std::string& dbname) {
return getDataDirectory() + dbname;
}
// Returns true if the DB name is valid, otherwise return false
bool isValidName(const std::string& dbname) {
// The DB name cannot have '/' in the name
if (dbname.find('/') < dbname.size()) {
return false;
}
return true;
}
private:
static int do_mkdir(const char *path, mode_t mode) {
struct stat st;
int status = 0;
if (stat(path, &st) != 0) {
if (mkdir(path, mode) != 0) {
status = -1;
}
} else if (!S_ISDIR(st.st_mode)) {
errno = ENOTDIR;
status = -1;
}
return(status);
}
// mkpath - ensure all directories in path exist
static int mkpath(const char *path, mode_t mode)
{
char *pp;
char *sp;
int status;
char *newpath = strdup(path);
status = 0;
pp = newpath;
while (status == 0 && (sp = strchr(pp, '/')) != 0) {
if (sp != pp) {
/* Neither root nor double slash in path */
*sp = '\0';
status = do_mkdir(newpath, mode);
*sp = '/';
}
pp = sp + 1;
}
if (status == 0) {
status = do_mkdir(path, mode);
}
free(newpath);
return (status);
}
};
#endif // THRIFT_LEVELDB_SERVER_OPTIONS_

@ -17,6 +17,7 @@
#include <util/TEventServerCreator.h>
#include <leveldb_types.h>
#include "openhandles.h"
#include "server_options.h"
#include "leveldb/db.h"
#include "leveldb/write_batch.h"
@ -30,11 +31,14 @@ using namespace apache::thrift::async;
using namespace Tleveldb;
using boost::shared_ptr;
extern "C" void startServer(int port);
extern "C" void startServer(int argc, char** argv);
extern "C" void stopServer(int port);
static boost::shared_ptr<TServer> tServer;
// The global object that stores the default configuration of the server
ServerOptions server_options;
class DBHandler : virtual public DBIf {
public:
DBHandler() {
@ -43,9 +47,21 @@ class DBHandler : virtual public DBIf {
void Open(DBHandle& _return, const Text& dbname,
const DBOptions& dboptions) {
printf("Open\n");
printf("Open %s\n", dbname.c_str());
if (!server_options.isValidName(dbname)) {
LeveldbException e;
e.errorCode = Code::kInvalidArgument;
e.message = "Bad DB name";
fprintf(stderr, "Bad DB name %s\n", dbname.c_str());
throw e;
}
std::string dbdir = server_options.getDataDirectory(dbname);
leveldb::Options options;
leveldb::DB* db;
// fill up per-server options
options.block_cache = server_options.getCache();
// fill up per-DB options
options.create_if_missing = dboptions.create_if_missing;
options.error_if_exists = dboptions.error_if_exists;
options.write_buffer_size = dboptions.write_buffer_size;
@ -57,7 +73,7 @@ class DBHandler : virtual public DBIf {
} else if (dboptions.compression == kSnappyCompression) {
options.compression = leveldb::kSnappyCompression;
}
int64_t session = openHandles->add(options, dbname);
int64_t session = openHandles->add(options, dbname, dbdir);
_return.dbname = dbname;
_return.handleid = session;
}
@ -280,6 +296,14 @@ class DBHandler : virtual public DBIf {
return;
}
// if iterator has encountered any corruption
if (!it->status().ok()) {
thishandle->removeIterator(iterator.iteratorid);
delete it; // cleanup
_return.status = Code::kIOError; // error in data
return;
}
// find current key-value
leveldb::Slice key = it->key();
leveldb::Slice value = it->value();
@ -372,7 +396,21 @@ class DBHandler : virtual public DBIf {
};
// Starts a very simple thrift server
void startServer(int port) {
void startServer(int argc, char** argv) {
// process command line options
if (!server_options.parseOptions(argc, argv)) {
exit(1);
}
// create directories for server
if (!server_options.createDirectories()) {
exit(1);
}
// create the server's block cache
server_options.createCache();
int port = server_options.getPort();
shared_ptr<DBHandler> handler(new DBHandler());
shared_ptr<TProcessor> processor(new DBProcessor(handler));
shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
@ -380,6 +418,8 @@ void startServer(int port) {
shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());
TSimpleServer tServer(processor, serverTransport, transportFactory, protocolFactory);
fprintf(stderr, "Server started on port %d\n", port);
tServer.serve();
}

@ -8,6 +8,7 @@
#include <transport/TBufferTransports.h>
#include <DB.h>
#include <leveldb_types.h>
#include "server_options.h"
using namespace apache::thrift;
using namespace apache::thrift::protocol;
@ -15,14 +16,24 @@ using namespace apache::thrift::transport;
using boost::shared_ptr;
using namespace Tleveldb;
extern "C" void startServer(int port);
extern "C" void startServer(int argc, char**argv);
extern "C" void stopServer(int port);
extern ServerOptions server_options;
static DBHandle dbhandle;
static DBClient* dbclient;
static const Text dbname = "/tmp/leveldb/test";
static const int myport = 9091;
static const Text dbname = "test";
static pthread_t serverThread;
static int ARGC;
static char** ARGV;
static void cleanupDir(std::string dir) {
// remove old data, if any
int ret = unlink(dir.c_str());
char* cleanup = new char[100];
snprintf(cleanup, 100, "rm -rf %s", dir.c_str());
system(cleanup);
}
static void createDatabase() {
DBOptions options;
@ -34,6 +45,8 @@ static void createDatabase() {
options.block_restart_interval = 16;
options.compression = kSnappyCompression;
cleanupDir(server_options.getDataDirectory(dbname));
// create the database
dbclient->Open(dbhandle, dbname, options);
}
@ -185,27 +198,26 @@ static void testClient(int port) {
static void* startTestServer(void *arg) {
printf("Server starting on port %d...\n", myport);
startServer(myport);
printf("Server test server\n");
startServer(ARGC, ARGV);
}
int main(int argc, char **argv) {
// remove old data, if any
int ret = unlink(dbname.c_str());
char* cleanup = new char[100];
snprintf(cleanup, 100, "rm -rf %s", dbname.c_str());
system(cleanup);
ARGC = argc;
ARGV = argv;
// create a server
int rc = pthread_create(&serverThread, NULL, startTestServer, NULL);
printf("Server thread created.\n");
// give some time to the server to initialize itself
sleep(1);
while (server_options.getPort() == 0) {
sleep(1);
}
// test client
testClient(myport);
testClient(server_options.getPort());
return 0;
}

Loading…
Cancel
Save