More changes from upstream.

git-svn-id: https://leveldb.googlecode.com/svn/trunk@12 62dab493-f737-651d-591e-8d6aee1b9529
main
jorlow@chromium.org 14 years ago
parent 9df5aa892e
commit 13b72af77b
  1. 264
      db/db_bench.cc
  2. 20
      db/db_impl.cc
  3. 4
      db/db_impl.h
  4. 74
      db/db_test.cc
  5. 122
      db/version_set.cc
  6. 25
      db/version_set.h
  7. 10
      doc/impl.html
  8. 3
      include/db.h
  9. 2
      leveldb.gyp

@ -17,11 +17,14 @@
// Comma-separated list of operations to run in the specified order // Comma-separated list of operations to run in the specified order
// Actual benchmarks: // Actual benchmarks:
// writeseq -- write N values in sequential key order // fillseq -- write N values in sequential key order in async mode
// writerandom -- write N values in random key order // fillrandom -- write N values in random key order in async mode
// writebig -- write N/1000 100K valuesin random order // overwrite -- overwrite N values in random key order in async mode
// readseq -- read N values sequentially // fillsync -- write N/100 values in random key order in sync mode
// readrandom -- read N values in random order // fill100K -- write N/1000 100K values in random order in async mode
// readseq -- read N values sequentially
// readreverse -- read N values in reverse order
// readrandom -- read N values in random order
// Meta operations: // Meta operations:
// compact -- Compact the entire DB // compact -- Compact the entire DB
// heapprofile -- Dump a heap profile (if supported by this port) // heapprofile -- Dump a heap profile (if supported by this port)
@ -30,10 +33,10 @@
// tenth -- divide N by 10 (i.e., following benchmarks are smaller) // tenth -- divide N by 10 (i.e., following benchmarks are smaller)
// normal -- reset N back to its normal value (1000000) // normal -- reset N back to its normal value (1000000)
static const char* FLAGS_benchmarks = static const char* FLAGS_benchmarks =
"writeseq," "fillseq,"
"writeseq," "fillrandom,"
"writerandom," "overwrite,"
"sync,tenth,tenth,writerandom,nosync,normal," "fillsync,"
"readseq," "readseq,"
"readreverse," "readreverse,"
"readrandom," "readrandom,"
@ -41,7 +44,7 @@ static const char* FLAGS_benchmarks =
"readseq," "readseq,"
"readreverse," "readreverse,"
"readrandom," "readrandom,"
"writebig"; "fill100K";
// Number of key/values to place in database // Number of key/values to place in database
static int FLAGS_num = 1000000; static int FLAGS_num = 1000000;
@ -51,7 +54,7 @@ static int FLAGS_value_size = 100;
// Arrange to generate values that shrink to this fraction of // Arrange to generate values that shrink to this fraction of
// their original size after compression // their original size after compression
static double FLAGS_compression_ratio = 0.25; static double FLAGS_compression_ratio = 0.5;
// Print histogram of operation timings // Print histogram of operation timings
static bool FLAGS_histogram = false; static bool FLAGS_histogram = false;
@ -93,6 +96,19 @@ class RandomGenerator {
return Slice(data_.data() + pos_ - len, len); return Slice(data_.data() + pos_ - len, len);
} }
}; };
static Slice TrimSpace(Slice s) {
int start = 0;
while (start < s.size() && isspace(s[start])) {
start++;
}
int limit = s.size();
while (limit > start && isspace(s[limit-1])) {
limit--;
}
return Slice(s.data() + start, limit - start);
}
} }
class Benchmark { class Benchmark {
@ -100,7 +116,6 @@ class Benchmark {
Cache* cache_; Cache* cache_;
DB* db_; DB* db_;
int num_; int num_;
bool sync_;
int heap_counter_; int heap_counter_;
double start_; double start_;
double last_op_finish_; double last_op_finish_;
@ -114,6 +129,70 @@ class Benchmark {
int done_; int done_;
int next_report_; // When to report next int next_report_; // When to report next
void PrintHeader() {
const int kKeySize = 16;
PrintEnvironment();
fprintf(stdout, "Keys: %d bytes each\n", kKeySize);
fprintf(stdout, "Values: %d bytes each (%d bytes after compression)\n",
FLAGS_value_size,
static_cast<int>(FLAGS_value_size * FLAGS_compression_ratio + 0.5));
fprintf(stdout, "Entries: %d\n", num_);
fprintf(stdout, "RawSize: %.1f MB (estimated)\n",
(((kKeySize + FLAGS_value_size) * num_) / 1048576.0));
fprintf(stdout, "FileSize: %.1f MB (estimated)\n",
(((kKeySize + FLAGS_value_size * FLAGS_compression_ratio) * num_)
/ 1048576.0));
PrintWarnings();
fprintf(stdout, "------------------------------------------------\n");
}
void PrintWarnings() {
#if defined(__GNUC__) && !defined(__OPTIMIZE__)
fprintf(stdout,
"WARNING: Optimization is disabled: benchmarks unnecessarily slow\n"
);
#endif
#ifndef NDEBUG
fprintf(stdout,
"WARNING: Assertions are enabled; benchmarks unnecessarily slow\n");
#endif
}
void PrintEnvironment() {
fprintf(stderr, "LevelDB: version %d.%d\n",
kMajorVersion, kMinorVersion);
#if defined(__linux)
time_t now = time(NULL);
fprintf(stderr, "Date: %s", ctime(&now)); // ctime() adds newline
FILE* cpuinfo = fopen("/proc/cpuinfo", "r");
if (cpuinfo != NULL) {
char line[1000];
int num_cpus = 0;
std::string cpu_type;
std::string cache_size;
while (fgets(line, sizeof(line), cpuinfo) != NULL) {
const char* sep = strchr(line, ':');
if (sep == NULL) {
continue;
}
Slice key = TrimSpace(Slice(line, sep - 1 - line));
Slice val = TrimSpace(Slice(sep + 1));
if (key == "model name") {
++num_cpus;
cpu_type = val.ToString();
} else if (key == "cache size") {
cache_size = val.ToString();
}
}
fclose(cpuinfo);
fprintf(stderr, "CPU: %d * %s\n", num_cpus, cpu_type.c_str());
fprintf(stderr, "CPUCache: %s\n", cache_size.c_str());
}
#endif
}
void Start() { void Start() {
start_ = Env::Default()->NowMicros() * 1e-6; start_ = Env::Default()->NowMicros() * 1e-6;
bytes_ = 0; bytes_ = 0;
@ -164,9 +243,10 @@ class Benchmark {
snprintf(rate, sizeof(rate), "%5.1f MB/s", snprintf(rate, sizeof(rate), "%5.1f MB/s",
(bytes_ / 1048576.0) / (finish - start_)); (bytes_ / 1048576.0) / (finish - start_));
if (!message_.empty()) { if (!message_.empty()) {
message_.push_back(' '); message_ = std::string(rate) + " " + message_;
} else {
message_ = rate;
} }
message_.append(rate);
} }
fprintf(stdout, "%-12s : %11.3f micros/op;%s%s\n", fprintf(stdout, "%-12s : %11.3f micros/op;%s%s\n",
@ -183,14 +263,16 @@ class Benchmark {
public: public:
enum Order { enum Order {
SEQUENTIAL, SEQUENTIAL,
REVERSE, // Currently only supported for reads
RANDOM RANDOM
}; };
enum DBState {
FRESH,
EXISTING
};
Benchmark() : cache_(NewLRUCache(200<<20)), Benchmark() : cache_(NewLRUCache(200<<20)),
db_(NULL), db_(NULL),
num_(FLAGS_num), num_(FLAGS_num),
sync_(false),
heap_counter_(0), heap_counter_(0),
bytes_(0), bytes_(0),
rand_(301) { rand_(301) {
@ -210,19 +292,8 @@ class Benchmark {
} }
void Run() { void Run() {
Options options; PrintHeader();
options.create_if_missing = true; Open();
options.max_open_files = 10000;
options.block_cache = cache_;
options.write_buffer_size = FLAGS_write_buffer_size;
Start();
Status s = DB::Open(options, "/tmp/dbbench", &db_);
Stop("open");
if (!s.ok()) {
fprintf(stderr, "open error: %s\n", s.ToString().c_str());
exit(1);
}
const char* benchmarks = FLAGS_benchmarks; const char* benchmarks = FLAGS_benchmarks;
while (benchmarks != NULL) { while (benchmarks != NULL) {
@ -237,30 +308,30 @@ class Benchmark {
} }
Start(); Start();
if (name == Slice("writeseq")) {
Write(SEQUENTIAL, num_, FLAGS_value_size); WriteOptions write_options;
} else if (name == Slice("writerandom")) { write_options.sync = false;
Write(RANDOM, num_, FLAGS_value_size); if (name == Slice("fillseq")) {
} else if (name == Slice("writebig")) { Write(write_options, SEQUENTIAL, FRESH, num_, FLAGS_value_size);
Write(RANDOM, num_ / 1000, 100 * 1000); } else if (name == Slice("fillrandom")) {
Write(write_options, RANDOM, FRESH, num_, FLAGS_value_size);
} else if (name == Slice("overwrite")) {
Write(write_options, RANDOM, EXISTING, num_, FLAGS_value_size);
} else if (name == Slice("fillsync")) {
write_options.sync = true;
Write(write_options, RANDOM, FRESH, num_ / 100, FLAGS_value_size);
} else if (name == Slice("fill100K")) {
Write(write_options, RANDOM, FRESH, num_ / 1000, 100 * 1000);
} else if (name == Slice("readseq")) { } else if (name == Slice("readseq")) {
Read(SEQUENTIAL); ReadSequential();
} else if (name == Slice("readreverse")) { } else if (name == Slice("readreverse")) {
Read(REVERSE); ReadReverse();
} else if (name == Slice("readrandom")) { } else if (name == Slice("readrandom")) {
Read(RANDOM); ReadRandom();
} else if (name == Slice("compact")) { } else if (name == Slice("compact")) {
Compact(); Compact();
} else if (name == Slice("heapprofile")) { } else if (name == Slice("heapprofile")) {
HeapProfile(); HeapProfile();
} else if (name == Slice("sync")) {
sync_ = true;
} else if (name == Slice("nosync")) {
sync_ = false;
} else if (name == Slice("tenth")) {
num_ = num_ / 10;
} else if (name == Slice("normal")) {
num_ = FLAGS_num;
} else { } else {
fprintf(stderr, "unknown benchmark '%s'\n", name.ToString().c_str()); fprintf(stderr, "unknown benchmark '%s'\n", name.ToString().c_str());
} }
@ -268,16 +339,44 @@ class Benchmark {
} }
} }
void Write(Order order, int num_entries, int value_size) { private:
void Open() {
assert(db_ == NULL);
Options options;
options.create_if_missing = true;
options.max_open_files = 10000;
options.block_cache = cache_;
options.write_buffer_size = FLAGS_write_buffer_size;
Status s = DB::Open(options, "/tmp/dbbench", &db_);
if (!s.ok()) {
fprintf(stderr, "open error: %s\n", s.ToString().c_str());
exit(1);
}
}
void Write(const WriteOptions& options, Order order, DBState state,
int num_entries, int value_size) {
if (state == FRESH) {
delete db_;
db_ = NULL;
DestroyDB("/tmp/dbbench", Options());
Open();
Start(); // Do not count time taken to destroy/open
}
if (num_entries != num_) {
char msg[100];
snprintf(msg, sizeof(msg), "(%d ops)", num_entries);
message_ = msg;
}
WriteBatch batch; WriteBatch batch;
Status s; Status s;
std::string val; std::string val;
WriteOptions options;
options.sync = sync_;
for (int i = 0; i < num_entries; i++) { for (int i = 0; i < num_entries; i++) {
const int k = (order == SEQUENTIAL) ? i : (rand_.Next() % FLAGS_num); const int k = (order == SEQUENTIAL) ? i : (rand_.Next() % FLAGS_num);
char key[100]; char key[100];
snprintf(key, sizeof(key), "%012d", k); snprintf(key, sizeof(key), "%016d", k);
batch.Clear(); batch.Clear();
batch.Put(key, gen_.Generate(value_size)); batch.Put(key, gen_.Generate(value_size));
s = db_->Write(options, &batch); s = db_->Write(options, &batch);
@ -290,42 +389,37 @@ class Benchmark {
} }
} }
void Read(Order order) { void ReadSequential() {
Iterator* iter = db_->NewIterator(ReadOptions());
int i = 0;
for (iter->SeekToFirst(); i < num_ && iter->Valid(); iter->Next()) {
bytes_ += iter->key().size() + iter->value().size();
FinishedSingleOp();
++i;
}
delete iter;
}
void ReadReverse() {
Iterator* iter = db_->NewIterator(ReadOptions());
int i = 0;
for (iter->SeekToLast(); i < num_ && iter->Valid(); iter->Prev()) {
bytes_ += iter->key().size() + iter->value().size();
FinishedSingleOp();
++i;
}
delete iter;
}
void ReadRandom() {
ReadOptions options; ReadOptions options;
switch (order) { std::string value;
case SEQUENTIAL: { for (int i = 0; i < num_; i++) {
Iterator* iter = db_->NewIterator(options); char key[100];
int i = 0; const int k = rand_.Next() % FLAGS_num;
for (iter->SeekToFirst(); i < num_ && iter->Valid(); iter->Next()) { snprintf(key, sizeof(key), "%016d", k);
bytes_ += iter->key().size() + iter->value().size(); db_->Get(options, key, &value);
FinishedSingleOp(); FinishedSingleOp();
++i;
}
delete iter;
break;
}
case REVERSE: {
Iterator* iter = db_->NewIterator(options);
int i = 0;
for (iter->SeekToLast(); i < num_ && iter->Valid(); iter->Prev()) {
bytes_ += iter->key().size() + iter->value().size();
FinishedSingleOp();
++i;
}
delete iter;
break;
}
case RANDOM: {
std::string value;
for (int i = 0; i < num_; i++) {
char key[100];
const int k = (order == SEQUENTIAL) ? i : (rand_.Next() % FLAGS_num);
snprintf(key, sizeof(key), "%012d", k);
db_->Get(options, key, &value);
FinishedSingleOp();
}
break;
}
} }
} }

@ -532,8 +532,9 @@ void DBImpl::BackgroundCompaction() {
} }
Status status; Status status;
if (c->num_input_files(0) == 1 && c->num_input_files(1) == 0) { if (c->IsTrivialMove()) {
// Move file to next level // Move file to next level
assert(c->num_input_files(0) == 1);
FileMetaData* f = c->input(0, 0); FileMetaData* f = c->input(0, 0);
c->edit()->DeleteFile(c->level(), f->number); c->edit()->DeleteFile(c->level(), f->number);
c->edit()->AddFile(c->level() + 1, f->number, f->file_size, c->edit()->AddFile(c->level() + 1, f->number, f->file_size,
@ -718,8 +719,18 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) {
bool has_current_user_key = false; bool has_current_user_key = false;
SequenceNumber last_sequence_for_key = kMaxSequenceNumber; SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
for (; input->Valid() && !shutting_down_.Acquire_Load(); ) { for (; input->Valid() && !shutting_down_.Acquire_Load(); ) {
// Handle key/value, add to state, etc.
Slice key = input->key(); Slice key = input->key();
InternalKey tmp_internal_key;
tmp_internal_key.DecodeFrom(key);
if (compact->compaction->ShouldStopBefore(tmp_internal_key) &&
compact->builder != NULL) {
status = FinishCompactionOutputFile(compact, input);
if (!status.ok()) {
break;
}
}
// Handle key/value, add to state, etc.
bool drop = false; bool drop = false;
if (!ParseInternalKey(key, &ikey)) { if (!ParseInternalKey(key, &ikey)) {
// Do not hide error keys // Do not hide error keys
@ -855,6 +866,11 @@ Iterator* DBImpl::TEST_NewInternalIterator() {
return NewInternalIterator(ReadOptions(), &ignored); return NewInternalIterator(ReadOptions(), &ignored);
} }
int64 DBImpl::TEST_MaxNextLevelOverlappingBytes() {
MutexLock l(&mutex_);
return versions_->MaxNextLevelOverlappingBytes();
}
Status DBImpl::Get(const ReadOptions& options, Status DBImpl::Get(const ReadOptions& options,
const Slice& key, const Slice& key,
std::string* value) { std::string* value) {

@ -55,6 +55,10 @@ class DBImpl : public DB {
// The returned iterator should be deleted when no longer needed. // The returned iterator should be deleted when no longer needed.
Iterator* TEST_NewInternalIterator(); Iterator* TEST_NewInternalIterator();
// Return the maximum overlapping data (in bytes) at next level for any
// file at a level >= 1.
int64 TEST_MaxNextLevelOverlappingBytes();
private: private:
friend class DB; friend class DB;

@ -72,15 +72,19 @@ class DBTest {
} }
Status Put(const std::string& k, const std::string& v) { Status Put(const std::string& k, const std::string& v) {
WriteOptions options;
options.sync = false;
WriteBatch batch; WriteBatch batch;
batch.Put(k, v); batch.Put(k, v);
return db_->Write(WriteOptions(), &batch); return db_->Write(options, &batch);
} }
Status Delete(const std::string& k) { Status Delete(const std::string& k) {
WriteOptions options;
options.sync = false;
WriteBatch batch; WriteBatch batch;
batch.Delete(k); batch.Delete(k);
return db_->Write(WriteOptions(), &batch); return db_->Write(options, &batch);
} }
std::string Get(const std::string& k, const Snapshot* snapshot = NULL) { std::string Get(const std::string& k, const Snapshot* snapshot = NULL) {
@ -176,6 +180,35 @@ class DBTest {
fprintf(stderr, "Found %d live large value files\n", (int)live.size()); fprintf(stderr, "Found %d live large value files\n", (int)live.size());
return live; return live;
} }
void Compact(const Slice& start, const Slice& limit) {
dbfull()->TEST_CompactMemTable();
int max_level_with_files = 1;
for (int level = 1; level < config::kNumLevels; level++) {
uint64_t v;
char name[100];
snprintf(name, sizeof(name), "leveldb.num-files-at-level%d", level);
if (dbfull()->GetProperty(name, &v) && v > 0) {
max_level_with_files = level;
}
}
for (int level = 0; level < max_level_with_files; level++) {
dbfull()->TEST_CompactRange(level, "", "~");
}
}
void DumpFileCounts(const char* label) {
fprintf(stderr, "---\n%s:\n", label);
fprintf(stderr, "maxoverlap: %lld\n",
static_cast<long long>(
dbfull()->TEST_MaxNextLevelOverlappingBytes()));
for (int level = 0; level < config::kNumLevels; level++) {
int num = NumTableFilesAtLevel(level);
if (num > 0) {
fprintf(stderr, " level %3d : %d files\n", level, num);
}
}
}
}; };
TEST(DBTest, Empty) { TEST(DBTest, Empty) {
@ -315,6 +348,43 @@ TEST(DBTest, CompactionsGenerateMultipleFiles) {
} }
} }
TEST(DBTest, SparseMerge) {
Options options;
options.compression = kNoCompression;
Reopen(&options);
// Suppose there is:
// small amount of data with prefix A
// large amount of data with prefix B
// small amount of data with prefix C
// and that recent updates have made small changes to all three prefixes.
// Check that we do not do a compaction that merges all of B in one shot.
const std::string value(1000, 'x');
Put("A", "va");
// Write approximately 100MB of "B" values
for (int i = 0; i < 100000; i++) {
char key[100];
snprintf(key, sizeof(key), "B%010d", i);
Put(key, value);
}
Put("C", "vc");
Compact("", "z");
// Make sparse update
Put("A", "va2");
Put("B100", "bvalue2");
Put("C", "vc2");
dbfull()->TEST_CompactMemTable();
// Compactions should not cause us to create a situation where
// a file overlaps too much data at the next level.
ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(), 20*1048576);
dbfull()->TEST_CompactRange(0, "", "z");
ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(), 20*1048576);
dbfull()->TEST_CompactRange(1, "", "z");
ASSERT_LE(dbfull()->TEST_MaxNextLevelOverlappingBytes(), 20*1048576);
}
static bool Between(uint64_t val, uint64_t low, uint64_t high) { static bool Between(uint64_t val, uint64_t low, uint64_t high) {
bool result = (val >= low) && (val <= high); bool result = (val >= low) && (val <= high);
if (!result) { if (!result) {

@ -20,6 +20,10 @@
namespace leveldb { namespace leveldb {
// Maximum number of overlaps in grandparent (i.e., level+2) before we
// stop building a single file in a level->level+1 compaction.
static const int kMaxGrandParentFiles = 10;
static double MaxBytesForLevel(int level) { static double MaxBytesForLevel(int level) {
if (level == 0) { if (level == 0) {
return 4 * 1048576.0; return 4 * 1048576.0;
@ -509,7 +513,7 @@ Status VersionSet::Finalize(Version* v) {
double best_score = -1; double best_score = -1;
Status s; Status s;
for (int level = 0; s.ok() && level < config::kNumLevels; level++) { for (int level = 0; s.ok() && level < config::kNumLevels-1; level++) {
s = SortLevel(v, level); s = SortLevel(v, level);
// Compute the ratio of current size to size limit. // Compute the ratio of current size to size limit.
@ -751,6 +755,25 @@ void VersionSet::AddLiveFiles(std::set<uint64_t>* live) {
} }
} }
int64 VersionSet::MaxNextLevelOverlappingBytes() {
int64 result = 0;
std::vector<FileMetaData*> overlaps;
for (int level = 0; level < config::kNumLevels - 1; level++) {
for (int i = 0; i < current_->files_[level].size(); i++) {
const FileMetaData* f = current_->files_[level][i];
GetOverlappingInputs(level+1, f->smallest, f->largest, &overlaps);
int64 sum = 0;
for (int j = 0; j < overlaps.size(); j++) {
sum += overlaps[j]->file_size;
}
if (sum > result) {
result = sum;
}
}
}
return result;
}
// Store in "*inputs" all files in "level" that overlap [begin,end] // Store in "*inputs" all files in "level" that overlap [begin,end]
void VersionSet::GetOverlappingInputs( void VersionSet::GetOverlappingInputs(
int level, int level,
@ -797,6 +820,18 @@ void VersionSet::GetRange(const std::vector<FileMetaData*>& inputs,
} }
} }
// Stores the minimal range that covers all entries in inputs1 and inputs2
// in *smallest, *largest.
// REQUIRES: inputs is not empty
void VersionSet::GetRange2(const std::vector<FileMetaData*>& inputs1,
const std::vector<FileMetaData*>& inputs2,
InternalKey* smallest,
InternalKey* largest) {
std::vector<FileMetaData*> all = inputs1;
all.insert(all.end(), inputs2.begin(), inputs2.end());
GetRange(all, smallest, largest);
}
Iterator* VersionSet::MakeInputIterator(Compaction* c) { Iterator* VersionSet::MakeInputIterator(Compaction* c) {
ReadOptions options; ReadOptions options;
options.verify_checksums = options_->paranoid_checks; options.verify_checksums = options_->paranoid_checks;
@ -836,6 +871,7 @@ Compaction* VersionSet::PickCompaction() {
} }
const int level = current_->compaction_level_; const int level = current_->compaction_level_;
assert(level >= 0); assert(level >= 0);
assert(level+1 < config::kNumLevels);
Compaction* c = new Compaction(level); Compaction* c = new Compaction(level);
c->input_version_ = current_; c->input_version_ = current_;
@ -855,31 +891,36 @@ Compaction* VersionSet::PickCompaction() {
c->inputs_[0].push_back(current_->files_[level][0]); c->inputs_[0].push_back(current_->files_[level][0]);
} }
// Find the range we are compacting
InternalKey smallest, largest;
GetRange(c->inputs_[0], &smallest, &largest);
// Files in level 0 may overlap each other, so pick up all overlapping ones // Files in level 0 may overlap each other, so pick up all overlapping ones
if (level == 0) { if (level == 0) {
InternalKey smallest, largest;
GetRange(c->inputs_[0], &smallest, &largest);
// Note that the next call will discard the file we placed in // Note that the next call will discard the file we placed in
// c->inputs_[0] earlier and replace it with an overlapping set // c->inputs_[0] earlier and replace it with an overlapping set
// which will include the picked file. // which will include the picked file.
GetOverlappingInputs(0, smallest, largest, &c->inputs_[0]); GetOverlappingInputs(0, smallest, largest, &c->inputs_[0]);
assert(!c->inputs_[0].empty()); assert(!c->inputs_[0].empty());
GetRange(c->inputs_[0], &smallest, &largest);
} }
SetupOtherInputs(c);
return c;
}
void VersionSet::SetupOtherInputs(Compaction* c) {
const int level = c->level();
InternalKey smallest, largest;
GetRange(c->inputs_[0], &smallest, &largest);
GetOverlappingInputs(level+1, smallest, largest, &c->inputs_[1]); GetOverlappingInputs(level+1, smallest, largest, &c->inputs_[1]);
// Get entire range covered by compaction
InternalKey all_start, all_limit;
GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
// See if we can grow the number of inputs in "level" without // See if we can grow the number of inputs in "level" without
// changing the number of "level+1" files we pick up. // changing the number of "level+1" files we pick up.
if (!c->inputs_[1].empty()) { if (!c->inputs_[1].empty()) {
// Get entire range covered by compaction
std::vector<FileMetaData*> all = c->inputs_[0];
all.insert(all.end(), c->inputs_[1].begin(), c->inputs_[1].end());
InternalKey all_start, all_limit;
GetRange(all, &all_start, &all_limit);
std::vector<FileMetaData*> expanded0; std::vector<FileMetaData*> expanded0;
GetOverlappingInputs(level, all_start, all_limit, &expanded0); GetOverlappingInputs(level, all_start, all_limit, &expanded0);
if (expanded0.size() > c->inputs_[0].size()) { if (expanded0.size() > c->inputs_[0].size()) {
@ -899,10 +940,17 @@ Compaction* VersionSet::PickCompaction() {
largest = new_limit; largest = new_limit;
c->inputs_[0] = expanded0; c->inputs_[0] = expanded0;
c->inputs_[1] = expanded1; c->inputs_[1] = expanded1;
GetRange2(c->inputs_[0], c->inputs_[1], &all_start, &all_limit);
} }
} }
} }
// Compute the set of grandparent files that overlap this compaction
// (parent == level+1; grandparent == level+2)
if (level + 2 < config::kNumLevels) {
GetOverlappingInputs(level + 2, all_start, all_limit, &c->grandparents_);
}
if (false) { if (false) {
Log(env_, options_->info_log, "Compacting %d '%s' .. '%s'", Log(env_, options_->info_log, "Compacting %d '%s' .. '%s'",
level, level,
@ -916,8 +964,6 @@ Compaction* VersionSet::PickCompaction() {
// key range next time. // key range next time.
compact_pointer_[level] = largest.Encode().ToString(); compact_pointer_[level] = largest.Encode().ToString();
c->edit_.SetCompactPointer(level, largest); c->edit_.SetCompactPointer(level, largest);
return c;
} }
Compaction* VersionSet::CompactRange( Compaction* VersionSet::CompactRange(
@ -934,25 +980,16 @@ Compaction* VersionSet::CompactRange(
c->input_version_ = current_; c->input_version_ = current_;
c->input_version_->Ref(); c->input_version_->Ref();
c->inputs_[0] = inputs; c->inputs_[0] = inputs;
SetupOtherInputs(c);
// Find the range we are compacting
InternalKey smallest, largest;
GetRange(c->inputs_[0], &smallest, &largest);
GetOverlappingInputs(level+1, smallest, largest, &c->inputs_[1]);
if (false) {
Log(env_, options_->info_log, "Compacting %d '%s' .. '%s'",
level,
EscapeString(smallest.Encode()).c_str(),
EscapeString(largest.Encode()).c_str());
}
return c; return c;
} }
Compaction::Compaction(int level) Compaction::Compaction(int level)
: level_(level), : level_(level),
max_output_file_size_(MaxFileSizeForLevel(level)), max_output_file_size_(MaxFileSizeForLevel(level)),
input_version_(NULL) { input_version_(NULL),
grandparent_index_(0),
output_start_(-1) {
for (int i = 0; i < config::kNumLevels; i++) { for (int i = 0; i < config::kNumLevels; i++) {
level_ptrs_[i] = 0; level_ptrs_[i] = 0;
} }
@ -964,6 +1001,15 @@ Compaction::~Compaction() {
} }
} }
bool Compaction::IsTrivialMove() const {
// Avoid a move if there are lots of overlapping grandparent files.
// Otherwise, the move could create a parent file that will require
// a very expensive merge later on.
return (num_input_files(0) == 1
&& num_input_files(1) == 0
&& grandparents_.size() <= kMaxGrandParentFiles);
}
void Compaction::AddInputDeletions(VersionEdit* edit) { void Compaction::AddInputDeletions(VersionEdit* edit) {
for (int which = 0; which < 2; which++) { for (int which = 0; which < 2; which++) {
for (int i = 0; i < inputs_[which].size(); i++) { for (int i = 0; i < inputs_[which].size(); i++) {
@ -993,6 +1039,28 @@ bool Compaction::IsBaseLevelForKey(const Slice& user_key) {
return true; return true;
} }
bool Compaction::ShouldStopBefore(const InternalKey& key) {
// Scan to find earliest grandparent file that contains key.
const InternalKeyComparator* icmp = &input_version_->vset_->icmp_;
while (grandparent_index_ < grandparents_.size() &&
icmp->Compare(key, grandparents_[grandparent_index_]->largest) > 0) {
grandparent_index_++;
}
// First call?
if (output_start_ < 0) {
output_start_ = grandparent_index_;
}
if (grandparent_index_ - output_start_ + 1 > kMaxGrandParentFiles) {
// Too many overlaps for current output; start new output
output_start_ = grandparent_index_;
return true;
} else {
return false;
}
}
void Compaction::ReleaseInputs() { void Compaction::ReleaseInputs() {
if (input_version_ != NULL) { if (input_version_ != NULL) {
input_version_->Unref(); input_version_->Unref();

@ -139,6 +139,10 @@ class VersionSet {
const InternalKey& begin, const InternalKey& begin,
const InternalKey& end); const InternalKey& end);
// Return the maximum overlapping data (in bytes) at next level for any
// file at a level >= 1.
int64 MaxNextLevelOverlappingBytes();
// Create an iterator that reads over the compaction inputs for "*c". // Create an iterator that reads over the compaction inputs for "*c".
// The caller should delete the iterator when no longer needed. // The caller should delete the iterator when no longer needed.
Iterator* MakeInputIterator(Compaction* c); Iterator* MakeInputIterator(Compaction* c);
@ -195,6 +199,13 @@ class VersionSet {
InternalKey* smallest, InternalKey* smallest,
InternalKey* largest); InternalKey* largest);
void GetRange2(const std::vector<FileMetaData*>& inputs1,
const std::vector<FileMetaData*>& inputs2,
InternalKey* smallest,
InternalKey* largest);
void SetupOtherInputs(Compaction* c);
Env* const env_; Env* const env_;
const std::string dbname_; const std::string dbname_;
const Options* const options_; const Options* const options_;
@ -250,6 +261,10 @@ class Compaction {
// Maximum size of files to build during this compaction. // Maximum size of files to build during this compaction.
uint64_t MaxOutputFileSize() const { return max_output_file_size_; } uint64_t MaxOutputFileSize() const { return max_output_file_size_; }
// Is this a trivial compaction that can be implemented by just
// moving a single input file to the next level (no merging or splitting)
bool IsTrivialMove() const;
// Add all inputs to this compaction as delete operations to *edit. // Add all inputs to this compaction as delete operations to *edit.
void AddInputDeletions(VersionEdit* edit); void AddInputDeletions(VersionEdit* edit);
@ -258,6 +273,10 @@ class Compaction {
// in levels greater than "level+1". // in levels greater than "level+1".
bool IsBaseLevelForKey(const Slice& user_key); bool IsBaseLevelForKey(const Slice& user_key);
// Returns true iff we should stop building the current output
// before processing "key".
bool ShouldStopBefore(const InternalKey& key);
// Release the input version for the compaction, once the compaction // Release the input version for the compaction, once the compaction
// is successful. // is successful.
void ReleaseInputs(); void ReleaseInputs();
@ -276,6 +295,12 @@ class Compaction {
// Each compaction reads inputs from "level_" and "level_+1" // Each compaction reads inputs from "level_" and "level_+1"
std::vector<FileMetaData*> inputs_[2]; // The two sets of inputs std::vector<FileMetaData*> inputs_[2]; // The two sets of inputs
// State used to check for number of of overlapping grandparent files
// (parent == level_ + 1, grandparent == level_ + 2)
std::vector<FileMetaData*> grandparents_;
int grandparent_index_; // Index in grandparent_starts_
int output_start_; // Index in grandparent_starts_ where output started
// State for implementing IsBaseLevelForKey // State for implementing IsBaseLevelForKey
// level_ptrs_ holds indices into input_version_->levels_: our state // level_ptrs_ holds indices into input_version_->levels_: our state

@ -123,8 +123,14 @@ one level-0 file in case some of these files overlap each other.
A compaction merges the contents of the picked files to produce a A compaction merges the contents of the picked files to produce a
sequence of level-(L+1) files. We switch to producing a new sequence of level-(L+1) files. We switch to producing a new
level-(L+1) file after the current output file has reached the target level-(L+1) file after the current output file has reached the target
file size (2MB). The old files are discarded and the new files are file size (2MB). We also switch to a new output file when the key
added to the serving state. range of the current output file has grown enough to overlap more then
ten level-(L+2) files. This last rule ensures that a later compaction
of a level-(L+1) file will not pick up too much data from level-(L+2).
<p>
The old files are discarded and the new files are added to the serving
state.
<p> <p>
Compactions for a particular level rotate through the key space. In Compactions for a particular level rotate through the key space. In

@ -12,6 +12,9 @@
namespace leveldb { namespace leveldb {
static const int kMajorVersion = 1;
static const int kMinorVersion = 0;
struct Options; struct Options;
struct ReadOptions; struct ReadOptions;
struct WriteOptions; struct WriteOptions;

@ -4,7 +4,7 @@
{ {
'variables': { 'variables': {
'use_snappy%': 1, 'use_snappy%': 0,
}, },
'target_defaults': { 'target_defaults': {
'defines': [ 'defines': [

Loading…
Cancel
Save