Merge remote-tracking branch 'origin/master' into task1181723

Conflicts:
	Makefile
main
bol 13 years ago
commit 89f0c25360
  1. 35
      Makefile
  2. 14
      build_detect_platform
  3. 25
      build_detect_version
  4. 2
      db/builder.cc
  5. 61
      db/db_bench.cc
  6. 105
      db/db_impl.cc
  7. 18
      db/db_impl.h
  8. 72
      db/db_stats_logger.cc
  9. 27
      db/db_test.cc
  10. 18
      db/filename.cc
  11. 2
      db/filename.h
  12. 176
      db/version_set.cc
  13. 9
      db/version_set.h
  14. 7
      fbcode.sh
  15. 22
      hdfs/env_hdfs.h
  16. 21
      include/leveldb/env.h
  17. 26
      include/leveldb/options.h
  18. 1012
      scribe/if/gen-cpp/scribe.cpp
  19. 593
      scribe/if/gen-cpp/scribe.h
  20. 17
      scribe/if/gen-cpp/scribe_constants.cpp
  21. 25
      scribe/if/gen-cpp/scribe_constants.h
  22. 513
      scribe/if/gen-cpp/scribe_types.cpp
  23. 247
      scribe/if/gen-cpp/scribe_types.h
  24. 82
      scribe/if/scribe.thrift
  25. 90
      scribe/scribe_logger.cc
  26. 71
      scribe/scribe_logger.h
  27. 2
      thrift/README
  28. BIN
      thrift/bin/thrift
  29. 209
      thrift/gen-cpp/leveldb_types.cpp
  30. 70
      thrift/gen-cpp/leveldb_types.h
  31. 15
      thrift/if/leveldb.thrift
  32. 8
      thrift/server_options.h
  33. 23
      thrift/server_utils.cpp
  34. 71
      tools/manifest_dump.cc
  35. 14
      util/build_version.h
  36. 79
      util/env_posix.cc
  37. 57
      util/filelock_test.cc
  38. 27
      util/options.cc
  39. 23
      util/stats_logger.h

@ -33,8 +33,6 @@ MEMENVOBJECTS = $(MEMENV_SOURCES:.cc=.o)
TESTUTIL = ./util/testutil.o TESTUTIL = ./util/testutil.o
TESTHARNESS = ./util/testharness.o $(TESTUTIL) TESTHARNESS = ./util/testharness.o $(TESTUTIL)
TOOLS = \
leveldb_shell
TESTS = \ TESTS = \
arena_test \ arena_test \
@ -55,10 +53,17 @@ TESTS = \
table_test \ table_test \
version_edit_test \ version_edit_test \
version_set_test \ version_set_test \
write_batch_test write_batch_test \
filelock_test
TOOLS = \
manifest_dump \
leveldb_shell
PROGRAMS = db_bench $(TESTS) PROGRAMS = db_bench $(TESTS) $(TOOLS)
BENCHMARKS = db_bench_sqlite3 db_bench_tree_db BENCHMARKS = db_bench_sqlite3 db_bench_tree_db
VERSIONFILE=util/build_version.cc
LIBRARY = libleveldb.a LIBRARY = libleveldb.a
MEMENVLIBRARY = libmemenv.a MEMENVLIBRARY = libmemenv.a
@ -82,21 +87,20 @@ $(SHARED1): $(SHARED3)
ln -fs $(SHARED3) $(SHARED1) ln -fs $(SHARED3) $(SHARED1)
endif endif
all: $(SHARED) $(LIBRARY) $(THRIFTSERVER) $(TOOLS)
check: all $(PROGRAMS) $(TESTS) all: $(VERSIONFILE) $(SHARED) $(LIBRARY) $(THRIFTSERVER) $(TOOLS)
check: all $(PROGRAMS) $(TESTS) $(TOOLS)
for t in $(TESTS); do echo "***** Running $$t"; ./$$t || exit 1; done for t in $(TESTS); do echo "***** Running $$t"; ./$$t || exit 1; done
clean: clean:
-rm -f $(PROGRAMS) $(BENCHMARKS) $(LIBRARY) $(SHARED) $(MEMENVLIBRARY) $(THRIFTSERVER) */*.o */*/*.o ios-x86/*/*.o ios-arm/*/*.o build_config.mk -rm -f $(PROGRAMS) $(BENCHMARKS) $(LIBRARY) $(SHARED) $(MEMENVLIBRARY) $(THRIFTSERVER) */*.o */*/*.o ios-x86/*/*.o ios-arm/*/*.o build_config.mk $(VERSIONFILE)
-rm -rf ios-x86/* ios-arm/* -rm -rf ios-x86/* ios-arm/*
$(LIBRARY): $(LIBOBJECTS) $(LIBRARY): $(LIBOBJECTS)
rm -f $@ rm -f $@
$(AR) -rs $@ $(LIBOBJECTS) $(AR) -rs $@ $(LIBOBJECTS)
leveldb_shell: tools/shell/ShellContext.o tools/shell/ShellState.o tools/shell/LeveldbShell.o tools/shell/DBClientProxy.o tools/shell/ShellContext.h tools/shell/ShellState.h tools/shell/DBClientProxy.h $(LIBOBJECTS)
$(CXX) tools/shell/ShellContext.o tools/shell/ShellState.o tools/shell/LeveldbShell.o tools/shell/DBClientProxy.o $(LIBOBJECTS) -o $@ $(LDFLAGS)
db_bench: db/db_bench.o $(LIBOBJECTS) $(TESTUTIL) db_bench: db/db_bench.o $(LIBOBJECTS) $(TESTUTIL)
$(CXX) db/db_bench.o $(LIBOBJECTS) $(TESTUTIL) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(CXX) db/db_bench.o $(LIBOBJECTS) $(TESTUTIL) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS)
@ -174,9 +178,22 @@ leveldb_server: thrift/server.o $(LIBRARY)
leveldb_server_test: thrift/test/simpletest.o $(LIBRARY) leveldb_server_test: thrift/test/simpletest.o $(LIBRARY)
$(CXX) thrift/test/simpletest.o $(LIBRARY) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(CXX) thrift/test/simpletest.o $(LIBRARY) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS)
leveldb_shell: tools/shell/ShellContext.o tools/shell/ShellState.o tools/shell/LeveldbShell.o tools/shell/DBClientProxy.o tools/shell/ShellContext.h tools/shell/ShellState.h tools/shell/DBClientProxy.h $(LIBOBJECTS)
$(CXX) tools/shell/ShellContext.o tools/shell/ShellState.o tools/shell/LeveldbShell.o tools/shell/DBClientProxy.o $(LIBOBJECTS) -o $@ $(LDFLAGS)
DBClientProxy_test: tools/shell/test/DBClientProxyTest.o tools/shell/DBClientProxy.o $(LIBRARY) DBClientProxy_test: tools/shell/test/DBClientProxyTest.o tools/shell/DBClientProxy.o $(LIBRARY)
$(CXX) tools/shell/test/DBClientProxyTest.o tools/shell/DBClientProxy.o $(LIBRARY) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS) $(CXX) tools/shell/test/DBClientProxyTest.o tools/shell/DBClientProxy.o $(LIBRARY) $(EXEC_LDFLAGS) -o $@ $(LDFLAGS)
manifest_dump: tools/manifest_dump.o $(LIBOBJECTS)
$(CXX) tools/manifest_dump.o $(LIBOBJECTS) -o $@ $(LDFLAGS)
filelock_test: util/filelock_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) util/filelock_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LDFLAGS)
# recreate the version file with the latest git revision
$(VERSIONFILE): build_detect_version
$(shell ./build_detect_platform build_config.mk)
ifeq ($(PLATFORM), IOS) ifeq ($(PLATFORM), IOS)
# For iOS, create universal object files to be used on both the simulator and # For iOS, create universal object files to be used on both the simulator and
# a device. # a device.

@ -96,15 +96,22 @@ case "$TARGET_OS" in
exit 1 exit 1
esac esac
./build_detect_version
# We want to make a list of all cc files within util, db, table, and helpers # We want to make a list of all cc files within util, db, table, and helpers
# except for the test and benchmark files. By default, find will output a list # except for the test and benchmark files. By default, find will output a list
# of all files matching either rule, so we need to append -print to make the # of all files matching either rule, so we need to append -print to make the
# prune take effect. # prune take effect.
DIRS="util db table" DIRS="util db table"
if test "$USE_THRIFT"; then if test "$USE_THRIFT"; then
DIRS+=" thrift/gen-cpp thrift/server_utils.cpp " DIRS+=" thrift/server_utils.cpp thrift/gen-cpp "
THRIFTSERVER=leveldb_server THRIFTSERVER=leveldb_server
fi fi
if test "$USE_SCRIBE"; then
DIRS+=" scribe "
fi
set -f # temporarily disable globbing so that our patterns aren't expanded set -f # temporarily disable globbing so that our patterns aren't expanded
PRUNE_TEST="-name *test*.cc -prune" PRUNE_TEST="-name *test*.cc -prune"
PRUNE_BENCH="-name *_bench.cc -prune" PRUNE_BENCH="-name *_bench.cc -prune"
@ -197,6 +204,11 @@ if test "$USE_THRIFT"; then
PLATFORM_LDFLAGS+=$THRIFT_LDFLAGS PLATFORM_LDFLAGS+=$THRIFT_LDFLAGS
fi fi
#shall we build with scribe
if test "$USE_SCRIBE"; then
COMMON_FLAGS="$COMMON_FLAGS -I./thrift/lib/ -DUSE_SCRIBE"
fi
PLATFORM_CCFLAGS="$PLATFORM_CCFLAGS $COMMON_FLAGS" PLATFORM_CCFLAGS="$PLATFORM_CCFLAGS $COMMON_FLAGS"
PLATFORM_CXXFLAGS="$PLATFORM_CXXFLAGS $COMMON_FLAGS" PLATFORM_CXXFLAGS="$PLATFORM_CXXFLAGS $COMMON_FLAGS"

@ -0,0 +1,25 @@
#!/bin/sh
#
# Record the version of the source that we are compiling.
# We keep a record of the git revision in util/version.cc. This source file
# is then built as a regular source file as part of the compilation process.
# One can run "strings executable_filename | grep _build_" to find the version of
# the source that we used to build the executable file.
#
# create git version file
VFILE=util/build_version.cc
# check to see if git is in the path
which git > /dev/null
if [ "$?" = 0 ]; then
git rev-parse HEAD | awk ' BEGIN {print "#include \"build_version.h\""} {print "const char * leveldb_build_git_sha = \"leveldb_build_git_sha:" $0"\";"} END {}' > ${VFILE}
else
echo "git not found"| awk ' BEGIN {print "#include \"build_version.h\""} {print "const char * leveldb_build_git_sha = \"leveldb_build_git_sha:git not found\";"} END {}' > ${VFILE}
fi
date | awk 'BEGIN {} {print "const char * leveldb_build_git_datetime = \"leveldb_build_git_datetime:"$0"\";"} END {} ' >> ${VFILE}
echo "const char * leveldb_build_compile_date = __DATE__;" >> ${VFILE}
echo "const char * leveldb_build_compile_time = __TIME__;" >> ${VFILE}

@ -53,7 +53,7 @@ Status BuildTable(const std::string& dbname,
delete builder; delete builder;
// Finish and check for file errors // Finish and check for file errors
if (s.ok()) { if (s.ok() && !options.disableDataSync) {
s = file->Sync(); s = file->Sync();
} }
if (s.ok()) { if (s.ok()) {

@ -120,9 +120,35 @@ static class leveldb::DBStatistics* dbstats = NULL;
// Number of write operations to do. If negative, do FLAGS_num reads. // Number of write operations to do. If negative, do FLAGS_num reads.
static long FLAGS_writes = -1; static long FLAGS_writes = -1;
// These default values might change if the hardcoded
// Sync all writes to disk // Sync all writes to disk
static bool FLAGS_sync = false; static bool FLAGS_sync = false;
// If true, do not wait until data is synced to disk.
static bool FLAGS_disable_data_sync = false;
// If true, do not write WAL for write.
static bool FLAGS_disable_wal = false;
// Target level-0 file size for compaction
static int FLAGS_target_file_size_base = 2 * 1048576;
// A multiplier to compute targe level-N file size
static int FLAGS_target_file_size_multiplier = 1;
// Max bytes for level-0
static int FLAGS_max_bytes_for_level_base = 10 * 1048576;
// A multiplier to compute max bytes for level-N
static int FLAGS_max_bytes_for_level_multiplier = 10;
// Number of files in level-0 that will trigger put stop.
static int FLAGS_level0_stop_writes_trigger = 12;
// Number of files in level-0 that will slow down writes.
static int FLAGS_level0_slowdown_writes_trigger = 8;
// posix or hdfs environment // posix or hdfs environment
static leveldb::Env* FLAGS_env = leveldb::Env::Default(); static leveldb::Env* FLAGS_env = leveldb::Env::Default();
@ -485,6 +511,8 @@ class Benchmark {
write_options_.sync = true; write_options_.sync = true;
} }
write_options_.disableWAL = FLAGS_disable_wal;
void (Benchmark::*method)(ThreadState*) = NULL; void (Benchmark::*method)(ThreadState*) = NULL;
bool fresh_db = false; bool fresh_db = false;
int num_threads = FLAGS_threads; int num_threads = FLAGS_threads;
@ -745,6 +773,15 @@ class Benchmark {
options.max_open_files = FLAGS_open_files; options.max_open_files = FLAGS_open_files;
options.statistics = dbstats; options.statistics = dbstats;
options.env = FLAGS_env; options.env = FLAGS_env;
options.disableDataSync = FLAGS_disable_data_sync;
options.target_file_size_base = FLAGS_target_file_size_base;
options.target_file_size_multiplier = FLAGS_target_file_size_multiplier;
options.max_bytes_for_level_base = FLAGS_max_bytes_for_level_base;
options.max_bytes_for_level_multiplier =
FLAGS_max_bytes_for_level_multiplier;
options.level0_stop_writes_trigger = FLAGS_level0_stop_writes_trigger;
options.level0_slowdown_writes_trigger =
FLAGS_level0_slowdown_writes_trigger;
Status s = DB::Open(options, FLAGS_db, &db_); Status s = DB::Open(options, FLAGS_db, &db_);
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "open error: %s\n", s.ToString().c_str()); fprintf(stderr, "open error: %s\n", s.ToString().c_str());
@ -1030,8 +1067,32 @@ int main(int argc, char** argv) {
} else if (sscanf(argv[i], "--sync=%d%c", &n, &junk) == 1 && } else if (sscanf(argv[i], "--sync=%d%c", &n, &junk) == 1 &&
(n == 0 || n == 1)) { (n == 0 || n == 1)) {
FLAGS_sync = n; FLAGS_sync = n;
} else if (sscanf(argv[i], "--disable_data_sync=%d%c", &n, &junk) == 1 &&
(n == 0 || n == 1)) {
FLAGS_disable_data_sync = n;
} else if (sscanf(argv[i], "--disable_wal=%d%c", &n, &junk) == 1 &&
(n == 0 || n == 1)) {
FLAGS_disable_wal = n;
} else if (sscanf(argv[i], "--hdfs=%s", &hdfsname) == 1) { } else if (sscanf(argv[i], "--hdfs=%s", &hdfsname) == 1) {
FLAGS_env = new leveldb::HdfsEnv(hdfsname); FLAGS_env = new leveldb::HdfsEnv(hdfsname);
} else if (sscanf(argv[i], "--target_file_size_base=%d%c",
&n, &junk) == 1) {
FLAGS_target_file_size_base = n;
} else if ( sscanf(argv[i], "--target_file_size_multiplier=%d%c",
&n, &junk) == 1) {
FLAGS_target_file_size_multiplier = n;
} else if (
sscanf(argv[i], "--max_bytes_for_level_base=%d%c", &n, &junk) == 1) {
FLAGS_max_bytes_for_level_base = n;
} else if (sscanf(argv[i], "--max_bytes_for_level_multiplier=%d%c",
&n, &junk) == 1) {
FLAGS_max_bytes_for_level_multiplier = n;
} else if (sscanf(argv[i],"--level0_stop_writes_trigger=%d%c",
&n, &junk) == 1) {
FLAGS_level0_stop_writes_trigger = n;
} else if (sscanf(argv[i],"--level0_slowdown_writes_trigger=%d%c",
&n, &junk) == 1) {
FLAGS_level0_slowdown_writes_trigger = n;
} else { } else {
fprintf(stderr, "Invalid flag '%s'\n", argv[i]); fprintf(stderr, "Invalid flag '%s'\n", argv[i]);
exit(1); exit(1);

@ -10,6 +10,7 @@
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <vector> #include <vector>
#include <algorithm>
#include "db/builder.h" #include "db/builder.h"
#include "db/db_iter.h" #include "db/db_iter.h"
#include "db/dbformat.h" #include "db/dbformat.h"
@ -32,6 +33,7 @@
#include "util/coding.h" #include "util/coding.h"
#include "util/logging.h" #include "util/logging.h"
#include "util/mutexlock.h" #include "util/mutexlock.h"
#include "util/build_version.h"
namespace leveldb { namespace leveldb {
@ -99,7 +101,8 @@ Options SanitizeOptions(const std::string& dbname,
if (result.info_log == NULL) { if (result.info_log == NULL) {
// Open a log file in the same directory as the db // Open a log file in the same directory as the db
src.env->CreateDir(dbname); // In case it does not exist src.env->CreateDir(dbname); // In case it does not exist
src.env->RenameFile(InfoLogFileName(dbname), OldInfoLogFileName(dbname)); src.env->RenameFile(InfoLogFileName(dbname),
OldInfoLogFileName(dbname, src.env->NowMicros()));
Status s = src.env->NewLogger(InfoLogFileName(dbname), &result.info_log); Status s = src.env->NewLogger(InfoLogFileName(dbname), &result.info_log);
if (!s.ok()) { if (!s.ok()) {
// No place suitable for logging // No place suitable for logging
@ -131,17 +134,34 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
log_(NULL), log_(NULL),
tmp_batch_(new WriteBatch), tmp_batch_(new WriteBatch),
bg_compaction_scheduled_(false), bg_compaction_scheduled_(false),
manual_compaction_(NULL) { manual_compaction_(NULL),
logger_(NULL) {
mem_->Ref(); mem_->Ref();
has_imm_.Release_Store(NULL); has_imm_.Release_Store(NULL);
stats_ = new CompactionStats[options.num_levels]; stats_ = new CompactionStats[options.num_levels];
// Reserve ten files or so for other uses and give the rest to TableCache. // Reserve ten files or so for other uses and give the rest to TableCache.
const int table_cache_size = options.max_open_files - 10; const int table_cache_size = options_.max_open_files - 10;
table_cache_ = new TableCache(dbname_, &options_, table_cache_size); table_cache_ = new TableCache(dbname_, &options_, table_cache_size);
versions_ = new VersionSet(dbname_, &options_, table_cache_, versions_ = new VersionSet(dbname_, &options_, table_cache_,
&internal_comparator_); &internal_comparator_);
options_.Dump(options_.info_log);
#ifdef USE_SCRIBE
logger_ = new ScribeLogger("localhost", 1456);
#endif
char name[100];
Status st = env_->GetHostName(name, 100);
if(st.ok()) {
host_name_ = name;
} else {
Log(options_.info_log, "Can't get hostname, use localhost as host name.");
host_name_ = "localhost";
}
last_log_ts = 0;
} }
DBImpl::~DBImpl() { DBImpl::~DBImpl() {
@ -224,6 +244,7 @@ void DBImpl::DeleteObsoleteFiles() {
env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose
uint64_t number; uint64_t number;
FileType type; FileType type;
std::vector<uint64_t> old_log_files_ts;
for (size_t i = 0; i < filenames.size(); i++) { for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type)) { if (ParseFileName(filenames[i], &number, &type)) {
bool keep = true; bool keep = true;
@ -245,9 +266,14 @@ void DBImpl::DeleteObsoleteFiles() {
// be recorded in pending_outputs_, which is inserted into "live" // be recorded in pending_outputs_, which is inserted into "live"
keep = (live.find(number) != live.end()); keep = (live.find(number) != live.end());
break; break;
case kInfoLogFile:
keep = true;
if (number != 0) {
old_log_files_ts.push_back(number);
}
break;
case kCurrentFile: case kCurrentFile:
case kDBLockFile: case kDBLockFile:
case kInfoLogFile:
keep = true; keep = true;
break; break;
} }
@ -263,6 +289,20 @@ void DBImpl::DeleteObsoleteFiles() {
} }
} }
} }
// Delete old log files.
int old_log_file_count = old_log_files_ts.size();
if (old_log_file_count >= KEEP_LOG_FILE_NUM) {
std::sort(old_log_files_ts.begin(), old_log_files_ts.end());
for (int i = 0; i >= (old_log_file_count - KEEP_LOG_FILE_NUM); i++) {
uint64_t ts = old_log_files_ts.at(i);
std::string to_delete = OldInfoLogFileName(dbname_, ts);
Log(options_.info_log, "Delete type=%d #%lld\n",
int(kInfoLogFile),
static_cast<unsigned long long>(ts));
env_->DeleteFile(dbname_ + "/" + to_delete);
}
}
} }
Status DBImpl::Recover(VersionEdit* edit) { Status DBImpl::Recover(VersionEdit* edit) {
@ -514,6 +554,7 @@ Status DBImpl::CompactMemTable() {
imm_ = NULL; imm_ = NULL;
has_imm_.Release_Store(NULL); has_imm_.Release_Store(NULL);
DeleteObsoleteFiles(); DeleteObsoleteFiles();
MaybeScheduleLogDBDeployStats();
} }
return s; return s;
@ -537,15 +578,15 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
} }
int DBImpl::NumberLevels() { int DBImpl::NumberLevels() {
return options_.num_levels; return options_.num_levels;
} }
int DBImpl::MaxMemCompactionLevel() { int DBImpl::MaxMemCompactionLevel() {
return options_.max_mem_compaction_level; return options_.max_mem_compaction_level;
} }
int DBImpl::Level0StopWriteTrigger() { int DBImpl::Level0StopWriteTrigger() {
return options_.level0_stop_writes_trigger; return options_.level0_stop_writes_trigger;
} }
Status DBImpl::Flush(const FlushOptions& options) { Status DBImpl::Flush(const FlushOptions& options) {
@ -598,34 +639,33 @@ Status DBImpl::FlushMemTable(const FlushOptions& options) {
} }
Status DBImpl::WaitForCompactMemTable() { Status DBImpl::WaitForCompactMemTable() {
Status s; Status s;
// Wait until the compaction completes // Wait until the compaction completes
MutexLock l(&mutex_); MutexLock l(&mutex_);
while (imm_ != NULL && bg_error_.ok()) { while (imm_ != NULL && bg_error_.ok()) {
bg_cv_.Wait(); bg_cv_.Wait();
} }
if (imm_ != NULL) { if (imm_ != NULL) {
s = bg_error_; s = bg_error_;
} }
return s; return s;
} }
Status DBImpl::TEST_CompactMemTable() { Status DBImpl::TEST_CompactMemTable() {
return FlushMemTable(FlushOptions()); return FlushMemTable(FlushOptions());
} }
Status DBImpl::TEST_WaitForCompactMemTable() { Status DBImpl::TEST_WaitForCompactMemTable() {
return WaitForCompactMemTable(); return WaitForCompactMemTable();
} }
Status DBImpl::TEST_WaitForCompact() { Status DBImpl::TEST_WaitForCompact() {
// Wait until the compaction completes // Wait until the compaction completes
MutexLock l(&mutex_); MutexLock l(&mutex_);
while (bg_compaction_scheduled_ && bg_error_.ok()) { while (bg_compaction_scheduled_ && bg_error_.ok()) {
bg_cv_.Wait(); bg_cv_.Wait();
} }
return bg_error_; return bg_error_;
} }
void DBImpl::MaybeScheduleCompaction() { void DBImpl::MaybeScheduleCompaction() {
@ -656,6 +696,8 @@ void DBImpl::BackgroundCall() {
} }
bg_compaction_scheduled_ = false; bg_compaction_scheduled_ = false;
MaybeScheduleLogDBDeployStats();
// Previous compaction may have produced too many files in a level, // Previous compaction may have produced too many files in a level,
// so reschedule another compaction if needed. // so reschedule another compaction if needed.
MaybeScheduleCompaction(); MaybeScheduleCompaction();
@ -868,6 +910,9 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
compact->compaction->level(), compact->compaction->level(),
compact->compaction->num_input_files(1), compact->compaction->num_input_files(1),
compact->compaction->level() + 1); compact->compaction->level() + 1);
char scratch[200];
compact->compaction->Summary(scratch, 256);
Log(options_.info_log, "Compaction start summary: %s\n", scratch);
assert(versions_->NumLevelFiles(compact->compaction->level()) > 0); assert(versions_->NumLevelFiles(compact->compaction->level()) > 0);
assert(compact->builder == NULL); assert(compact->builder == NULL);
@ -1457,6 +1502,7 @@ Status DB::Open(const Options& options, const std::string& dbname,
if (s.ok()) { if (s.ok()) {
impl->DeleteObsoleteFiles(); impl->DeleteObsoleteFiles();
impl->MaybeScheduleCompaction(); impl->MaybeScheduleCompaction();
impl->MaybeScheduleLogDBDeployStats();
} }
} }
impl->mutex_.Unlock(); impl->mutex_.Unlock();
@ -1502,4 +1548,13 @@ Status DestroyDB(const std::string& dbname, const Options& options) {
return result; return result;
} }
//
// A global method that can dump out the build version
void printLeveldbBuildVersion() {
printf("Git sha %s", leveldb_build_git_sha);
printf("Git datetime %s", leveldb_build_git_datetime);
printf("Compile time %s", leveldb_build_compile_time);
printf("Compile date %s", leveldb_build_compile_date);
}
} // namespace leveldb } // namespace leveldb

@ -13,6 +13,13 @@
#include "leveldb/db.h" #include "leveldb/db.h"
#include "leveldb/env.h" #include "leveldb/env.h"
#include "port/port.h" #include "port/port.h"
#include "util/stats_logger.h"
#ifdef USE_SCRIBE
#include "scribe/scribe_logger.h"
#endif
#include <boost/lexical_cast.hpp>
namespace leveldb { namespace leveldb {
@ -107,6 +114,9 @@ class DBImpl : public DB {
// Wait for memtable compaction // Wait for memtable compaction
Status WaitForCompactMemTable(); Status WaitForCompactMemTable();
void MaybeScheduleLogDBDeployStats();
static void LogDBDeployStats(void* db);
void MaybeScheduleCompaction(); void MaybeScheduleCompaction();
static void BGWork(void* db); static void BGWork(void* db);
void BackgroundCall(); void BackgroundCall();
@ -144,6 +154,8 @@ class DBImpl : public DB {
uint64_t logfile_number_; uint64_t logfile_number_;
log::Writer* log_; log::Writer* log_;
std::string host_name_;
// Queue of writers. // Queue of writers.
std::deque<Writer*> writers_; std::deque<Writer*> writers_;
WriteBatch* tmp_batch_; WriteBatch* tmp_batch_;
@ -172,6 +184,10 @@ class DBImpl : public DB {
// Have we encountered a background error in paranoid mode? // Have we encountered a background error in paranoid mode?
Status bg_error_; Status bg_error_;
StatsLogger* logger_;
int64_t volatile last_log_ts;
// Per level compaction stats. stats_[level] stores the stats for // Per level compaction stats. stats_[level] stores the stats for
// compactions that produced data for the specified "level". // compactions that produced data for the specified "level".
struct CompactionStats { struct CompactionStats {
@ -189,6 +205,8 @@ class DBImpl : public DB {
}; };
CompactionStats* stats_; CompactionStats* stats_;
static const int KEEP_LOG_FILE_NUM = 1000;
// No copying allowed // No copying allowed
DBImpl(const DBImpl&); DBImpl(const DBImpl&);
void operator=(const DBImpl&); void operator=(const DBImpl&);

@ -0,0 +1,72 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/db_impl.h"
#include <string>
#include <stdint.h>
#include "db/version_set.h"
#include "leveldb/db.h"
#include "leveldb/env.h"
namespace leveldb {
void DBImpl::MaybeScheduleLogDBDeployStats() {
// There is a lock in the actual logger.
if (!logger_ || options_.db_stats_log_interval < 0
|| host_name_.empty()) {
return;
}
if (shutting_down_.Acquire_Load()) {
// Already scheduled
} else {
int64_t current_ts = 0;
Status st = env_->GetCurrentTime(&current_ts);
if (!st.ok()) {
return;
}
if ((current_ts - last_log_ts) < options_.db_stats_log_interval) {
return;
}
last_log_ts = current_ts;
env_->Schedule(&DBImpl::LogDBDeployStats, this);
}
}
void DBImpl::LogDBDeployStats(void* db) {
DBImpl* db_inst = reinterpret_cast<DBImpl*>(db);
if (db_inst->shutting_down_.Acquire_Load()) {
return;
}
std::string version_info;
version_info += boost::lexical_cast<std::string>(kMajorVersion);
version_info += ".";
version_info += boost::lexical_cast<std::string>(kMinorVersion);
std::string data_dir;
db_inst->env_->GetAbsolutePath(db_inst->dbname_, &data_dir);
uint64_t file_total_size = 0;
uint32_t file_total_num = 0;
for (int i = 0; i < db_inst->versions_->NumberLevels(); i++) {
file_total_num += db_inst->versions_->NumLevelFiles(i);
file_total_size += db_inst->versions_->NumLevelBytes(i);
}
VersionSet::LevelSummaryStorage scratch;
const char* file_num_summary = db_inst->versions_->LevelSummary(&scratch);
std::string file_num_per_level(file_num_summary);
const char* file_size_summary = db_inst->versions_->LevelDataSizeSummary(
&scratch);
std::string data_size_per_level(file_num_summary);
int64_t unix_ts;
db_inst->env_->GetCurrentTime(&unix_ts);
db_inst->logger_->Log_Deploy_Stats(version_info, db_inst->host_name_,
data_dir, file_total_size, file_total_num, file_num_per_level,
data_size_per_level, unix_ts);
}
}

@ -213,6 +213,10 @@ class DBTest {
ASSERT_OK(TryReopen(options)); ASSERT_OK(TryReopen(options));
} }
Status PureReopen(Options* options, DB** db) {
return DB::Open(*options, dbname_, db);
}
Status TryReopen(Options* options) { Status TryReopen(Options* options) {
delete db_; delete db_;
db_ = NULL; db_ = NULL;
@ -779,6 +783,22 @@ TEST(DBTest, Recover) {
} while (ChangeOptions()); } while (ChangeOptions());
} }
TEST(DBTest, RollLog) {
do {
ASSERT_OK(Put("foo", "v1"));
ASSERT_OK(Put("baz", "v5"));
Reopen();
for (int i = 0; i < 10; i++) {
Reopen();
}
ASSERT_OK(Put("foo", "v4"));
for (int i = 0; i < 10; i++) {
Reopen();
}
} while (ChangeOptions());
}
TEST(DBTest, WAL) { TEST(DBTest, WAL) {
Options options = CurrentOptions(); Options options = CurrentOptions();
WriteOptions writeOpt = WriteOptions(); WriteOptions writeOpt = WriteOptions();
@ -812,6 +832,13 @@ TEST(DBTest, WAL) {
ASSERT_EQ("v3", Get("foo")); ASSERT_EQ("v3", Get("foo"));
} }
TEST(DBTest, CheckLock) {
DB* localdb;
Options options = CurrentOptions();
ASSERT_TRUE(TryReopen(&options).ok());
ASSERT_TRUE(!(PureReopen(&options, &localdb).ok())); // second open should fail
}
TEST(DBTest, FLUSH) { TEST(DBTest, FLUSH) {
Options options = CurrentOptions(); Options options = CurrentOptions();
WriteOptions writeOpt = WriteOptions(); WriteOptions writeOpt = WriteOptions();

@ -60,8 +60,10 @@ std::string InfoLogFileName(const std::string& dbname) {
} }
// Return the name of the old info log file for "dbname". // Return the name of the old info log file for "dbname".
std::string OldInfoLogFileName(const std::string& dbname) { std::string OldInfoLogFileName(const std::string& dbname, uint64_t ts) {
return dbname + "/LOG.old"; char buf[50];
snprintf(buf, sizeof(buf), "%llu", static_cast<unsigned long long>(ts));
return dbname + "/LOG.old." + buf;
} }
@ -69,7 +71,7 @@ std::string OldInfoLogFileName(const std::string& dbname) {
// dbname/CURRENT // dbname/CURRENT
// dbname/LOCK // dbname/LOCK
// dbname/LOG // dbname/LOG
// dbname/LOG.old // dbname/LOG.old.[0-9]+
// dbname/MANIFEST-[0-9]+ // dbname/MANIFEST-[0-9]+
// dbname/[0-9]+.(log|sst) // dbname/[0-9]+.(log|sst)
bool ParseFileName(const std::string& fname, bool ParseFileName(const std::string& fname,
@ -82,9 +84,17 @@ bool ParseFileName(const std::string& fname,
} else if (rest == "LOCK") { } else if (rest == "LOCK") {
*number = 0; *number = 0;
*type = kDBLockFile; *type = kDBLockFile;
} else if (rest == "LOG" || rest == "LOG.old") { } else if (rest == "LOG") {
*number = 0; *number = 0;
*type = kInfoLogFile; *type = kInfoLogFile;
} else if (rest.starts_with("LOG.old.")) {
uint64_t ts_suffix;
rest.remove_prefix(sizeof("LOG.old."));
if (!ConsumeDecimalNumber(&rest, &ts_suffix)) {
return false;
}
*number = ts_suffix;
*type = kInfoLogFile;
} else if (rest.starts_with("MANIFEST-")) { } else if (rest.starts_with("MANIFEST-")) {
rest.remove_prefix(strlen("MANIFEST-")); rest.remove_prefix(strlen("MANIFEST-"));
uint64_t num; uint64_t num;

@ -60,7 +60,7 @@ extern std::string TempFileName(const std::string& dbname, uint64_t number);
extern std::string InfoLogFileName(const std::string& dbname); extern std::string InfoLogFileName(const std::string& dbname);
// Return the name of the old info log file for "dbname". // Return the name of the old info log file for "dbname".
extern std::string OldInfoLogFileName(const std::string& dbname); extern std::string OldInfoLogFileName(const std::string& dbname, uint64_t ts);
// If filename is a leveldb file, store the type of the file in *type. // If filename is a leveldb file, store the type of the file in *type.
// The number encoded in the filename is stored in *number. If the // The number encoded in the filename is stored in *number. If the

@ -702,15 +702,16 @@ VersionSet::VersionSet(const std::string& dbname,
compact_pointer_ = new std::string[options_->num_levels]; compact_pointer_ = new std::string[options_->num_levels];
max_file_size_ = new uint64_t[options_->num_levels]; max_file_size_ = new uint64_t[options_->num_levels];
level_max_bytes_ = new uint64_t[options->num_levels]; level_max_bytes_ = new uint64_t[options->num_levels];
max_file_size_[0] = options_->target_file_size_base;
level_max_bytes_[0] = options_->max_bytes_for_level_base;
int target_file_size_multiplier = options_->target_file_size_multiplier; int target_file_size_multiplier = options_->target_file_size_multiplier;
int max_bytes_multiplier = options_->max_bytes_for_level_multiplier; int max_bytes_multiplier = options_->max_bytes_for_level_multiplier;
int i = 1; for (int i = 0; i < options_->num_levels; i++) {
while (i < options_->num_levels) { if (i > 1) {
max_file_size_[i] = max_file_size_[i-1] * target_file_size_multiplier; max_file_size_[i] = max_file_size_[i-1] * target_file_size_multiplier;
level_max_bytes_[i] = level_max_bytes_[i-1] * max_bytes_multiplier; level_max_bytes_[i] = level_max_bytes_[i-1] * max_bytes_multiplier;
i++; } else {
max_file_size_[i] = options_->target_file_size_base;
level_max_bytes_[i] = options_->max_bytes_for_level_base;
}
} }
AppendVersion(new Version(this)); AppendVersion(new Version(this));
} }
@ -939,6 +940,119 @@ Status VersionSet::Recover() {
return s; return s;
} }
Status VersionSet::DumpManifest(Options& options, std::string& dscname) {
struct LogReporter : public log::Reader::Reporter {
Status* status;
virtual void Corruption(size_t bytes, const Status& s) {
if (this->status->ok()) *this->status = s;
}
};
// Open the specified manifest file.
SequentialFile* file;
Status s = options.env->NewSequentialFile(dscname, &file);
if (!s.ok()) {
return s;
}
bool have_log_number = false;
bool have_prev_log_number = false;
bool have_next_file = false;
bool have_last_sequence = false;
uint64_t next_file = 0;
uint64_t last_sequence = 0;
uint64_t log_number = 0;
uint64_t prev_log_number = 0;
VersionSet::Builder builder(this, current_);
{
LogReporter reporter;
reporter.status = &s;
log::Reader reader(file, &reporter, true/*checksum*/, 0/*initial_offset*/);
Slice record;
std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
VersionEdit edit(NumberLevels());
s = edit.DecodeFrom(record);
if (s.ok()) {
if (edit.has_comparator_ &&
edit.comparator_ != icmp_.user_comparator()->Name()) {
s = Status::InvalidArgument(
edit.comparator_ + "does not match existing comparator ",
icmp_.user_comparator()->Name());
}
}
if (s.ok()) {
builder.Apply(&edit);
}
if (edit.has_log_number_) {
log_number = edit.log_number_;
have_log_number = true;
}
if (edit.has_prev_log_number_) {
prev_log_number = edit.prev_log_number_;
have_prev_log_number = true;
}
if (edit.has_next_file_number_) {
next_file = edit.next_file_number_;
have_next_file = true;
}
if (edit.has_last_sequence_) {
last_sequence = edit.last_sequence_;
have_last_sequence = true;
}
}
}
delete file;
file = NULL;
if (s.ok()) {
if (!have_next_file) {
s = Status::Corruption("no meta-nextfile entry in descriptor");
printf("no meta-nextfile entry in descriptor");
} else if (!have_log_number) {
s = Status::Corruption("no meta-lognumber entry in descriptor");
printf("no meta-lognumber entry in descriptor");
} else if (!have_last_sequence) {
printf("no last-sequence-number entry in descriptor");
s = Status::Corruption("no last-sequence-number entry in descriptor");
}
if (!have_prev_log_number) {
prev_log_number = 0;
}
MarkFileNumberUsed(prev_log_number);
MarkFileNumberUsed(log_number);
}
if (s.ok()) {
Version* v = new Version(this);
builder.SaveTo(v);
// Install recovered version
Finalize(v);
AppendVersion(v);
manifest_file_number_ = next_file;
next_file_number_ = next_file + 1;
last_sequence_ = last_sequence;
log_number_ = log_number;
prev_log_number_ = prev_log_number;
printf("manifest_file_number %d next_file_number %d last_sequence %d log_number %d prev_log_number %d\n",
manifest_file_number_, next_file_number_,
last_sequence, log_number, prev_log_number);
printf("%s \n", v->DebugString().c_str());
}
return s;
}
void VersionSet::MarkFileNumberUsed(uint64_t number) { void VersionSet::MarkFileNumberUsed(uint64_t number) {
if (next_file_number_ <= number) { if (next_file_number_ <= number) {
next_file_number_ = number + 1; next_file_number_ = number + 1;
@ -1032,6 +1146,21 @@ const char* VersionSet::LevelSummary(LevelSummaryStorage* scratch) const {
return scratch->buffer; return scratch->buffer;
} }
const char* VersionSet::LevelDataSizeSummary(
LevelSummaryStorage* scratch) const {
int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files_size[");
for (int i = 0; i < NumberLevels(); i++) {
int sz = sizeof(scratch->buffer) - len;
int ret = snprintf(scratch->buffer + len, sz, "%ld ",
NumLevelBytes(i));
if (ret < 0 || ret >= sz)
break;
len += ret;
}
snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "]");
return scratch->buffer;
}
uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) { uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) {
uint64_t result = 0; uint64_t result = 0;
for (int level = 0; level < NumberLevels(); level++) { for (int level = 0; level < NumberLevels(); level++) {
@ -1443,4 +1572,37 @@ void Compaction::ReleaseInputs() {
} }
} }
static void InputSummary(std::vector<FileMetaData*>& files,
char* output,
int len) {
int write = 0;
for (int i = 0; i < files.size(); i++) {
int sz = len - write;
int ret = snprintf(output + write, sz, "%llu(%llu) ",
files.at(i)->number,
files.at(i)->file_size);
if (ret < 0 || ret >= sz)
break;
write += ret;
}
}
void Compaction::Summary(char* output, int len) {
int write = snprintf(output, len, "Base level %d, inputs:", level_);
if(write < 0 || write > len)
return;
char level_low_summary[100];
InputSummary(inputs_[0], level_low_summary, 100);
char level_up_summary[100];
if (inputs_[1].size()) {
InputSummary(inputs_[1], level_up_summary, 100);
} else {
level_up_summary[0] = '\0';
}
snprintf(output + write, len - write, "[%s],[%s]",
level_low_summary, level_up_summary);
}
} // namespace leveldb } // namespace leveldb

@ -238,6 +238,13 @@ class VersionSet {
}; };
const char* LevelSummary(LevelSummaryStorage* scratch) const; const char* LevelSummary(LevelSummaryStorage* scratch) const;
// printf contents (for debugging)
Status DumpManifest(Options& options, std::string& manifestFileName);
// Return a human-readable short (single-line) summary of the data size
// of files per level. Uses *scratch as backing store.
const char* LevelDataSizeSummary(LevelSummaryStorage* scratch) const;
private: private:
class Builder; class Builder;
@ -344,6 +351,8 @@ class Compaction {
// is successful. // is successful.
void ReleaseInputs(); void ReleaseInputs();
void Summary(char* output, int len);
private: private:
friend class Version; friend class Version;
friend class VersionSet; friend class VersionSet;

@ -8,6 +8,13 @@ TOOLCHAIN_REV=d28c90311ca14f9f0b2bb720f4e34b285513d4f4
TOOLCHAIN_EXECUTABLES="/mnt/gvfs/third-party/$TOOLCHAIN_REV/centos5.2-native" TOOLCHAIN_EXECUTABLES="/mnt/gvfs/third-party/$TOOLCHAIN_REV/centos5.2-native"
TOOLCHAIN_LIB_BASE="/mnt/gvfs/third-party/$TOOLCHAIN_REV/gcc-4.6.2-glibc-2.13" TOOLCHAIN_LIB_BASE="/mnt/gvfs/third-party/$TOOLCHAIN_REV/gcc-4.6.2-glibc-2.13"
# always build thrift server
export USE_THRIFT=1
if ! test "$NO_SCRIBE"; then
export USE_SCRIBE=1
fi
# location of libhdfs libraries # location of libhdfs libraries
if test "$USE_HDFS"; then if test "$USE_HDFS"; then
JAVA_HOME="/usr/local/jdk-6u22-64" JAVA_HOME="/usr/local/jdk-6u22-64"

@ -122,6 +122,20 @@ class HdfsEnv : public Env {
posixEnv->SleepForMicroseconds(micros); posixEnv->SleepForMicroseconds(micros);
} }
virtual Status GetHostName(char* name, uint len) {
return posixEnv->GetHostName(name, len);
}
virtual Status GetCurrentTime(int64_t* unix_time) {
return posixEnv->NowUnixTime(unix_time);
}
virtual Status GetAbsolutePath(const std::string& db_path,
std::string* output_path) {
return posixEnv->GetAbsolutePath(db_path, output_path);
}
static uint64_t gettid() { static uint64_t gettid() {
assert(sizeof(pthread_t) <= sizeof(uint64_t)); assert(sizeof(pthread_t) <= sizeof(uint64_t));
return (uint64_t)pthread_self(); return (uint64_t)pthread_self();
@ -245,6 +259,14 @@ class HdfsEnv : public Env {
virtual uint64_t NowMicros() {} virtual uint64_t NowMicros() {}
virtual void SleepForMicroseconds(int micros) {} virtual void SleepForMicroseconds(int micros) {}
virtual Status GetHostName(char* name, uint len) {}
virtual Status GetCurrentTime(int64_t* unix_time) {}
virtual Status GetAbsolutePath(const std::string& db_path,
std::string* outputpath) {}
}; };
} }

@ -145,6 +145,16 @@ class Env {
// Sleep/delay the thread for the perscribed number of micro-seconds. // Sleep/delay the thread for the perscribed number of micro-seconds.
virtual void SleepForMicroseconds(int micros) = 0; virtual void SleepForMicroseconds(int micros) = 0;
// Get the current host name.
virtual Status GetHostName(char* name, uint len) = 0;
// Get the number of seconds since the Epoch, 1970-01-01 00:00:00 (UTC).
virtual Status GetCurrentTime(int64_t* unix_time) = 0;
// Get full directory name for this db.
virtual Status GetAbsolutePath(const std::string& db_path,
std::string* output_path) = 0;
private: private:
// No copying allowed // No copying allowed
Env(const Env&); Env(const Env&);
@ -314,6 +324,17 @@ class EnvWrapper : public Env {
void SleepForMicroseconds(int micros) { void SleepForMicroseconds(int micros) {
target_->SleepForMicroseconds(micros); target_->SleepForMicroseconds(micros);
} }
Status GetHostName(char* name, uint len) {
return target_->GetHostName(name, len);
}
Status GetCurrentTime(int64_t* unix_time) {
return target_->GetCurrentTime(unix_time);
}
Status GetAbsolutePath(const std::string& db_path,
std::string* output_path) {
return target_->GetAbsolutePath(db_path, output_path);
}
private: private:
Env* target_; Env* target_;
}; };

@ -152,17 +152,17 @@ struct Options {
// (max_bytes_for_level_base)^(max_bytes_for_level_multiplier). // (max_bytes_for_level_base)^(max_bytes_for_level_multiplier).
int max_bytes_for_level_base; int max_bytes_for_level_base;
int max_bytes_for_level_multiplier; int max_bytes_for_level_multiplier;
// Maximum number of bytes in all compacted files. We avoid expanding // Maximum number of bytes in all compacted files. We avoid expanding
// the lower level file set of a compaction if it would make the // the lower level file set of a compaction if it would make the
// total compaction cover more than // total compaction cover more than
// (expanded_compaction_factor * targetFileSizeLevel()) many bytes. // (expanded_compaction_factor * targetFileSizeLevel()) many bytes.
int expanded_compaction_factor; int expanded_compaction_factor;
// Control maximum bytes of overlaps in grandparent (i.e., level+2) before we // Control maximum bytes of overlaps in grandparent (i.e., level+2) before we
// stop building a single file in a level->level+1 compaction. // stop building a single file in a level->level+1 compaction.
int max_grandparent_overlap_factor; int max_grandparent_overlap_factor;
// Compress blocks using the specified compression algorithm. This // Compress blocks using the specified compression algorithm. This
// parameter can be changed dynamically. // parameter can be changed dynamically.
@ -198,8 +198,16 @@ struct Options {
// Default: false // Default: false
bool disableDataSync; bool disableDataSync;
// This number controls how often a new scribe log about
// db deploy stats is written out.
// -1 indicates no logging at all.
// Default value is 1800 (half an hour).
int db_stats_log_interval;
// Create an Options object with default values for all fields. // Create an Options object with default values for all fields.
Options(); Options();
void Dump(Logger * log) const;
}; };
// Options that control read operations // Options that control read operations

File diff suppressed because it is too large Load Diff

@ -0,0 +1,593 @@
/**
* Autogenerated by Thrift
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
*/
#ifndef _Tleveldb_scribe_H
#define _Tleveldb_scribe_H
#include <TDispatchProcessor.h>
#include "scribe_types.h"
namespace Tleveldb {
class scribeIf {
public:
virtual ~scribeIf() {}
virtual ResultCode Log(const std::vector<LogEntry> & messages) = 0;
virtual void LogMulti(std::vector<ResultCode> & _return, const std::vector<LogEntry> & messages) = 0;
virtual ResultCode LogCompressedMsg(const std::string& compressedMessages) = 0;
};
class scribeIfFactory {
public:
typedef scribeIf Handler;
virtual ~scribeIfFactory() {}
virtual scribeIf* getHandler(::apache::thrift::server::TConnectionContext* ctx) = 0;
virtual void releaseHandler(scribeIf* handler) = 0;
};
class scribeIfSingletonFactory : virtual public scribeIfFactory {
public:
scribeIfSingletonFactory(const boost::shared_ptr<scribeIf>& iface) : iface_(iface) {}
virtual ~scribeIfSingletonFactory() {}
virtual scribeIf* getHandler(::apache::thrift::server::TConnectionContext*) {
return iface_.get();
}
virtual void releaseHandler(scribeIf* handler) {}
protected:
boost::shared_ptr<scribeIf> iface_;
};
class scribeNull : virtual public scribeIf {
public:
virtual ~scribeNull() {}
ResultCode Log(const std::vector<LogEntry> & /* messages */) {
ResultCode _return = (ResultCode)0;
return _return;
}
void LogMulti(std::vector<ResultCode> & /* _return */, const std::vector<LogEntry> & /* messages */) {
return;
}
ResultCode LogCompressedMsg(const std::string& /* compressedMessages */) {
ResultCode _return = (ResultCode)0;
return _return;
}
};
class scribe_Log_args {
public:
static const uint64_t _reflection_id = 5902265217339133004U;
static void _reflection_register(::apache::thrift::reflection::Schema&);
scribe_Log_args() {
}
scribe_Log_args(const scribe_Log_args&) = default;
scribe_Log_args& operator=(const scribe_Log_args&) = default;
scribe_Log_args(scribe_Log_args&&) = default;
scribe_Log_args& operator=(scribe_Log_args&&) = default;
void __clear() {
messages.clear();
__isset.__clear();
}
virtual ~scribe_Log_args() throw() {}
std::vector<LogEntry> messages;
struct __isset {
__isset() { __clear(); }
void __clear() {
messages = false;
}
bool messages;
} __isset;
bool operator == (const scribe_Log_args & rhs) const
{
if (!(this->messages == rhs.messages))
return false;
return true;
}
bool operator != (const scribe_Log_args &rhs) const {
return !(*this == rhs);
}
bool operator < (const scribe_Log_args & ) const;
uint32_t read(apache::thrift::protocol::TProtocol* iprot);
uint32_t write(apache::thrift::protocol::TProtocol* oprot) const;
};
class scribe_Log_pargs {
public:
static const uint64_t _reflection_id = 5555604010648986412U;
static void _reflection_register(::apache::thrift::reflection::Schema&);
virtual ~scribe_Log_pargs() throw() {}
const std::vector<LogEntry> * messages;
uint32_t write(apache::thrift::protocol::TProtocol* oprot) const;
};
class scribe_Log_result {
public:
static const uint64_t _reflection_id = 18205781396971565932U;
static void _reflection_register(::apache::thrift::reflection::Schema&);
scribe_Log_result() : success(static_cast<ResultCode>(0)) {
}
scribe_Log_result(const scribe_Log_result&) = default;
scribe_Log_result& operator=(const scribe_Log_result&) = default;
scribe_Log_result(scribe_Log_result&&) = default;
scribe_Log_result& operator=(scribe_Log_result&&) = default;
void __clear() {
success = static_cast<ResultCode>(0);
__isset.__clear();
}
virtual ~scribe_Log_result() throw() {}
ResultCode success;
struct __isset {
__isset() { __clear(); }
void __clear() {
success = false;
}
bool success;
} __isset;
bool operator == (const scribe_Log_result & rhs) const
{
if (!(this->success == rhs.success))
return false;
return true;
}
bool operator != (const scribe_Log_result &rhs) const {
return !(*this == rhs);
}
bool operator < (const scribe_Log_result & ) const;
uint32_t read(apache::thrift::protocol::TProtocol* iprot);
uint32_t write(apache::thrift::protocol::TProtocol* oprot) const;
};
class scribe_Log_presult {
public:
static const uint64_t _reflection_id = 12945584136895385836U;
static void _reflection_register(::apache::thrift::reflection::Schema&);
virtual ~scribe_Log_presult() throw() {}
ResultCode* success;
struct __isset {
__isset() { __clear(); }
void __clear() {
success = false;
}
bool success;
} __isset;
uint32_t read(apache::thrift::protocol::TProtocol* iprot);
};
class scribe_LogMulti_args {
public:
static const uint64_t _reflection_id = 7590876486278061516U;
static void _reflection_register(::apache::thrift::reflection::Schema&);
scribe_LogMulti_args() {
}
scribe_LogMulti_args(const scribe_LogMulti_args&) = default;
scribe_LogMulti_args& operator=(const scribe_LogMulti_args&) = default;
scribe_LogMulti_args(scribe_LogMulti_args&&) = default;
scribe_LogMulti_args& operator=(scribe_LogMulti_args&&) = default;
void __clear() {
messages.clear();
__isset.__clear();
}
virtual ~scribe_LogMulti_args() throw() {}
std::vector<LogEntry> messages;
struct __isset {
__isset() { __clear(); }
void __clear() {
messages = false;
}
bool messages;
} __isset;
bool operator == (const scribe_LogMulti_args & rhs) const
{
if (!(this->messages == rhs.messages))
return false;
return true;
}
bool operator != (const scribe_LogMulti_args &rhs) const {
return !(*this == rhs);
}
bool operator < (const scribe_LogMulti_args & ) const;
uint32_t read(apache::thrift::protocol::TProtocol* iprot);
uint32_t write(apache::thrift::protocol::TProtocol* oprot) const;
};
class scribe_LogMulti_pargs {
public:
static const uint64_t _reflection_id = 9124384543551655628U;
static void _reflection_register(::apache::thrift::reflection::Schema&);
virtual ~scribe_LogMulti_pargs() throw() {}
const std::vector<LogEntry> * messages;
uint32_t write(apache::thrift::protocol::TProtocol* oprot) const;
};
class scribe_LogMulti_result {
public:
static const uint64_t _reflection_id = 4828367046341273164U;
static void _reflection_register(::apache::thrift::reflection::Schema&);
scribe_LogMulti_result() {
}
scribe_LogMulti_result(const scribe_LogMulti_result&) = default;
scribe_LogMulti_result& operator=(const scribe_LogMulti_result&) = default;
scribe_LogMulti_result(scribe_LogMulti_result&&) = default;
scribe_LogMulti_result& operator=(scribe_LogMulti_result&&) = default;
void __clear() {
success.clear();
__isset.__clear();
}
virtual ~scribe_LogMulti_result() throw() {}
std::vector<ResultCode> success;
struct __isset {
__isset() { __clear(); }
void __clear() {
success = false;
}
bool success;
} __isset;
bool operator == (const scribe_LogMulti_result & rhs) const
{
if (!(this->success == rhs.success))
return false;
return true;
}
bool operator != (const scribe_LogMulti_result &rhs) const {
return !(*this == rhs);
}
bool operator < (const scribe_LogMulti_result & ) const;
uint32_t read(apache::thrift::protocol::TProtocol* iprot);
uint32_t write(apache::thrift::protocol::TProtocol* oprot) const;
};
class scribe_LogMulti_presult {
public:
static const uint64_t _reflection_id = 5642041737363050316U;
static void _reflection_register(::apache::thrift::reflection::Schema&);
virtual ~scribe_LogMulti_presult() throw() {}
std::vector<ResultCode> * success;
struct __isset {
__isset() { __clear(); }
void __clear() {
success = false;
}
bool success;
} __isset;
uint32_t read(apache::thrift::protocol::TProtocol* iprot);
};
class scribe_LogCompressedMsg_args {
public:
static const uint64_t _reflection_id = 12705053036625273964U;
static void _reflection_register(::apache::thrift::reflection::Schema&);
scribe_LogCompressedMsg_args() : compressedMessages("") {
}
scribe_LogCompressedMsg_args(const scribe_LogCompressedMsg_args&) = default;
scribe_LogCompressedMsg_args& operator=(const scribe_LogCompressedMsg_args&) = default;
scribe_LogCompressedMsg_args(scribe_LogCompressedMsg_args&&) = default;
scribe_LogCompressedMsg_args& operator=(scribe_LogCompressedMsg_args&&) = default;
void __clear() {
compressedMessages = "";
__isset.__clear();
}
virtual ~scribe_LogCompressedMsg_args() throw() {}
std::string compressedMessages;
struct __isset {
__isset() { __clear(); }
void __clear() {
compressedMessages = false;
}
bool compressedMessages;
} __isset;
bool operator == (const scribe_LogCompressedMsg_args & rhs) const
{
if (!(this->compressedMessages == rhs.compressedMessages))
return false;
return true;
}
bool operator != (const scribe_LogCompressedMsg_args &rhs) const {
return !(*this == rhs);
}
bool operator < (const scribe_LogCompressedMsg_args & ) const;
uint32_t read(apache::thrift::protocol::TProtocol* iprot);
uint32_t write(apache::thrift::protocol::TProtocol* oprot) const;
};
class scribe_LogCompressedMsg_pargs {
public:
static const uint64_t _reflection_id = 13645577436870531500U;
static void _reflection_register(::apache::thrift::reflection::Schema&);
virtual ~scribe_LogCompressedMsg_pargs() throw() {}
const std::string* compressedMessages;
uint32_t write(apache::thrift::protocol::TProtocol* oprot) const;
};
class scribe_LogCompressedMsg_result {
public:
static const uint64_t _reflection_id = 15026639991904524972U;
static void _reflection_register(::apache::thrift::reflection::Schema&);
scribe_LogCompressedMsg_result() : success(static_cast<ResultCode>(0)) {
}
scribe_LogCompressedMsg_result(const scribe_LogCompressedMsg_result&) = default;
scribe_LogCompressedMsg_result& operator=(const scribe_LogCompressedMsg_result&) = default;
scribe_LogCompressedMsg_result(scribe_LogCompressedMsg_result&&) = default;
scribe_LogCompressedMsg_result& operator=(scribe_LogCompressedMsg_result&&) = default;
void __clear() {
success = static_cast<ResultCode>(0);
__isset.__clear();
}
virtual ~scribe_LogCompressedMsg_result() throw() {}
ResultCode success;
struct __isset {
__isset() { __clear(); }
void __clear() {
success = false;
}
bool success;
} __isset;
bool operator == (const scribe_LogCompressedMsg_result & rhs) const
{
if (!(this->success == rhs.success))
return false;
return true;
}
bool operator != (const scribe_LogCompressedMsg_result &rhs) const {
return !(*this == rhs);
}
bool operator < (const scribe_LogCompressedMsg_result & ) const;
uint32_t read(apache::thrift::protocol::TProtocol* iprot);
uint32_t write(apache::thrift::protocol::TProtocol* oprot) const;
};
class scribe_LogCompressedMsg_presult {
public:
static const uint64_t _reflection_id = 5311776576442573772U;
static void _reflection_register(::apache::thrift::reflection::Schema&);
virtual ~scribe_LogCompressedMsg_presult() throw() {}
ResultCode* success;
struct __isset {
__isset() { __clear(); }
void __clear() {
success = false;
}
bool success;
} __isset;
uint32_t read(apache::thrift::protocol::TProtocol* iprot);
};
class scribeClient : virtual public scribeIf, virtual public apache::thrift::TClientBase {
public:
scribeClient(boost::shared_ptr<apache::thrift::protocol::TProtocol> prot) :
checkSeqid_(true),
nextSendSequenceId_(1),
nextRecvSequenceId_(1),
piprot_(prot),
poprot_(prot) {
iprot_ = prot.get();
oprot_ = prot.get();
}
scribeClient(boost::shared_ptr<apache::thrift::protocol::TProtocol> iprot, boost::shared_ptr<apache::thrift::protocol::TProtocol> oprot) :
checkSeqid_(true),
nextSendSequenceId_(1),
nextRecvSequenceId_(1),
piprot_(iprot),
poprot_(oprot) {
iprot_ = iprot.get();
oprot_ = oprot.get();
}
boost::shared_ptr<apache::thrift::protocol::TProtocol> getInputProtocol() {
return piprot_;
}
boost::shared_ptr<apache::thrift::protocol::TProtocol> getOutputProtocol() {
return poprot_;
}
ResultCode Log(const std::vector<LogEntry> & messages);
void send_Log(const std::vector<LogEntry> & messages);
ResultCode recv_Log();
void LogMulti(std::vector<ResultCode> & _return, const std::vector<LogEntry> & messages);
void send_LogMulti(const std::vector<LogEntry> & messages);
void recv_LogMulti(std::vector<ResultCode> & _return);
ResultCode LogCompressedMsg(const std::string& compressedMessages);
void send_LogCompressedMsg(const std::string& compressedMessages);
ResultCode recv_LogCompressedMsg();
/**
* Disable checking the seqid field in server responses.
*
* This should only be used with broken servers that return incorrect seqid values.
*/
void _disableSequenceIdChecks() {
checkSeqid_ = false;
}
protected:
bool checkSeqid_;
int32_t nextSendSequenceId_;
int32_t nextRecvSequenceId_;
int32_t getNextSendSequenceId();
int32_t getNextRecvSequenceId();
boost::shared_ptr<apache::thrift::protocol::TProtocol> piprot_;
boost::shared_ptr<apache::thrift::protocol::TProtocol> poprot_;
apache::thrift::protocol::TProtocol* iprot_;
apache::thrift::protocol::TProtocol* oprot_;
};
class scribeProcessor : public ::apache::thrift::TDispatchProcessor {
protected:
boost::shared_ptr<scribeIf> iface_;
virtual bool dispatchCall(apache::thrift::protocol::TProtocol* iprot, apache::thrift::protocol::TProtocol* oprot, const std::string& fname, int32_t seqid, apache::thrift::server::TConnectionContext* connectionContext);
private:
typedef void (scribeProcessor::*ProcessFunction)(int32_t, apache::thrift::protocol::TProtocol*, apache::thrift::protocol::TProtocol*, apache::thrift::server::TConnectionContext*);
typedef std::map<std::string, ProcessFunction> ProcessMap;
ProcessMap processMap_;
void process_Log(int32_t seqid, apache::thrift::protocol::TProtocol* iprot, apache::thrift::protocol::TProtocol* oprot, apache::thrift::server::TConnectionContext* connectionContext);
void process_LogMulti(int32_t seqid, apache::thrift::protocol::TProtocol* iprot, apache::thrift::protocol::TProtocol* oprot, apache::thrift::server::TConnectionContext* connectionContext);
void process_LogCompressedMsg(int32_t seqid, apache::thrift::protocol::TProtocol* iprot, apache::thrift::protocol::TProtocol* oprot, apache::thrift::server::TConnectionContext* connectionContext);
public:
scribeProcessor(boost::shared_ptr<scribeIf> iface) :
iface_(iface) {
processMap_["Log"] = &scribeProcessor::process_Log;
processMap_["LogMulti"] = &scribeProcessor::process_LogMulti;
processMap_["LogCompressedMsg"] = &scribeProcessor::process_LogCompressedMsg;
}
virtual ~scribeProcessor() {}
boost::shared_ptr<std::set<std::string> > getProcessFunctions() {
boost::shared_ptr<std::set<std::string> > rSet(new std::set<std::string>());
rSet->insert("scribe.Log");
rSet->insert("scribe.LogMulti");
rSet->insert("scribe.LogCompressedMsg");
return rSet;
}
};
class scribeProcessorFactory : public ::apache::thrift::TProcessorFactory {
public:
scribeProcessorFactory(const ::boost::shared_ptr< scribeIfFactory >& handlerFactory) :
handlerFactory_(handlerFactory) {}
::boost::shared_ptr< ::apache::thrift::TProcessor > getProcessor(::apache::thrift::server::TConnectionContext* ctx);
protected:
::boost::shared_ptr< scribeIfFactory > handlerFactory_;
};
class scribeMultiface : virtual public scribeIf {
public:
scribeMultiface(std::vector<boost::shared_ptr<scribeIf> >& ifaces) : ifaces_(ifaces) {
}
virtual ~scribeMultiface() {}
protected:
std::vector<boost::shared_ptr<scribeIf> > ifaces_;
scribeMultiface() {}
void add(boost::shared_ptr<scribeIf> iface) {
ifaces_.push_back(iface);
}
public:
ResultCode Log(const std::vector<LogEntry> & messages) {
uint32_t i;
uint32_t sz = ifaces_.size();
for (i = 0; i < sz - 1; ++i) {
ifaces_[i]->Log(messages);
}
return ifaces_[i]->Log(messages);
}
void LogMulti(std::vector<ResultCode> & _return, const std::vector<LogEntry> & messages) {
uint32_t i;
uint32_t sz = ifaces_.size();
for (i = 0; i < sz; ++i) {
ifaces_[i]->LogMulti(_return, messages);
}
}
ResultCode LogCompressedMsg(const std::string& compressedMessages) {
uint32_t i;
uint32_t sz = ifaces_.size();
for (i = 0; i < sz - 1; ++i) {
ifaces_[i]->LogCompressedMsg(compressedMessages);
}
return ifaces_[i]->LogCompressedMsg(compressedMessages);
}
};
} // namespace
#endif

@ -0,0 +1,17 @@
/**
* Autogenerated by Thrift
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
*/
#include "scribe_constants.h"
namespace Tleveldb {
const scribeConstants g_scribe_constants;
scribeConstants::scribeConstants() {
SCRIBE_MAX_MESSAGE_LENGTH = 26214400;
}
} // namespace

@ -0,0 +1,25 @@
/**
* Autogenerated by Thrift
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
*/
#ifndef scribe_CONSTANTS_H
#define scribe_CONSTANTS_H
#include "scribe_types.h"
namespace Tleveldb {
class scribeConstants {
public:
scribeConstants();
int32_t SCRIBE_MAX_MESSAGE_LENGTH;
};
extern const scribeConstants g_scribe_constants;
} // namespace
#endif

@ -0,0 +1,513 @@
/**
* Autogenerated by Thrift
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
*/
#include "scribe_types.h"
#include <thrift/lib/cpp/Reflection.h>
#include <algorithm>
#include <string.h>
namespace Tleveldb {
int _kResultCodeValues[] = {
OK,
TRY_LATER,
ERROR_DECOMPRESS
};
const char* _kResultCodeNames[] = {
"OK",
"TRY_LATER",
"ERROR_DECOMPRESS"
};
const std::map<int, const char*> _ResultCode_VALUES_TO_NAMES(apache::thrift::TEnumIterator<int>(3, _kResultCodeValues, _kResultCodeNames), apache::thrift::TEnumIterator<int>(-1, NULL, NULL));
const std::map<const char*, int, apache::thrift::ltstr> _ResultCode_NAMES_TO_VALUES(apache::thrift::TEnumInverseIterator<int>(3, _kResultCodeValues, _kResultCodeNames), apache::thrift::TEnumInverseIterator<int>(-1, NULL, NULL));
} // namespace
namespace apache { namespace thrift {
template<>
const char* TEnumTraits< ::Tleveldb::ResultCode>::findName( ::Tleveldb::ResultCode value) {
return findName( ::Tleveldb::_ResultCode_VALUES_TO_NAMES, value);
}
template<>
bool TEnumTraits< ::Tleveldb::ResultCode>::findValue(const char* name, ::Tleveldb::ResultCode* out) {
return findValue( ::Tleveldb::_ResultCode_NAMES_TO_VALUES, name, out);
}
}} // apache::thrift
namespace Tleveldb {
// Reflection initializer for struct scribe.SourceInfo
namespace {
void reflectionInitializer_16557823557777806572(::apache::thrift::reflection::Schema& schema) {
const uint64_t id = 16557823557777806572U;
if (schema.dataTypes.count(id)) return;
::apache::thrift::reflection::DataType dt;
dt.name = "struct scribe.SourceInfo";
dt.__isset.fields = true;
{
::apache::thrift::reflection::StructField f;
f.isRequired = true;
f.type = 1U;
f.name = "host";
dt.fields[1] = f;
}
{
::apache::thrift::reflection::StructField f;
f.isRequired = true;
f.type = 5U;
f.name = "port";
dt.fields[2] = f;
}
{
::apache::thrift::reflection::StructField f;
f.isRequired = true;
f.type = 6U;
f.name = "timestamp";
dt.fields[3] = f;
}
schema.dataTypes[id] = dt;
schema.names[dt.name] = id;
}
} // namespace
const uint64_t SourceInfo::_reflection_id;
void SourceInfo::_reflection_register(::apache::thrift::reflection::Schema& schema) {
reflectionInitializer_16557823557777806572(schema);
}
uint32_t SourceInfo::read(apache::thrift::protocol::TProtocol* iprot) {
uint32_t xfer = 0;
std::string fname;
apache::thrift::protocol::TType ftype;
int16_t fid;
xfer += iprot->readStructBegin(fname);
using apache::thrift::protocol::TProtocolException;
while (true)
{
xfer += iprot->readFieldBegin(fname, ftype, fid);
if (ftype == apache::thrift::protocol::T_STOP) {
break;
}
switch (fid)
{
case 1:
if (ftype == apache::thrift::protocol::T_STRING) {
xfer += iprot->readBinary(this->host);
this->__isset.host = true;
} else {
xfer += iprot->skip(ftype);
}
break;
case 2:
if (ftype == apache::thrift::protocol::T_I32) {
xfer += iprot->readI32(this->port);
this->__isset.port = true;
} else {
xfer += iprot->skip(ftype);
}
break;
case 3:
if (ftype == apache::thrift::protocol::T_I64) {
xfer += iprot->readI64(this->timestamp);
this->__isset.timestamp = true;
} else {
xfer += iprot->skip(ftype);
}
break;
default:
xfer += iprot->skip(ftype);
break;
}
xfer += iprot->readFieldEnd();
}
xfer += iprot->readStructEnd();
return xfer;
}
uint32_t SourceInfo::write(apache::thrift::protocol::TProtocol* oprot) const {
uint32_t xfer = 0;
xfer += oprot->writeStructBegin("SourceInfo");
xfer += oprot->writeFieldBegin("host", apache::thrift::protocol::T_STRING, 1);
xfer += oprot->writeBinary(this->host);
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldBegin("port", apache::thrift::protocol::T_I32, 2);
xfer += oprot->writeI32(this->port);
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldBegin("timestamp", apache::thrift::protocol::T_I64, 3);
xfer += oprot->writeI64(this->timestamp);
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
return xfer;
}
void swap(SourceInfo &a, SourceInfo &b) {
using ::std::swap;
(void)a;
(void)b;
swap(a.host, b.host);
swap(a.port, b.port);
swap(a.timestamp, b.timestamp);
swap(a.__isset, b.__isset);
}
// Reflection initializer for map<string, string>
namespace {
void reflectionInitializer_9246346592659763371(::apache::thrift::reflection::Schema& schema) {
const uint64_t id = 9246346592659763371U;
if (schema.dataTypes.count(id)) return;
::apache::thrift::reflection::DataType dt;
dt.name = "map<string, string>";
dt.__isset.mapKeyType = true;
dt.mapKeyType = 1U;
dt.__isset.valueType = true;
dt.valueType = 1U;
schema.dataTypes[id] = dt;
schema.names[dt.name] = id;
}
} // namespace
// Reflection initializer for struct scribe.LogEntry
namespace {
void reflectionInitializer_15053466696968532300(::apache::thrift::reflection::Schema& schema) {
const uint64_t id = 15053466696968532300U;
if (schema.dataTypes.count(id)) return;
reflectionInitializer_16557823557777806572(schema); // struct scribe.SourceInfo
reflectionInitializer_9246346592659763371(schema); // map<string, string>
::apache::thrift::reflection::DataType dt;
dt.name = "struct scribe.LogEntry";
dt.__isset.fields = true;
{
::apache::thrift::reflection::StructField f;
f.isRequired = true;
f.type = 1U;
f.name = "category";
dt.fields[1] = f;
}
{
::apache::thrift::reflection::StructField f;
f.isRequired = true;
f.type = 1U;
f.name = "message";
dt.fields[2] = f;
}
{
::apache::thrift::reflection::StructField f;
f.isRequired = false;
f.type = 9246346592659763371U;
f.name = "metadata";
dt.fields[3] = f;
}
{
::apache::thrift::reflection::StructField f;
f.isRequired = false;
f.type = 5U;
f.name = "checksum";
dt.fields[4] = f;
}
{
::apache::thrift::reflection::StructField f;
f.isRequired = false;
f.type = 16557823557777806572U;
f.name = "source";
dt.fields[5] = f;
}
{
::apache::thrift::reflection::StructField f;
f.isRequired = false;
f.type = 5U;
f.name = "bucket";
dt.fields[6] = f;
}
schema.dataTypes[id] = dt;
schema.names[dt.name] = id;
}
} // namespace
const uint64_t LogEntry::_reflection_id;
void LogEntry::_reflection_register(::apache::thrift::reflection::Schema& schema) {
reflectionInitializer_15053466696968532300(schema);
}
uint32_t LogEntry::read(apache::thrift::protocol::TProtocol* iprot) {
uint32_t xfer = 0;
std::string fname;
apache::thrift::protocol::TType ftype;
int16_t fid;
xfer += iprot->readStructBegin(fname);
using apache::thrift::protocol::TProtocolException;
while (true)
{
xfer += iprot->readFieldBegin(fname, ftype, fid);
if (ftype == apache::thrift::protocol::T_STOP) {
break;
}
switch (fid)
{
case 1:
if (ftype == apache::thrift::protocol::T_STRING) {
xfer += iprot->readBinary(this->category);
this->__isset.category = true;
} else {
xfer += iprot->skip(ftype);
}
break;
case 2:
if (ftype == apache::thrift::protocol::T_STRING) {
xfer += iprot->readBinary(this->message);
this->__isset.message = true;
} else {
xfer += iprot->skip(ftype);
}
break;
case 3:
if (ftype == apache::thrift::protocol::T_MAP) {
{
this->metadata.clear();
uint32_t _size0;
apache::thrift::protocol::TType _ktype1;
apache::thrift::protocol::TType _vtype2;
xfer += iprot->readMapBegin(_ktype1, _vtype2, _size0);
uint32_t _i4;
for (_i4 = 0; _i4 < _size0; ++_i4)
{
std::string _key5;
xfer += iprot->readString(_key5);
std::string& _val6 = this->metadata[_key5];
xfer += iprot->readString(_val6);
}
xfer += iprot->readMapEnd();
}
this->__isset.metadata = true;
} else {
xfer += iprot->skip(ftype);
}
break;
case 4:
if (ftype == apache::thrift::protocol::T_I32) {
xfer += iprot->readI32(this->checksum);
this->__isset.checksum = true;
} else {
xfer += iprot->skip(ftype);
}
break;
case 5:
if (ftype == apache::thrift::protocol::T_STRUCT) {
xfer += this->source.read(iprot);
this->__isset.source = true;
} else {
xfer += iprot->skip(ftype);
}
break;
case 6:
if (ftype == apache::thrift::protocol::T_I32) {
xfer += iprot->readI32(this->bucket);
this->__isset.bucket = true;
} else {
xfer += iprot->skip(ftype);
}
break;
default:
xfer += iprot->skip(ftype);
break;
}
xfer += iprot->readFieldEnd();
}
xfer += iprot->readStructEnd();
return xfer;
}
uint32_t LogEntry::write(apache::thrift::protocol::TProtocol* oprot) const {
uint32_t xfer = 0;
xfer += oprot->writeStructBegin("LogEntry");
xfer += oprot->writeFieldBegin("category", apache::thrift::protocol::T_STRING, 1);
xfer += oprot->writeBinary(this->category);
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldBegin("message", apache::thrift::protocol::T_STRING, 2);
xfer += oprot->writeBinary(this->message);
xfer += oprot->writeFieldEnd();
if (this->__isset.metadata) {
xfer += oprot->writeFieldBegin("metadata", apache::thrift::protocol::T_MAP, 3);
{
xfer += oprot->writeMapBegin(apache::thrift::protocol::T_STRING, apache::thrift::protocol::T_STRING, this->metadata.size());
std::map<std::string, std::string> ::const_iterator _iter7;
for (_iter7 = this->metadata.begin(); _iter7 != this->metadata.end(); ++_iter7)
{
xfer += oprot->writeString(_iter7->first);
xfer += oprot->writeString(_iter7->second);
}
xfer += oprot->writeMapEnd();
}
xfer += oprot->writeFieldEnd();
}
if (this->__isset.checksum) {
xfer += oprot->writeFieldBegin("checksum", apache::thrift::protocol::T_I32, 4);
xfer += oprot->writeI32(this->checksum);
xfer += oprot->writeFieldEnd();
}
if (this->__isset.source) {
xfer += oprot->writeFieldBegin("source", apache::thrift::protocol::T_STRUCT, 5);
xfer += this->source.write(oprot);
xfer += oprot->writeFieldEnd();
}
if (this->__isset.bucket) {
xfer += oprot->writeFieldBegin("bucket", apache::thrift::protocol::T_I32, 6);
xfer += oprot->writeI32(this->bucket);
xfer += oprot->writeFieldEnd();
}
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
return xfer;
}
void swap(LogEntry &a, LogEntry &b) {
using ::std::swap;
(void)a;
(void)b;
swap(a.category, b.category);
swap(a.message, b.message);
swap(a.metadata, b.metadata);
swap(a.checksum, b.checksum);
swap(a.source, b.source);
swap(a.bucket, b.bucket);
swap(a.__isset, b.__isset);
}
// Reflection initializer for list<struct scribe.LogEntry>
namespace {
void reflectionInitializer_10251729064312664553(::apache::thrift::reflection::Schema& schema) {
const uint64_t id = 10251729064312664553U;
if (schema.dataTypes.count(id)) return;
reflectionInitializer_15053466696968532300(schema); // struct scribe.LogEntry
::apache::thrift::reflection::DataType dt;
dt.name = "list<struct scribe.LogEntry>";
dt.__isset.valueType = true;
dt.valueType = 15053466696968532300U;
schema.dataTypes[id] = dt;
schema.names[dt.name] = id;
}
} // namespace
// Reflection initializer for struct scribe.MessageList
namespace {
void reflectionInitializer_5674270912483072844(::apache::thrift::reflection::Schema& schema) {
const uint64_t id = 5674270912483072844U;
if (schema.dataTypes.count(id)) return;
reflectionInitializer_10251729064312664553(schema); // list<struct scribe.LogEntry>
::apache::thrift::reflection::DataType dt;
dt.name = "struct scribe.MessageList";
dt.__isset.fields = true;
{
::apache::thrift::reflection::StructField f;
f.isRequired = true;
f.type = 10251729064312664553U;
f.name = "messages";
dt.fields[1] = f;
}
schema.dataTypes[id] = dt;
schema.names[dt.name] = id;
}
} // namespace
const uint64_t MessageList::_reflection_id;
void MessageList::_reflection_register(::apache::thrift::reflection::Schema& schema) {
reflectionInitializer_5674270912483072844(schema);
}
uint32_t MessageList::read(apache::thrift::protocol::TProtocol* iprot) {
uint32_t xfer = 0;
std::string fname;
apache::thrift::protocol::TType ftype;
int16_t fid;
xfer += iprot->readStructBegin(fname);
using apache::thrift::protocol::TProtocolException;
while (true)
{
xfer += iprot->readFieldBegin(fname, ftype, fid);
if (ftype == apache::thrift::protocol::T_STOP) {
break;
}
switch (fid)
{
case 1:
if (ftype == apache::thrift::protocol::T_LIST) {
{
this->messages.clear();
uint32_t _size8;
apache::thrift::protocol::TType _etype11;
xfer += iprot->readListBegin(_etype11, _size8);
this->messages.resize(_size8);
uint32_t _i12;
for (_i12 = 0; _i12 < _size8; ++_i12)
{
xfer += this->messages[_i12].read(iprot);
}
xfer += iprot->readListEnd();
}
this->__isset.messages = true;
} else {
xfer += iprot->skip(ftype);
}
break;
default:
xfer += iprot->skip(ftype);
break;
}
xfer += iprot->readFieldEnd();
}
xfer += iprot->readStructEnd();
return xfer;
}
uint32_t MessageList::write(apache::thrift::protocol::TProtocol* oprot) const {
uint32_t xfer = 0;
xfer += oprot->writeStructBegin("MessageList");
xfer += oprot->writeFieldBegin("messages", apache::thrift::protocol::T_LIST, 1);
{
xfer += oprot->writeListBegin(apache::thrift::protocol::T_STRUCT, this->messages.size());
std::vector<LogEntry> ::const_iterator _iter13;
for (_iter13 = this->messages.begin(); _iter13 != this->messages.end(); ++_iter13)
{
xfer += (*_iter13).write(oprot);
}
xfer += oprot->writeListEnd();
}
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd();
return xfer;
}
void swap(MessageList &a, MessageList &b) {
using ::std::swap;
(void)a;
(void)b;
swap(a.messages, b.messages);
swap(a.__isset, b.__isset);
}
} // namespace

@ -0,0 +1,247 @@
/**
* Autogenerated by Thrift
*
* DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
* @generated
*/
#ifndef scribe_TYPES_H
#define scribe_TYPES_H
#include <Thrift.h>
#include <TApplicationException.h>
#include <protocol/TProtocol.h>
#include <transport/TTransport.h>
namespace apache { namespace thrift { namespace reflection {
class Schema;
}}}
namespace Tleveldb {
enum ResultCode {
OK = 0,
TRY_LATER = 1,
ERROR_DECOMPRESS = 2
};
extern const std::map<int, const char*> _ResultCode_VALUES_TO_NAMES;
extern const std::map<const char*, int, apache::thrift::ltstr> _ResultCode_NAMES_TO_VALUES;
} // namespace
namespace apache { namespace thrift {
template<>
inline constexpr ::Tleveldb::ResultCode TEnumTraits< ::Tleveldb::ResultCode>::min() {
return ::Tleveldb::ResultCode::OK;
}
template<>
inline constexpr ::Tleveldb::ResultCode TEnumTraits< ::Tleveldb::ResultCode>::max() {
return ::Tleveldb::ResultCode::ERROR_DECOMPRESS;
}
}} // apache:thrift
namespace Tleveldb {
class SourceInfo {
public:
static const uint64_t _reflection_id = 16557823557777806572U;
static void _reflection_register(::apache::thrift::reflection::Schema&);
SourceInfo() : host(""), port(0), timestamp(0) {
}
SourceInfo(const SourceInfo&) = default;
SourceInfo& operator=(const SourceInfo&) = default;
SourceInfo(SourceInfo&&) = default;
SourceInfo& operator=(SourceInfo&&) = default;
void __clear() {
host = "";
port = 0;
timestamp = 0;
__isset.__clear();
}
virtual ~SourceInfo() throw() {}
std::string host;
int32_t port;
int64_t timestamp;
struct __isset {
__isset() { __clear(); }
void __clear() {
host = false;
port = false;
timestamp = false;
}
bool host;
bool port;
bool timestamp;
} __isset;
bool operator == (const SourceInfo & rhs) const
{
if (!(this->host == rhs.host))
return false;
if (!(this->port == rhs.port))
return false;
if (!(this->timestamp == rhs.timestamp))
return false;
return true;
}
bool operator != (const SourceInfo &rhs) const {
return !(*this == rhs);
}
bool operator < (const SourceInfo & ) const;
uint32_t read(apache::thrift::protocol::TProtocol* iprot);
uint32_t write(apache::thrift::protocol::TProtocol* oprot) const;
};
class SourceInfo;
void swap(SourceInfo &a, SourceInfo &b);
class LogEntry {
public:
static const uint64_t _reflection_id = 15053466696968532300U;
static void _reflection_register(::apache::thrift::reflection::Schema&);
LogEntry() : category(""), message(""), checksum(0), bucket(0) {
}
LogEntry(const LogEntry&) = default;
LogEntry& operator=(const LogEntry&) = default;
LogEntry(LogEntry&&) = default;
LogEntry& operator=(LogEntry&&) = default;
void __clear() {
category = "";
message = "";
metadata.clear();
checksum = 0;
source.__clear();
bucket = 0;
__isset.__clear();
}
virtual ~LogEntry() throw() {}
std::string category;
std::string message;
std::map<std::string, std::string> metadata;
int32_t checksum;
SourceInfo source;
int32_t bucket;
struct __isset {
__isset() { __clear(); }
void __clear() {
category = false;
message = false;
metadata = false;
checksum = false;
source = false;
bucket = false;
}
bool category;
bool message;
bool metadata;
bool checksum;
bool source;
bool bucket;
} __isset;
bool operator == (const LogEntry & rhs) const
{
if (!(this->category == rhs.category))
return false;
if (!(this->message == rhs.message))
return false;
if (__isset.metadata != rhs.__isset.metadata)
return false;
else if (__isset.metadata && !(metadata == rhs.metadata))
return false;
if (__isset.checksum != rhs.__isset.checksum)
return false;
else if (__isset.checksum && !(checksum == rhs.checksum))
return false;
if (__isset.source != rhs.__isset.source)
return false;
else if (__isset.source && !(source == rhs.source))
return false;
if (__isset.bucket != rhs.__isset.bucket)
return false;
else if (__isset.bucket && !(bucket == rhs.bucket))
return false;
return true;
}
bool operator != (const LogEntry &rhs) const {
return !(*this == rhs);
}
bool operator < (const LogEntry & ) const;
uint32_t read(apache::thrift::protocol::TProtocol* iprot);
uint32_t write(apache::thrift::protocol::TProtocol* oprot) const;
};
class LogEntry;
void swap(LogEntry &a, LogEntry &b);
class MessageList {
public:
static const uint64_t _reflection_id = 5674270912483072844U;
static void _reflection_register(::apache::thrift::reflection::Schema&);
MessageList() {
}
MessageList(const MessageList&) = default;
MessageList& operator=(const MessageList&) = default;
MessageList(MessageList&&) = default;
MessageList& operator=(MessageList&&) = default;
void __clear() {
messages.clear();
__isset.__clear();
}
virtual ~MessageList() throw() {}
std::vector<LogEntry> messages;
struct __isset {
__isset() { __clear(); }
void __clear() {
messages = false;
}
bool messages;
} __isset;
bool operator == (const MessageList & rhs) const
{
if (!(this->messages == rhs.messages))
return false;
return true;
}
bool operator != (const MessageList &rhs) const {
return !(*this == rhs);
}
bool operator < (const MessageList & ) const;
uint32_t read(apache::thrift::protocol::TProtocol* iprot);
uint32_t write(apache::thrift::protocol::TProtocol* oprot) const;
};
class MessageList;
void swap(MessageList &a, MessageList &b);
} // namespace
#endif

@ -0,0 +1,82 @@
#!/usr/local/bin/thrift --cpp --php
## Copyright (c) 2007-2012 Facebook
##
## Licensed under the Apache License, Version 2.0 (the "License");
## you may not use this file except in compliance with the License.
## You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
##
## See accompanying file LICENSE or visit the Scribe site at:
## http://developers.facebook.com/scribe/
namespace cpp Tleveldb
namespace java Tleveldb
// Max message length allowed to log through scribe
const i32 SCRIBE_MAX_MESSAGE_LENGTH = 26214400;
enum ResultCode
{
OK,
TRY_LATER,
ERROR_DECOMPRESS
}
struct SourceInfo
{
1: binary host,
2: i32 port,
3: i64 timestamp
}
struct LogEntry
{
1: binary category,
2: binary message,
3: optional map<string, string> metadata,
4: optional i32 checksum,
5: optional SourceInfo source,
6: optional i32 bucket
}
struct MessageList
{
1: list<LogEntry> messages
}
service scribe
{
#
# Delivers a list of LogEntry messages to the Scribe server.
# A returned ResultCode of anything other than OK indicates that the
# whole batch was unable to be delivered to the server.
# If data loss is a concern, the caller should buffer and retry the messages.
#
ResultCode Log(1: list<LogEntry> messages);
#
# NOTE: FOR INTERNAL USE ONLY!
#
# Delivers a list of LogEntry messages to the Scribe server, but
# allows partial successes. A list of ResultCodes will be returned to
# indicate the success or failure of each message at the corresponding index.
# If data loss is a concern, the caller should retry only the failed messages.
#
list<ResultCode> LogMulti(1: list<LogEntry> messages);
#
# NOTE: FOR INTERNAL USE ONLY!
#
# The same as Log(...) except that the list of messages must first be
# serialized and compressed in some internal format.
#
ResultCode LogCompressedMsg(1: binary compressedMessages);
}

@ -0,0 +1,90 @@
#include "scribe_logger.h"
namespace leveldb {
const std::string ScribeLogger::COL_SEPERATOR = "\x1";
const std::string ScribeLogger::DEPLOY_STATS_CATEGORY = "leveldb_deploy_stats";
ScribeLogger::ScribeLogger(const std::string& host, int port,
int retry_times, uint32_t retry_intervals, int batch_size)
: host_(host),
port_(port),
retry_times_(retry_times),
retry_intervals_ (retry_intervals),
batch_size_ (batch_size) {
shared_ptr<TSocket> socket(new TSocket(host_, port_));
shared_ptr<TFramedTransport> framedTransport(new TFramedTransport(socket));
framedTransport->open();
shared_ptr<TBinaryProtocol> protocol(new TBinaryProtocol(framedTransport));
scribe_client_ = new scribeClient(protocol);
}
void ScribeLogger::Log(const std::string& category,
const std::string& message) {
LogEntry entry;
entry.category = category;
entry.message = message;
logger_mutex_.Lock();
logs_.push_back(entry);
if (logs_.size() >= batch_size_) {
ResultCode ret = scribe_client_->Log(logs_);
int retries_left = retry_times_;
while (ret == TRY_LATER && retries_left > 0) {
Env::Default()->SleepForMicroseconds(retry_intervals_);
ret = scribe_client_->Log(logs_);
retries_left--;
}
// Clear the local messages if either successfully write out
// or has failed in the last 10 calls.
if (ret == OK || logs_.size() > batch_size_ * 5) {
logs_.clear();
}
}
logger_mutex_.Unlock();
}
void ScribeLogger::MakeScribeMessage(std::string& output,
std::vector<std::string>& cols) {
int sz = cols.size();
int i = 0;
for (; i < sz - 1; i++) {
std::string& col = cols.at(i);
output += col;
output += ScribeLogger::COL_SEPERATOR;
}
std::string& col = cols.at(i);
output+=col;
}
void ScribeLogger::Log_Deploy_Stats(
const std::string& db_version,
const std::string& machine_info,
const std::string& data_dir,
const uint64_t data_size,
const uint32_t file_number,
const std::string& data_size_per_level,
const std::string& file_number_per_level,
const int64_t& ts_unix) {
std::string message;
std::vector<std::string> cols;
cols.push_back(db_version);
cols.push_back(machine_info);
cols.push_back(data_dir);
cols.push_back(boost::lexical_cast<std::string>(data_size));
cols.push_back(boost::lexical_cast<std::string>(file_number));
cols.push_back(data_size_per_level);
cols.push_back(file_number_per_level);
cols.push_back(boost::lexical_cast<std::string>(ts_unix));
MakeScribeMessage(message, cols);
return Log(ScribeLogger::DEPLOY_STATS_CATEGORY, message);
}
ScribeLogger::~ScribeLogger(){
delete scribe_client_;
}
}

@ -0,0 +1,71 @@
#ifndef SCRIBE_LOGGER_H_
#define SCRIBE_LOGGER_H_
#include "scribe/if/gen-cpp/scribe.h"
#include "scribe/if/gen-cpp/scribe_types.h"
#include "thrift/lib/cpp/protocol/TProtocol.h"
#include "thrift/lib/cpp/transport/TSocket.h"
#include "thrift/lib/cpp/protocol/TBinaryProtocol.h"
#include "thrift/lib/cpp/transport/TBufferTransports.h"
#include "leveldb/env.h"
#include "port/port.h"
#include "util/stats_logger.h"
#include "boost/lexical_cast.hpp"
using namespace Tleveldb;
using Tleveldb::scribeClient;
using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using boost::shared_ptr;
using namespace ::Tleveldb;
namespace leveldb {
class ScribeLogger : public StatsLogger{
private:
std::string host_;
int port_;
int batch_size_;
scribeClient* scribe_client_;
std::vector<LogEntry> logs_;
port::Mutex logger_mutex_;
int retry_times_;
uint32_t retry_intervals_;
void MakeScribeMessage(std::string& output, std::vector<std::string>& cols);
public:
static const std::string COL_SEPERATOR;
static const std::string DEPLOY_STATS_CATEGORY;
ScribeLogger(const std::string& host, int port,
int retry_times=3, uint32_t retry_intervals=1000000,
int batch_size=1);
virtual ~ScribeLogger();
virtual void Log(const std::string& category, const std::string& message);
virtual void Log_Deploy_Stats(
const std::string& db_version,
const std::string& machine_info,
const std::string& data_dir,
const uint64_t data_size,
const uint32_t file_number,
const std::string& data_size_per_level,
const std::string& file_number_per_level,
const int64_t& ts_unix
);
};
}
#endif /* SCRIBE_LOGGER_H_ */

@ -21,5 +21,5 @@ You can run the leveldb server unit tests by
You can regenerate the thrift cpp files by doing the following You can regenerate the thrift cpp files by doing the following
cd ./thrift cd ./thrift
thrift --gen cpp if/leveldb.thrift bin/thrift --gen cpp if/leveldb.thrift

Binary file not shown.

@ -422,6 +422,76 @@ void reflectionInitializer_6731746507948871532(::apache::thrift::reflection::Sch
f.name = "compression"; f.name = "compression";
dt.fields[7] = f; dt.fields[7] = f;
} }
{
::apache::thrift::reflection::StructField f;
f.isRequired = true;
f.type = 5U;
f.name = "num_levels";
dt.fields[8] = f;
}
{
::apache::thrift::reflection::StructField f;
f.isRequired = true;
f.type = 5U;
f.name = "level0_file_num_compaction_trigger";
dt.fields[9] = f;
}
{
::apache::thrift::reflection::StructField f;
f.isRequired = true;
f.type = 5U;
f.name = "level0_slowdown_writes_trigger";
dt.fields[10] = f;
}
{
::apache::thrift::reflection::StructField f;
f.isRequired = true;
f.type = 5U;
f.name = "level0_stop_writes_trigger";
dt.fields[11] = f;
}
{
::apache::thrift::reflection::StructField f;
f.isRequired = true;
f.type = 5U;
f.name = "target_file_size_base";
dt.fields[12] = f;
}
{
::apache::thrift::reflection::StructField f;
f.isRequired = true;
f.type = 5U;
f.name = "target_file_size_multiplier";
dt.fields[13] = f;
}
{
::apache::thrift::reflection::StructField f;
f.isRequired = true;
f.type = 5U;
f.name = "max_bytes_for_level_base";
dt.fields[14] = f;
}
{
::apache::thrift::reflection::StructField f;
f.isRequired = true;
f.type = 5U;
f.name = "max_bytes_for_level_multiplier";
dt.fields[15] = f;
}
{
::apache::thrift::reflection::StructField f;
f.isRequired = true;
f.type = 5U;
f.name = "max_grandparent_overlap_factor";
dt.fields[16] = f;
}
{
::apache::thrift::reflection::StructField f;
f.isRequired = true;
f.type = 2U;
f.name = "disableDataSync";
dt.fields[17] = f;
}
schema.dataTypes[id] = dt; schema.dataTypes[id] = dt;
schema.names[dt.name] = id; schema.names[dt.name] = id;
} }
@ -509,6 +579,86 @@ uint32_t DBOptions::read(apache::thrift::protocol::TProtocol* iprot) {
xfer += iprot->skip(ftype); xfer += iprot->skip(ftype);
} }
break; break;
case 8:
if (ftype == apache::thrift::protocol::T_I32) {
xfer += iprot->readI32(this->num_levels);
this->__isset.num_levels = true;
} else {
xfer += iprot->skip(ftype);
}
break;
case 9:
if (ftype == apache::thrift::protocol::T_I32) {
xfer += iprot->readI32(this->level0_file_num_compaction_trigger);
this->__isset.level0_file_num_compaction_trigger = true;
} else {
xfer += iprot->skip(ftype);
}
break;
case 10:
if (ftype == apache::thrift::protocol::T_I32) {
xfer += iprot->readI32(this->level0_slowdown_writes_trigger);
this->__isset.level0_slowdown_writes_trigger = true;
} else {
xfer += iprot->skip(ftype);
}
break;
case 11:
if (ftype == apache::thrift::protocol::T_I32) {
xfer += iprot->readI32(this->level0_stop_writes_trigger);
this->__isset.level0_stop_writes_trigger = true;
} else {
xfer += iprot->skip(ftype);
}
break;
case 12:
if (ftype == apache::thrift::protocol::T_I32) {
xfer += iprot->readI32(this->target_file_size_base);
this->__isset.target_file_size_base = true;
} else {
xfer += iprot->skip(ftype);
}
break;
case 13:
if (ftype == apache::thrift::protocol::T_I32) {
xfer += iprot->readI32(this->target_file_size_multiplier);
this->__isset.target_file_size_multiplier = true;
} else {
xfer += iprot->skip(ftype);
}
break;
case 14:
if (ftype == apache::thrift::protocol::T_I32) {
xfer += iprot->readI32(this->max_bytes_for_level_base);
this->__isset.max_bytes_for_level_base = true;
} else {
xfer += iprot->skip(ftype);
}
break;
case 15:
if (ftype == apache::thrift::protocol::T_I32) {
xfer += iprot->readI32(this->max_bytes_for_level_multiplier);
this->__isset.max_bytes_for_level_multiplier = true;
} else {
xfer += iprot->skip(ftype);
}
break;
case 16:
if (ftype == apache::thrift::protocol::T_I32) {
xfer += iprot->readI32(this->max_grandparent_overlap_factor);
this->__isset.max_grandparent_overlap_factor = true;
} else {
xfer += iprot->skip(ftype);
}
break;
case 17:
if (ftype == apache::thrift::protocol::T_BOOL) {
xfer += iprot->readBool(this->disableDataSync);
this->__isset.disableDataSync = true;
} else {
xfer += iprot->skip(ftype);
}
break;
default: default:
xfer += iprot->skip(ftype); xfer += iprot->skip(ftype);
break; break;
@ -545,6 +695,36 @@ uint32_t DBOptions::write(apache::thrift::protocol::TProtocol* oprot) const {
xfer += oprot->writeFieldBegin("compression", apache::thrift::protocol::T_I32, 7); xfer += oprot->writeFieldBegin("compression", apache::thrift::protocol::T_I32, 7);
xfer += oprot->writeI32((int32_t)this->compression); xfer += oprot->writeI32((int32_t)this->compression);
xfer += oprot->writeFieldEnd(); xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldBegin("num_levels", apache::thrift::protocol::T_I32, 8);
xfer += oprot->writeI32(this->num_levels);
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldBegin("level0_file_num_compaction_trigger", apache::thrift::protocol::T_I32, 9);
xfer += oprot->writeI32(this->level0_file_num_compaction_trigger);
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldBegin("level0_slowdown_writes_trigger", apache::thrift::protocol::T_I32, 10);
xfer += oprot->writeI32(this->level0_slowdown_writes_trigger);
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldBegin("level0_stop_writes_trigger", apache::thrift::protocol::T_I32, 11);
xfer += oprot->writeI32(this->level0_stop_writes_trigger);
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldBegin("target_file_size_base", apache::thrift::protocol::T_I32, 12);
xfer += oprot->writeI32(this->target_file_size_base);
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldBegin("target_file_size_multiplier", apache::thrift::protocol::T_I32, 13);
xfer += oprot->writeI32(this->target_file_size_multiplier);
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldBegin("max_bytes_for_level_base", apache::thrift::protocol::T_I32, 14);
xfer += oprot->writeI32(this->max_bytes_for_level_base);
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldBegin("max_bytes_for_level_multiplier", apache::thrift::protocol::T_I32, 15);
xfer += oprot->writeI32(this->max_bytes_for_level_multiplier);
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldBegin("max_grandparent_overlap_factor", apache::thrift::protocol::T_I32, 16);
xfer += oprot->writeI32(this->max_grandparent_overlap_factor);
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldBegin("disableDataSync", apache::thrift::protocol::T_BOOL, 17);
xfer += oprot->writeBool(this->disableDataSync);
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldStop(); xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd(); xfer += oprot->writeStructEnd();
return xfer; return xfer;
@ -561,6 +741,16 @@ void swap(DBOptions &a, DBOptions &b) {
swap(a.block_size, b.block_size); swap(a.block_size, b.block_size);
swap(a.block_restart_interval, b.block_restart_interval); swap(a.block_restart_interval, b.block_restart_interval);
swap(a.compression, b.compression); swap(a.compression, b.compression);
swap(a.num_levels, b.num_levels);
swap(a.level0_file_num_compaction_trigger, b.level0_file_num_compaction_trigger);
swap(a.level0_slowdown_writes_trigger, b.level0_slowdown_writes_trigger);
swap(a.level0_stop_writes_trigger, b.level0_stop_writes_trigger);
swap(a.target_file_size_base, b.target_file_size_base);
swap(a.target_file_size_multiplier, b.target_file_size_multiplier);
swap(a.max_bytes_for_level_base, b.max_bytes_for_level_base);
swap(a.max_bytes_for_level_multiplier, b.max_bytes_for_level_multiplier);
swap(a.max_grandparent_overlap_factor, b.max_grandparent_overlap_factor);
swap(a.disableDataSync, b.disableDataSync);
swap(a.__isset, b.__isset); swap(a.__isset, b.__isset);
} }
@ -579,6 +769,13 @@ void reflectionInitializer_8830325115029814540(::apache::thrift::reflection::Sch
f.name = "sync"; f.name = "sync";
dt.fields[1] = f; dt.fields[1] = f;
} }
{
::apache::thrift::reflection::StructField f;
f.isRequired = true;
f.type = 2U;
f.name = "disableWAL";
dt.fields[2] = f;
}
schema.dataTypes[id] = dt; schema.dataTypes[id] = dt;
schema.names[dt.name] = id; schema.names[dt.name] = id;
} }
@ -616,6 +813,14 @@ uint32_t WriteOptions::read(apache::thrift::protocol::TProtocol* iprot) {
xfer += iprot->skip(ftype); xfer += iprot->skip(ftype);
} }
break; break;
case 2:
if (ftype == apache::thrift::protocol::T_BOOL) {
xfer += iprot->readBool(this->disableWAL);
this->__isset.disableWAL = true;
} else {
xfer += iprot->skip(ftype);
}
break;
default: default:
xfer += iprot->skip(ftype); xfer += iprot->skip(ftype);
break; break;
@ -634,6 +839,9 @@ uint32_t WriteOptions::write(apache::thrift::protocol::TProtocol* oprot) const {
xfer += oprot->writeFieldBegin("sync", apache::thrift::protocol::T_BOOL, 1); xfer += oprot->writeFieldBegin("sync", apache::thrift::protocol::T_BOOL, 1);
xfer += oprot->writeBool(this->sync); xfer += oprot->writeBool(this->sync);
xfer += oprot->writeFieldEnd(); xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldBegin("disableWAL", apache::thrift::protocol::T_BOOL, 2);
xfer += oprot->writeBool(this->disableWAL);
xfer += oprot->writeFieldEnd();
xfer += oprot->writeFieldStop(); xfer += oprot->writeFieldStop();
xfer += oprot->writeStructEnd(); xfer += oprot->writeStructEnd();
return xfer; return xfer;
@ -644,6 +852,7 @@ void swap(WriteOptions &a, WriteOptions &b) {
(void)a; (void)a;
(void)b; (void)b;
swap(a.sync, b.sync); swap(a.sync, b.sync);
swap(a.disableWAL, b.disableWAL);
swap(a.__isset, b.__isset); swap(a.__isset, b.__isset);
} }

@ -238,7 +238,7 @@ class DBOptions {
static const uint64_t _reflection_id = 6731746507948871532U; static const uint64_t _reflection_id = 6731746507948871532U;
static void _reflection_register(::apache::thrift::reflection::Schema&); static void _reflection_register(::apache::thrift::reflection::Schema&);
DBOptions() : create_if_missing(0), error_if_exists(0), write_buffer_size(0), max_open_files(0), block_size(0), block_restart_interval(0), compression(static_cast<CompressionType>(0)) { DBOptions() : create_if_missing(0), error_if_exists(0), write_buffer_size(0), max_open_files(0), block_size(0), block_restart_interval(0), compression(static_cast<CompressionType>(0)), num_levels(0), level0_file_num_compaction_trigger(0), level0_slowdown_writes_trigger(0), level0_stop_writes_trigger(0), target_file_size_base(0), target_file_size_multiplier(0), max_bytes_for_level_base(0), max_bytes_for_level_multiplier(0), max_grandparent_overlap_factor(0), disableDataSync(0) {
} }
DBOptions(const DBOptions&) = default; DBOptions(const DBOptions&) = default;
@ -254,6 +254,16 @@ class DBOptions {
block_size = 0; block_size = 0;
block_restart_interval = 0; block_restart_interval = 0;
compression = static_cast<CompressionType>(0); compression = static_cast<CompressionType>(0);
num_levels = 0;
level0_file_num_compaction_trigger = 0;
level0_slowdown_writes_trigger = 0;
level0_stop_writes_trigger = 0;
target_file_size_base = 0;
target_file_size_multiplier = 0;
max_bytes_for_level_base = 0;
max_bytes_for_level_multiplier = 0;
max_grandparent_overlap_factor = 0;
disableDataSync = 0;
__isset.__clear(); __isset.__clear();
} }
@ -266,6 +276,16 @@ class DBOptions {
int32_t block_size; int32_t block_size;
int32_t block_restart_interval; int32_t block_restart_interval;
CompressionType compression; CompressionType compression;
int32_t num_levels;
int32_t level0_file_num_compaction_trigger;
int32_t level0_slowdown_writes_trigger;
int32_t level0_stop_writes_trigger;
int32_t target_file_size_base;
int32_t target_file_size_multiplier;
int32_t max_bytes_for_level_base;
int32_t max_bytes_for_level_multiplier;
int32_t max_grandparent_overlap_factor;
bool disableDataSync;
struct __isset { struct __isset {
__isset() { __clear(); } __isset() { __clear(); }
@ -277,6 +297,16 @@ class DBOptions {
block_size = false; block_size = false;
block_restart_interval = false; block_restart_interval = false;
compression = false; compression = false;
num_levels = false;
level0_file_num_compaction_trigger = false;
level0_slowdown_writes_trigger = false;
level0_stop_writes_trigger = false;
target_file_size_base = false;
target_file_size_multiplier = false;
max_bytes_for_level_base = false;
max_bytes_for_level_multiplier = false;
max_grandparent_overlap_factor = false;
disableDataSync = false;
} }
bool create_if_missing; bool create_if_missing;
bool error_if_exists; bool error_if_exists;
@ -285,6 +315,16 @@ class DBOptions {
bool block_size; bool block_size;
bool block_restart_interval; bool block_restart_interval;
bool compression; bool compression;
bool num_levels;
bool level0_file_num_compaction_trigger;
bool level0_slowdown_writes_trigger;
bool level0_stop_writes_trigger;
bool target_file_size_base;
bool target_file_size_multiplier;
bool max_bytes_for_level_base;
bool max_bytes_for_level_multiplier;
bool max_grandparent_overlap_factor;
bool disableDataSync;
} __isset; } __isset;
bool operator == (const DBOptions & rhs) const bool operator == (const DBOptions & rhs) const
@ -303,6 +343,26 @@ class DBOptions {
return false; return false;
if (!(this->compression == rhs.compression)) if (!(this->compression == rhs.compression))
return false; return false;
if (!(this->num_levels == rhs.num_levels))
return false;
if (!(this->level0_file_num_compaction_trigger == rhs.level0_file_num_compaction_trigger))
return false;
if (!(this->level0_slowdown_writes_trigger == rhs.level0_slowdown_writes_trigger))
return false;
if (!(this->level0_stop_writes_trigger == rhs.level0_stop_writes_trigger))
return false;
if (!(this->target_file_size_base == rhs.target_file_size_base))
return false;
if (!(this->target_file_size_multiplier == rhs.target_file_size_multiplier))
return false;
if (!(this->max_bytes_for_level_base == rhs.max_bytes_for_level_base))
return false;
if (!(this->max_bytes_for_level_multiplier == rhs.max_bytes_for_level_multiplier))
return false;
if (!(this->max_grandparent_overlap_factor == rhs.max_grandparent_overlap_factor))
return false;
if (!(this->disableDataSync == rhs.disableDataSync))
return false;
return true; return true;
} }
bool operator != (const DBOptions &rhs) const { bool operator != (const DBOptions &rhs) const {
@ -324,7 +384,7 @@ class WriteOptions {
static const uint64_t _reflection_id = 8830325115029814540U; static const uint64_t _reflection_id = 8830325115029814540U;
static void _reflection_register(::apache::thrift::reflection::Schema&); static void _reflection_register(::apache::thrift::reflection::Schema&);
WriteOptions() : sync(0) { WriteOptions() : sync(0), disableWAL(0) {
} }
WriteOptions(const WriteOptions&) = default; WriteOptions(const WriteOptions&) = default;
@ -334,25 +394,31 @@ class WriteOptions {
void __clear() { void __clear() {
sync = 0; sync = 0;
disableWAL = 0;
__isset.__clear(); __isset.__clear();
} }
virtual ~WriteOptions() throw() {} virtual ~WriteOptions() throw() {}
bool sync; bool sync;
bool disableWAL;
struct __isset { struct __isset {
__isset() { __clear(); } __isset() { __clear(); }
void __clear() { void __clear() {
sync = false; sync = false;
disableWAL = false;
} }
bool sync; bool sync;
bool disableWAL;
} __isset; } __isset;
bool operator == (const WriteOptions & rhs) const bool operator == (const WriteOptions & rhs) const
{ {
if (!(this->sync == rhs.sync)) if (!(this->sync == rhs.sync))
return false; return false;
if (!(this->disableWAL == rhs.disableWAL))
return false;
return true; return true;
} }
bool operator != (const WriteOptions &rhs) const { bool operator != (const WriteOptions &rhs) const {

@ -47,12 +47,23 @@ struct DBOptions {
4:i32 max_open_files; 4:i32 max_open_files;
5:i32 block_size; 5:i32 block_size;
6:i32 block_restart_interval; 6:i32 block_restart_interval;
7:CompressionType compression 7:CompressionType compression,
8:i32 num_levels,
9:i32 level0_file_num_compaction_trigger,
10:i32 level0_slowdown_writes_trigger,
11:i32 level0_stop_writes_trigger,
12:i32 target_file_size_base,
13:i32 target_file_size_multiplier,
14:i32 max_bytes_for_level_base,
15:i32 max_bytes_for_level_multiplier,
16:i32 max_grandparent_overlap_factor,
17:bool disableDataSync
} }
// Options for writing // Options for writing
struct WriteOptions { struct WriteOptions {
1:bool sync 1:bool sync,
2:bool disableWAL
} }
struct Snapshot { struct Snapshot {

@ -73,10 +73,10 @@ public:
cache_size_ = l; cache_size_ = l;
} else if (sscanf(argv[i], "--cache_numshardbits=%d%c", &n, &junk) == 1) { } else if (sscanf(argv[i], "--cache_numshardbits=%d%c", &n, &junk) == 1) {
cache_numshardbits_ = n; cache_numshardbits_ = n;
} else if (strncmp(argv[i], "--hostname=", 10) == 0) { } else if (strncmp(argv[i], "--hostname=", 11) == 0) {
hostname_ = argv[i] + 10; hostname_ = argv[i] + 11;
} else if (strncmp(argv[i], "--rootdir=", 9) == 0) { } else if (strncmp(argv[i], "--rootdir=", 10) == 0) {
rootdir_ = argv[i] + 9; rootdir_ = argv[i] + 10;
} else { } else {
fprintf(stderr, "Invalid flag '%s'\n", argv[i]); fprintf(stderr, "Invalid flag '%s'\n", argv[i]);
return false; return false;

@ -75,6 +75,26 @@ class DBHandler : virtual public DBIf {
} else if (dboptions.compression == kSnappyCompression) { } else if (dboptions.compression == kSnappyCompression) {
options.compression = leveldb::kSnappyCompression; options.compression = leveldb::kSnappyCompression;
} }
if (dboptions.num_levels > 0)
options.num_levels = dboptions.num_levels;
if (dboptions.level0_file_num_compaction_trigger > 0)
options.level0_file_num_compaction_trigger = dboptions.level0_file_num_compaction_trigger;
if (dboptions.level0_slowdown_writes_trigger > 0)
options.level0_slowdown_writes_trigger = dboptions.level0_slowdown_writes_trigger;
if (dboptions.level0_stop_writes_trigger)
options.level0_stop_writes_trigger = dboptions.level0_stop_writes_trigger;
if (dboptions.target_file_size_base > 0)
options.target_file_size_base = dboptions.target_file_size_base;
if (dboptions.target_file_size_multiplier > 0)
options.target_file_size_multiplier = dboptions.target_file_size_multiplier;
if (dboptions.max_bytes_for_level_base)
options.max_bytes_for_level_base = dboptions.max_bytes_for_level_base;
if (dboptions.max_bytes_for_level_multiplier)
options.max_bytes_for_level_multiplier = dboptions.max_bytes_for_level_multiplier;
if (dboptions.max_grandparent_overlap_factor)
options.max_grandparent_overlap_factor = dboptions.max_grandparent_overlap_factor;
if (dboptions.disableDataSync)
options.disableDataSync = dboptions.disableDataSync;
openHandles->add(options, dbname, dbdir); openHandles->add(options, dbname, dbdir);
_return.dbname = dbname; _return.dbname = dbname;
} }
@ -91,6 +111,7 @@ class DBHandler : virtual public DBIf {
const WriteOptions& options) { const WriteOptions& options) {
leveldb::WriteOptions woptions; leveldb::WriteOptions woptions;
woptions.sync = options.sync; woptions.sync = options.sync;
woptions.disableWAL = options.disableWAL;
leveldb::Slice key, value; leveldb::Slice key, value;
key.data_ = kv.key.data.data(); key.data_ = kv.key.data.data();
key.size_ = kv.key.size; key.size_ = kv.key.size;
@ -111,6 +132,7 @@ class DBHandler : virtual public DBIf {
const WriteOptions& options) { const WriteOptions& options) {
leveldb::WriteOptions woptions; leveldb::WriteOptions woptions;
woptions.sync = options.sync; woptions.sync = options.sync;
woptions.disableWAL = options.disableWAL;
leveldb::Slice key; leveldb::Slice key;
key.data_ = kv.data.data(); key.data_ = kv.data.data();
key.size_ = kv.size; key.size_ = kv.size;
@ -130,6 +152,7 @@ class DBHandler : virtual public DBIf {
leveldb::WriteOptions woptions; leveldb::WriteOptions woptions;
leveldb::WriteBatch lbatch; leveldb::WriteBatch lbatch;
woptions.sync = options.sync; woptions.sync = options.sync;
woptions.disableWAL = options.disableWAL;
leveldb::Slice key, value; leveldb::Slice key, value;
for (unsigned int i = 0; i < batch.size(); i++) { for (unsigned int i = 0; i < batch.size(); i++) {
kv one = batch[i]; kv one = batch[i];

@ -0,0 +1,71 @@
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/version_set.h"
#include <algorithm>
#include <stdio.h>
#include "db/filename.h"
#include "db/log_reader.h"
#include "db/log_writer.h"
#include "db/memtable.h"
#include "db/table_cache.h"
#include "leveldb/env.h"
#include "leveldb/table_builder.h"
#include "table/merger.h"
#include "table/two_level_iterator.h"
#include "util/coding.h"
#include "util/logging.h"
static int verbose = 0;
using namespace leveldb;
//
// Takes a manifest file and dumps out all metedata
//
int main(int argc, char** argv) {
// parse command line options
int n;
char junk;
int foundfile = 0;
std::string manifestfile;
for (int i = 1; i < argc; i++) {
std::string param(argv[i]);
if ((n = param.find("--file=")) != std::string::npos) {
manifestfile = param.substr(strlen("--file="));
foundfile = 1;
} else if (sscanf(argv[i], "--verbose=%d%c", &n, &junk) == 1 &&
(n == 0 || n == 1)) {
verbose = n;
}
}
if (!foundfile) {
fprintf(stderr, "%s [--verbose=0|1] [--file=pathname of manifest file\n",
argv[0]);
abort();
}
if (verbose) {
printf("Processing Manifest file %s\n", manifestfile.c_str());
}
Options options;
std::string file(manifestfile);
std::string dbname("dummy");
TableCache* tc = new TableCache(dbname, &options, 10);
const InternalKeyComparator* cmp = new InternalKeyComparator(options.comparator);
VersionSet* versions = new VersionSet(dbname, &options,
tc, cmp);
Status s = versions->DumpManifest(options, file);
if (!s.ok()) {
printf("Error in processing file %s %s\n", manifestfile.c_str(),
s.ToString().c_str());
}
if (verbose) {
printf("Processing Manifest file %s done\n", manifestfile.c_str());
}
}

@ -0,0 +1,14 @@
/*version.h*/
#ifndef VERSION_H_
#define VERSION_H_
// these variables tell us about the git config and time
extern const char* leveldb_build_git_sha;
extern const char* leveldb_build_git_datetime;
// these variables tell us when the compilation occured
extern const char* leveldb_build_compile_time;
extern const char* leveldb_build_compile_date;
#endif /* VERSION_H_ */

@ -3,6 +3,7 @@
// found in the LICENSE file. See the AUTHORS file for names of contributors. // found in the LICENSE file. See the AUTHORS file for names of contributors.
#include <deque> #include <deque>
#include <set>
#include <dirent.h> #include <dirent.h>
#include <errno.h> #include <errno.h>
#include <fcntl.h> #include <fcntl.h>
@ -31,6 +32,10 @@ namespace leveldb {
namespace { namespace {
// list of pathnames that are locked
static std::set<std::string> lockedFiles;
static port::Mutex mutex_lockedFiles;
static Status IOError(const std::string& context, int err_number) { static Status IOError(const std::string& context, int err_number) {
return Status::IOError(context, strerror(err_number)); return Status::IOError(context, strerror(err_number));
} }
@ -291,7 +296,28 @@ class PosixMmapFile : public WritableFile {
} }
}; };
static int LockOrUnlock(int fd, bool lock) { static int LockOrUnlock(const std::string& fname, int fd, bool lock) {
mutex_lockedFiles.Lock();
if (lock) {
// If it already exists in the lockedFiles set, then it is already locked,
// and fail this lock attempt. Otherwise, insert it into lockedFiles.
// This check is needed because fcntl() does not detect lock conflict
// if the fcntl is issued by the same thread that earlier acquired
// this lock.
if (lockedFiles.insert(fname).second == false) {
mutex_lockedFiles.Unlock();
errno = ENOLCK;
return -1;
}
} else {
// If we are unlocking, then verify that we had locked it earlier,
// it should already exist in lockedFiles. Remove it from lockedFiles.
if (lockedFiles.erase(fname) != 1) {
mutex_lockedFiles.Unlock();
errno = ENOLCK;
return -1;
}
}
errno = 0; errno = 0;
struct flock f; struct flock f;
memset(&f, 0, sizeof(f)); memset(&f, 0, sizeof(f));
@ -299,12 +325,19 @@ static int LockOrUnlock(int fd, bool lock) {
f.l_whence = SEEK_SET; f.l_whence = SEEK_SET;
f.l_start = 0; f.l_start = 0;
f.l_len = 0; // Lock/unlock entire file f.l_len = 0; // Lock/unlock entire file
return fcntl(fd, F_SETLK, &f); int value = fcntl(fd, F_SETLK, &f);
if (value == -1 && lock) {
// if there is an error in locking, then remove the pathname from lockedfiles
lockedFiles.erase(fname);
}
mutex_lockedFiles.Unlock();
return value;
} }
class PosixFileLock : public FileLock { class PosixFileLock : public FileLock {
public: public:
int fd_; int fd_;
std::string filename;
}; };
class PosixEnv : public Env { class PosixEnv : public Env {
@ -435,12 +468,13 @@ class PosixEnv : public Env {
int fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644); int fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644);
if (fd < 0) { if (fd < 0) {
result = IOError(fname, errno); result = IOError(fname, errno);
} else if (LockOrUnlock(fd, true) == -1) { } else if (LockOrUnlock(fname, fd, true) == -1) {
result = IOError("lock " + fname, errno); result = IOError("lock " + fname, errno);
close(fd); close(fd);
} else { } else {
PosixFileLock* my_lock = new PosixFileLock; PosixFileLock* my_lock = new PosixFileLock;
my_lock->fd_ = fd; my_lock->fd_ = fd;
my_lock->filename = fname;
*lock = my_lock; *lock = my_lock;
} }
return result; return result;
@ -449,7 +483,7 @@ class PosixEnv : public Env {
virtual Status UnlockFile(FileLock* lock) { virtual Status UnlockFile(FileLock* lock) {
PosixFileLock* my_lock = reinterpret_cast<PosixFileLock*>(lock); PosixFileLock* my_lock = reinterpret_cast<PosixFileLock*>(lock);
Status result; Status result;
if (LockOrUnlock(my_lock->fd_, false) == -1) { if (LockOrUnlock(my_lock->filename, my_lock->fd_, false) == -1) {
result = IOError("unlock", errno); result = IOError("unlock", errno);
} }
close(my_lock->fd_); close(my_lock->fd_);
@ -503,6 +537,43 @@ class PosixEnv : public Env {
usleep(micros); usleep(micros);
} }
virtual Status GetHostName(char* name, uint len) {
int ret = gethostname(name, len);
if (ret < 0) {
if (errno == EFAULT || errno == EINVAL)
return Status::InvalidArgument(strerror(errno));
else
return IOError("GetHostName", errno);
}
return Status::OK();
}
virtual Status GetCurrentTime(int64_t* unix_time) {
time_t ret = time(NULL);
if (ret == (time_t) -1) {
return IOError("GetCurrentTime", errno);
}
*unix_time = (int64_t) ret;
return Status::OK();
}
virtual Status GetAbsolutePath(const std::string& db_path,
std::string* output_path) {
if (db_path.find('/') == 0) {
*output_path = db_path;
return Status::OK();
}
char the_path[256];
char* ret = getcwd(the_path, 256);
if (ret == NULL) {
return Status::IOError(strerror(errno));
}
*output_path = ret;
return Status::OK();
}
private: private:
void PthreadCall(const char* label, int result) { void PthreadCall(const char* label, int result) {
if (result != 0) { if (result != 0) {

@ -0,0 +1,57 @@
// Copyright (c) 2012 Facebook. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "leveldb/status.h"
#include "leveldb/env.h"
#include <vector>
#include "util/coding.h"
#include "util/testharness.h"
namespace leveldb {
class LockTest {
public:
static LockTest* current_;
std::string file_;
leveldb::Env* env_;
LockTest() : file_(test::TmpDir() + "/db_testlock_file"),
env_(leveldb::Env::Default()) {
current_ = this;
}
~LockTest() {
}
Status LockFile(FileLock** db_lock) {
return env_->LockFile(file_, db_lock);
}
Status UnlockFile(FileLock* db_lock) {
return env_->UnlockFile(db_lock);
}
};
LockTest* LockTest::current_;
TEST(LockTest, LockBySameThread) {
FileLock* lock1;
FileLock* lock2;
// acquire a lock on a file
ASSERT_OK(LockFile(&lock1));
// re-acquire the lock on the same file. This should fail.
ASSERT_TRUE(LockFile(&lock2).IsIOError());
// release the lock
ASSERT_OK(UnlockFile(lock1));
}
} // namespace leveldb
int main(int argc, char** argv) {
return leveldb::test::RunAllTests();
}

@ -6,6 +6,7 @@
#include "leveldb/comparator.h" #include "leveldb/comparator.h"
#include "leveldb/env.h" #include "leveldb/env.h"
#include "leveldb/filter_policy.h"
namespace leveldb { namespace leveldb {
@ -28,14 +29,36 @@ Options::Options()
level0_stop_writes_trigger(12), level0_stop_writes_trigger(12),
max_mem_compaction_level(2), max_mem_compaction_level(2),
target_file_size_base(2 * 1048576), target_file_size_base(2 * 1048576),
target_file_size_multiplier(10), target_file_size_multiplier(1),
max_bytes_for_level_base(10 * 1048576), max_bytes_for_level_base(10 * 1048576),
max_bytes_for_level_multiplier(10), max_bytes_for_level_multiplier(10),
expanded_compaction_factor(25), expanded_compaction_factor(25),
max_grandparent_overlap_factor(10), max_grandparent_overlap_factor(10),
filter_policy(NULL), filter_policy(NULL),
statistics(NULL), statistics(NULL),
disableDataSync(false) { disableDataSync(false),
db_stats_log_interval(1800) {
} }
void
Options::Dump(
Logger * log) const
{
Log(log," Options.comparator: %s", comparator->Name());
Log(log," Options.create_if_missing: %d", create_if_missing);
Log(log," Options.error_if_exists: %d", error_if_exists);
Log(log," Options.paranoid_checks: %d", paranoid_checks);
Log(log," Options.env: %p", env);
Log(log," Options.info_log: %p", info_log);
Log(log," Options.write_buffer_size: %zd", write_buffer_size);
Log(log," Options.max_open_files: %d", max_open_files);
Log(log," Options.block_cache: %p", block_cache);
Log(log," Options.block_size: %zd", block_size);
Log(log,"Options.block_restart_interval: %d", block_restart_interval);
Log(log," Options.compression: %d", compression);
Log(log," Options.filter_policy: %s", filter_policy == NULL ? "NULL" : filter_policy->Name());
} // Options::Dump
} // namespace leveldb } // namespace leveldb

@ -0,0 +1,23 @@
#ifndef STATS_LOGGER_H_
#define STATS_LOGGER_H_
namespace leveldb {
class StatsLogger {
public:
virtual void Log_Deploy_Stats(const std::string& db_version,
const std::string& machine_info,
const std::string& data_dir,
const uint64_t data_size,
const uint32_t file_number,
const std::string& data_size_per_level,
const std::string& file_number_per_level,
const int64_t& ts_unix) = 0;
};
}
#endif
Loading…
Cancel
Save