Make some variables configurable for each db instance

Summary:
Make configurable 'targetFileSize', 'targetFileSizeMultiplier',
'maxBytesForLevelBase', 'maxBytesForLevelMultiplier',
'expandedCompactionFactor', 'maxGrandParentOverlapFactor'

Test Plan: N/A

Reviewers: dhruba

Reviewed By: dhruba

Differential Revision: https://reviews.facebook.net/D3801
main
heyongqiang 13 years ago
parent 2067d22038
commit 4e4b6812ff
  1. 54
      db/c.cc
  2. 4
      db/corruption_test.cc
  3. 64
      db/db_impl.cc
  4. 11
      db/db_impl.h
  5. 81
      db/db_test.cc
  6. 24
      db/dbformat.h
  7. 16
      db/repair.cc
  8. 4
      db/version_edit.cc
  9. 8
      db/version_edit.h
  10. 4
      db/version_edit_test.cc
  11. 186
      db/version_set.cc
  12. 39
      db/version_set.h
  13. 10
      include/leveldb/db.h
  14. 49
      include/leveldb/options.h
  15. 2
      table/table_test.cc
  16. 14
      util/options.cc

@ -444,6 +444,60 @@ void leveldb_options_set_block_restart_interval(leveldb_options_t* opt, int n) {
opt->rep.block_restart_interval = n;
}
void leveldb_options_set_target_file_size_base(
leveldb_options_t* opt, uint64_t n) {
opt->rep.target_file_size_base = n;
}
void leveldb_options_set_target_file_size_multiplier(
leveldb_options_t* opt, int n) {
opt->rep.target_file_size_multiplier = n;
}
void leveldb_options_set_max_bytes_for_level_base(
leveldb_options_t* opt, uint64_t n) {
opt->rep.max_bytes_for_level_base = n;
}
void leveldb_options_set_max_bytes_for_level_multiplier(
leveldb_options_t* opt, int n) {
opt->rep.max_bytes_for_level_multiplier = n;
}
void leveldb_options_set_expanded_compaction_factor(
leveldb_options_t* opt, int n) {
opt->rep.expanded_compaction_factor = n;
}
void leveldb_options_set_max_grandparent_overlap_factor(
leveldb_options_t* opt, int n) {
opt->rep.max_grandparent_overlap_factor = n;
}
void leveldb_options_set_num_levels(leveldb_options_t* opt, int n) {
opt->rep.num_levels = n;
}
void leveldb_options_set_level0_file_num_compaction_trigger(
leveldb_options_t* opt, int n) {
opt->rep.level0_file_num_compaction_trigger = n;
}
void leveldb_options_set_level0_slowdown_writes_trigger(
leveldb_options_t* opt, int n) {
opt->rep.level0_slowdown_writes_trigger = n;
}
void leveldb_options_set_level0_stop_writes_trigger(
leveldb_options_t* opt, int n) {
opt->rep.level0_stop_writes_trigger = n;
}
void leveldb_options_set_max_mem_compaction_level(
leveldb_options_t* opt, int n) {
opt->rep.max_mem_compaction_level = n;
}
void leveldb_options_set_compression(leveldb_options_t* opt, int t) {
opt->rep.compression = static_cast<CompressionType>(t);
}

@ -295,7 +295,7 @@ TEST(CorruptionTest, CompactionInputError) {
Build(10);
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
dbi->TEST_CompactMemTable();
const int last = config::kMaxMemCompactLevel;
const int last = dbi->MaxMemCompactionLevel();
ASSERT_EQ(1, Property("leveldb.num-files-at-level" + NumberToString(last)));
Corrupt(kTableFile, 100, 1);
@ -314,7 +314,7 @@ TEST(CorruptionTest, CompactionInputErrorParanoid) {
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
// Fill levels >= 1 so memtable compaction outputs to level 1
for (int level = 1; level < config::kNumLevels; level++) {
for (int level = 1; level < dbi->NumberLevels(); level++) {
dbi->Put(WriteOptions(), "", "begin");
dbi->Put(WriteOptions(), "~", "end");
dbi->TEST_CompactMemTable();

@ -134,6 +134,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
mem_->Ref();
has_imm_.Release_Store(NULL);
stats_ = new CompactionStats[options.num_levels];
// Reserve ten files or so for other uses and give the rest to TableCache.
const int table_cache_size = options.max_open_files - 10;
table_cache_ = new TableCache(dbname_, &options_, table_cache_size);
@ -162,6 +163,7 @@ DBImpl::~DBImpl() {
delete log_;
delete logfile_;
delete table_cache_;
delete[] stats_;
if (owns_info_log_) {
delete options_.info_log;
@ -172,7 +174,7 @@ DBImpl::~DBImpl() {
}
Status DBImpl::NewDB() {
VersionEdit new_db;
VersionEdit new_db(NumberLevels());
new_db.SetComparatorName(user_comparator()->Name());
new_db.SetLogNumber(0);
new_db.SetNextFile(2);
@ -488,7 +490,7 @@ Status DBImpl::CompactMemTable() {
assert(imm_ != NULL);
// Save the contents of the memtable as a new Table
VersionEdit edit;
VersionEdit edit(NumberLevels());
Version* base = versions_->current();
base->Ref();
Status s = WriteLevel0Table(imm_, &edit, base);
@ -521,7 +523,7 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
{
MutexLock l(&mutex_);
Version* base = versions_->current();
for (int level = 1; level < config::kNumLevels; level++) {
for (int level = 1; level < NumberLevels(); level++) {
if (base->OverlapInLevel(level, begin, end)) {
max_level_with_files = level;
}
@ -533,9 +535,20 @@ void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
}
}
int DBImpl::NumberLevels() {
return options_.num_levels;
}
int DBImpl::MaxMemCompactionLevel() {
return options_.max_mem_compaction_level;
}
int DBImpl::Level0StopWriteTrigger() {
return options_.level0_stop_writes_trigger;
}
void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) {
assert(level >= 0);
assert(level + 1 < config::kNumLevels);
InternalKey begin_storage, end_storage;
@ -573,17 +586,33 @@ Status DBImpl::TEST_CompactMemTable() {
Status s = Write(WriteOptions(), NULL);
if (s.ok()) {
// Wait until the compaction completes
MutexLock l(&mutex_);
while (imm_ != NULL && bg_error_.ok()) {
bg_cv_.Wait();
}
if (imm_ != NULL) {
s = bg_error_;
}
s = TEST_WaitForCompactMemTable();
}
return s;
}
Status DBImpl::TEST_WaitForCompactMemTable() {
Status s;
// Wait until the compaction completes
MutexLock l(&mutex_);
while (imm_ != NULL && bg_error_.ok()) {
bg_cv_.Wait();
}
if (imm_ != NULL) {
s = bg_error_;
}
return s;
}
Status DBImpl::TEST_WaitForCompact() {
// Wait until the compaction completes
MutexLock l(&mutex_);
while (bg_compaction_scheduled_ && bg_error_.ok()) {
bg_cv_.Wait();
}
return bg_error_;
}
void DBImpl::MaybeScheduleCompaction() {
mutex_.AssertHeld();
if (bg_compaction_scheduled_) {
@ -1226,6 +1255,7 @@ Status DBImpl::MakeRoomForWrite(bool force) {
assert(!writers_.empty());
bool allow_delay = !force;
Status s;
while (true) {
if (!bg_error_.ok()) {
// Yield previous error
@ -1233,7 +1263,8 @@ Status DBImpl::MakeRoomForWrite(bool force) {
break;
} else if (
allow_delay &&
versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) {
versions_->NumLevelFiles(0) >=
options_.level0_slowdown_writes_trigger) {
// We are getting close to hitting a hard limit on the number of
// L0 files. Rather than delaying a single write by several
// seconds when we hit the hard limit, start delaying each
@ -1253,7 +1284,8 @@ Status DBImpl::MakeRoomForWrite(bool force) {
// We have filled up the current memtable, but the previous
// one is still being compacted, so we wait.
bg_cv_.Wait();
} else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
} else if (versions_->NumLevelFiles(0) >=
options_.level0_stop_writes_trigger) {
// There are too many level-0 files.
Log(options_.info_log, "waiting...\n");
bg_cv_.Wait();
@ -1295,7 +1327,7 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
in.remove_prefix(strlen("num-files-at-level"));
uint64_t level;
bool ok = ConsumeDecimalNumber(&in, &level) && in.empty();
if (!ok || level >= config::kNumLevels) {
if (!ok || level >= NumberLevels()) {
return false;
} else {
char buf[100];
@ -1312,7 +1344,7 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
"--------------------------------------------------\n"
);
value->append(buf);
for (int level = 0; level < config::kNumLevels; level++) {
for (int level = 0; level < NumberLevels(); level++) {
int files = versions_->NumLevelFiles(level);
if (stats_[level].micros > 0 || files > 0) {
snprintf(
@ -1384,7 +1416,7 @@ Status DB::Open(const Options& options, const std::string& dbname,
DBImpl* impl = new DBImpl(options, dbname);
impl->mutex_.Lock();
VersionEdit edit;
VersionEdit edit(impl->NumberLevels());
Status s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists
if (s.ok()) {
uint64_t new_log_number = impl->versions_->NewFileNumber();

@ -40,6 +40,9 @@ class DBImpl : public DB {
virtual bool GetProperty(const Slice& property, std::string* value);
virtual void GetApproximateSizes(const Range* range, int n, uint64_t* sizes);
virtual void CompactRange(const Slice* begin, const Slice* end);
virtual int NumberLevels();
virtual int MaxMemCompactionLevel();
virtual int Level0StopWriteTrigger();
// Extra methods (for testing) that are not in the public DB interface
@ -49,6 +52,12 @@ class DBImpl : public DB {
// Force current memtable contents to be compacted.
Status TEST_CompactMemTable();
// Wait for memtable compaction
Status TEST_WaitForCompactMemTable();
// Wait for any compaction
Status TEST_WaitForCompact();
// Return an internal iterator over the current state of the database.
// The keys of this iterator are internal keys (see format.h).
// The returned iterator should be deleted when no longer needed.
@ -171,7 +180,7 @@ class DBImpl : public DB {
this->bytes_written += c.bytes_written;
}
};
CompactionStats stats_[config::kNumLevels];
CompactionStats* stats_;
// No copying allowed
DBImpl(const DBImpl&);

@ -137,6 +137,7 @@ class DBTest {
enum OptionConfig {
kDefault,
kFilter,
kNumLevel_3,
kEnd
};
int option_config_;
@ -183,6 +184,9 @@ class DBTest {
case kFilter:
options.filter_policy = filter_policy_;
break;
case kNumLevel_3:
options.num_levels = 3;
break;
default:
break;
}
@ -324,7 +328,7 @@ class DBTest {
int TotalTableFiles() {
int result = 0;
for (int level = 0; level < config::kNumLevels; level++) {
for (int level = 0; level < db_->NumberLevels(); level++) {
result += NumTableFilesAtLevel(level);
}
return result;
@ -334,7 +338,7 @@ class DBTest {
std::string FilesPerLevel() {
std::string result;
int last_non_zero_offset = 0;
for (int level = 0; level < config::kNumLevels; level++) {
for (int level = 0; level < db_->NumberLevels(); level++) {
int f = NumTableFilesAtLevel(level);
char buf[100];
snprintf(buf, sizeof(buf), "%s%d", (level ? "," : ""), f);
@ -377,7 +381,7 @@ class DBTest {
// Prevent pushing of new sstables into deeper levels by adding
// tables that cover a specified range to all levels.
void FillLevels(const std::string& smallest, const std::string& largest) {
MakeTables(config::kNumLevels, smallest, largest);
MakeTables(db_->NumberLevels(), smallest, largest);
}
void DumpFileCounts(const char* label) {
@ -385,7 +389,7 @@ class DBTest {
fprintf(stderr, "maxoverlap: %lld\n",
static_cast<long long>(
dbfull()->TEST_MaxNextLevelOverlappingBytes()));
for (int level = 0; level < config::kNumLevels; level++) {
for (int level = 0; level < db_->NumberLevels(); level++) {
int num = NumTableFilesAtLevel(level);
if (num > 0) {
fprintf(stderr, " level %3d : %d files\n", level, num);
@ -891,6 +895,42 @@ TEST(DBTest, CompactionsGenerateMultipleFiles) {
}
}
TEST(DBTest, CompactionTrigger) {
Options options = CurrentOptions();
options.write_buffer_size = 100<<10; //100KB
options.num_levels = 3;
options.max_mem_compaction_level = 0;
options.level0_file_num_compaction_trigger = 3;
Reopen(&options);
Random rnd(301);
for (int num = 0;
num < options.level0_file_num_compaction_trigger - 1;
num++)
{
std::vector<std::string> values;
// Write 120KB (12 values, each 10K)
for (int i = 0; i < 12; i++) {
values.push_back(RandomString(&rnd, 10000));
ASSERT_OK(Put(Key(i), values[i]));
}
dbfull()->TEST_WaitForCompactMemTable();
ASSERT_EQ(NumTableFilesAtLevel(0), num + 1);
}
//generate one more file in level-0, and should trigger level-0 compaction
std::vector<std::string> values;
for (int i = 0; i < 12; i++) {
values.push_back(RandomString(&rnd, 10000));
ASSERT_OK(Put(Key(i), values[i]));
}
dbfull()->TEST_WaitForCompact();
ASSERT_EQ(NumTableFilesAtLevel(0), 0);
ASSERT_EQ(NumTableFilesAtLevel(1), 1);
}
TEST(DBTest, RepeatedWritesToSameKey) {
Options options = CurrentOptions();
options.env = env_;
@ -899,7 +939,8 @@ TEST(DBTest, RepeatedWritesToSameKey) {
// We must have at most one file per level except for level-0,
// which may have up to kL0_StopWritesTrigger files.
const int kMaxFiles = config::kNumLevels + config::kL0_StopWritesTrigger;
const int kMaxFiles = dbfull()->NumberLevels() +
dbfull()->Level0StopWriteTrigger();
Random rnd(301);
std::string value = RandomString(&rnd, 2 * options.write_buffer_size);
@ -1134,7 +1175,7 @@ TEST(DBTest, HiddenValuesAreRemoved) {
TEST(DBTest, DeletionMarkers1) {
Put("foo", "v1");
ASSERT_OK(dbfull()->TEST_CompactMemTable());
const int last = config::kMaxMemCompactLevel;
const int last = dbfull()->MaxMemCompactionLevel();
ASSERT_EQ(NumTableFilesAtLevel(last), 1); // foo => v1 is now in last level
// Place a table at level last-1 to prevent merging with preceding mutation
@ -1163,7 +1204,7 @@ TEST(DBTest, DeletionMarkers1) {
TEST(DBTest, DeletionMarkers2) {
Put("foo", "v1");
ASSERT_OK(dbfull()->TEST_CompactMemTable());
const int last = config::kMaxMemCompactLevel;
const int last = dbfull()->MaxMemCompactionLevel();
ASSERT_EQ(NumTableFilesAtLevel(last), 1); // foo => v1 is now in last level
// Place a table at level last-1 to prevent merging with preceding mutation
@ -1188,7 +1229,8 @@ TEST(DBTest, DeletionMarkers2) {
TEST(DBTest, OverlapInLevel0) {
do {
ASSERT_EQ(config::kMaxMemCompactLevel, 2) << "Fix test to match config";
int tmp = dbfull()->MaxMemCompactionLevel();
ASSERT_EQ(tmp, 2) << "Fix test to match config";
// Fill levels 1 and 2 to disable the pushing of new memtables to levels > 0.
ASSERT_OK(Put("100", "v100"));
@ -1349,7 +1391,7 @@ TEST(DBTest, CustomComparator) {
}
TEST(DBTest, ManualCompaction) {
ASSERT_EQ(config::kMaxMemCompactLevel, 2)
ASSERT_EQ(dbfull()->MaxMemCompactionLevel(), 2)
<< "Need to update this test to match kMaxMemCompactLevel";
MakeTables(3, "p", "q");
@ -1433,7 +1475,7 @@ TEST(DBTest, NoSpace) {
const int num_files = CountFiles();
env_->no_space_.Release_Store(env_); // Force out-of-space errors
for (int i = 0; i < 10; i++) {
for (int level = 0; level < config::kNumLevels-1; level++) {
for (int level = 0; level < dbfull()->NumberLevels()-1; level++) {
dbfull()->TEST_CompactRange(level, NULL, NULL);
}
}
@ -1668,6 +1710,21 @@ class ModelDB: public DB {
virtual void CompactRange(const Slice* start, const Slice* end) {
}
virtual int NumberLevels()
{
return 1;
}
virtual int MaxMemCompactionLevel()
{
return 1;
}
virtual int Level0StopWriteTrigger()
{
return -1;
}
private:
class ModelIter: public Iterator {
public:
@ -1858,7 +1915,7 @@ void BM_LogAndApply(int iters, int num_base_files) {
Options options;
VersionSet vset(dbname, &options, NULL, &cmp);
ASSERT_OK(vset.Recover());
VersionEdit vbase;
VersionEdit vbase(vset.NumberLevels());
uint64_t fnum = 1;
for (int i = 0; i < num_base_files; i++) {
InternalKey start(MakeKey(2*fnum), 1, kTypeValue);
@ -1870,7 +1927,7 @@ void BM_LogAndApply(int iters, int num_base_files) {
uint64_t start_micros = env->NowMicros();
for (int i = 0; i < iters; i++) {
VersionEdit vedit;
VersionEdit vedit(vset.NumberLevels());
vedit.DeleteFile(2, fnum);
InternalKey start(MakeKey(2*fnum), 1, kTypeValue);
InternalKey limit(MakeKey(2*fnum+1), 1, kTypeDeletion);

@ -16,30 +16,6 @@
namespace leveldb {
// Grouping of constants. We may want to make some of these
// parameters set via options.
namespace config {
static const int kNumLevels = 7;
// Level-0 compaction is started when we hit this many files.
static const int kL0_CompactionTrigger = 4;
// Soft limit on number of level-0 files. We slow down writes at this point.
static const int kL0_SlowdownWritesTrigger = 8;
// Maximum number of level-0 files. We stop writes at this point.
static const int kL0_StopWritesTrigger = 12;
// Maximum level to which a new compacted memtable is pushed if it
// does not create overlap. We try to push to level 2 to avoid the
// relatively expensive level 0=>1 compactions and to avoid some
// expensive manifest file operations. We do not push all the way to
// the largest level since that can generate a lot of wasted disk
// space if the same key space is being repeatedly overwritten.
static const int kMaxMemCompactLevel = 2;
} // namespace config
class InternalKey;
// Value types encoded as the last component of internal keys.

@ -55,10 +55,12 @@ class Repairer {
next_file_number_(1) {
// TableCache can be small since we expect each table to be opened once.
table_cache_ = new TableCache(dbname_, &options_, 10);
edit_ = new VersionEdit(options.num_levels);
}
~Repairer() {
delete table_cache_;
delete edit_;
if (owns_info_log_) {
delete options_.info_log;
}
@ -105,7 +107,7 @@ class Repairer {
bool owns_info_log_;
bool owns_cache_;
TableCache* table_cache_;
VersionEdit edit_;
VersionEdit* edit_;
std::vector<std::string> manifests_;
std::vector<uint64_t> table_numbers_;
@ -315,15 +317,15 @@ class Repairer {
}
}
edit_.SetComparatorName(icmp_.user_comparator()->Name());
edit_.SetLogNumber(0);
edit_.SetNextFile(next_file_number_);
edit_.SetLastSequence(max_sequence);
edit_->SetComparatorName(icmp_.user_comparator()->Name());
edit_->SetLogNumber(0);
edit_->SetNextFile(next_file_number_);
edit_->SetLastSequence(max_sequence);
for (size_t i = 0; i < tables_.size(); i++) {
// TODO(opt): separate out into multiple levels
const TableInfo& t = tables_[i];
edit_.AddFile(0, t.meta.number, t.meta.file_size,
edit_->AddFile(0, t.meta.number, t.meta.file_size,
t.meta.smallest, t.meta.largest);
}
@ -331,7 +333,7 @@ class Repairer {
{
log::Writer log(file);
std::string record;
edit_.EncodeTo(&record);
edit_->EncodeTo(&record);
status = log.AddRecord(record);
}
if (status.ok()) {

@ -95,10 +95,10 @@ static bool GetInternalKey(Slice* input, InternalKey* dst) {
}
}
static bool GetLevel(Slice* input, int* level) {
bool VersionEdit::GetLevel(Slice* input, int* level) {
uint32_t v;
if (GetVarint32(input, &v) &&
v < config::kNumLevels) {
v < number_levels_) {
*level = v;
return true;
} else {

@ -27,7 +27,10 @@ struct FileMetaData {
class VersionEdit {
public:
VersionEdit() { Clear(); }
VersionEdit(int number_levels) :
number_levels_(number_levels) {
Clear();
}
~VersionEdit() { }
void Clear();
@ -86,6 +89,9 @@ class VersionEdit {
typedef std::set< std::pair<int, uint64_t> > DeletedFileSet;
bool GetLevel(Slice* input, int* level);
int number_levels_;
std::string comparator_;
uint64_t log_number_;
uint64_t prev_log_number_;

@ -10,7 +10,7 @@ namespace leveldb {
static void TestEncodeDecode(const VersionEdit& edit) {
std::string encoded, encoded2;
edit.EncodeTo(&encoded);
VersionEdit parsed;
VersionEdit parsed(7);
Status s = parsed.DecodeFrom(encoded);
ASSERT_TRUE(s.ok()) << s.ToString();
parsed.EncodeTo(&encoded2);
@ -22,7 +22,7 @@ class VersionEditTest { };
TEST(VersionEditTest, EncodeDecode) {
static const uint64_t kBig = 1ull << 50;
VersionEdit edit;
VersionEdit edit(7);
for (int i = 0; i < 4; i++) {
TestEncodeDecode(edit);
edit.AddFile(3, kBig + 300 + i, kBig + 400 + i,

@ -20,32 +20,6 @@
namespace leveldb {
static const int kTargetFileSize = 2 * 1048576;
// Maximum bytes of overlaps in grandparent (i.e., level+2) before we
// stop building a single file in a level->level+1 compaction.
static const int64_t kMaxGrandParentOverlapBytes = 10 * kTargetFileSize;
// Maximum number of bytes in all compacted files. We avoid expanding
// the lower level file set of a compaction if it would make the
// total compaction cover more than this many bytes.
static const int64_t kExpandedCompactionByteSizeLimit = 25 * kTargetFileSize;
static double MaxBytesForLevel(int level) {
// Note: the result for level zero is not really used since we set
// the level-0 compaction threshold based on number of files.
double result = 10 * 1048576.0; // Result for both level-0 and level-1
while (level > 1) {
result *= 10;
level--;
}
return result;
}
static uint64_t MaxFileSizeForLevel(int level) {
return kTargetFileSize; // We could vary per level to reduce number of files?
}
static int64_t TotalFileSize(const std::vector<FileMetaData*>& files) {
int64_t sum = 0;
for (size_t i = 0; i < files.size(); i++) {
@ -76,7 +50,7 @@ Version::~Version() {
next_->prev_ = prev_;
// Drop references to files
for (int level = 0; level < config::kNumLevels; level++) {
for (int level = 0; level < vset_->NumberLevels(); level++) {
for (size_t i = 0; i < files_[level].size(); i++) {
FileMetaData* f = files_[level][i];
assert(f->refs > 0);
@ -86,6 +60,7 @@ Version::~Version() {
}
}
}
delete[] files_;
}
int FindFile(const InternalKeyComparator& icmp,
@ -248,7 +223,7 @@ void Version::AddIterators(const ReadOptions& options,
// For levels > 0, we can use a concatenating iterator that sequentially
// walks through the non-overlapping files in the level, opening them
// lazily.
for (int level = 1; level < config::kNumLevels; level++) {
for (int level = 1; level < vset_->NumberLevels(); level++) {
if (!files_[level].empty()) {
iters->push_back(NewConcatenatingIterator(options, level));
}
@ -289,6 +264,15 @@ static bool NewestFirst(FileMetaData* a, FileMetaData* b) {
return a->number > b->number;
}
Version::Version(VersionSet* vset)
: vset_(vset), next_(this), prev_(this), refs_(0),
file_to_compact_(NULL),
file_to_compact_level_(-1),
compaction_score_(-1),
compaction_level_(-1) {
files_ = new std::vector<FileMetaData*>[vset->NumberLevels()];
}
Status Version::Get(const ReadOptions& options,
const LookupKey& k,
std::string* value,
@ -308,7 +292,7 @@ Status Version::Get(const ReadOptions& options,
// in an smaller level, later levels are irrelevant.
std::vector<FileMetaData*> tmp;
FileMetaData* tmp2;
for (int level = 0; level < config::kNumLevels; level++) {
for (int level = 0; level < vset_->NumberLevels(); level++) {
size_t num_files = files_[level].size();
if (num_files == 0) continue;
@ -431,18 +415,24 @@ int Version::PickLevelForMemTableOutput(
InternalKey start(smallest_user_key, kMaxSequenceNumber, kValueTypeForSeek);
InternalKey limit(largest_user_key, 0, static_cast<ValueType>(0));
std::vector<FileMetaData*> overlaps;
while (level < config::kMaxMemCompactLevel) {
int max_mem_compact_level = vset_->options_->max_mem_compaction_level;
while (max_mem_compact_level > 0 && level < max_mem_compact_level) {
if (OverlapInLevel(level + 1, &smallest_user_key, &largest_user_key)) {
break;
}
if (level + 2 >= vset_->NumberLevels()) {
level++;
break;
}
GetOverlappingInputs(level + 2, &start, &limit, &overlaps);
const int64_t sum = TotalFileSize(overlaps);
if (sum > kMaxGrandParentOverlapBytes) {
if (sum > vset_->MaxGrandParentOverlapBytes(level)) {
break;
}
level++;
}
}
return level;
}
@ -490,7 +480,7 @@ void Version::GetOverlappingInputs(
std::string Version::DebugString() const {
std::string r;
for (int level = 0; level < config::kNumLevels; level++) {
for (int level = 0; level < vset_->NumberLevels(); level++) {
// E.g.,
// --- level 1 ---
// 17:123['a' .. 'd']
@ -542,7 +532,7 @@ class VersionSet::Builder {
VersionSet* vset_;
Version* base_;
LevelState levels_[config::kNumLevels];
LevelState* levels_;
public:
// Initialize a builder with the files from *base and other info from *vset
@ -550,15 +540,16 @@ class VersionSet::Builder {
: vset_(vset),
base_(base) {
base_->Ref();
levels_ = new LevelState[vset_->NumberLevels()];
BySmallestKey cmp;
cmp.internal_comparator = &vset_->icmp_;
for (int level = 0; level < config::kNumLevels; level++) {
for (int level = 0; level < vset_->NumberLevels(); level++) {
levels_[level].added_files = new FileSet(cmp);
}
}
~Builder() {
for (int level = 0; level < config::kNumLevels; level++) {
for (int level = 0; level < vset_->NumberLevels(); level++) {
const FileSet* added = levels_[level].added_files;
std::vector<FileMetaData*> to_unref;
to_unref.reserve(added->size());
@ -575,6 +566,7 @@ class VersionSet::Builder {
}
}
}
delete[] levels_;
base_->Unref();
}
@ -628,7 +620,7 @@ class VersionSet::Builder {
void SaveTo(Version* v) {
BySmallestKey cmp;
cmp.internal_comparator = &vset_->icmp_;
for (int level = 0; level < config::kNumLevels; level++) {
for (int level = 0; level < vset_->NumberLevels(); level++) {
// Merge the set of added files with the set of pre-existing files.
// Drop any deleted files. Store the result in *v.
const std::vector<FileMetaData*>& base_files = base_->files_[level];
@ -707,12 +699,28 @@ VersionSet::VersionSet(const std::string& dbname,
descriptor_log_(NULL),
dummy_versions_(this),
current_(NULL) {
compact_pointer_ = new std::string[options_->num_levels];
max_file_size_ = new uint64_t[options_->num_levels];
level_max_bytes_ = new uint64_t[options->num_levels];
max_file_size_[0] = options_->target_file_size_base;
level_max_bytes_[0] = options_->max_bytes_for_level_base;
int target_file_size_multiplier = options_->target_file_size_multiplier;
int max_bytes_multiplier = options_->max_bytes_for_level_multiplier;
int i = 1;
while (i < options_->num_levels) {
max_file_size_[i] = max_file_size_[i-1] * target_file_size_multiplier;
level_max_bytes_[i] = level_max_bytes_[i-1] * max_bytes_multiplier;
i++;
}
AppendVersion(new Version(this));
}
VersionSet::~VersionSet() {
current_->Unref();
assert(dummy_versions_.next_ == &dummy_versions_); // List must be empty
delete[] compact_pointer_;
delete[] max_file_size_;
delete[] level_max_bytes_;
delete descriptor_log_;
delete descriptor_file_;
}
@ -859,7 +867,7 @@ Status VersionSet::Recover() {
Slice record;
std::string scratch;
while (reader.ReadRecord(&record, &scratch) && s.ok()) {
VersionEdit edit;
VersionEdit edit(NumberLevels());
s = edit.DecodeFrom(record);
if (s.ok()) {
if (edit.has_comparator_ &&
@ -942,7 +950,7 @@ void VersionSet::Finalize(Version* v) {
int best_level = -1;
double best_score = -1;
for (int level = 0; level < config::kNumLevels-1; level++) {
for (int level = 0; level < NumberLevels()-1; level++) {
double score;
if (level == 0) {
// We treat level-0 specially by bounding the number of files
@ -957,7 +965,7 @@ void VersionSet::Finalize(Version* v) {
// setting, or very high compression ratios, or lots of
// overwrites/deletions).
score = v->files_[level].size() /
static_cast<double>(config::kL0_CompactionTrigger);
static_cast<double>(options_->level0_file_num_compaction_trigger);
} else {
// Compute the ratio of current size to size limit.
const uint64_t level_bytes = TotalFileSize(v->files_[level]);
@ -978,11 +986,11 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
// TODO: Break up into multiple records to reduce memory usage on recovery?
// Save metadata
VersionEdit edit;
VersionEdit edit(NumberLevels());
edit.SetComparatorName(icmp_.user_comparator()->Name());
// Save compaction pointers
for (int level = 0; level < config::kNumLevels; level++) {
for (int level = 0; level < NumberLevels(); level++) {
if (!compact_pointer_[level].empty()) {
InternalKey key;
key.DecodeFrom(compact_pointer_[level]);
@ -991,7 +999,7 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
}
// Save files
for (int level = 0; level < config::kNumLevels; level++) {
for (int level = 0; level < NumberLevels(); level++) {
const std::vector<FileMetaData*>& files = current_->files_[level];
for (size_t i = 0; i < files.size(); i++) {
const FileMetaData* f = files[i];
@ -1006,28 +1014,27 @@ Status VersionSet::WriteSnapshot(log::Writer* log) {
int VersionSet::NumLevelFiles(int level) const {
assert(level >= 0);
assert(level < config::kNumLevels);
assert(level < NumberLevels());
return current_->files_[level].size();
}
const char* VersionSet::LevelSummary(LevelSummaryStorage* scratch) const {
// Update code if kNumLevels changes
assert(config::kNumLevels == 7);
snprintf(scratch->buffer, sizeof(scratch->buffer),
"files[ %d %d %d %d %d %d %d ]",
int(current_->files_[0].size()),
int(current_->files_[1].size()),
int(current_->files_[2].size()),
int(current_->files_[3].size()),
int(current_->files_[4].size()),
int(current_->files_[5].size()),
int(current_->files_[6].size()));
int len = snprintf(scratch->buffer, sizeof(scratch->buffer), "files[");
for (int i = 0; i < NumberLevels(); i++) {
int sz = sizeof(scratch->buffer) - len;
int ret = snprintf(scratch->buffer + len, sz, "%d ",
int(current_->files_[i].size()));
if (ret < 0 || ret >= sz)
break;
len += ret;
}
snprintf(scratch->buffer + len, sizeof(scratch->buffer) - len, "]");
return scratch->buffer;
}
uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) {
uint64_t result = 0;
for (int level = 0; level < config::kNumLevels; level++) {
for (int level = 0; level < NumberLevels(); level++) {
const std::vector<FileMetaData*>& files = v->files_[level];
for (size_t i = 0; i < files.size(); i++) {
if (icmp_.Compare(files[i]->largest, ikey) <= 0) {
@ -1061,7 +1068,7 @@ void VersionSet::AddLiveFiles(std::set<uint64_t>* live) {
for (Version* v = dummy_versions_.next_;
v != &dummy_versions_;
v = v->next_) {
for (int level = 0; level < config::kNumLevels; level++) {
for (int level = 0; level < NumberLevels(); level++) {
const std::vector<FileMetaData*>& files = v->files_[level];
for (size_t i = 0; i < files.size(); i++) {
live->insert(files[i]->number);
@ -1072,14 +1079,14 @@ void VersionSet::AddLiveFiles(std::set<uint64_t>* live) {
int64_t VersionSet::NumLevelBytes(int level) const {
assert(level >= 0);
assert(level < config::kNumLevels);
assert(level < NumberLevels());
return TotalFileSize(current_->files_[level]);
}
int64_t VersionSet::MaxNextLevelOverlappingBytes() {
int64_t result = 0;
std::vector<FileMetaData*> overlaps;
for (int level = 1; level < config::kNumLevels - 1; level++) {
for (int level = 1; level < NumberLevels() - 1; level++) {
for (size_t i = 0; i < current_->files_[level].size(); i++) {
const FileMetaData* f = current_->files_[level][i];
current_->GetOverlappingInputs(level+1, &f->smallest, &f->largest,
@ -1163,6 +1170,32 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) {
return result;
}
double VersionSet::MaxBytesForLevel(int level) {
// Note: the result for level zero is not really used since we set
// the level-0 compaction threshold based on number of files.
assert(level >= 0);
assert(level < NumberLevels());
return level_max_bytes_[level];
}
uint64_t VersionSet::MaxFileSizeForLevel(int level) {
assert(level >= 0);
assert(level < NumberLevels());
return max_file_size_[level];
}
uint64_t VersionSet::ExpandedCompactionByteSizeLimit(int level) {
uint64_t result = MaxFileSizeForLevel(level);
result *= options_->expanded_compaction_factor;
return result;
}
uint64_t VersionSet::MaxGrandParentOverlapBytes(int level) {
uint64_t result = MaxFileSizeForLevel(level);
result *= options_->max_grandparent_overlap_factor;
return result;
}
Compaction* VersionSet::PickCompaction() {
Compaction* c;
int level;
@ -1174,8 +1207,9 @@ Compaction* VersionSet::PickCompaction() {
if (size_compaction) {
level = current_->compaction_level_;
assert(level >= 0);
assert(level+1 < config::kNumLevels);
c = new Compaction(level);
assert(level+1 < NumberLevels());
c = new Compaction(level, MaxFileSizeForLevel(level),
MaxGrandParentOverlapBytes(level), NumberLevels());
// Pick the first file that comes after compact_pointer_[level]
for (size_t i = 0; i < current_->files_[level].size(); i++) {
@ -1192,7 +1226,8 @@ Compaction* VersionSet::PickCompaction() {
}
} else if (seek_compaction) {
level = current_->file_to_compact_level_;
c = new Compaction(level);
c = new Compaction(level, MaxFileSizeForLevel(level),
MaxGrandParentOverlapBytes(level), NumberLevels());
c->inputs_[0].push_back(current_->file_to_compact_);
} else {
return NULL;
@ -1236,8 +1271,9 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
const int64_t inputs0_size = TotalFileSize(c->inputs_[0]);
const int64_t inputs1_size = TotalFileSize(c->inputs_[1]);
const int64_t expanded0_size = TotalFileSize(expanded0);
uint64_t limit = ExpandedCompactionByteSizeLimit(level);
if (expanded0.size() > c->inputs_[0].size() &&
inputs1_size + expanded0_size < kExpandedCompactionByteSizeLimit) {
inputs1_size + expanded0_size < limit) {
InternalKey new_start, new_limit;
GetRange(expanded0, &new_start, &new_limit);
std::vector<FileMetaData*> expanded1;
@ -1264,7 +1300,7 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
// Compute the set of grandparent files that overlap this compaction
// (parent == level+1; grandparent == level+2)
if (level + 2 < config::kNumLevels) {
if (level + 2 < NumberLevels()) {
current_->GetOverlappingInputs(level + 2, &all_start, &all_limit,
&c->grandparents_);
}
@ -1281,7 +1317,7 @@ void VersionSet::SetupOtherInputs(Compaction* c) {
// to be applied so that if the compaction fails, we will try a different
// key range next time.
compact_pointer_[level] = largest.Encode().ToString();
c->edit_.SetCompactPointer(level, largest);
c->edit_->SetCompactPointer(level, largest);
}
Compaction* VersionSet::CompactRange(
@ -1306,7 +1342,8 @@ Compaction* VersionSet::CompactRange(
}
}
Compaction* c = new Compaction(level);
Compaction* c = new Compaction(level, limit,
MaxGrandParentOverlapBytes(level), NumberLevels());
c->input_version_ = current_;
c->input_version_->Ref();
c->inputs_[0] = inputs;
@ -1314,19 +1351,26 @@ Compaction* VersionSet::CompactRange(
return c;
}
Compaction::Compaction(int level)
Compaction::Compaction(int level, uint64_t target_file_size,
uint64_t max_grandparent_overlap_bytes, int number_levels)
: level_(level),
max_output_file_size_(MaxFileSizeForLevel(level)),
max_output_file_size_(target_file_size),
maxGrandParentOverlapBytes_(max_grandparent_overlap_bytes),
number_levels_(number_levels),
input_version_(NULL),
grandparent_index_(0),
seen_key_(false),
overlapped_bytes_(0) {
for (int i = 0; i < config::kNumLevels; i++) {
edit_ = new VersionEdit(number_levels_);
level_ptrs_ = new size_t[number_levels_];
for (int i = 0; i < number_levels_; i++) {
level_ptrs_[i] = 0;
}
}
Compaction::~Compaction() {
delete[] level_ptrs_;
delete edit_;
if (input_version_ != NULL) {
input_version_->Unref();
}
@ -1338,7 +1382,7 @@ bool Compaction::IsTrivialMove() const {
// a very expensive merge later on.
return (num_input_files(0) == 1 &&
num_input_files(1) == 0 &&
TotalFileSize(grandparents_) <= kMaxGrandParentOverlapBytes);
TotalFileSize(grandparents_) <= maxGrandParentOverlapBytes_);
}
void Compaction::AddInputDeletions(VersionEdit* edit) {
@ -1352,7 +1396,7 @@ void Compaction::AddInputDeletions(VersionEdit* edit) {
bool Compaction::IsBaseLevelForKey(const Slice& user_key) {
// Maybe use binary search to find right entry instead of linear search?
const Comparator* user_cmp = input_version_->vset_->icmp_.user_comparator();
for (int lvl = level_ + 2; lvl < config::kNumLevels; lvl++) {
for (int lvl = level_ + 2; lvl < number_levels_; lvl++) {
const std::vector<FileMetaData*>& files = input_version_->files_[lvl];
for (; level_ptrs_[lvl] < files.size(); ) {
FileMetaData* f = files[level_ptrs_[lvl]];
@ -1383,7 +1427,7 @@ bool Compaction::ShouldStopBefore(const Slice& internal_key) {
}
seen_key_ = true;
if (overlapped_bytes_ > kMaxGrandParentOverlapBytes) {
if (overlapped_bytes_ > maxGrandParentOverlapBytes_) {
// Too much overlap for current output; start new output
overlapped_bytes_ = 0;
return true;

@ -119,7 +119,7 @@ class Version {
int refs_; // Number of live refs to this version
// List of files per level
std::vector<FileMetaData*> files_[config::kNumLevels];
std::vector<FileMetaData*>* files_;
// Next file to compact based on seek stats.
FileMetaData* file_to_compact_;
@ -131,13 +131,7 @@ class Version {
double compaction_score_;
int compaction_level_;
explicit Version(VersionSet* vset)
: vset_(vset), next_(this), prev_(this), refs_(0),
file_to_compact_(NULL),
file_to_compact_level_(-1),
compaction_score_(-1),
compaction_level_(-1) {
}
explicit Version(VersionSet* vset);
~Version();
@ -198,6 +192,8 @@ class VersionSet {
// being compacted, or zero if there is no such log file.
uint64_t PrevLogNumber() const { return prev_log_number_; }
int NumberLevels() const { return options_->num_levels; }
// Pick level and inputs for a new compaction.
// Returns NULL if there is no compaction to be done.
// Otherwise returns a pointer to a heap-allocated object that
@ -266,6 +262,14 @@ class VersionSet {
void AppendVersion(Version* v);
double MaxBytesForLevel(int level);
uint64_t MaxFileSizeForLevel(int level);
uint64_t ExpandedCompactionByteSizeLimit(int level);
uint64_t MaxGrandParentOverlapBytes(int level);
Env* const env_;
const std::string dbname_;
const Options* const options_;
@ -285,7 +289,13 @@ class VersionSet {
// Per-level key at which the next compaction at that level should start.
// Either an empty string, or a valid InternalKey.
std::string compact_pointer_[config::kNumLevels];
std::string* compact_pointer_;
// Per-level target file size.
uint64_t* max_file_size_;
// Per-level max bytes
uint64_t* level_max_bytes_;
// No copying allowed
VersionSet(const VersionSet&);
@ -303,7 +313,7 @@ class Compaction {
// Return the object that holds the edits to the descriptor done
// by this compaction.
VersionEdit* edit() { return &edit_; }
VersionEdit* edit() { return edit_; }
// "which" must be either 0 or 1
int num_input_files(int which) const { return inputs_[which].size(); }
@ -338,12 +348,15 @@ class Compaction {
friend class Version;
friend class VersionSet;
explicit Compaction(int level);
explicit Compaction(int level, uint64_t target_file_size,
uint64_t max_grandparent_overlap_bytes, int number_levels);
int level_;
uint64_t max_output_file_size_;
uint64_t maxGrandParentOverlapBytes_;
Version* input_version_;
VersionEdit edit_;
VersionEdit* edit_;
int number_levels_;
// Each compaction reads inputs from "level_" and "level_+1"
std::vector<FileMetaData*> inputs_[2]; // The two sets of inputs
@ -362,7 +375,7 @@ class Compaction {
// is that we are positioned at one of the file ranges for each
// higher level than the ones involved in this compaction (i.e. for
// all L >= level_ + 2).
size_t level_ptrs_[config::kNumLevels];
size_t* level_ptrs_;
};
} // namespace leveldb

@ -140,6 +140,16 @@ class DB {
// db->CompactRange(NULL, NULL);
virtual void CompactRange(const Slice* begin, const Slice* end) = 0;
// Number of levels used for this DB.
virtual int NumberLevels() = 0;
// Maximum level to which a new compacted memtable is pushed if it
// does not create overlap.
virtual int MaxMemCompactionLevel() = 0;
// Number of files in level-0 that would stop writes.
virtual int Level0StopWriteTrigger() = 0;
private:
// No copying allowed
DB(const DB&);

@ -113,6 +113,55 @@ struct Options {
// Default: 16
int block_restart_interval;
// Number of levels for this database
int num_levels;
// Number of files to trigger level-0 compaction. A value <0 means that
// level-0 compaction will not be triggered by number of files at all.
int level0_file_num_compaction_trigger;
// Soft limit on number of level-0 files. We slow down writes at this point.
// A value <0 means that no writing slow down will be triggered by number
// of files in level-0.
int level0_slowdown_writes_trigger;
// Maximum number of level-0 files. We stop writes at this point.
int level0_stop_writes_trigger;
// Maximum level to which a new compacted memtable is pushed if it
// does not create overlap. We try to push to level 2 to avoid the
// relatively expensive level 0=>1 compactions and to avoid some
// expensive manifest file operations. We do not push all the way to
// the largest level since that can generate a lot of wasted disk
// space if the same key space is being repeatedly overwritten.
int max_mem_compaction_level;
// Target file size for compaction. Target file size for level L is
// (target_file_size_base)^(target_file_size_multiplier).
// For example, if target_file_size_base is 20MB and
// target_file_size_multiplier is 2^10, then target file size on level 1
// will be 200MB, and wiil be 2GB on level 2.
int target_file_size_base;
int target_file_size_multiplier;
// Control maximum number of bytes in all compacted files for one level.
// Maximum number of bytes for level L is
// (max_bytes_for_level_base)^(max_bytes_for_level_multiplier).
int max_bytes_for_level_base;
int max_bytes_for_level_multiplier;
// Maximum number of bytes in all compacted files. We avoid expanding
// the lower level file set of a compaction if it would make the
// total compaction cover more than
// (expanded_compaction_factor * targetFileSizeLevel()) many bytes.
int expanded_compaction_factor;
// Control maximum bytes of overlaps in grandparent (i.e., level+2) before we
// stop building a single file in a level->level+1 compaction.
int max_grandparent_overlap_factor;
// Compress blocks using the specified compression algorithm. This
// parameter can be changed dynamically.
//

@ -717,7 +717,7 @@ TEST(Harness, RandomizedLongDB) {
// We must have created enough data to force merging
int files = 0;
for (int level = 0; level < config::kNumLevels; level++) {
for (int level = 0; level < db()->NumberLevels(); level++) {
std::string value;
char name[100];
snprintf(name, sizeof(name), "leveldb.num-files-at-level%d", level);

@ -22,7 +22,19 @@ Options::Options()
block_size(4096),
block_restart_interval(16),
compression(kSnappyCompression),
filter_policy(NULL) {
num_levels(7),
level0_file_num_compaction_trigger(4),
level0_slowdown_writes_trigger(8),
level0_stop_writes_trigger(12),
max_mem_compaction_level(2),
target_file_size_base(2 * 1048576),
target_file_size_multiplier(10),
max_bytes_for_level_base(10 * 1048576.0),
max_bytes_for_level_multiplier(10),
expanded_compaction_factor(25),
max_grandparent_overlap_factor(10),
filter_policy(NULL),
statistics(NULL) {
}

Loading…
Cancel
Save