This is the mega-patch multi-threaded compaction

published in https://reviews.facebook.net/D5997.

Summary:
This patch allows compaction to occur in multiple background threads
concurrently.

If a manual compaction is issued, the system falls back to a
single-compaction-thread model. This is done to ensure correctess
and simplicity of code. When the manual compaction is finished,
the system resumes its concurrent-compaction mode automatically.

The updates to the manifest are done via group-commit approach.

Test Plan: run db_bench
main
Dhruba Borthakur 12 years ago
parent cd93e82845
commit 1ca0584345
  1. 34
      db/db_bench.cc
  2. 359
      db/db_impl.cc
  3. 31
      db/db_impl.h
  4. 10
      db/memtable.cc
  5. 17
      db/memtable.h
  6. 192
      db/memtablelist.cc
  7. 96
      db/memtablelist.h
  8. 2
      db/repair.cc
  9. 6
      db/skiplist.h
  10. 4
      db/version_edit.h
  11. 461
      db/version_set.cc
  12. 74
      db/version_set.h
  13. 6
      hdfs/env_hdfs.h
  14. 6
      include/leveldb/env.h
  15. 13
      include/leveldb/options.h
  16. 29
      tools/db_stress.cc
  17. 20
      util/env_posix.cc
  18. 47
      util/options.cc

@ -87,6 +87,16 @@ static bool FLAGS_histogram = false;
// (initialized to default value by "main") // (initialized to default value by "main")
static int FLAGS_write_buffer_size = 0; static int FLAGS_write_buffer_size = 0;
// The number of in-memory memtables.
// Each memtable is of size FLAGS_write_buffer_size.
// This is initialized to default value of 2 in "main" function.
static int FLAGS_max_write_buffer_number = 0;
// The maximum number of concurrent background compactions
// that can occur in parallel.
// This is initialized to default value of 1 in "main" function.
static int FLAGS_max_background_compactions = 0;
// Number of bytes to use as a cache of uncompressed data. // Number of bytes to use as a cache of uncompressed data.
// Negative means use default settings. // Negative means use default settings.
static long FLAGS_cache_size = -1; static long FLAGS_cache_size = -1;
@ -159,6 +169,9 @@ static int FLAGS_level0_stop_writes_trigger = 12;
// Number of files in level-0 that will slow down writes. // Number of files in level-0 that will slow down writes.
static int FLAGS_level0_slowdown_writes_trigger = 8; static int FLAGS_level0_slowdown_writes_trigger = 8;
// Number of files in level-0 when compactions start
static int FLAGS_level0_file_num_compaction_trigger = 4;
// Ratio of reads to writes (expressed as a percentage) // Ratio of reads to writes (expressed as a percentage)
// for the ReadRandomWriteRandom workload. The default // for the ReadRandomWriteRandom workload. The default
// setting is 9 gets for every 1 put. // setting is 9 gets for every 1 put.
@ -326,7 +339,8 @@ class Stats {
} else { } else {
double now = FLAGS_env->NowMicros(); double now = FLAGS_env->NowMicros();
fprintf(stderr, fprintf(stderr,
"... thread %d: %ld ops in %.6f seconds and %.2f ops/sec\n", "%s thread %d: %ld ops in %.6f seconds and %.2f ops/sec\n",
FLAGS_env->TimeToString((uint64_t) now/1000000).c_str(),
id_, id_,
done_ - last_report_done_, done_ - last_report_done_,
(now - last_report_finish_) / 1000000.0, (now - last_report_finish_) / 1000000.0,
@ -873,6 +887,8 @@ class Benchmark {
options.create_if_missing = !FLAGS_use_existing_db; options.create_if_missing = !FLAGS_use_existing_db;
options.block_cache = cache_; options.block_cache = cache_;
options.write_buffer_size = FLAGS_write_buffer_size; options.write_buffer_size = FLAGS_write_buffer_size;
options.max_write_buffer_number = FLAGS_max_write_buffer_number;
options.max_background_compactions = FLAGS_max_background_compactions;
options.block_size = FLAGS_block_size; options.block_size = FLAGS_block_size;
options.filter_policy = filter_policy_; options.filter_policy = filter_policy_;
options.max_open_files = FLAGS_open_files; options.max_open_files = FLAGS_open_files;
@ -887,6 +903,8 @@ class Benchmark {
options.max_bytes_for_level_multiplier = options.max_bytes_for_level_multiplier =
FLAGS_max_bytes_for_level_multiplier; FLAGS_max_bytes_for_level_multiplier;
options.level0_stop_writes_trigger = FLAGS_level0_stop_writes_trigger; options.level0_stop_writes_trigger = FLAGS_level0_stop_writes_trigger;
options.level0_file_num_compaction_trigger =
FLAGS_level0_file_num_compaction_trigger;
options.level0_slowdown_writes_trigger = options.level0_slowdown_writes_trigger =
FLAGS_level0_slowdown_writes_trigger; FLAGS_level0_slowdown_writes_trigger;
options.compression = FLAGS_compression_type; options.compression = FLAGS_compression_type;
@ -1172,7 +1190,10 @@ class Benchmark {
int main(int argc, char** argv) { int main(int argc, char** argv) {
FLAGS_write_buffer_size = leveldb::Options().write_buffer_size; FLAGS_write_buffer_size = leveldb::Options().write_buffer_size;
FLAGS_max_write_buffer_number = leveldb::Options().max_write_buffer_number;
FLAGS_open_files = leveldb::Options().max_open_files; FLAGS_open_files = leveldb::Options().max_open_files;
FLAGS_max_background_compactions =
leveldb::Options().max_background_compactions;
// Compression test code above refers to FLAGS_block_size // Compression test code above refers to FLAGS_block_size
FLAGS_block_size = leveldb::Options().block_size; FLAGS_block_size = leveldb::Options().block_size;
std::string default_db_path; std::string default_db_path;
@ -1203,6 +1224,10 @@ int main(int argc, char** argv) {
FLAGS_value_size = n; FLAGS_value_size = n;
} else if (sscanf(argv[i], "--write_buffer_size=%d%c", &n, &junk) == 1) { } else if (sscanf(argv[i], "--write_buffer_size=%d%c", &n, &junk) == 1) {
FLAGS_write_buffer_size = n; FLAGS_write_buffer_size = n;
} else if (sscanf(argv[i], "--max_write_buffer_number=%d%c", &n, &junk) == 1) {
FLAGS_max_write_buffer_number = n;
} else if (sscanf(argv[i], "--max_background_compactions=%d%c", &n, &junk) == 1) {
FLAGS_max_background_compactions = n;
} else if (sscanf(argv[i], "--cache_size=%ld%c", &l, &junk) == 1) { } else if (sscanf(argv[i], "--cache_size=%ld%c", &l, &junk) == 1) {
FLAGS_cache_size = l; FLAGS_cache_size = l;
} else if (sscanf(argv[i], "--block_size=%d%c", &n, &junk) == 1) { } else if (sscanf(argv[i], "--block_size=%d%c", &n, &junk) == 1) {
@ -1281,6 +1306,9 @@ int main(int argc, char** argv) {
} else if (sscanf(argv[i],"--level0_slowdown_writes_trigger=%d%c", } else if (sscanf(argv[i],"--level0_slowdown_writes_trigger=%d%c",
&n, &junk) == 1) { &n, &junk) == 1) {
FLAGS_level0_slowdown_writes_trigger = n; FLAGS_level0_slowdown_writes_trigger = n;
} else if (sscanf(argv[i],"--level0_file_num_compaction_trigger=%d%c",
&n, &junk) == 1) {
FLAGS_level0_file_num_compaction_trigger = n;
} else if (strncmp(argv[i], "--compression_type=", 19) == 0) { } else if (strncmp(argv[i], "--compression_type=", 19) == 0) {
const char* ctype = argv[i] + 19; const char* ctype = argv[i] + 19;
if (!strcasecmp(ctype, "none")) if (!strcasecmp(ctype, "none"))
@ -1309,6 +1337,10 @@ int main(int argc, char** argv) {
} }
} }
// The number of background threads should be at least as much the
// max number of concurrent compactions.
FLAGS_env->SetBackgroundThreads(FLAGS_max_background_compactions);
// Choose a location for the test database if none given with --db=<path> // Choose a location for the test database if none given with --db=<path>
if (FLAGS_db == NULL) { if (FLAGS_db == NULL) {
leveldb::Env::Default()->GetTestDirectory(&default_db_path); leveldb::Env::Default()->GetTestDirectory(&default_db_path);

@ -18,6 +18,7 @@
#include "db/log_reader.h" #include "db/log_reader.h"
#include "db/log_writer.h" #include "db/log_writer.h"
#include "db/memtable.h" #include "db/memtable.h"
#include "db/memtablelist.h"
#include "db/table_cache.h" #include "db/table_cache.h"
#include "db/version_set.h" #include "db/version_set.h"
#include "db/write_batch_internal.h" #include "db/write_batch_internal.h"
@ -67,6 +68,7 @@ struct DBImpl::CompactionState {
InternalKey smallest, largest; InternalKey smallest, largest;
}; };
std::vector<Output> outputs; std::vector<Output> outputs;
std::list<uint64_t> allocated_file_numbers;
// State kept for output being generated // State kept for output being generated
WritableFile* outfile; WritableFile* outfile;
@ -133,20 +135,19 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
db_lock_(NULL), db_lock_(NULL),
shutting_down_(NULL), shutting_down_(NULL),
bg_cv_(&mutex_), bg_cv_(&mutex_),
mem_(new MemTable(internal_comparator_)), mem_(new MemTable(internal_comparator_, NumberLevels())),
imm_(NULL),
logfile_(NULL), logfile_(NULL),
logfile_number_(0), logfile_number_(0),
log_(NULL), log_(NULL),
tmp_batch_(new WriteBatch), tmp_batch_(new WriteBatch),
bg_compaction_scheduled_(false), bg_compaction_scheduled_(0),
bg_logstats_scheduled_(false), bg_logstats_scheduled_(false),
manual_compaction_(NULL), manual_compaction_(NULL),
logger_(NULL), logger_(NULL),
disable_delete_obsolete_files_(false), disable_delete_obsolete_files_(false),
delete_obsolete_files_last_run_(0) { delete_obsolete_files_last_run_(0),
delayed_writes_(0) {
mem_->Ref(); mem_->Ref();
has_imm_.Release_Store(NULL);
env_->GetAbsolutePath(dbname, &db_absolute_path_); env_->GetAbsolutePath(dbname, &db_absolute_path_);
stats_ = new CompactionStats[options.num_levels]; stats_ = new CompactionStats[options.num_levels];
@ -190,7 +191,7 @@ DBImpl::~DBImpl() {
delete versions_; delete versions_;
if (mem_ != NULL) mem_->Unref(); if (mem_ != NULL) mem_->Unref();
if (imm_ != NULL) imm_->Unref(); imm_.UnrefAll();
delete tmp_batch_; delete tmp_batch_;
delete log_; delete log_;
delete logfile_; delete logfile_;
@ -476,7 +477,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
WriteBatchInternal::SetContents(&batch, record); WriteBatchInternal::SetContents(&batch, record);
if (mem == NULL) { if (mem == NULL) {
mem = new MemTable(internal_comparator_); mem = new MemTable(internal_comparator_, NumberLevels());
mem->Ref(); mem->Ref();
} }
status = WriteBatchInternal::InsertInto(&batch, mem); status = WriteBatchInternal::InsertInto(&batch, mem);
@ -492,7 +493,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
} }
if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) { if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
status = WriteLevel0Table(mem, edit, NULL); status = WriteLevel0TableForRecovery(mem, edit);
if (!status.ok()) { if (!status.ok()) {
// Reflect errors immediately so that conditions like full // Reflect errors immediately so that conditions like full
// file-systems cause the DB::Open() to fail. // file-systems cause the DB::Open() to fail.
@ -504,7 +505,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
} }
if (status.ok() && mem != NULL) { if (status.ok() && mem != NULL) {
status = WriteLevel0Table(mem, edit, NULL); status = WriteLevel0TableForRecovery(mem, edit);
// Reflect errors immediately so that conditions like full // Reflect errors immediately so that conditions like full
// file-systems cause the DB::Open() to fail. // file-systems cause the DB::Open() to fail.
} }
@ -514,8 +515,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
return status; return status;
} }
Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, Status DBImpl::WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit) {
Version* base) {
mutex_.AssertHeld(); mutex_.AssertHeld();
const uint64_t start_micros = env_->NowMicros(); const uint64_t start_micros = env_->NowMicros();
FileMetaData meta; FileMetaData meta;
@ -537,8 +537,8 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
(unsigned long long) meta.file_size, (unsigned long long) meta.file_size,
s.ToString().c_str()); s.ToString().c_str());
delete iter; delete iter;
pending_outputs_.erase(meta.number);
pending_outputs_.erase(meta.number);
// Note that if file_size is zero, the file has been deleted and // Note that if file_size is zero, the file has been deleted and
// should not be added to the manifest. // should not be added to the manifest.
@ -546,9 +546,6 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
if (s.ok() && meta.file_size > 0) { if (s.ok() && meta.file_size > 0) {
const Slice min_user_key = meta.smallest.user_key(); const Slice min_user_key = meta.smallest.user_key();
const Slice max_user_key = meta.largest.user_key(); const Slice max_user_key = meta.largest.user_key();
if (base != NULL) {
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
}
edit->AddFile(level, meta.number, meta.file_size, edit->AddFile(level, meta.number, meta.file_size,
meta.smallest, meta.largest); meta.smallest, meta.largest);
} }
@ -560,37 +557,113 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
return s; return s;
} }
Status DBImpl::CompactMemTable() {
Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
uint64_t* filenumber) {
mutex_.AssertHeld(); mutex_.AssertHeld();
assert(imm_ != NULL); const uint64_t start_micros = env_->NowMicros();
FileMetaData meta;
meta.number = versions_->NewFileNumber();
*filenumber = meta.number;
pending_outputs_.insert(meta.number);
Iterator* iter = mem->NewIterator();
Log(options_.info_log, "Level-0 flush table #%llu: started",
(unsigned long long) meta.number);
// Save the contents of the memtable as a new Table
VersionEdit edit(NumberLevels());
Version* base = versions_->current(); Version* base = versions_->current();
base->Ref(); base->Ref();
Status s = WriteLevel0Table(imm_, &edit, base); Status s;
{
mutex_.Unlock();
s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
mutex_.Lock();
}
base->Unref(); base->Unref();
Log(options_.info_log, "Level-0 flush table #%llu: %lld bytes %s",
(unsigned long long) meta.number,
(unsigned long long) meta.file_size,
s.ToString().c_str());
delete iter;
// re-acquire the most current version
base = versions_->current();
// There could be multiple threads writing to its own level-0 file.
// The pending_outputs cannot be cleared here, otherwise this newly
// created file might not be considered as a live-file by another
// compaction thread that is concurrently deleting obselete files.
// The pending_outputs can be cleared only after the new version is
// committed so that other threads can recognize this file as a
// valid one.
// pending_outputs_.erase(meta.number);
// Note that if file_size is zero, the file has been deleted and
// should not be added to the manifest.
int level = 0;
if (s.ok() && meta.file_size > 0) {
const Slice min_user_key = meta.smallest.user_key();
const Slice max_user_key = meta.largest.user_key();
// if we have more than 1 background thread, then we cannot
// insert files directly into higher levels because some other
// threads could be concurrently producing compacted files for
// that key range.
if (base != NULL && options_.max_background_compactions <= 1) {
level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
}
edit->AddFile(level, meta.number, meta.file_size,
meta.smallest, meta.largest);
}
CompactionStats stats;
stats.micros = env_->NowMicros() - start_micros;
stats.bytes_written = meta.file_size;
stats_[level].Add(stats);
return s;
}
Status DBImpl::CompactMemTable(bool* madeProgress) {
mutex_.AssertHeld();
assert(imm_.size() != 0);
if (!imm_.IsFlushPending()) {
Log(options_.info_log, "Memcompaction already in progress");
Status s = Status::IOError("Memcompaction already in progress");
return s;
}
// Save the contents of the earliest memtable as a new Table
// This will release and re-acquire the mutex.
uint64_t file_number;
MemTable* m = imm_.PickMemtableToFlush();
if (m == NULL) {
Log(options_.info_log, "Nothing in memstore to flush");
Status s = Status::IOError("Nothing in memstore to flush");
return s;
}
// record the logfile_number_ before we release the mutex
VersionEdit* edit = m->GetEdits();
edit->SetPrevLogNumber(0);
edit->SetLogNumber(logfile_number_); // Earlier logs no longer needed
Status s = WriteLevel0Table(m, edit, &file_number);
if (s.ok() && shutting_down_.Acquire_Load()) { if (s.ok() && shutting_down_.Acquire_Load()) {
s = Status::IOError("Deleting DB during memtable compaction"); s = Status::IOError("Deleting DB during memtable compaction");
} }
// Replace immutable memtable with the generated Table // Replace immutable memtable with the generated Table
if (s.ok()) { s = imm_.InstallMemtableFlushResults(m, versions_, s, &mutex_,
edit.SetPrevLogNumber(0); options_.info_log, file_number, pending_outputs_);
edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed
s = versions_->LogAndApply(&edit, &mutex_);
}
if (s.ok()) { if (s.ok()) {
// Commit to the new state if (madeProgress) {
imm_->Unref(); *madeProgress = 1;
imm_ = NULL; }
has_imm_.Release_Store(NULL);
DeleteObsoleteFiles(); DeleteObsoleteFiles();
MaybeScheduleLogDBDeployStats(); MaybeScheduleLogDBDeployStats();
} }
return s; return s;
} }
@ -636,6 +709,7 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) {
ManualCompaction manual; ManualCompaction manual;
manual.level = level; manual.level = level;
manual.done = false; manual.done = false;
manual.in_progress = false;
if (begin == NULL) { if (begin == NULL) {
manual.begin = NULL; manual.begin = NULL;
} else { } else {
@ -650,16 +724,46 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) {
} }
MutexLock l(&mutex_); MutexLock l(&mutex_);
// When a manual compaction arrives, temporarily throttle down
// the number of background compaction threads to 1. This is
// needed to ensure that this manual compaction can compact
// any range of keys/files. We artificialy increase
// bg_compaction_scheduled_ by a large number, this causes
// the system to have a single background thread. Now,
// this manual compaction can progress without stomping
// on any other concurrent compactions.
const int LargeNumber = 10000000;
const int newvalue = options_.max_background_compactions-1;
bg_compaction_scheduled_ += LargeNumber;
while (bg_compaction_scheduled_ > LargeNumber) {
Log(options_.info_log, "Manual compaction request waiting for background threads to fall below 1");
bg_cv_.Wait();
}
Log(options_.info_log, "Manual compaction starting");
while (!manual.done) { while (!manual.done) {
while (manual_compaction_ != NULL) { while (manual_compaction_ != NULL) {
bg_cv_.Wait(); bg_cv_.Wait();
} }
manual_compaction_ = &manual; manual_compaction_ = &manual;
if (bg_compaction_scheduled_ == LargeNumber) {
bg_compaction_scheduled_ = newvalue;
}
MaybeScheduleCompaction(); MaybeScheduleCompaction();
while (manual_compaction_ == &manual) { while (manual_compaction_ == &manual) {
bg_cv_.Wait(); bg_cv_.Wait();
} }
} }
assert(!manual.in_progress);
// wait till there are no background threads scheduled
bg_compaction_scheduled_ += LargeNumber;
while (bg_compaction_scheduled_ > LargeNumber + newvalue) {
Log(options_.info_log, "Manual compaction resetting background threads");
bg_cv_.Wait();
}
bg_compaction_scheduled_ = 0;
} }
Status DBImpl::FlushMemTable(const FlushOptions& options) { Status DBImpl::FlushMemTable(const FlushOptions& options) {
@ -676,10 +780,10 @@ Status DBImpl::WaitForCompactMemTable() {
Status s; Status s;
// Wait until the compaction completes // Wait until the compaction completes
MutexLock l(&mutex_); MutexLock l(&mutex_);
while (imm_ != NULL && bg_error_.ok()) { while (imm_.size() > 0 && bg_error_.ok()) {
bg_cv_.Wait(); bg_cv_.Wait();
} }
if (imm_ != NULL) { if (imm_.size() != 0) {
s = bg_error_; s = bg_error_;
} }
return s; return s;
@ -704,16 +808,16 @@ Status DBImpl::TEST_WaitForCompact() {
void DBImpl::MaybeScheduleCompaction() { void DBImpl::MaybeScheduleCompaction() {
mutex_.AssertHeld(); mutex_.AssertHeld();
if (bg_compaction_scheduled_) { if (bg_compaction_scheduled_ >= options_.max_background_compactions) {
// Already scheduled // Already scheduled
} else if (shutting_down_.Acquire_Load()) { } else if (shutting_down_.Acquire_Load()) {
// DB is being deleted; no more background compactions // DB is being deleted; no more background compactions
} else if (imm_ == NULL && } else if (!imm_.IsFlushPending() &&
manual_compaction_ == NULL && manual_compaction_ == NULL &&
!versions_->NeedsCompaction()) { !versions_->NeedsCompaction()) {
// No work to be done // No work to be done
} else { } else {
bg_compaction_scheduled_ = true; bg_compaction_scheduled_++;
env_->Schedule(&DBImpl::BGWork, this); env_->Schedule(&DBImpl::BGWork, this);
} }
} }
@ -723,10 +827,12 @@ void DBImpl::BGWork(void* db) {
} }
void DBImpl::BackgroundCall() { void DBImpl::BackgroundCall() {
bool madeProgress;
MutexLock l(&mutex_); MutexLock l(&mutex_);
// Log(options_.info_log, "XXX BG Thread %llx process new work item", pthread_self());
assert(bg_compaction_scheduled_); assert(bg_compaction_scheduled_);
if (!shutting_down_.Acquire_Load()) { if (!shutting_down_.Acquire_Load()) {
Status s = BackgroundCompaction(); Status s = BackgroundCompaction(&madeProgress);
if (!s.ok()) { if (!s.ok()) {
// Wait a little bit before retrying background compaction in // Wait a little bit before retrying background compaction in
// case this is an environmental problem and we do not want to // case this is an environmental problem and we do not want to
@ -741,28 +847,41 @@ void DBImpl::BackgroundCall() {
} }
} }
bg_compaction_scheduled_ = false; bg_compaction_scheduled_--;
MaybeScheduleLogDBDeployStats(); MaybeScheduleLogDBDeployStats();
// Previous compaction may have produced too many files in a level, // Previous compaction may have produced too many files in a level,
// so reschedule another compaction if needed. // So reschedule another compaction if we made progress in the
MaybeScheduleCompaction(); // last compaction.
if (madeProgress) {
MaybeScheduleCompaction();
}
bg_cv_.SignalAll(); bg_cv_.SignalAll();
} }
Status DBImpl::BackgroundCompaction() { Status DBImpl::BackgroundCompaction(bool* madeProgress) {
*madeProgress = false;
mutex_.AssertHeld(); mutex_.AssertHeld();
if (imm_ != NULL) { while (imm_.IsFlushPending()) {
return CompactMemTable(); Log(options_.info_log,
"BackgroundCompaction doing CompactMemTable, compaction slots available %d",
options_.max_background_compactions - bg_compaction_scheduled_);
Status stat = CompactMemTable(madeProgress);
if (!stat.ok()) {
return stat;
}
} }
Compaction* c; Compaction* c;
bool is_manual = (manual_compaction_ != NULL); bool is_manual = (manual_compaction_ != NULL) &&
(manual_compaction_->in_progress == false);
InternalKey manual_end; InternalKey manual_end;
if (is_manual) { if (is_manual) {
ManualCompaction* m = manual_compaction_; ManualCompaction* m = manual_compaction_;
assert(!m->in_progress);
m->in_progress = true; // another thread cannot pick up the same work
c = versions_->CompactRange(m->level, m->begin, m->end); c = versions_->CompactRange(m->level, m->begin, m->end);
m->done = (c == NULL); m->done = (c == NULL);
if (c != NULL) { if (c != NULL) {
@ -781,6 +900,7 @@ Status DBImpl::BackgroundCompaction() {
Status status; Status status;
if (c == NULL) { if (c == NULL) {
// Nothing to do // Nothing to do
Log(options_.info_log, "Compaction nothing to do");
} else if (!is_manual && c->IsTrivialMove()) { } else if (!is_manual && c->IsTrivialMove()) {
// Move file to next level // Move file to next level
assert(c->num_input_files(0) == 1); assert(c->num_input_files(0) == 1);
@ -796,12 +916,16 @@ Status DBImpl::BackgroundCompaction() {
static_cast<unsigned long long>(f->file_size), static_cast<unsigned long long>(f->file_size),
status.ToString().c_str(), status.ToString().c_str(),
versions_->LevelSummary(&tmp)); versions_->LevelSummary(&tmp));
versions_->ReleaseCompactionFiles(c);
*madeProgress = true;
} else { } else {
CompactionState* compact = new CompactionState(c); CompactionState* compact = new CompactionState(c);
status = DoCompactionWork(compact); status = DoCompactionWork(compact);
CleanupCompaction(compact); CleanupCompaction(compact);
versions_->ReleaseCompactionFiles(c);
c->ReleaseInputs(); c->ReleaseInputs();
DeleteObsoleteFiles(); DeleteObsoleteFiles();
*madeProgress = true;
} }
delete c; delete c;
@ -828,6 +952,7 @@ Status DBImpl::BackgroundCompaction() {
m->tmp_storage = manual_end; m->tmp_storage = manual_end;
m->begin = &m->tmp_storage; m->begin = &m->tmp_storage;
} }
m->in_progress = false; // not being processed anymore
manual_compaction_ = NULL; manual_compaction_ = NULL;
} }
return status; return status;
@ -850,21 +975,54 @@ void DBImpl::CleanupCompaction(CompactionState* compact) {
delete compact; delete compact;
} }
// Allocate the file numbers for the output file. We allocate as
// many output file numbers as there are files in level+1.
// Insert them into pending_outputs so that they do not get deleted.
void DBImpl::AllocateCompactionOutputFileNumbers(CompactionState* compact) {
mutex_.AssertHeld();
assert(compact != NULL);
assert(compact->builder == NULL);
int filesNeeded = compact->compaction->num_input_files(1);
for (unsigned i = 0; i < filesNeeded; i++) {
uint64_t file_number = versions_->NewFileNumber();
pending_outputs_.insert(file_number);
compact->allocated_file_numbers.push_back(file_number);
}
}
// Frees up unused file number.
void DBImpl::ReleaseCompactionUnusedFileNumbers(CompactionState* compact) {
mutex_.AssertHeld();
for (std::list<uint64_t>::iterator it =
compact->allocated_file_numbers.begin();
it != compact->allocated_file_numbers.end(); ++it) {
uint64_t file_number = *it;
pending_outputs_.erase(file_number);
// Log(options_.info_log, "XXX releasing unused file num %d", file_number);
}
}
Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
assert(compact != NULL); assert(compact != NULL);
assert(compact->builder == NULL); assert(compact->builder == NULL);
uint64_t file_number; uint64_t file_number;
{ // If we have not yet exhausted the pre-allocated file numbers,
// then use the one from the front. Otherwise, we have to acquire
// the heavyweight lock and allocate a new file number.
if (!compact->allocated_file_numbers.empty()) {
file_number = compact->allocated_file_numbers.front();
compact->allocated_file_numbers.pop_front();
} else {
mutex_.Lock(); mutex_.Lock();
file_number = versions_->NewFileNumber(); file_number = versions_->NewFileNumber();
pending_outputs_.insert(file_number); pending_outputs_.insert(file_number);
CompactionState::Output out;
out.number = file_number;
out.smallest.Clear();
out.largest.Clear();
compact->outputs.push_back(out);
mutex_.Unlock(); mutex_.Unlock();
} }
CompactionState::Output out;
out.number = file_number;
out.smallest.Clear();
out.largest.Clear();
compact->outputs.push_back(out);
// Make the output file // Make the output file
std::string fname = TableFileName(dbname_, file_number); std::string fname = TableFileName(dbname_, file_number);
@ -933,6 +1091,21 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact,
Status DBImpl::InstallCompactionResults(CompactionState* compact) { Status DBImpl::InstallCompactionResults(CompactionState* compact) {
mutex_.AssertHeld(); mutex_.AssertHeld();
// paranoia: verify that the files that we started with
// still exist in the current version and in the same original level.
// This ensures that a concurrent compaction did not erroneously
// pick the same files to compact.
if (options_.paranoid_checks &&
!versions_->VerifyCompactionFileConsistency(compact->compaction)) {
Log(options_.info_log, "Compaction %d@%d + %d@%d files aborted",
compact->compaction->num_input_files(0),
compact->compaction->level(),
compact->compaction->num_input_files(1),
compact->compaction->level() + 1);
return Status::IOError("Compaction input files inconsistent");
}
Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes", Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes",
compact->compaction->num_input_files(0), compact->compaction->num_input_files(0),
compact->compaction->level(), compact->compaction->level(),
@ -953,14 +1126,15 @@ Status DBImpl::InstallCompactionResults(CompactionState* compact) {
} }
Status DBImpl::DoCompactionWork(CompactionState* compact) { Status DBImpl::DoCompactionWork(CompactionState* compact) {
const uint64_t start_micros = env_->NowMicros();
int64_t imm_micros = 0; // Micros spent doing imm_ compactions int64_t imm_micros = 0; // Micros spent doing imm_ compactions
Log(options_.info_log, "Compacting %d@%d + %d@%d files", Log(options_.info_log,
"Compacting %d@%d + %d@%d files, compaction slots available %d",
compact->compaction->num_input_files(0), compact->compaction->num_input_files(0),
compact->compaction->level(), compact->compaction->level(),
compact->compaction->num_input_files(1), compact->compaction->num_input_files(1),
compact->compaction->level() + 1); compact->compaction->level() + 1,
options_.max_background_compactions - bg_compaction_scheduled_);
char scratch[256]; char scratch[256];
compact->compaction->Summary(scratch, sizeof(scratch)); compact->compaction->Summary(scratch, sizeof(scratch));
Log(options_.info_log, "Compaction start summary: %s\n", scratch); Log(options_.info_log, "Compaction start summary: %s\n", scratch);
@ -974,9 +1148,13 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
compact->smallest_snapshot = snapshots_.oldest()->number_; compact->smallest_snapshot = snapshots_.oldest()->number_;
} }
// Allocate the output file numbers before we release the lock
AllocateCompactionOutputFileNumbers(compact);
// Release mutex while we're actually doing the compaction work // Release mutex while we're actually doing the compaction work
mutex_.Unlock(); mutex_.Unlock();
const uint64_t start_micros = env_->NowMicros();
Iterator* input = versions_->MakeInputIterator(compact->compaction); Iterator* input = versions_->MakeInputIterator(compact->compaction);
input->SeekToFirst(); input->SeekToFirst();
Status status; Status status;
@ -986,10 +1164,10 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
SequenceNumber last_sequence_for_key = kMaxSequenceNumber; SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
for (; input->Valid() && !shutting_down_.Acquire_Load(); ) { for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
// Prioritize immutable compaction work // Prioritize immutable compaction work
if (has_imm_.NoBarrier_Load() != NULL) { if (imm_.imm_flush_needed.NoBarrier_Load() != NULL) {
const uint64_t imm_start = env_->NowMicros(); const uint64_t imm_start = env_->NowMicros();
mutex_.Lock(); mutex_.Lock();
if (imm_ != NULL) { if (imm_.IsFlushPending()) {
CompactMemTable(); CompactMemTable();
bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary bg_cv_.SignalAll(); // Wakeup MakeRoomForWrite() if necessary
} }
@ -1101,15 +1279,23 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
stats.bytes_written += compact->outputs[i].file_size; stats.bytes_written += compact->outputs[i].file_size;
} }
int MBpersec = ((stats.bytes_read + stats.bytes_written))
/stats.micros;
mutex_.Lock(); mutex_.Lock();
stats_[compact->compaction->level() + 1].Add(stats); stats_[compact->compaction->level() + 1].Add(stats);
// if there were any unused file number (mostly in case of
// compaction error), free up the entry from pending_putputs
ReleaseCompactionUnusedFileNumbers(compact);
if (status.ok()) { if (status.ok()) {
status = InstallCompactionResults(compact); status = InstallCompactionResults(compact);
} }
VersionSet::LevelSummaryStorage tmp; VersionSet::LevelSummaryStorage tmp;
Log(options_.info_log, Log(options_.info_log,
"compacted to: %s", versions_->LevelSummary(&tmp)); "compacted to: %s %d MBytes/sec", versions_->LevelSummary(&tmp),
MBpersec);
return status; return status;
} }
@ -1117,15 +1303,15 @@ namespace {
struct IterState { struct IterState {
port::Mutex* mu; port::Mutex* mu;
Version* version; Version* version;
MemTable* mem; std::vector<MemTable*> mem; // includes both mem_ and imm_
MemTable* imm;
}; };
static void CleanupIteratorState(void* arg1, void* arg2) { static void CleanupIteratorState(void* arg1, void* arg2) {
IterState* state = reinterpret_cast<IterState*>(arg1); IterState* state = reinterpret_cast<IterState*>(arg1);
state->mu->Lock(); state->mu->Lock();
state->mem->Unref(); for (unsigned int i = 0; i < state->mem.size(); i++) {
if (state->imm != NULL) state->imm->Unref(); state->mem[i]->Unref();
}
state->version->Unref(); state->version->Unref();
state->mu->Unlock(); state->mu->Unlock();
delete state; delete state;
@ -1138,22 +1324,29 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
mutex_.Lock(); mutex_.Lock();
*latest_snapshot = versions_->LastSequence(); *latest_snapshot = versions_->LastSequence();
// Collect together all needed child iterators // Collect together all needed child iterators for mem
std::vector<Iterator*> list; std::vector<Iterator*> list;
list.push_back(mem_->NewIterator());
mem_->Ref(); mem_->Ref();
if (imm_ != NULL) { list.push_back(mem_->NewIterator());
list.push_back(imm_->NewIterator()); cleanup->mem.push_back(mem_);
imm_->Ref();
// Collect together all needed child iterators for imm_
std::vector<MemTable*> immutables;
imm_.GetMemTables(&immutables);
for (unsigned int i = 0; i < immutables.size(); i++) {
MemTable* m = immutables[i];
m->Ref();
list.push_back(m->NewIterator());
cleanup->mem.push_back(m);
} }
// Collect iterators for files in L0 - Ln
versions_->current()->AddIterators(options, &list); versions_->current()->AddIterators(options, &list);
Iterator* internal_iter = Iterator* internal_iter =
NewMergingIterator(&internal_comparator_, &list[0], list.size()); NewMergingIterator(&internal_comparator_, &list[0], list.size());
versions_->current()->Ref(); versions_->current()->Ref();
cleanup->mu = &mutex_; cleanup->mu = &mutex_;
cleanup->mem = mem_;
cleanup->imm = imm_;
cleanup->version = versions_->current(); cleanup->version = versions_->current();
internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, NULL); internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, NULL);
@ -1184,10 +1377,10 @@ Status DBImpl::Get(const ReadOptions& options,
} }
MemTable* mem = mem_; MemTable* mem = mem_;
MemTable* imm = imm_; MemTableList imm = imm_;
Version* current = versions_->current(); Version* current = versions_->current();
mem->Ref(); mem->Ref();
if (imm != NULL) imm->Ref(); imm.RefAll();
current->Ref(); current->Ref();
bool have_stat_update = false; bool have_stat_update = false;
@ -1200,7 +1393,7 @@ Status DBImpl::Get(const ReadOptions& options,
LookupKey lkey(key, snapshot); LookupKey lkey(key, snapshot);
if (mem->Get(lkey, value, &s)) { if (mem->Get(lkey, value, &s)) {
// Done // Done
} else if (imm != NULL && imm->Get(lkey, value, &s)) { } else if (imm.Get(lkey, value, &s)) {
// Done // Done
} else { } else {
s = current->Get(options, lkey, value, &stats); s = current->Get(options, lkey, value, &stats);
@ -1214,7 +1407,7 @@ Status DBImpl::Get(const ReadOptions& options,
MaybeScheduleCompaction(); MaybeScheduleCompaction();
} }
mem->Unref(); mem->Unref();
if (imm != NULL) imm->Unref(); imm.UnrefAll();
current->Unref(); current->Unref();
return s; return s;
} }
@ -1399,24 +1592,30 @@ Status DBImpl::MakeRoomForWrite(bool force) {
mutex_.Unlock(); mutex_.Unlock();
env_->SleepForMicroseconds(1000); env_->SleepForMicroseconds(1000);
allow_delay = false; // Do not delay a single write more than once allow_delay = false; // Do not delay a single write more than once
Log(options_.info_log, "delaying write...\n");
mutex_.Lock(); mutex_.Lock();
delayed_writes_++;
} else if (!force && } else if (!force &&
(mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) { (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
// There is room in current memtable // There is room in current memtable
if (allow_delay) {
DelayLoggingAndReset();
}
break; break;
} else if (imm_ != NULL) { } else if (imm_.size() == options_.max_write_buffer_number - 1) {
// We have filled up the current memtable, but the previous // We have filled up the current memtable, but the previous
// one is still being compacted, so we wait. // ones are still being compacted, so we wait.
DelayLoggingAndReset();
Log(options_.info_log, "wait for memtable compaction...\n"); Log(options_.info_log, "wait for memtable compaction...\n");
bg_cv_.Wait(); bg_cv_.Wait();
} else if (versions_->NumLevelFiles(0) >= } else if (versions_->NumLevelFiles(0) >=
options_.level0_stop_writes_trigger) { options_.level0_stop_writes_trigger) {
// There are too many level-0 files. // There are too many level-0 files.
Log(options_.info_log, "waiting...\n"); DelayLoggingAndReset();
Log(options_.info_log, "wait for fewer level0 files...\n");
bg_cv_.Wait(); bg_cv_.Wait();
} else { } else {
// Attempt to switch to a new memtable and trigger compaction of old // Attempt to switch to a new memtable and trigger compaction of old
DelayLoggingAndReset();
assert(versions_->PrevLogNumber() == 0); assert(versions_->PrevLogNumber() == 0);
uint64_t new_log_number = versions_->NewFileNumber(); uint64_t new_log_number = versions_->NewFileNumber();
WritableFile* lfile = NULL; WritableFile* lfile = NULL;
@ -1431,9 +1630,8 @@ Status DBImpl::MakeRoomForWrite(bool force) {
logfile_ = lfile; logfile_ = lfile;
logfile_number_ = new_log_number; logfile_number_ = new_log_number;
log_ = new log::Writer(lfile); log_ = new log::Writer(lfile);
imm_ = mem_; imm_.Add(mem_);
has_imm_.Release_Store(imm_); mem_ = new MemTable(internal_comparator_, NumberLevels());
mem_ = new MemTable(internal_comparator_);
mem_->Ref(); mem_->Ref();
force = false; // Do not force another compaction if have room force = false; // Do not force another compaction if have room
MaybeScheduleCompaction(); MaybeScheduleCompaction();
@ -1522,6 +1720,13 @@ void DBImpl::GetApproximateSizes(
} }
} }
inline void DBImpl::DelayLoggingAndReset() {
if (delayed_writes_ > 0) {
Log(options_.info_log, "delayed %d write...\n", delayed_writes_ );
delayed_writes_ = 0;
}
}
// Default implementations of convenience methods that subclasses of DB // Default implementations of convenience methods that subclasses of DB
// can call if they wish // can call if they wish
Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) {

@ -14,6 +14,7 @@
#include "leveldb/env.h" #include "leveldb/env.h"
#include "port/port.h" #include "port/port.h"
#include "util/stats_logger.h" #include "util/stats_logger.h"
#include "memtablelist.h"
#ifdef USE_SCRIBE #ifdef USE_SCRIBE
#include "scribe/scribe_logger.h" #include "scribe/scribe_logger.h"
@ -99,13 +100,20 @@ class DBImpl : public DB {
// Compact the in-memory write buffer to disk. Switches to a new // Compact the in-memory write buffer to disk. Switches to a new
// log-file/memtable and writes a new descriptor iff successful. // log-file/memtable and writes a new descriptor iff successful.
Status CompactMemTable(); Status CompactMemTable(bool* madeProgress = NULL);
Status RecoverLogFile(uint64_t log_number, Status RecoverLogFile(uint64_t log_number,
VersionEdit* edit, VersionEdit* edit,
SequenceNumber* max_sequence); SequenceNumber* max_sequence);
Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base); // The following two methods are used to flush a memtable to
// storage. The first one is used atdatabase RecoveryTime (when the
// database is opened) and is heavyweight because it holds the mutex
// for the entire period. The second method WriteLevel0Table supports
// concurrent flush memtables to storage.
Status WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit);
Status WriteLevel0Table(MemTable* mem, VersionEdit* edit,
uint64_t* filenumber);
Status MakeRoomForWrite(bool force /* compact even if there is room? */); Status MakeRoomForWrite(bool force /* compact even if there is room? */);
WriteBatch* BuildBatchGroup(Writer** last_writer); WriteBatch* BuildBatchGroup(Writer** last_writer);
@ -123,13 +131,16 @@ class DBImpl : public DB {
void MaybeScheduleCompaction(); void MaybeScheduleCompaction();
static void BGWork(void* db); static void BGWork(void* db);
void BackgroundCall(); void BackgroundCall();
Status BackgroundCompaction(); Status BackgroundCompaction(bool* madeProgress);
void CleanupCompaction(CompactionState* compact); void CleanupCompaction(CompactionState* compact);
Status DoCompactionWork(CompactionState* compact); Status DoCompactionWork(CompactionState* compact);
Status OpenCompactionOutputFile(CompactionState* compact); Status OpenCompactionOutputFile(CompactionState* compact);
Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input); Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input);
Status InstallCompactionResults(CompactionState* compact); Status InstallCompactionResults(CompactionState* compact);
void AllocateCompactionOutputFileNumbers(CompactionState* compact);
void ReleaseCompactionUnusedFileNumbers(CompactionState* compact);
// Constant after construction // Constant after construction
Env* const env_; Env* const env_;
@ -151,8 +162,7 @@ class DBImpl : public DB {
port::AtomicPointer shutting_down_; port::AtomicPointer shutting_down_;
port::CondVar bg_cv_; // Signalled when background work finishes port::CondVar bg_cv_; // Signalled when background work finishes
MemTable* mem_; MemTable* mem_;
MemTable* imm_; // Memtable being compacted MemTableList imm_; // Memtable that are not changing
port::AtomicPointer has_imm_; // So bg thread can detect non-NULL imm_
WritableFile* logfile_; WritableFile* logfile_;
uint64_t logfile_number_; uint64_t logfile_number_;
log::Writer* log_; log::Writer* log_;
@ -169,8 +179,8 @@ class DBImpl : public DB {
// part of ongoing compactions. // part of ongoing compactions.
std::set<uint64_t> pending_outputs_; std::set<uint64_t> pending_outputs_;
// Has a background compaction been scheduled or is running? // count how many background compaction been scheduled or is running?
bool bg_compaction_scheduled_; int bg_compaction_scheduled_;
// Has a background stats log thread scheduled? // Has a background stats log thread scheduled?
bool bg_logstats_scheduled_; bool bg_logstats_scheduled_;
@ -179,6 +189,7 @@ class DBImpl : public DB {
struct ManualCompaction { struct ManualCompaction {
int level; int level;
bool done; bool done;
bool in_progress; // compaction request being processed?
const InternalKey* begin; // NULL means beginning of key range const InternalKey* begin; // NULL means beginning of key range
const InternalKey* end; // NULL means end of key range const InternalKey* end; // NULL means end of key range
InternalKey tmp_storage; // Used to keep track of compaction progress InternalKey tmp_storage; // Used to keep track of compaction progress
@ -220,6 +231,9 @@ class DBImpl : public DB {
static const int KEEP_LOG_FILE_NUM = 1000; static const int KEEP_LOG_FILE_NUM = 1000;
std::string db_absolute_path_; std::string db_absolute_path_;
// count of the number of contiguous delaying writes
int delayed_writes_;
// No copying allowed // No copying allowed
DBImpl(const DBImpl&); DBImpl(const DBImpl&);
void operator=(const DBImpl&); void operator=(const DBImpl&);
@ -227,6 +241,9 @@ class DBImpl : public DB {
const Comparator* user_comparator() const { const Comparator* user_comparator() const {
return internal_comparator_.user_comparator(); return internal_comparator_.user_comparator();
} }
// dump the delayed_writes_ to the log file and reset counter.
void DelayLoggingAndReset();
}; };
// Sanitize db options. The caller should delete result.info_log if // Sanitize db options. The caller should delete result.info_log if

@ -18,10 +18,14 @@ static Slice GetLengthPrefixedSlice(const char* data) {
return Slice(p, len); return Slice(p, len);
} }
MemTable::MemTable(const InternalKeyComparator& cmp) MemTable::MemTable(const InternalKeyComparator& cmp, int numlevel)
: comparator_(cmp), : comparator_(cmp),
refs_(0), refs_(0),
table_(comparator_, &arena_) { table_(comparator_, &arena_),
flush_in_progress_(false),
flush_completed_(false),
file_number_(0),
edit_(numlevel) {
} }
MemTable::~MemTable() { MemTable::~MemTable() {
@ -101,7 +105,7 @@ void MemTable::Add(SequenceNumber s, ValueType type,
p += 8; p += 8;
p = EncodeVarint32(p, val_size); p = EncodeVarint32(p, val_size);
memcpy(p, value.data(), val_size); memcpy(p, value.data(), val_size);
assert((p + val_size) - buf == encoded_len); assert((p + val_size) - buf == (unsigned)encoded_len);
table_.Insert(buf); table_.Insert(buf);
} }

@ -9,6 +9,7 @@
#include "leveldb/db.h" #include "leveldb/db.h"
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/skiplist.h" #include "db/skiplist.h"
#include "db/version_set.h"
#include "util/arena.h" #include "util/arena.h"
namespace leveldb { namespace leveldb {
@ -21,7 +22,8 @@ class MemTable {
public: public:
// MemTables are reference counted. The initial reference count // MemTables are reference counted. The initial reference count
// is zero and the caller must call Ref() at least once. // is zero and the caller must call Ref() at least once.
explicit MemTable(const InternalKeyComparator& comparator); explicit MemTable(const InternalKeyComparator& comparator,
int numlevel = 7);
// Increase reference count. // Increase reference count.
void Ref() { ++refs_; } void Ref() { ++refs_; }
@ -63,6 +65,9 @@ class MemTable {
// Else, return false. // Else, return false.
bool Get(const LookupKey& key, std::string* value, Status* s); bool Get(const LookupKey& key, std::string* value, Status* s);
// Returns the edits area that is needed for flushing the memtable
VersionEdit* GetEdits() { return &edit_; }
private: private:
~MemTable(); // Private since only Unref() should be used to delete it ~MemTable(); // Private since only Unref() should be used to delete it
@ -73,6 +78,7 @@ class MemTable {
}; };
friend class MemTableIterator; friend class MemTableIterator;
friend class MemTableBackwardIterator; friend class MemTableBackwardIterator;
friend class MemTableList;
typedef SkipList<const char*, KeyComparator> Table; typedef SkipList<const char*, KeyComparator> Table;
@ -81,6 +87,15 @@ class MemTable {
Arena arena_; Arena arena_;
Table table_; Table table_;
// These are used to manage memtable flushes to storage
bool flush_in_progress_; // started the flush
bool flush_completed_; // finished the flush
uint64_t file_number_; // filled up after flush is complete
// The udpates to be applied to the transaction log when this
// memtable is flushed to storage.
VersionEdit edit_;
// No copying allowed // No copying allowed
MemTable(const MemTable&); MemTable(const MemTable&);
void operator=(const MemTable&); void operator=(const MemTable&);

@ -0,0 +1,192 @@
// Copyright (c) 2012 Facebook.
#include <string>
#include "leveldb/db.h"
#include "db/memtable.h"
#include "db/memtablelist.h"
#include "leveldb/env.h"
#include "leveldb/iterator.h"
#include "util/coding.h"
namespace leveldb {
class InternalKeyComparator;
class Mutex;
class MemTableListIterator;
class VersionSet;
using std::list;
// Increase reference count on all underling memtables
void MemTableList::RefAll() {
for (list<MemTable*>::iterator it = memlist_.begin();
it != memlist_.end() ; ++it) {
(*it)->Ref();
}
}
// Drop reference count on all underling memtables
void MemTableList::UnrefAll() {
for (list<MemTable*>::iterator it = memlist_.begin();
it != memlist_.end() ; ++it) {
(*it)->Unref();
}
}
// Returns the total number of memtables in the list
int MemTableList::size() {
assert(num_flush_not_started_ <= size_);
return size_;
}
// Returns true if there is at least one memtable on which flush has
// not yet started.
bool MemTableList::IsFlushPending() {
if (num_flush_not_started_ > 0) {
assert(imm_flush_needed.NoBarrier_Load() != NULL);
return true;
}
return false;
}
// Returns the earliest memtable that needs to be flushed.
// Returns null, if no such memtable exist.
MemTable* MemTableList::PickMemtableToFlush() {
for (list<MemTable*>::reverse_iterator it = memlist_.rbegin();
it != memlist_.rend(); it++) {
MemTable* m = *it;
if (!m->flush_in_progress_) {
assert(!m->flush_completed_);
num_flush_not_started_--;
if (num_flush_not_started_ == 0) {
imm_flush_needed.Release_Store(NULL);
}
m->flush_in_progress_ = true; // flushing will start very soon
return m;
}
}
return NULL;
}
// Record a successful flush in the manifest file
Status MemTableList::InstallMemtableFlushResults(MemTable* m,
VersionSet* vset, Status flushStatus,
port::Mutex* mu, Logger* info_log,
uint64_t file_number,
std::set<uint64_t>& pending_outputs) {
mu->AssertHeld();
assert(m->flush_in_progress_);
assert(m->file_number_ == 0);
// If the flush was not successful, then just reset state.
// Maybe a suceeding attempt to flush will be successful.
if (!flushStatus.ok()) {
m->flush_in_progress_ = false;
m->flush_completed_ = false;
m->edit_.Clear();
num_flush_not_started_++;
imm_flush_needed.Release_Store((void *)1);
pending_outputs.erase(file_number);
return flushStatus;
}
// flush was sucessful
m->flush_completed_ = true;
m->file_number_ = file_number;
// if some other thread is already commiting, then return
Status s;
if (commit_in_progress_) {
return s;
}
// Only a single thread can be executing this piece of code
commit_in_progress_ = true;
// scan all memtables from the earliest, and commit those
// (in that order) that have finished flushing.
while (!memlist_.empty()) {
m = memlist_.back(); // get the last element
if (!m->flush_completed_) {
break;
}
Log(info_log,
"Level-0 commit table #%llu: started",
(unsigned long long)m->file_number_);
// this can release and reacquire the mutex.
s = vset->LogAndApply(&m->edit_, mu);
if (s.ok()) { // commit new state
Log(info_log, "Level-0 commit table #%llu: done",
(unsigned long long)m->file_number_);
memlist_.remove(m);
assert(m->file_number_ > 0);
// pending_outputs can be cleared only after the newly created file
// has been written to a committed version so that other concurrently
// executing compaction threads do not mistakenly assume that this
// file is not live.
pending_outputs.erase(m->file_number_);
m->Unref();
size_--;
} else {
//commit failed. setup state so that we can flush again.
Log(info_log, "Level-0 commit table #%llu: failed",
(unsigned long long)m->file_number_);
m->flush_completed_ = false;
m->flush_in_progress_ = false;
m->edit_.Clear();
num_flush_not_started_++;
pending_outputs.erase(m->file_number_);
m->file_number_ = 0;
imm_flush_needed.Release_Store((void *)1);
s = Status::IOError("Unable to commit flushed memtable");
break;
}
}
commit_in_progress_ = false;
return s;
}
// New memtables are inserted at the front of the list.
void MemTableList::Add(MemTable* m) {
assert(size_ >= num_flush_not_started_);
size_++;
memlist_.push_front(m);
num_flush_not_started_++;
if (num_flush_not_started_ == 1) {
imm_flush_needed.Release_Store((void *)1);
}
}
// Returns an estimate of the number of bytes of data in use.
size_t MemTableList::ApproximateMemoryUsage() {
size_t size = 0;
for (list<MemTable*>::iterator it = memlist_.begin();
it != memlist_.end(); ++it) {
size += (*it)->ApproximateMemoryUsage();
}
return size;
}
// Search all the memtables starting from the most recent one.
// Return the most recent value found, if any.
bool MemTableList::Get(const LookupKey& key, std::string* value, Status* s) {
for (list<MemTable*>::iterator it = memlist_.begin();
it != memlist_.end(); ++it) {
if ((*it)->Get(key, value, s)) {
return true;
}
}
return false;
}
void MemTableList::GetMemTables(std::vector<MemTable*>* output) {
for (list<MemTable*>::iterator it = memlist_.begin();
it != memlist_.end(); ++it) {
output->push_back(*it);
}
}
} // namespace leveldb

@ -0,0 +1,96 @@
// Copyright (c) 2012 Facebook.
#ifndef STORAGE_LEVELDB_DB_MEMTABLELIST_H_
#define STORAGE_LEVELDB_DB_MEMTABLELIST_H_
#include <string>
#include <list>
#include "leveldb/db.h"
#include "db/dbformat.h"
#include "db/skiplist.h"
#include "util/arena.h"
#include "memtable.h"
namespace leveldb {
class InternalKeyComparator;
class Mutex;
class MemTableListIterator;
//
// This class stores refeernces to all the immutable memtables.
// The memtables are flushed to L0 as soon as possible and in
// any order. If there are more than one immutable memtable, their
// flushes can occur concurrently. However, they are 'committed'
// to the manifest in FIFO order to maintain correctness and
// recoverability from a crash.
//
class MemTableList {
public:
// A list of memtables.
MemTableList() : size_(0), num_flush_not_started_(0),
commit_in_progress_(false) {
imm_flush_needed.Release_Store(NULL);
}
~MemTableList() {};
// so that backgrund threads can detect non-NULL pointer to
// determine whether this is anything more to start flushing.
port::AtomicPointer imm_flush_needed;
// Increase reference count on all underling memtables
void RefAll();
// Drop reference count on all underling memtables
void UnrefAll();
// Returns the total number of memtables in the list
int size();
// Returns true if there is at least one memtable on which flush has
// not yet started.
bool IsFlushPending();
// Returns the earliest memtable that needs to be flushed.
// Returns null, if no such memtable exist.
MemTable* PickMemtableToFlush();
// Commit a successful flush in the manifest file
Status InstallMemtableFlushResults(MemTable* m,
VersionSet* vset, Status flushStatus,
port::Mutex* mu, Logger* info_log,
uint64_t file_number,
std::set<uint64_t>& pending_outputs);
// New memtables are inserted at the front of the list.
void Add(MemTable* m);
// Returns an estimate of the number of bytes of data in use.
size_t ApproximateMemoryUsage();
// Search all the memtables starting from the most recent one.
// Return the most recent value found, if any.
bool Get(const LookupKey& key, std::string* value, Status* s);
// Returns the list of underlying memtables.
void GetMemTables(std::vector<MemTable*>* list);
// Copying allowed
// MemTableList(const MemTableList&);
// void operator=(const MemTableList&);
private:
std::list<MemTable*> memlist_;
int size_;
// the number of elements that still need flushing
int num_flush_not_started_;
// committing in progress
bool commit_in_progress_;
};
} // namespace leveldb
#endif // STORAGE_LEVELDB_DB_MEMTABLELIST_H_

@ -199,7 +199,7 @@ class Repairer {
std::string scratch; std::string scratch;
Slice record; Slice record;
WriteBatch batch; WriteBatch batch;
MemTable* mem = new MemTable(icmp_); MemTable* mem = new MemTable(icmp_, options_.num_levels);
mem->Ref(); mem->Ref();
int counter = 0; int counter = 0;
while (reader.ReadRecord(&record, &scratch)) { while (reader.ReadRecord(&record, &scratch)) {

@ -23,6 +23,10 @@
// more lists. // more lists.
// //
// ... prev vs. next pointer ordering ... // ... prev vs. next pointer ordering ...
//
#ifndef STORAGE_LEVELDB_DB_SKIPLIST_H_
#define STORAGE_LEVELDB_DB_SKIPLIST_H_
#include <assert.h> #include <assert.h>
#include <stdlib.h> #include <stdlib.h>
@ -390,3 +394,5 @@ bool SkipList<Key,Comparator>::Contains(const Key& key) const {
} }
} // namespace leveldb } // namespace leveldb
#endif // STORAGE_LEVELDB_DB_SKIPLIST_H_

@ -21,8 +21,10 @@ struct FileMetaData {
uint64_t file_size; // File size in bytes uint64_t file_size; // File size in bytes
InternalKey smallest; // Smallest internal key served by table InternalKey smallest; // Smallest internal key served by table
InternalKey largest; // Largest internal key served by table InternalKey largest; // Largest internal key served by table
bool being_compacted; // Is this file undergoing compaction?
FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0) { } FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0),
being_compacted(false) { }
}; };
class VersionEdit { class VersionEdit {

@ -266,13 +266,14 @@ static bool NewestFirst(FileMetaData* a, FileMetaData* b) {
return a->number > b->number; return a->number > b->number;
} }
Version::Version(VersionSet* vset) Version::Version(VersionSet* vset, uint64_t version_number)
: vset_(vset), next_(this), prev_(this), refs_(0), : vset_(vset), next_(this), prev_(this), refs_(0),
file_to_compact_(NULL), file_to_compact_(NULL),
file_to_compact_level_(-1), file_to_compact_level_(-1),
compaction_score_(-1), compaction_score_(vset->NumberLevels()),
compaction_level_(-1), compaction_level_(vset->NumberLevels()),
offset_manifest_file_(0) { offset_manifest_file_(0),
version_number_(version_number) {
files_ = new std::vector<FileMetaData*>[vset->NumberLevels()]; files_ = new std::vector<FileMetaData*>[vset->NumberLevels()];
} }
@ -503,6 +504,8 @@ std::string Version::DebugString() const {
// 20:43['e' .. 'g'] // 20:43['e' .. 'g']
r.append("--- level "); r.append("--- level ");
AppendNumberTo(&r, level); AppendNumberTo(&r, level);
r.append(" --- version# ");
AppendNumberTo(&r, version_number_);
r.append(" ---\n"); r.append(" ---\n");
const std::vector<FileMetaData*>& files = files_[level]; const std::vector<FileMetaData*>& files = files_[level];
for (size_t i = 0; i < files.size(); i++) { for (size_t i = 0; i < files.size(); i++) {
@ -520,6 +523,17 @@ std::string Version::DebugString() const {
return r; return r;
} }
// this is used to batch writes to the manifest file
struct VersionSet::ManifestWriter {
Status status;
bool done;
port::CondVar cv;
VersionEdit* edit;
explicit ManifestWriter(port::Mutex* mu, VersionEdit* e) :
done(false), cv(mu), edit(e) {}
};
// A helper class so we can efficiently apply a whole sequence // A helper class so we can efficiently apply a whole sequence
// of edits to a particular state without creating intermediate // of edits to a particular state without creating intermediate
// Versions that contain full copies of the intermediate state. // Versions that contain full copies of the intermediate state.
@ -586,8 +600,30 @@ class VersionSet::Builder {
base_->Unref(); base_->Unref();
} }
void CheckConsistency(Version* v) {
#ifndef NDEBUG
for (int level = 0; level < vset_->NumberLevels(); level++) {
// Make sure there is no overlap in levels > 0
if (level > 0) {
for (uint32_t i = 1; i < v->files_[level].size(); i++) {
const InternalKey& prev_end = v->files_[level][i-1]->largest;
const InternalKey& this_begin = v->files_[level][i]->smallest;
if (vset_->icmp_.Compare(prev_end, this_begin) >= 0) {
fprintf(stderr, "overlapping ranges in same level %s vs. %s\n",
prev_end.DebugString().c_str(),
this_begin.DebugString().c_str());
abort();
}
}
}
}
#endif
}
// Apply all of the edits in *edit to the current state. // Apply all of the edits in *edit to the current state.
void Apply(VersionEdit* edit) { void Apply(VersionEdit* edit) {
CheckConsistency(base_);
// Update compaction pointers // Update compaction pointers
for (size_t i = 0; i < edit->compact_pointers_.size(); i++) { for (size_t i = 0; i < edit->compact_pointers_.size(); i++) {
const int level = edit->compact_pointers_[i].first; const int level = edit->compact_pointers_[i].first;
@ -603,6 +639,18 @@ class VersionSet::Builder {
const int level = iter->first; const int level = iter->first;
const uint64_t number = iter->second; const uint64_t number = iter->second;
levels_[level].deleted_files.insert(number); levels_[level].deleted_files.insert(number);
#ifndef NDEBUG
// none of the files to be deleted could have been added
// by a concurrent compaction process
const FileSet* added = levels_[level].added_files;
for (FileSet::const_iterator added_iter = added->begin();
added_iter != added->end();
++added_iter) {
FileMetaData* f = *added_iter;
assert(f->number != number);
}
#endif
} }
// Add new files // Add new files
@ -634,6 +682,8 @@ class VersionSet::Builder {
// Save the current state in *v. // Save the current state in *v.
void SaveTo(Version* v) { void SaveTo(Version* v) {
CheckConsistency(base_);
CheckConsistency(v);
BySmallestKey cmp; BySmallestKey cmp;
cmp.internal_comparator = &vset_->icmp_; cmp.internal_comparator = &vset_->icmp_;
for (int level = 0; level < vset_->NumberLevels(); level++) { for (int level = 0; level < vset_->NumberLevels(); level++) {
@ -662,22 +712,7 @@ class VersionSet::Builder {
for (; base_iter != base_end; ++base_iter) { for (; base_iter != base_end; ++base_iter) {
MaybeAddFile(v, level, *base_iter); MaybeAddFile(v, level, *base_iter);
} }
CheckConsistency(v);
#ifndef NDEBUG
// Make sure there is no overlap in levels > 0
if (level > 0) {
for (uint32_t i = 1; i < v->files_[level].size(); i++) {
const InternalKey& prev_end = v->files_[level][i-1]->largest;
const InternalKey& this_begin = v->files_[level][i]->smallest;
if (vset_->icmp_.Compare(prev_end, this_begin) >= 0) {
fprintf(stderr, "overlapping ranges in same level %s vs. %s\n",
prev_end.DebugString().c_str(),
this_begin.DebugString().c_str());
abort();
}
}
}
#endif
} }
} }
@ -714,7 +749,9 @@ VersionSet::VersionSet(const std::string& dbname,
descriptor_file_(NULL), descriptor_file_(NULL),
descriptor_log_(NULL), descriptor_log_(NULL),
dummy_versions_(this), dummy_versions_(this),
current_(NULL) { current_(NULL),
compactions_in_progress_(options_->num_levels),
current_version_number_(0) {
compact_pointer_ = new std::string[options_->num_levels]; compact_pointer_ = new std::string[options_->num_levels];
max_file_size_ = new uint64_t[options_->num_levels]; max_file_size_ = new uint64_t[options_->num_levels];
level_max_bytes_ = new uint64_t[options->num_levels]; level_max_bytes_ = new uint64_t[options->num_levels];
@ -729,7 +766,7 @@ VersionSet::VersionSet(const std::string& dbname,
level_max_bytes_[i] = options_->max_bytes_for_level_base; level_max_bytes_[i] = options_->max_bytes_for_level_base;
} }
} }
AppendVersion(new Version(this)); AppendVersion(new Version(this, current_version_number_++));
} }
VersionSet::~VersionSet() { VersionSet::~VersionSet() {
@ -747,6 +784,7 @@ void VersionSet::AppendVersion(Version* v) {
assert(v->refs_ == 0); assert(v->refs_ == 0);
assert(v != current_); assert(v != current_);
if (current_ != NULL) { if (current_ != NULL) {
assert(current_->refs_ > 0);
current_->Unref(); current_->Unref();
} }
current_ = v; current_ = v;
@ -760,27 +798,35 @@ void VersionSet::AppendVersion(Version* v) {
} }
Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) { Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
if (edit->has_log_number_) { mu->AssertHeld();
assert(edit->log_number_ >= log_number_);
assert(edit->log_number_ < next_file_number_);
} else {
edit->SetLogNumber(log_number_);
}
if (!edit->has_prev_log_number_) { // queue our request
edit->SetPrevLogNumber(prev_log_number_); ManifestWriter w(mu, edit);
manifest_writers_.push_back(&w);
while (!w.done && &w != manifest_writers_.front()) {
w.cv.Wait();
} }
if (w.done) {
manifest_lock_.Unlock();
return w.status;
}
std::vector<VersionEdit*> batch_edits;
Version* v = new Version(this, current_version_number_++);
Builder builder(this, current_);
edit->SetNextFile(next_file_number_); // process all requests in the queue
edit->SetLastSequence(last_sequence_); ManifestWriter* last_writer = &w;
ManifestWriter* first = manifest_writers_.front();
Version* v = new Version(this); assert(!manifest_writers_.empty());
{ assert(first == &w);
Builder builder(this, current_); std::deque<ManifestWriter*>::iterator iter = manifest_writers_.begin();
builder.Apply(edit); for (; iter != manifest_writers_.end(); ++iter) {
builder.SaveTo(v); last_writer = *iter;
LogAndApplyHelper(&builder, v, last_writer->edit, mu);
batch_edits.push_back(last_writer->edit);
} }
Finalize(v); builder.SaveTo(v);
// Initialize new descriptor log file if necessary by creating // Initialize new descriptor log file if necessary by creating
// a temporary file that contains a snapshot of the current version. // a temporary file that contains a snapshot of the current version.
@ -800,15 +846,22 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
} }
} }
// Unlock during expensive MANIFEST log write // Unlock during expensive MANIFEST log write. New writes cannot get here
// because &w is ensuring that all new writes get queued.
{ {
mu->Unlock(); mu->Unlock();
Finalize(v);
// Write new record to MANIFEST log // Write new record to MANIFEST log
if (s.ok()) { if (s.ok()) {
std::string record; std::string record;
edit->EncodeTo(&record); for (unsigned int i = 0; i < batch_edits.size(); i++) {
s = descriptor_log_->AddRecord(record); batch_edits[i]->EncodeTo(&record);
s = descriptor_log_->AddRecord(record);
if (!s.ok()) {
break;
}
}
if (s.ok()) { if (s.ok()) {
if (options_->use_fsync) { if (options_->use_fsync) {
s = descriptor_file_->Fsync(); s = descriptor_file_->Fsync();
@ -826,7 +879,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
// find offset in manifest file where this version is stored. // find offset in manifest file where this version is stored.
new_manifest_file_size = descriptor_file_->GetFileSize(); new_manifest_file_size = descriptor_file_->GetFileSize();
mu->Lock(); mu->Lock();
} }
@ -836,7 +889,10 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
AppendVersion(v); AppendVersion(v);
log_number_ = edit->log_number_; log_number_ = edit->log_number_;
prev_log_number_ = edit->prev_log_number_; prev_log_number_ = edit->prev_log_number_;
} else { } else {
Log(options_->info_log, "Error in committing version %d",
v->GetVersionNumber());
delete v; delete v;
if (!new_manifest_file.empty()) { if (!new_manifest_file.empty()) {
delete descriptor_log_; delete descriptor_log_;
@ -847,9 +903,46 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
} }
} }
// wake up all the waiting writers
while (true) {
ManifestWriter* ready = manifest_writers_.front();
manifest_writers_.pop_front();
if (ready != &w) {
ready->status = s;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer) break;
}
// Notify new head of write queue
if (!manifest_writers_.empty()) {
manifest_writers_.front()->cv.Signal();
}
return s; return s;
} }
void VersionSet::LogAndApplyHelper(Builder* builder, Version* v,
VersionEdit* edit, port::Mutex* mu) {
manifest_lock_.AssertHeld();
mu->AssertHeld();
if (edit->has_log_number_) {
assert(edit->log_number_ >= log_number_);
assert(edit->log_number_ < next_file_number_);
} else {
edit->SetLogNumber(log_number_);
}
if (!edit->has_prev_log_number_) {
edit->SetPrevLogNumber(prev_log_number_);
}
edit->SetNextFile(next_file_number_);
edit->SetLastSequence(last_sequence_);
builder->Apply(edit);
}
Status VersionSet::Recover() { Status VersionSet::Recover() {
struct LogReporter : public log::Reader::Reporter { struct LogReporter : public log::Reader::Reporter {
Status* status; Status* status;
@ -958,7 +1051,7 @@ Status VersionSet::Recover() {
} }
if (s.ok()) { if (s.ok()) {
Version* v = new Version(this); Version* v = new Version(this, current_version_number_++);
builder.SaveTo(v); builder.SaveTo(v);
// Install recovered version // Install recovered version
Finalize(v); Finalize(v);
@ -1073,7 +1166,7 @@ Status VersionSet::DumpManifest(Options& options, std::string& dscname) {
} }
if (s.ok()) { if (s.ok()) {
Version* v = new Version(this); Version* v = new Version(this, 0);
builder.SaveTo(v); builder.SaveTo(v);
// Install recovered version // Install recovered version
Finalize(v); Finalize(v);
@ -1101,9 +1194,6 @@ void VersionSet::MarkFileNumberUsed(uint64_t number) {
} }
void VersionSet::Finalize(Version* v) { void VersionSet::Finalize(Version* v) {
// Precomputed best level for next compaction
int best_level = -1;
double best_score = -1;
for (int level = 0; level < NumberLevels()-1; level++) { for (int level = 0; level < NumberLevels()-1; level++) {
double score; double score;
@ -1119,22 +1209,54 @@ void VersionSet::Finalize(Version* v) {
// file size is small (perhaps because of a small write-buffer // file size is small (perhaps because of a small write-buffer
// setting, or very high compression ratios, or lots of // setting, or very high compression ratios, or lots of
// overwrites/deletions). // overwrites/deletions).
score = v->files_[level].size() / int numfiles = 0;
for (unsigned int i = 0; i < v->files_[level].size(); i++) {
if (!v->files_[level][i]->being_compacted) {
numfiles++;
}
}
// If we are slowing down writes, then we better compact that first
if (numfiles >= options_->level0_stop_writes_trigger) {
score = 1000000;
// Log(options_->info_log, "XXX score l0 = 1000000000 max");
} else if (numfiles >= options_->level0_slowdown_writes_trigger) {
score = 10000;
// Log(options_->info_log, "XXX score l0 = 1000000 medium");
} else {
score = numfiles /
static_cast<double>(options_->level0_file_num_compaction_trigger); static_cast<double>(options_->level0_file_num_compaction_trigger);
if (score >= 1) {
// Log(options_->info_log, "XXX score l0 = %d least", (int)score);
}
}
} else { } else {
// Compute the ratio of current size to size limit. // Compute the ratio of current size to size limit.
const uint64_t level_bytes = TotalFileSize(v->files_[level]); const uint64_t level_bytes = TotalFileSize(v->files_[level]) -
SizeBeingCompacted(level);
score = static_cast<double>(level_bytes) / MaxBytesForLevel(level); score = static_cast<double>(level_bytes) / MaxBytesForLevel(level);
if (score > 1) {
// Log(options_->info_log, "XXX score l%d = %d ", level, (int)score);
}
} }
v->compaction_level_[level] = level;
if (score > best_score) { v->compaction_score_[level] = score;
best_level = level; }
best_score = score;
// sort all the levels based on their score. Higher scores get listed
// first. Use bubble sort because the number of entries are small.
for(int i = 0; i < NumberLevels()-2; i++) {
for (int j = i+1; j < NumberLevels()-2; j++) {
if (v->compaction_score_[i] < v->compaction_score_[j]) {
double score = v->compaction_score_[i];
int level = v->compaction_level_[i];
v->compaction_score_[i] = v->compaction_score_[j];
v->compaction_level_[i] = v->compaction_level_[j];
v->compaction_score_[j] = score;
v->compaction_level_[j] = level;
}
} }
} }
v->compaction_level_ = best_level;
v->compaction_score_ = best_score;
} }
Status VersionSet::WriteSnapshot(log::Writer* log) { Status VersionSet::WriteSnapshot(log::Writer* log) {
@ -1379,40 +1501,162 @@ int64_t VersionSet::MaxGrandParentOverlapBytes(int level) {
return result; return result;
} }
// verify that the files listed in this compaction are present
// in the current version
bool VersionSet::VerifyCompactionFileConsistency(Compaction* c) {
if (c->input_version_ != current_) {
Log(options_->info_log, "VerifyCompactionFileConsistency version mismatch");
}
// verify files in level
int level = c->level();
for (int i = 0; i < c->num_input_files(0); i++) {
uint64_t number = c->input(0,i)->number;
// look for this file in the current version
bool found = false;
for (unsigned int j = 0; j < current_->files_[level].size(); j++) {
FileMetaData* f = current_->files_[level][j];
if (f->number == number) {
found = true;
break;
}
}
if (!found) {
return false; // input files non existant in current version
}
}
// verify level+1 files
level++;
for (int i = 0; i < c->num_input_files(1); i++) {
uint64_t number = c->input(1,i)->number;
// look for this file in the current version
bool found = false;
for (unsigned int j = 0; j < current_->files_[level].size(); j++) {
FileMetaData* f = current_->files_[level][j];
if (f->number == number) {
found = true;
break;
}
}
if (!found) {
return false; // input files non existant in current version
}
}
return true; // everything good
}
// Clear all files to indicate that they are not being compacted
// Delete this compaction from the list of running compactions.
void VersionSet::ReleaseCompactionFiles(Compaction* c) {
c->MarkFilesBeingCompacted(false);
compactions_in_progress_[c->level()].erase(c);
}
// The total size of files that are currently being compacted
uint64_t VersionSet::SizeBeingCompacted(int level) {
uint64_t total = 0;
for (std::set<Compaction*>::iterator it =
compactions_in_progress_[level].begin();
it != compactions_in_progress_[level].end();
++it) {
Compaction* c = (*it);
assert(c->level() == level);
for (int i = 0; i < c->num_input_files(0); i++) {
total += c->input(0,i)->file_size;
}
}
return total;
}
Compaction* VersionSet::PickCompactionBySize(int level) {
Compaction* c = NULL;
// level 0 files are overlapping. So we cannot pick more
// than one concurrent compactions at this level. This
// cold be made better by looking at key-ranges that are
// being compacted at level 0.
if (level == 0 && compactions_in_progress_[level].size() == 1) {
return NULL;
}
assert(level >= 0);
assert(level+1 < NumberLevels());
c = new Compaction(level, MaxFileSizeForLevel(level),
MaxGrandParentOverlapBytes(level), NumberLevels());
// remember the first file that is not being compacted
int firstIndex = 0;
// Pick the first file that comes after compact_pointer_[level]
for (size_t i = 0; i < current_->files_[level].size(); i++) {
FileMetaData* f = current_->files_[level][i];
// do not pick a file to compact if it is being compacted
// from n-1 level.
if (f->being_compacted) {
continue;
}
firstIndex = i;
// Pick a file that has a key-range larger than the range
// we picked in the previous call to PickCompaction.
if (compact_pointer_[level].empty() ||
icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {
// Do not pick this file if its parents at level+1 are being compacted.
// Maybe we can avoid redoing this work in SetupOtherInputs
if (ParentFilesInCompaction(f, level)) {
continue;
}
c->inputs_[0].push_back(f);
break;
}
}
if (c->inputs_[0].empty()) {
// Wrap-around to the beginning of the key space
FileMetaData* f = current_->files_[level][firstIndex];
if (!f->being_compacted && !ParentFilesInCompaction(f, level)) {
c->inputs_[0].push_back(f);
}
}
if (c->inputs_[0].empty()) {
delete c;
c = NULL;
}
return c;
}
Compaction* VersionSet::PickCompaction() { Compaction* VersionSet::PickCompaction() {
Compaction* c; Compaction* c = NULL;
int level; int level;
// compute the compactions needed. It is better to do it here
// and also in LogAndApply(), otherwise the values could be stale.
Finalize(current_);
// We prefer compactions triggered by too much data in a level over // We prefer compactions triggered by too much data in a level over
// the compactions triggered by seeks. // the compactions triggered by seeks.
const bool size_compaction = (current_->compaction_score_ >= 1); //
const bool seek_compaction = (current_->file_to_compact_ != NULL); // Find the compactions by size on all levels.
if (size_compaction) { for (int i = 0; i < NumberLevels()-1; i++) {
level = current_->compaction_level_; level = current_->compaction_level_[i];
assert(level >= 0); if ((current_->compaction_score_[i] >= 1)) {
assert(level+1 < NumberLevels()); c = PickCompactionBySize(level);
c = new Compaction(level, MaxFileSizeForLevel(level), if (c != NULL) {
MaxGrandParentOverlapBytes(level), NumberLevels());
// Pick the first file that comes after compact_pointer_[level]
for (size_t i = 0; i < current_->files_[level].size(); i++) {
FileMetaData* f = current_->files_[level][i];
if (compact_pointer_[level].empty() ||
icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {
c->inputs_[0].push_back(f);
break; break;
} }
} }
if (c->inputs_[0].empty()) { }
// Wrap-around to the beginning of the key space
c->inputs_[0].push_back(current_->files_[level][0]); // Find compactions needed by seeks
} if (c == NULL && (current_->file_to_compact_ != NULL)) {
} else if (seek_compaction) {
level = current_->file_to_compact_level_; level = current_->file_to_compact_level_;
c = new Compaction(level, MaxFileSizeForLevel(level), c = new Compaction(level, MaxFileSizeForLevel(level),
MaxGrandParentOverlapBytes(level), NumberLevels()); MaxGrandParentOverlapBytes(level), NumberLevels());
c->inputs_[0].push_back(current_->file_to_compact_); c->inputs_[0].push_back(current_->file_to_compact_);
} else { }
if (c == NULL) {
return NULL; return NULL;
} }
@ -1426,15 +1670,46 @@ Compaction* VersionSet::PickCompaction() {
// Note that the next call will discard the file we placed in // Note that the next call will discard the file we placed in
// c->inputs_[0] earlier and replace it with an overlapping set // c->inputs_[0] earlier and replace it with an overlapping set
// which will include the picked file. // which will include the picked file.
current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]); c->inputs_[0].clear();
std::vector<FileMetaData*> more;
current_->GetOverlappingInputs(0, &smallest, &largest, &more);
for (unsigned int i = 0; i < more.size(); i++) {
FileMetaData* f = more[i];
if (!f->being_compacted && !ParentFilesInCompaction(f, level)) {
c->inputs_[0].push_back(f);
}
}
assert(!c->inputs_[0].empty()); assert(!c->inputs_[0].empty());
} }
SetupOtherInputs(c); SetupOtherInputs(c);
// mark all the files that are being compacted
c->MarkFilesBeingCompacted(true);
// remember this currently undergoing compaction
compactions_in_progress_[level].insert(c);
return c; return c;
} }
// Returns true if any one of the parent files are being compacted
bool VersionSet::ParentFilesInCompaction(FileMetaData* f, int level) {
std::vector<FileMetaData*> inputs;
current_->GetOverlappingInputs(level+1, &f->smallest, &f->largest, &inputs);
return FilesInCompaction(inputs);
}
// Returns true if any one of specified files are being compacted
bool VersionSet::FilesInCompaction(std::vector<FileMetaData*>& files) {
for (unsigned int i = 0; i < files.size(); i++) {
if (files[i]->being_compacted) {
return true;
}
}
return false;
}
void VersionSet::SetupOtherInputs(Compaction* c) { void VersionSet::SetupOtherInputs(Compaction* c) {
const int level = c->level(); const int level = c->level();
InternalKey smallest, largest; InternalKey smallest, largest;
@ -1456,13 +1731,15 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
const int64_t expanded0_size = TotalFileSize(expanded0); const int64_t expanded0_size = TotalFileSize(expanded0);
int64_t limit = ExpandedCompactionByteSizeLimit(level); int64_t limit = ExpandedCompactionByteSizeLimit(level);
if (expanded0.size() > c->inputs_[0].size() && if (expanded0.size() > c->inputs_[0].size() &&
inputs1_size + expanded0_size < limit) { inputs1_size + expanded0_size < limit &&
!FilesInCompaction(expanded0)) {
InternalKey new_start, new_limit; InternalKey new_start, new_limit;
GetRange(expanded0, &new_start, &new_limit); GetRange(expanded0, &new_start, &new_limit);
std::vector<FileMetaData*> expanded1; std::vector<FileMetaData*> expanded1;
current_->GetOverlappingInputs(level+1, &new_start, &new_limit, current_->GetOverlappingInputs(level+1, &new_start, &new_limit,
&expanded1); &expanded1);
if (expanded1.size() == c->inputs_[1].size()) { if (expanded1.size() == c->inputs_[1].size() &&
!FilesInCompaction(expanded1)) {
Log(options_->info_log, Log(options_->info_log,
"Expanding@%d %d+%d (%ld+%ld bytes) to %d+%d (%ld+%ld bytes)\n", "Expanding@%d %d+%d (%ld+%ld bytes) to %d+%d (%ld+%ld bytes)\n",
level, level,
@ -1531,6 +1808,11 @@ Compaction* VersionSet::CompactRange(
c->input_version_->Ref(); c->input_version_->Ref();
c->inputs_[0] = inputs; c->inputs_[0] = inputs;
SetupOtherInputs(c); SetupOtherInputs(c);
// These files that are to be manaully compacted do not trample
// upon other files because manual compactions are processed when
// the system has a max of 1 background compaction thread.
c->MarkFilesBeingCompacted(true);
return c; return c;
} }
@ -1619,6 +1901,18 @@ bool Compaction::ShouldStopBefore(const Slice& internal_key) {
} }
} }
// Mark (or clear) each file that is being compacted
void Compaction::MarkFilesBeingCompacted(bool value) {
for (int i = 0; i < 2; i++) {
std::vector<FileMetaData*> v = inputs_[i];
for (unsigned int j = 0; j < inputs_[i].size(); j++) {
assert(value ? !inputs_[i][j]->being_compacted :
inputs_[i][j]->being_compacted);
inputs_[i][j]->being_compacted = value;
}
}
}
void Compaction::ReleaseInputs() { void Compaction::ReleaseInputs() {
if (input_version_ != NULL) { if (input_version_ != NULL) {
input_version_->Unref(); input_version_->Unref();
@ -1642,7 +1936,8 @@ static void InputSummary(std::vector<FileMetaData*>& files,
} }
void Compaction::Summary(char* output, int len) { void Compaction::Summary(char* output, int len) {
int write = snprintf(output, len, "Base level %d, inputs:", level_); int write = snprintf(output, len, "Base version %ld Base level %d, inputs:",
input_version_->GetVersionNumber(), level_);
if(write < 0 || write > len) if(write < 0 || write > len)
return; return;

@ -18,6 +18,7 @@
#include <map> #include <map>
#include <set> #include <set>
#include <vector> #include <vector>
#include <deque>
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/version_edit.h" #include "db/version_edit.h"
#include "port/port.h" #include "port/port.h"
@ -106,6 +107,11 @@ class Version {
// Return a human readable string that describes this version's contents. // Return a human readable string that describes this version's contents.
std::string DebugString() const; std::string DebugString() const;
// Returns the version nuber of this version
uint64_t GetVersionNumber() {
return version_number_;
}
private: private:
friend class Compaction; friend class Compaction;
friend class VersionSet; friend class VersionSet;
@ -128,13 +134,19 @@ class Version {
// Level that should be compacted next and its compaction score. // Level that should be compacted next and its compaction score.
// Score < 1 means compaction is not strictly needed. These fields // Score < 1 means compaction is not strictly needed. These fields
// are initialized by Finalize(). // are initialized by Finalize().
double compaction_score_; // The most critical level to be compacted is listed first
int compaction_level_; // These are used to pick the best compaction level
std::vector<double> compaction_score_;
std::vector<int> compaction_level_;
// The offset in the manifest file where this version is stored. // The offset in the manifest file where this version is stored.
uint64_t offset_manifest_file_; uint64_t offset_manifest_file_;
explicit Version(VersionSet* vset); // A version number that uniquely represents this version. This is
// used for debugging and logging purposes only.
uint64_t version_number_;
explicit Version(VersionSet* vset, uint64_t version_number = 0);
~Version(); ~Version();
@ -229,10 +241,20 @@ class VersionSet {
// The caller should delete the iterator when no longer needed. // The caller should delete the iterator when no longer needed.
Iterator* MakeInputIterator(Compaction* c); Iterator* MakeInputIterator(Compaction* c);
// Returns true iff some level needs a compaction because it has
// exceeded its target size.
bool NeedsSizeCompaction() const {
for (int i = 0; i < NumberLevels()-1; i++) {
if (current_->compaction_score_[i] >= 1) {
return true;
}
}
return false;
}
// Returns true iff some level needs a compaction. // Returns true iff some level needs a compaction.
bool NeedsCompaction() const { bool NeedsCompaction() const {
Version* v = current_; return ((current_->file_to_compact_ != NULL) ||
return (v->compaction_score_ >= 1) || (v->file_to_compact_ != NULL); NeedsSizeCompaction());
} }
// Add all files listed in any live version to *live. // Add all files listed in any live version to *live.
@ -263,8 +285,22 @@ class VersionSet {
// Return the size of the current manifest file // Return the size of the current manifest file
const uint64_t ManifestFileSize() { return current_->offset_manifest_file_; } const uint64_t ManifestFileSize() { return current_->offset_manifest_file_; }
// For the specfied level, pick a compaction.
// Returns NULL if there is no compaction to be done.
Compaction* PickCompactionBySize(int level);
// Free up the files that were participated in a compaction
void ReleaseCompactionFiles(Compaction* c);
// verify that the files that we started with for a compaction
// still exist in the current version and in the same original level.
// This ensures that a concurrent compaction did not erroneously
// pick the same files to compact.
bool VerifyCompactionFileConsistency(Compaction* c);
private: private:
class Builder; class Builder;
struct ManifestWriter;
friend class Compaction; friend class Compaction;
friend class Version; friend class Version;
@ -322,9 +358,34 @@ class VersionSet {
// Per-level max bytes // Per-level max bytes
uint64_t* level_max_bytes_; uint64_t* level_max_bytes_;
// record all the ongoing compactions for all levels
std::vector<std::set<Compaction*> > compactions_in_progress_;
// A lock that serialize writes to the manifest
port::Mutex manifest_lock_;
// generates a increasing version number for every new version
uint64_t current_version_number_;
// Queue of writers to the manifest file
std::deque<ManifestWriter*> manifest_writers_;
// No copying allowed // No copying allowed
VersionSet(const VersionSet&); VersionSet(const VersionSet&);
void operator=(const VersionSet&); void operator=(const VersionSet&);
// Return the total amount of data that is undergoing
// compactions at this level
uint64_t SizeBeingCompacted(int level);
// Returns true if any one of the parent files are being compacted
bool ParentFilesInCompaction(FileMetaData* f, int level);
// Returns true if any one of the specified files are being compacted
bool FilesInCompaction(std::vector<FileMetaData*>& files);
void LogAndApplyHelper(Builder*b, Version* v,
VersionEdit* edit, port::Mutex* mu);
}; };
// A Compaction encapsulates information about a compaction. // A Compaction encapsulates information about a compaction.
@ -403,6 +464,9 @@ class Compaction {
// higher level than the ones involved in this compaction (i.e. for // higher level than the ones involved in this compaction (i.e. for
// all L >= level_ + 2). // all L >= level_ + 2).
size_t* level_ptrs_; size_t* level_ptrs_;
// mark (or clear) all files that are being compacted
void MarkFilesBeingCompacted(bool);
}; };
} // namespace leveldb } // namespace leveldb

@ -139,6 +139,10 @@ class HdfsEnv : public Env {
posixEnv->SetBackgroundThreads(number); posixEnv->SetBackgroundThreads(number);
} }
virtual std::string TimeToString(uint64_t number) {
return posixEnv->TimeToString(number);
}
static uint64_t gettid() { static uint64_t gettid() {
assert(sizeof(pthread_t) <= sizeof(uint64_t)); assert(sizeof(pthread_t) <= sizeof(uint64_t));
return (uint64_t)pthread_self(); return (uint64_t)pthread_self();
@ -273,6 +277,8 @@ class HdfsEnv : public Env {
std::string* outputpath) {return notsup;} std::string* outputpath) {return notsup;}
virtual void SetBackgroundThreads(int number) {} virtual void SetBackgroundThreads(int number) {}
virtual std::string TimeToString(uint64_t number) { return "";}
}; };
} }

@ -159,6 +159,9 @@ class Env {
// default: 1 // default: 1
virtual void SetBackgroundThreads(int number) = 0; virtual void SetBackgroundThreads(int number) = 0;
// Converts seconds-since-Jan-01-1970 to a printable string
virtual std::string TimeToString(uint64_t time) = 0;
private: private:
// No copying allowed // No copying allowed
Env(const Env&); Env(const Env&);
@ -358,6 +361,9 @@ class EnvWrapper : public Env {
void SetBackgroundThreads(int num) { void SetBackgroundThreads(int num) {
return target_->SetBackgroundThreads(num); return target_->SetBackgroundThreads(num);
} }
std::string TimeToString(uint64_t time) {
return target_->TimeToString(time);
}
private: private:
Env* target_; Env* target_;

@ -78,7 +78,8 @@ struct Options {
// on disk) before converting to a sorted on-disk file. // on disk) before converting to a sorted on-disk file.
// //
// Larger values increase performance, especially during bulk loads. // Larger values increase performance, especially during bulk loads.
// Up to two write buffers may be held in memory at the same time, // Up to max_write_buffer_number write buffers may be held in memory
// at the same time,
// so you may wish to adjust this parameter to control memory usage. // so you may wish to adjust this parameter to control memory usage.
// Also, a larger write buffer will result in a longer recovery time // Also, a larger write buffer will result in a longer recovery time
// the next time the database is opened. // the next time the database is opened.
@ -86,6 +87,12 @@ struct Options {
// Default: 4MB // Default: 4MB
size_t write_buffer_size; size_t write_buffer_size;
// The maximum number of write buffers that are built up in memory.
// The default is 2, so that when 1 write buffer is being flushed to
// storage, new writes can continue to the other write buffer.
// Default: 2
int max_write_buffer_number;
// Number of open files that can be used by the DB. You may need to // Number of open files that can be used by the DB. You may need to
// increase this if your database has a large working set (budget // increase this if your database has a large working set (budget
// one open file per 2MB of working set). // one open file per 2MB of working set).
@ -244,6 +251,10 @@ struct Options {
// value is 0 which means that obsolete files get removed after // value is 0 which means that obsolete files get removed after
// every compaction run. // every compaction run.
uint64_t delete_obsolete_files_period_micros; uint64_t delete_obsolete_files_period_micros;
// Maximum number of concurrent background compactions.
// Default: 1
int max_background_compactions;
// Create an Options object with default values for all fields. // Create an Options object with default values for all fields.
Options(); Options();

@ -46,6 +46,16 @@ static bool FLAGS_verbose = false;
// (initialized to default value by "main") // (initialized to default value by "main")
static int FLAGS_write_buffer_size = 0; static int FLAGS_write_buffer_size = 0;
// The number of in-memory memtables.
// Each memtable is of size FLAGS_write_buffer_size.
// This is initialized to default value of 2 in "main" function.
static int FLAGS_max_write_buffer_number = 0;
// The maximum number of concurrent background compactions
// that can occur in parallel.
// This is initialized to default value of 1 in "main" function.
static int FLAGS_max_background_compactions = 0;
// Number of bytes to use as a cache of uncompressed data. // Number of bytes to use as a cache of uncompressed data.
static long FLAGS_cache_size = 2 * KB * KB * KB; static long FLAGS_cache_size = 2 * KB * KB * KB;
@ -104,6 +114,11 @@ static int FLAGS_readwritepercent = 10;
// Option to disable compation triggered by read. // Option to disable compation triggered by read.
static int FLAGS_disable_seek_compaction = false; static int FLAGS_disable_seek_compaction = false;
// Option to delete obsolete files periodically
// Default: 0 which means that obsolete files are
// deleted after every compaction run.
static uint64_t FLAGS_delete_obsolete_files_period_micros = 0;
// Algorithm to use to compress the database // Algorithm to use to compress the database
static enum leveldb::CompressionType FLAGS_compression_type = static enum leveldb::CompressionType FLAGS_compression_type =
leveldb::kSnappyCompression; leveldb::kSnappyCompression;
@ -626,6 +641,8 @@ class StressTest {
Options options; Options options;
options.block_cache = cache_; options.block_cache = cache_;
options.write_buffer_size = FLAGS_write_buffer_size; options.write_buffer_size = FLAGS_write_buffer_size;
options.max_write_buffer_number = FLAGS_max_write_buffer_number;
options.max_background_compactions = FLAGS_max_background_compactions;
options.block_size = FLAGS_block_size; options.block_size = FLAGS_block_size;
options.filter_policy = filter_policy_; options.filter_policy = filter_policy_;
options.max_open_files = FLAGS_open_files; options.max_open_files = FLAGS_open_files;
@ -644,6 +661,8 @@ class StressTest {
options.compression = FLAGS_compression_type; options.compression = FLAGS_compression_type;
options.create_if_missing = true; options.create_if_missing = true;
options.disable_seek_compaction = FLAGS_disable_seek_compaction; options.disable_seek_compaction = FLAGS_disable_seek_compaction;
options.delete_obsolete_files_period_micros =
FLAGS_delete_obsolete_files_period_micros;
Status s = DB::Open(options, FLAGS_db, &db_); Status s = DB::Open(options, FLAGS_db, &db_);
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "open error: %s\n", s.ToString().c_str()); fprintf(stderr, "open error: %s\n", s.ToString().c_str());
@ -670,7 +689,10 @@ class StressTest {
int main(int argc, char** argv) { int main(int argc, char** argv) {
FLAGS_write_buffer_size = leveldb::Options().write_buffer_size; FLAGS_write_buffer_size = leveldb::Options().write_buffer_size;
FLAGS_max_write_buffer_number = leveldb::Options().max_write_buffer_number;
FLAGS_open_files = leveldb::Options().max_open_files; FLAGS_open_files = leveldb::Options().max_open_files;
FLAGS_max_background_compactions =
leveldb::Options().max_background_compactions;
// Compression test code above refers to FLAGS_block_size // Compression test code above refers to FLAGS_block_size
FLAGS_block_size = leveldb::Options().block_size; FLAGS_block_size = leveldb::Options().block_size;
std::string default_db_path; std::string default_db_path;
@ -706,6 +728,10 @@ int main(int argc, char** argv) {
FLAGS_value_size_mult = n; FLAGS_value_size_mult = n;
} else if (sscanf(argv[i], "--write_buffer_size=%d%c", &n, &junk) == 1) { } else if (sscanf(argv[i], "--write_buffer_size=%d%c", &n, &junk) == 1) {
FLAGS_write_buffer_size = n; FLAGS_write_buffer_size = n;
} else if (sscanf(argv[i], "--max_write_buffer_number=%d%c", &n, &junk) == 1) {
FLAGS_max_write_buffer_number = n;
} else if (sscanf(argv[i], "--max_background_compactions=%d%c", &n, &junk) == 1) {
FLAGS_max_background_compactions = n;
} else if (sscanf(argv[i], "--cache_size=%ld%c", &l, &junk) == 1) { } else if (sscanf(argv[i], "--cache_size=%ld%c", &l, &junk) == 1) {
FLAGS_cache_size = l; FLAGS_cache_size = l;
} else if (sscanf(argv[i], "--block_size=%d%c", &n, &junk) == 1) { } else if (sscanf(argv[i], "--block_size=%d%c", &n, &junk) == 1) {
@ -784,6 +810,9 @@ int main(int argc, char** argv) {
} else if (sscanf(argv[i], "--disable_seek_compaction=%d%c", &n, &junk) == 1 } else if (sscanf(argv[i], "--disable_seek_compaction=%d%c", &n, &junk) == 1
&& (n == 0 || n == 1)) { && (n == 0 || n == 1)) {
FLAGS_disable_seek_compaction = n; FLAGS_disable_seek_compaction = n;
} else if (sscanf(argv[i], "--delete_obsolete_files_period_micros=%ld%c",
&l, &junk) == 1) {
FLAGS_delete_obsolete_files_period_micros = n;
} else { } else {
fprintf(stderr, "Invalid flag '%s'\n", argv[i]); fprintf(stderr, "Invalid flag '%s'\n", argv[i]);
exit(1); exit(1);

@ -749,6 +749,26 @@ class PosixEnv : public Env {
} }
} }
virtual std::string TimeToString(uint64_t secondsSince1970) {
const time_t seconds = (time_t)secondsSince1970;
struct tm t;
int maxsize = 64;
std::string dummy;
dummy.reserve(maxsize);
dummy.resize(maxsize);
char* p = &dummy[0];
localtime_r(&seconds, &t);
snprintf(p, maxsize,
"%04d/%02d/%02d-%02d:%02d:%02d ",
t.tm_year + 1900,
t.tm_mon + 1,
t.tm_mday,
t.tm_hour,
t.tm_min,
t.tm_sec);
return dummy;
}
private: private:
void PthreadCall(const char* label, int result) { void PthreadCall(const char* label, int result) {
if (result != 0) { if (result != 0) {

@ -19,6 +19,7 @@ Options::Options()
env(Env::Default()), env(Env::Default()),
info_log(NULL), info_log(NULL),
write_buffer_size(4<<20), write_buffer_size(4<<20),
max_write_buffer_number(2),
max_open_files(1000), max_open_files(1000),
block_cache(NULL), block_cache(NULL),
block_size(4096), block_size(4096),
@ -42,32 +43,34 @@ Options::Options()
db_stats_log_interval(1800), db_stats_log_interval(1800),
db_log_dir(""), db_log_dir(""),
disable_seek_compaction(false), disable_seek_compaction(false),
delete_obsolete_files_period_micros(0) { delete_obsolete_files_period_micros(0),
max_background_compactions(1) {
} }
void void
Options::Dump( Options::Dump(
Logger * log) const Logger * log) const
{ {
Log(log," Options.comparator: %s", comparator->Name()); Log(log," Options.comparator: %s", comparator->Name());
Log(log," Options.create_if_missing: %d", create_if_missing); Log(log," Options.create_if_missing: %d", create_if_missing);
Log(log," Options.error_if_exists: %d", error_if_exists); Log(log," Options.error_if_exists: %d", error_if_exists);
Log(log," Options.paranoid_checks: %d", paranoid_checks); Log(log," Options.paranoid_checks: %d", paranoid_checks);
Log(log," Options.env: %p", env); Log(log," Options.env: %p", env);
Log(log," Options.info_log: %p", info_log); Log(log," Options.info_log: %p", info_log);
Log(log," Options.write_buffer_size: %zd", write_buffer_size); Log(log," Options.write_buffer_size: %zd", write_buffer_size);
Log(log," Options.max_open_files: %d", max_open_files); Log(log," Options.max_write_buffer_number: %zd", max_write_buffer_number);
Log(log," Options.block_cache: %p", block_cache); Log(log," Options.max_open_files: %d", max_open_files);
Log(log," Options.block_cache_size: %zd", block_cache->GetCapacity()); Log(log," Options.block_cache: %p", block_cache);
Log(log," Options.block_size: %zd", block_size); Log(log," Options.block_cache_size: %zd", block_cache->GetCapacity());
Log(log,"Options.block_restart_interval: %d", block_restart_interval); Log(log," Options.block_size: %zd", block_size);
Log(log," Options.compression: %d", compression); Log(log," Options.block_restart_interval: %d", block_restart_interval);
Log(log," Options.filter_policy: %s", Log(log," Options.compression: %d", compression);
Log(log," Options.filter_policy: %s",
filter_policy == NULL ? "NULL" : filter_policy->Name()); filter_policy == NULL ? "NULL" : filter_policy->Name());
Log(log," Options.num_levels: %d", num_levels); Log(log," Options.num_levels: %d", num_levels);
Log(log," Options.disableDataSync: %d", disableDataSync); Log(log," Options.disableDataSync: %d", disableDataSync);
Log(log," Options.use_fsync: %d", use_fsync); Log(log," Options.use_fsync: %d", use_fsync);
Log(log," Options.db_stats_log_interval: %d", Log(log," Options.db_stats_log_interval: %d",
db_stats_log_interval); db_stats_log_interval);
Log(log," Options.level0_file_num_compaction_trigger: %d", Log(log," Options.level0_file_num_compaction_trigger: %d",
level0_file_num_compaction_trigger); level0_file_num_compaction_trigger);
@ -89,10 +92,12 @@ Options::Dump(
expanded_compaction_factor); expanded_compaction_factor);
Log(log," Options.max_grandparent_overlap_factor: %d", Log(log," Options.max_grandparent_overlap_factor: %d",
max_grandparent_overlap_factor); max_grandparent_overlap_factor);
Log(log," Options.db_log_dir: %s", Log(log," Options.db_log_dir: %s",
db_log_dir.c_str()); db_log_dir.c_str());
Log(log," Options.disable_seek_compaction: %d", Log(log," Options.disable_seek_compaction: %d",
disable_seek_compaction); disable_seek_compaction);
Log(log," Options.max_background_compactions: %d",
max_background_compactions);
} // Options::Dump } // Options::Dump

Loading…
Cancel
Save