A number of smaller fixes and performance improvements:

- Implemented Get() directly instead of building on top of a full
  merging iterator stack.  This speeds up the "readrandom" benchmark
  by up to 15-30%.

- Fixed an opensource compilation problem.
  Added --db=<name> flag to control where the database is placed.

- Automatically compact a file when we have done enough
  overlapping seeks to that file.

- Fixed a performance bug where we would read from at least one
  file in a level even if none of the files overlapped the key
  being read.

- Makefile fix for Mac OSX installations that have XCode 4 without XCode 3.

- Unified the two occurrences of binary search in a file-list
  into one routine.

- Found and fixed a bug where we would unnecessarily search the
  last file when looking for a key larger than all data in the
  level.

- A fix to avoid the need for trivial move compactions and
  therefore gets rid of two out of five syncs in "fillseq".

- Removed the MANIFEST file write when switching to a new
  memtable/log-file for a 10-20% improvement on fill speed on ext4.

- Adding a SNAPPY setting in the Makefile for folks who have
  Snappy installed. Snappy compresses values and speeds up writes.



git-svn-id: https://leveldb.googlecode.com/svn/trunk@32 62dab493-f737-651d-591e-8d6aee1b9529
main
gabor@google.com 14 years ago
parent 80e5b0d944
commit ccf0fcd5c2
  1. 28
      Makefile
  2. 7
      TODO
  3. 6
      db/builder.cc
  4. 10
      db/builder.h
  5. 31
      db/corruption_test.cc
  6. 19
      db/db_bench.cc
  7. 135
      db/db_impl.cc
  8. 3
      db/db_impl.h
  9. 219
      db/db_test.cc
  10. 19
      db/dbformat.cc
  11. 40
      db/dbformat.h
  12. 37
      db/memtable.cc
  13. 6
      db/memtable.h
  14. 6
      db/repair.cc
  15. 3
      db/version_edit.h
  16. 281
      db/version_set.cc
  17. 49
      db/version_set.h
  18. 22
      port/port_posix.h
  19. 14
      table/table_test.cc

@ -28,9 +28,22 @@ PLATFORM_CFLAGS = -DLEVELDB_PLATFORM_POSIX -std=c++0x
PORT_MODULE = port_posix.o PORT_MODULE = port_posix.o
endif # UNAME endif # UNAME
CFLAGS = -c -I. -I./include $(PLATFORM_CFLAGS) $(OPT) # Set 'SNAPPY' to 1 if you have the Snappy compression library
# installed and want to enable its use in LevelDB
# (see http://code.google.com/p/snappy/)
SNAPPY=0
ifeq ($(SNAPPY), 0)
SNAPPY_CFLAGS=
SNAPPY_LDFLAGS=
else
SNAPPY_CFLAGS=-DSNAPPY
SNAPPY_LDFLAGS=-lsnappy
endif
LDFLAGS=-lpthread CFLAGS = -c -I. -I./include $(PLATFORM_CFLAGS) $(OPT) $(SNAPPY_CFLAGS)
LDFLAGS=-lpthread $(SNAPPY_LDFLAGS)
LIBOBJECTS = \ LIBOBJECTS = \
./db/builder.o \ ./db/builder.o \
@ -85,6 +98,7 @@ TESTS = \
skiplist_test \ skiplist_test \
table_test \ table_test \
version_edit_test \ version_edit_test \
version_set_test \
write_batch_test write_batch_test
PROGRAMS = db_bench $(TESTS) PROGRAMS = db_bench $(TESTS)
@ -151,17 +165,23 @@ skiplist_test: db/skiplist_test.o $(LIBOBJECTS) $(TESTHARNESS)
version_edit_test: db/version_edit_test.o $(LIBOBJECTS) $(TESTHARNESS) version_edit_test: db/version_edit_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CC) $(LDFLAGS) db/version_edit_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(CC) $(LDFLAGS) db/version_edit_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@
version_set_test: db/version_set_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CC) $(LDFLAGS) db/version_set_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@
write_batch_test: db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS) write_batch_test: db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CC) $(LDFLAGS) db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(CC) $(LDFLAGS) db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@
ifeq ($(PLATFORM), IOS) ifeq ($(PLATFORM), IOS)
# For iOS, create universal object files to be used on both the simulator and # For iOS, create universal object files to be used on both the simulator and
# a device. # a device.
SIMULATORROOT=/Developer/Platforms/iPhoneSimulator.platform/Developer
DEVICEROOT=/Developer/Platforms/iPhoneOS.platform/Developer
IOSVERSION=$(shell defaults read /Developer/Platforms/iPhoneOS.platform/version CFBundleShortVersionString)
.cc.o: .cc.o:
mkdir -p ios-x86/$(dir $@) mkdir -p ios-x86/$(dir $@)
$(CC) $(CFLAGS) -isysroot /Developer/Platforms/iPhoneSimulator.platform/Developer/SDKs/iPhoneSimulator4.3.sdk -arch i686 $< -o ios-x86/$@ $(SIMULATORROOT)/usr/bin/$(CC) $(CFLAGS) -isysroot $(SIMULATORROOT)/SDKs/iPhoneSimulator$(IOSVERSION).sdk -arch i686 $< -o ios-x86/$@
mkdir -p ios-arm/$(dir $@) mkdir -p ios-arm/$(dir $@)
$(CC) $(CFLAGS) -isysroot /Developer/Platforms/iPhoneOS.platform/Developer/SDKs/iPhoneOS4.3.sdk -arch armv6 -arch armv7 $< -o ios-arm/$@ $(DEVICEROOT)/usr/bin/$(CC) $(CFLAGS) -isysroot $(DEVICEROOT)/SDKs/iPhoneOS$(IOSVERSION).sdk -arch armv6 -arch armv7 $< -o ios-arm/$@
lipo ios-x86/$@ ios-arm/$@ -create -output $@ lipo ios-x86/$@ ios-arm/$@ -create -output $@
else else
.cc.o: .cc.o:

@ -8,7 +8,6 @@ db
object stores, etc. can be done in the background anyway, so object stores, etc. can be done in the background anyway, so
probably not that important. probably not that important.
api changes: After a range is completely deleted, what gets rid of the
- Make it wrappable corresponding files if we do no future changes to that range. Make
the conditions for triggering compactions fire in more situations?
Faster Get implementation

@ -19,8 +19,7 @@ Status BuildTable(const std::string& dbname,
const Options& options, const Options& options,
TableCache* table_cache, TableCache* table_cache,
Iterator* iter, Iterator* iter,
FileMetaData* meta, FileMetaData* meta) {
VersionEdit* edit) {
Status s; Status s;
meta->file_size = 0; meta->file_size = 0;
iter->SeekToFirst(); iter->SeekToFirst();
@ -79,8 +78,7 @@ Status BuildTable(const std::string& dbname,
} }
if (s.ok() && meta->file_size > 0) { if (s.ok() && meta->file_size > 0) {
edit->AddFile(0, meta->number, meta->file_size, // Keep it
meta->smallest, meta->largest);
} else { } else {
env->DeleteFile(fname); env->DeleteFile(fname);
} }

@ -19,17 +19,15 @@ class VersionEdit;
// Build a Table file from the contents of *iter. The generated file // Build a Table file from the contents of *iter. The generated file
// will be named according to meta->number. On success, the rest of // will be named according to meta->number. On success, the rest of
// *meta will be filled with metadata about the generated table, and // *meta will be filled with metadata about the generated table.
// the file information will be added to *edit. If no data is present // If no data is present in *iter, meta->file_size will be set to
// in *iter, meta->file_size will be set to zero, and no Table file // zero, and no Table file will be produced.
// will be produced.
extern Status BuildTable(const std::string& dbname, extern Status BuildTable(const std::string& dbname,
Env* env, Env* env,
const Options& options, const Options& options,
TableCache* table_cache, TableCache* table_cache,
Iterator* iter, Iterator* iter,
FileMetaData* meta, FileMetaData* meta);
VersionEdit* edit);
} }

@ -27,13 +27,12 @@ static const int kValueSize = 1000;
class CorruptionTest { class CorruptionTest {
public: public:
test::ErrorEnv env_; test::ErrorEnv env_;
Random rnd_;
std::string dbname_; std::string dbname_;
Cache* tiny_cache_; Cache* tiny_cache_;
Options options_; Options options_;
DB* db_; DB* db_;
CorruptionTest() : rnd_(test::RandomSeed()) { CorruptionTest() {
tiny_cache_ = NewLRUCache(100); tiny_cache_ = NewLRUCache(100);
options_.env = &env_; options_.env = &env_;
dbname_ = test::TmpDir() + "/db_test"; dbname_ = test::TmpDir() + "/db_test";
@ -122,15 +121,17 @@ class CorruptionTest {
ASSERT_OK(env_.GetChildren(dbname_, &filenames)); ASSERT_OK(env_.GetChildren(dbname_, &filenames));
uint64_t number; uint64_t number;
FileType type; FileType type;
std::vector<std::string> candidates; std::string fname;
int picked_number = -1;
for (int i = 0; i < filenames.size(); i++) { for (int i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type) && if (ParseFileName(filenames[i], &number, &type) &&
type == filetype) { type == filetype &&
candidates.push_back(dbname_ + "/" + filenames[i]); int(number) > picked_number) { // Pick latest file
fname = dbname_ + "/" + filenames[i];
picked_number = number;
} }
} }
ASSERT_TRUE(!candidates.empty()) << filetype; ASSERT_TRUE(!fname.empty()) << filetype;
std::string fname = candidates[rnd_.Uniform(candidates.size())];
struct stat sbuf; struct stat sbuf;
if (stat(fname.c_str(), &sbuf) != 0) { if (stat(fname.c_str(), &sbuf) != 0) {
@ -239,8 +240,6 @@ TEST(CorruptionTest, TableFileIndexData) {
Build(10000); // Enough to build multiple Tables Build(10000); // Enough to build multiple Tables
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_); DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
dbi->TEST_CompactMemTable(); dbi->TEST_CompactMemTable();
dbi->TEST_CompactRange(0, "", "~");
dbi->TEST_CompactRange(1, "", "~");
Corrupt(kTableFile, -2000, 500); Corrupt(kTableFile, -2000, 500);
Reopen(); Reopen();
@ -296,7 +295,8 @@ TEST(CorruptionTest, CompactionInputError) {
Build(10); Build(10);
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_); DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
dbi->TEST_CompactMemTable(); dbi->TEST_CompactMemTable();
ASSERT_EQ(1, Property("leveldb.num-files-at-level0")); const int last = config::kNumLevels - 1;
ASSERT_EQ(1, Property("leveldb.num-files-at-level" + NumberToString(last)));
Corrupt(kTableFile, 100, 1); Corrupt(kTableFile, 100, 1);
Check(9, 9); Check(9, 9);
@ -304,8 +304,6 @@ TEST(CorruptionTest, CompactionInputError) {
// Force compactions by writing lots of values // Force compactions by writing lots of values
Build(10000); Build(10000);
Check(10000, 10000); Check(10000, 10000);
dbi->TEST_CompactRange(0, "", "~");
ASSERT_EQ(0, Property("leveldb.num-files-at-level0"));
} }
TEST(CorruptionTest, CompactionInputErrorParanoid) { TEST(CorruptionTest, CompactionInputErrorParanoid) {
@ -313,9 +311,16 @@ TEST(CorruptionTest, CompactionInputErrorParanoid) {
options.paranoid_checks = true; options.paranoid_checks = true;
options.write_buffer_size = 1048576; options.write_buffer_size = 1048576;
Reopen(&options); Reopen(&options);
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
// Fill levels >= 1 so memtable compaction outputs to level 1
for (int level = 1; level < config::kNumLevels; level++) {
dbi->Put(WriteOptions(), "", "begin");
dbi->Put(WriteOptions(), "~", "end");
dbi->TEST_CompactMemTable();
}
Build(10); Build(10);
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
dbi->TEST_CompactMemTable(); dbi->TEST_CompactMemTable();
ASSERT_EQ(1, Property("leveldb.num-files-at-level0")); ASSERT_EQ(1, Property("leveldb.num-files-at-level0"));

@ -86,6 +86,9 @@ static int FLAGS_open_files = 0;
// benchmark will fail. // benchmark will fail.
static bool FLAGS_use_existing_db = false; static bool FLAGS_use_existing_db = false;
// Use the db with the following name.
static const char* FLAGS_db = "/tmp/dbbench";
namespace leveldb { namespace leveldb {
// Helper for quickly generating random data. // Helper for quickly generating random data.
@ -318,14 +321,14 @@ class Benchmark {
bytes_(0), bytes_(0),
rand_(301) { rand_(301) {
std::vector<std::string> files; std::vector<std::string> files;
Env::Default()->GetChildren("/tmp/dbbench", &files); Env::Default()->GetChildren(FLAGS_db, &files);
for (int i = 0; i < files.size(); i++) { for (int i = 0; i < files.size(); i++) {
if (Slice(files[i]).starts_with("heap-")) { if (Slice(files[i]).starts_with("heap-")) {
Env::Default()->DeleteFile("/tmp/dbbench/" + files[i]); Env::Default()->DeleteFile(std::string(FLAGS_db) + "/" + files[i]);
} }
} }
if (!FLAGS_use_existing_db) { if (!FLAGS_use_existing_db) {
DestroyDB("/tmp/dbbench", Options()); DestroyDB(FLAGS_db, Options());
} }
} }
@ -364,7 +367,7 @@ class Benchmark {
Write(write_options, RANDOM, EXISTING, num_, FLAGS_value_size, 1); Write(write_options, RANDOM, EXISTING, num_, FLAGS_value_size, 1);
} else if (name == Slice("fillsync")) { } else if (name == Slice("fillsync")) {
write_options.sync = true; write_options.sync = true;
Write(write_options, RANDOM, FRESH, num_ / 100, FLAGS_value_size, 1); Write(write_options, RANDOM, FRESH, num_ / 1000, FLAGS_value_size, 1);
} else if (name == Slice("fill100K")) { } else if (name == Slice("fill100K")) {
Write(write_options, RANDOM, FRESH, num_ / 1000, 100 * 1000, 1); Write(write_options, RANDOM, FRESH, num_ / 1000, 100 * 1000, 1);
} else if (name == Slice("readseq")) { } else if (name == Slice("readseq")) {
@ -490,7 +493,7 @@ class Benchmark {
options.create_if_missing = !FLAGS_use_existing_db; options.create_if_missing = !FLAGS_use_existing_db;
options.block_cache = cache_; options.block_cache = cache_;
options.write_buffer_size = FLAGS_write_buffer_size; options.write_buffer_size = FLAGS_write_buffer_size;
Status s = DB::Open(options, "/tmp/dbbench", &db_); Status s = DB::Open(options, FLAGS_db, &db_);
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "open error: %s\n", s.ToString().c_str()); fprintf(stderr, "open error: %s\n", s.ToString().c_str());
exit(1); exit(1);
@ -506,7 +509,7 @@ class Benchmark {
} }
delete db_; delete db_;
db_ = NULL; db_ = NULL;
DestroyDB("/tmp/dbbench", Options()); DestroyDB(FLAGS_db, Options());
Open(); Open();
Start(); // Do not count time taken to destroy/open Start(); // Do not count time taken to destroy/open
} }
@ -617,7 +620,7 @@ class Benchmark {
void HeapProfile() { void HeapProfile() {
char fname[100]; char fname[100];
snprintf(fname, sizeof(fname), "/tmp/dbbench/heap-%04d", ++heap_counter_); snprintf(fname, sizeof(fname), "%s/heap-%04d", FLAGS_db, ++heap_counter_);
WritableFile* file; WritableFile* file;
Status s = Env::Default()->NewWritableFile(fname, &file); Status s = Env::Default()->NewWritableFile(fname, &file);
if (!s.ok()) { if (!s.ok()) {
@ -665,6 +668,8 @@ int main(int argc, char** argv) {
FLAGS_cache_size = n; FLAGS_cache_size = n;
} else if (sscanf(argv[i], "--open_files=%d%c", &n, &junk) == 1) { } else if (sscanf(argv[i], "--open_files=%d%c", &n, &junk) == 1) {
FLAGS_open_files = n; FLAGS_open_files = n;
} else if (strncmp(argv[i], "--db=", 5) == 0) {
FLAGS_db = argv[i] + 5;
} else { } else {
fprintf(stderr, "Invalid flag '%s'\n", argv[i]); fprintf(stderr, "Invalid flag '%s'\n", argv[i]);
exit(1); exit(1);

@ -122,6 +122,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
mem_(new MemTable(internal_comparator_)), mem_(new MemTable(internal_comparator_)),
imm_(NULL), imm_(NULL),
logfile_(NULL), logfile_(NULL),
logfile_number_(0),
log_(NULL), log_(NULL),
bg_compaction_scheduled_(false), bg_compaction_scheduled_(false),
manual_compaction_(NULL) { manual_compaction_(NULL) {
@ -219,7 +220,7 @@ void DBImpl::DeleteObsoleteFiles() {
bool keep = true; bool keep = true;
switch (type) { switch (type) {
case kLogFile: case kLogFile:
keep = ((number == versions_->LogNumber()) || keep = ((number >= versions_->LogNumber()) ||
(number == versions_->PrevLogNumber())); (number == versions_->PrevLogNumber()));
break; break;
case kDescriptorFile: case kDescriptorFile:
@ -287,14 +288,39 @@ Status DBImpl::Recover(VersionEdit* edit) {
s = versions_->Recover(); s = versions_->Recover();
if (s.ok()) { if (s.ok()) {
// Recover from the log files named in the descriptor
SequenceNumber max_sequence(0); SequenceNumber max_sequence(0);
if (versions_->PrevLogNumber() != 0) { // log#==0 means no prev log
s = RecoverLogFile(versions_->PrevLogNumber(), edit, &max_sequence); // Recover from all newer log files than the ones named in the
// descriptor (new log files may have been added by the previous
// incarnation without registering them in the descriptor).
//
// Note that PrevLogNumber() is no longer used, but we pay
// attention to it in case we are recovering a database
// produced by an older version of leveldb.
const uint64_t min_log = versions_->LogNumber();
const uint64_t prev_log = versions_->PrevLogNumber();
std::vector<std::string> filenames;
s = env_->GetChildren(dbname_, &filenames);
if (!s.ok()) {
return s;
}
uint64_t number;
FileType type;
std::vector<uint64_t> logs;
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type)
&& type == kLogFile
&& ((number >= min_log) || (number == prev_log))) {
logs.push_back(number);
}
} }
if (s.ok() && versions_->LogNumber() != 0) { // log#==0 for initial state
s = RecoverLogFile(versions_->LogNumber(), edit, &max_sequence); // Recover in the order in which the logs were generated
std::sort(logs.begin(), logs.end());
for (size_t i = 0; i < logs.size(); i++) {
s = RecoverLogFile(logs[i], edit, &max_sequence);
} }
if (s.ok()) { if (s.ok()) {
if (versions_->LastSequence() < max_sequence) { if (versions_->LastSequence() < max_sequence) {
versions_->SetLastSequence(max_sequence); versions_->SetLastSequence(max_sequence);
@ -378,7 +404,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
} }
if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) { if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
status = WriteLevel0Table(mem, edit); status = WriteLevel0Table(mem, edit, NULL);
if (!status.ok()) { if (!status.ok()) {
// Reflect errors immediately so that conditions like full // Reflect errors immediately so that conditions like full
// file-systems cause the DB::Open() to fail. // file-systems cause the DB::Open() to fail.
@ -390,7 +416,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
} }
if (status.ok() && mem != NULL) { if (status.ok() && mem != NULL) {
status = WriteLevel0Table(mem, edit); status = WriteLevel0Table(mem, edit, NULL);
// Reflect errors immediately so that conditions like full // Reflect errors immediately so that conditions like full
// file-systems cause the DB::Open() to fail. // file-systems cause the DB::Open() to fail.
} }
@ -400,7 +426,8 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
return status; return status;
} }
Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit) { Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
Version* base) {
mutex_.AssertHeld(); mutex_.AssertHeld();
const uint64_t start_micros = env_->NowMicros(); const uint64_t start_micros = env_->NowMicros();
FileMetaData meta; FileMetaData meta;
@ -413,7 +440,7 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit) {
Status s; Status s;
{ {
mutex_.Unlock(); mutex_.Unlock();
s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta, edit); s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
mutex_.Lock(); mutex_.Lock();
} }
@ -424,10 +451,26 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit) {
delete iter; delete iter;
pending_outputs_.erase(meta.number); pending_outputs_.erase(meta.number);
// Note that if file_size is zero, the file has been deleted and
// should not be added to the manifest.
int level = 0;
if (s.ok() && meta.file_size > 0) {
if (base != NULL && !base->OverlapInLevel(0, meta.smallest, meta.largest)) {
// Push to largest level we can without causing overlaps
while (level + 1 < config::kNumLevels &&
!base->OverlapInLevel(level + 1, meta.smallest, meta.largest)) {
level++;
}
}
edit->AddFile(level, meta.number, meta.file_size,
meta.smallest, meta.largest);
}
CompactionStats stats; CompactionStats stats;
stats.micros = env_->NowMicros() - start_micros; stats.micros = env_->NowMicros() - start_micros;
stats.bytes_written = meta.file_size; stats.bytes_written = meta.file_size;
stats_[0].Add(stats); stats_[level].Add(stats);
return s; return s;
} }
@ -437,11 +480,19 @@ Status DBImpl::CompactMemTable() {
// Save the contents of the memtable as a new Table // Save the contents of the memtable as a new Table
VersionEdit edit; VersionEdit edit;
Status s = WriteLevel0Table(imm_, &edit); Version* base = versions_->current();
base->Ref();
Status s = WriteLevel0Table(imm_, &edit, base);
base->Unref();
if (s.ok() && shutting_down_.Acquire_Load()) {
s = Status::IOError("Deleting DB during memtable compaction");
}
// Replace immutable memtable with the generated Table // Replace immutable memtable with the generated Table
if (s.ok()) { if (s.ok()) {
edit.SetPrevLogNumber(0); edit.SetPrevLogNumber(0);
edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed
s = versions_->LogAndApply(&edit); s = versions_->LogAndApply(&edit);
} }
@ -460,6 +511,9 @@ void DBImpl::TEST_CompactRange(
int level, int level,
const std::string& begin, const std::string& begin,
const std::string& end) { const std::string& end) {
assert(level >= 0);
assert(level + 1 < config::kNumLevels);
MutexLock l(&mutex_); MutexLock l(&mutex_);
while (manual_compaction_ != NULL) { while (manual_compaction_ != NULL) {
bg_cv_.Wait(); bg_cv_.Wait();
@ -934,22 +988,38 @@ int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
Status DBImpl::Get(const ReadOptions& options, Status DBImpl::Get(const ReadOptions& options,
const Slice& key, const Slice& key,
std::string* value) { std::string* value) {
// TODO(opt): faster implementation Status s;
Iterator* iter = NewIterator(options); MutexLock l(&mutex_);
iter->Seek(key); SequenceNumber snapshot;
bool found = false; if (options.snapshot != NULL) {
if (iter->Valid() && user_comparator()->Compare(key, iter->key()) == 0) { snapshot = reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_;
Slice v = iter->value(); } else {
value->assign(v.data(), v.size()); snapshot = versions_->LastSequence();
found = true;
}
// Non-OK iterator status trumps everything else
Status result = iter->status();
if (result.ok() && !found) {
result = Status::NotFound(Slice()); // Use an empty error message for speed
} }
delete iter;
return result; // First look in the memtable, then in the immutable memtable (if any).
LookupKey lkey(key, snapshot);
if (mem_->Get(lkey, value, &s)) {
return s;
}
if (imm_ != NULL && imm_->Get(lkey, value, &s)) {
return s;
}
// Not in memtable(s); try live files in level order
Version* current = versions_->current();
current->Ref();
Version::GetStats stats;
{ // Unlock while reading from files
mutex_.Unlock();
s = current->Get(options, lkey, value, &stats);
mutex_.Lock();
}
if (current->UpdateStats(stats)) {
MaybeScheduleCompaction();
}
current->Unref();
return s;
} }
Iterator* DBImpl::NewIterator(const ReadOptions& options) { Iterator* DBImpl::NewIterator(const ReadOptions& options) {
@ -1050,18 +1120,10 @@ Status DBImpl::MakeRoomForWrite(bool force) {
if (!s.ok()) { if (!s.ok()) {
break; break;
} }
VersionEdit edit;
edit.SetPrevLogNumber(versions_->LogNumber());
edit.SetLogNumber(new_log_number);
s = versions_->LogAndApply(&edit);
if (!s.ok()) {
delete lfile;
env_->DeleteFile(LogFileName(dbname_, new_log_number));
break;
}
delete log_; delete log_;
delete logfile_; delete logfile_;
logfile_ = lfile; logfile_ = lfile;
logfile_number_ = new_log_number;
log_ = new log::Writer(lfile); log_ = new log::Writer(lfile);
imm_ = mem_; imm_ = mem_;
has_imm_.Release_Store(imm_); has_imm_.Release_Store(imm_);
@ -1183,6 +1245,7 @@ Status DB::Open(const Options& options, const std::string& dbname,
if (s.ok()) { if (s.ok()) {
edit.SetLogNumber(new_log_number); edit.SetLogNumber(new_log_number);
impl->logfile_ = lfile; impl->logfile_ = lfile;
impl->logfile_number_ = new_log_number;
impl->log_ = new log::Writer(lfile); impl->log_ = new log::Writer(lfile);
s = impl->versions_->LogAndApply(&edit); s = impl->versions_->LogAndApply(&edit);
} }

@ -85,7 +85,7 @@ class DBImpl : public DB {
VersionEdit* edit, VersionEdit* edit,
SequenceNumber* max_sequence); SequenceNumber* max_sequence);
Status WriteLevel0Table(MemTable* mem, VersionEdit* edit); Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base);
Status MakeRoomForWrite(bool force /* compact even if there is room? */); Status MakeRoomForWrite(bool force /* compact even if there is room? */);
@ -124,6 +124,7 @@ class DBImpl : public DB {
MemTable* imm_; // Memtable being compacted MemTable* imm_; // Memtable being compacted
port::AtomicPointer has_imm_; // So bg thread can detect non-NULL imm_ port::AtomicPointer has_imm_; // So bg thread can detect non-NULL imm_
WritableFile* logfile_; WritableFile* logfile_;
uint64_t logfile_number_;
log::Writer* log_; log::Writer* log_;
SnapshotList snapshots_; SnapshotList snapshots_;

@ -21,15 +21,57 @@ static std::string RandomString(Random* rnd, int len) {
return r; return r;
} }
// Special Env used to delay background operations
class SpecialEnv : public EnvWrapper {
public:
// sstable Sync() calls are blocked while this pointer is non-NULL.
port::AtomicPointer delay_sstable_sync_;
explicit SpecialEnv(Env* base) : EnvWrapper(base) {
delay_sstable_sync_.Release_Store(NULL);
}
Status NewWritableFile(const std::string& f, WritableFile** r) {
class SSTableFile : public WritableFile {
private:
SpecialEnv* env_;
WritableFile* base_;
public:
SSTableFile(SpecialEnv* env, WritableFile* base)
: env_(env),
base_(base) {
}
Status Append(const Slice& data) { return base_->Append(data); }
Status Close() { return base_->Close(); }
Status Flush() { return base_->Flush(); }
Status Sync() {
while (env_->delay_sstable_sync_.Acquire_Load() != NULL) {
env_->SleepForMicroseconds(100000);
}
return base_->Sync();
}
};
Status s = target()->NewWritableFile(f, r);
if (s.ok()) {
if (strstr(f.c_str(), ".sst") != NULL) {
*r = new SSTableFile(this, *r);
}
}
return s;
}
};
class DBTest { class DBTest {
public: public:
std::string dbname_; std::string dbname_;
Env* env_; SpecialEnv* env_;
DB* db_; DB* db_;
Options last_options_; Options last_options_;
DBTest() : env_(Env::Default()) { DBTest() : env_(new SpecialEnv(Env::Default())) {
dbname_ = test::TmpDir() + "/db_test"; dbname_ = test::TmpDir() + "/db_test";
DestroyDB(dbname_, Options()); DestroyDB(dbname_, Options());
db_ = NULL; db_ = NULL;
@ -39,6 +81,7 @@ class DBTest {
~DBTest() { ~DBTest() {
delete db_; delete db_;
DestroyDB(dbname_, Options()); DestroyDB(dbname_, Options());
delete env_;
} }
DBImpl* dbfull() { DBImpl* dbfull() {
@ -142,6 +185,14 @@ class DBTest {
return atoi(property.c_str()); return atoi(property.c_str());
} }
int TotalTableFiles() {
int result = 0;
for (int level = 0; level < config::kNumLevels; level++) {
result += NumTableFilesAtLevel(level);
}
return result;
}
uint64_t Size(const Slice& start, const Slice& limit) { uint64_t Size(const Slice& start, const Slice& limit) {
Range r(start, limit); Range r(start, limit);
uint64_t size; uint64_t size;
@ -162,6 +213,16 @@ class DBTest {
} }
} }
// Prevent pushing of new sstables into deeper levels by adding
// tables that cover a specified range to all levels.
void FillLevels(const std::string& smallest, const std::string& largest) {
for (int level = 0; level < config::kNumLevels; level++) {
Put(smallest, "begin");
Put(largest, "end");
dbfull()->TEST_CompactMemTable();
}
}
void DumpFileCounts(const char* label) { void DumpFileCounts(const char* label) {
fprintf(stderr, "---\n%s:\n", label); fprintf(stderr, "---\n%s:\n", label);
fprintf(stderr, "maxoverlap: %lld\n", fprintf(stderr, "maxoverlap: %lld\n",
@ -209,6 +270,80 @@ TEST(DBTest, PutDeleteGet) {
ASSERT_EQ("NOT_FOUND", Get("foo")); ASSERT_EQ("NOT_FOUND", Get("foo"));
} }
TEST(DBTest, GetFromImmutableLayer) {
Options options;
options.env = env_;
options.write_buffer_size = 100000; // Small write buffer
Reopen(&options);
ASSERT_OK(Put("foo", "v1"));
ASSERT_EQ("v1", Get("foo"));
env_->delay_sstable_sync_.Release_Store(env_); // Block sync calls
Put("k1", std::string(100000, 'x')); // Fill memtable
Put("k2", std::string(100000, 'y')); // Trigger compaction
ASSERT_EQ("v1", Get("foo"));
env_->delay_sstable_sync_.Release_Store(NULL); // Release sync calls
}
TEST(DBTest, GetFromVersions) {
ASSERT_OK(Put("foo", "v1"));
dbfull()->TEST_CompactMemTable();
ASSERT_EQ("v1", Get("foo"));
}
TEST(DBTest, GetSnapshot) {
// Try with both a short key and a long key
for (int i = 0; i < 2; i++) {
std::string key = (i == 0) ? std::string("foo") : std::string(200, 'x');
ASSERT_OK(Put(key, "v1"));
const Snapshot* s1 = db_->GetSnapshot();
ASSERT_OK(Put(key, "v2"));
ASSERT_EQ("v2", Get(key));
ASSERT_EQ("v1", Get(key, s1));
dbfull()->TEST_CompactMemTable();
ASSERT_EQ("v2", Get(key));
ASSERT_EQ("v1", Get(key, s1));
db_->ReleaseSnapshot(s1);
}
}
TEST(DBTest, GetLevel0Ordering) {
// Check that we process level-0 files in correct order. The code
// below generates two level-0 files where the earlier one comes
// before the later one in the level-0 file list since the earlier
// one has a smaller "smallest" key.
ASSERT_OK(Put("bar", "b"));
ASSERT_OK(Put("foo", "v1"));
dbfull()->TEST_CompactMemTable();
ASSERT_OK(Put("foo", "v2"));
dbfull()->TEST_CompactMemTable();
ASSERT_EQ("v2", Get("foo"));
}
TEST(DBTest, GetOrderedByLevels) {
ASSERT_OK(Put("foo", "v1"));
Compact("a", "z");
ASSERT_EQ("v1", Get("foo"));
ASSERT_OK(Put("foo", "v2"));
ASSERT_EQ("v2", Get("foo"));
dbfull()->TEST_CompactMemTable();
ASSERT_EQ("v2", Get("foo"));
}
TEST(DBTest, GetPicksCorrectFile) {
// Arrange to have multiple files in a non-level-0 level.
ASSERT_OK(Put("a", "va"));
Compact("a", "b");
ASSERT_OK(Put("x", "vx"));
Compact("x", "y");
ASSERT_OK(Put("f", "vf"));
Compact("f", "g");
ASSERT_EQ("va", Get("a"));
ASSERT_EQ("vf", Get("f"));
ASSERT_EQ("vx", Get("x"));
}
TEST(DBTest, IterEmpty) { TEST(DBTest, IterEmpty) {
Iterator* iter = db_->NewIterator(ReadOptions()); Iterator* iter = db_->NewIterator(ReadOptions());
@ -413,6 +548,27 @@ TEST(DBTest, RecoveryWithEmptyLog) {
ASSERT_EQ("v3", Get("foo")); ASSERT_EQ("v3", Get("foo"));
} }
// Check that writes done during a memtable compaction are recovered
// if the database is shutdown during the memtable compaction.
TEST(DBTest, RecoverDuringMemtableCompaction) {
Options options;
options.env = env_;
options.write_buffer_size = 1000000;
Reopen(&options);
// Trigger a long memtable compaction and reopen the database during it
ASSERT_OK(Put("foo", "v1")); // Goes to 1st log file
ASSERT_OK(Put("big1", std::string(10000000, 'x'))); // Fills memtable
ASSERT_OK(Put("big2", std::string(1000, 'y'))); // Triggers compaction
ASSERT_OK(Put("bar", "v2")); // Goes to new log file
Reopen(&options);
ASSERT_EQ("v1", Get("foo"));
ASSERT_EQ("v2", Get("bar"));
ASSERT_EQ(std::string(10000000, 'x'), Get("big1"));
ASSERT_EQ(std::string(1000, 'y'), Get("big2"));
}
static std::string Key(int i) { static std::string Key(int i) {
char buf[100]; char buf[100];
snprintf(buf, sizeof(buf), "key%06d", i); snprintf(buf, sizeof(buf), "key%06d", i);
@ -426,11 +582,11 @@ TEST(DBTest, MinorCompactionsHappen) {
const int N = 500; const int N = 500;
int starting_num_tables = NumTableFilesAtLevel(0); int starting_num_tables = TotalTableFiles();
for (int i = 0; i < N; i++) { for (int i = 0; i < N; i++) {
ASSERT_OK(Put(Key(i), Key(i) + std::string(1000, 'v'))); ASSERT_OK(Put(Key(i), Key(i) + std::string(1000, 'v')));
} }
int ending_num_tables = NumTableFilesAtLevel(0); int ending_num_tables = TotalTableFiles();
ASSERT_GT(ending_num_tables, starting_num_tables); ASSERT_GT(ending_num_tables, starting_num_tables);
for (int i = 0; i < N; i++) { for (int i = 0; i < N; i++) {
@ -499,6 +655,8 @@ TEST(DBTest, SparseMerge) {
options.compression = kNoCompression; options.compression = kNoCompression;
Reopen(&options); Reopen(&options);
FillLevels("A", "Z");
// Suppose there is: // Suppose there is:
// small amount of data with prefix A // small amount of data with prefix A
// large amount of data with prefix B // large amount of data with prefix B
@ -514,7 +672,8 @@ TEST(DBTest, SparseMerge) {
Put(key, value); Put(key, value);
} }
Put("C", "vc"); Put("C", "vc");
Compact("", "z"); dbfull()->TEST_CompactMemTable();
dbfull()->TEST_CompactRange(0, "A", "Z");
// Make sparse update // Make sparse update
Put("A", "va2"); Put("A", "va2");
@ -675,6 +834,8 @@ TEST(DBTest, Snapshot) {
TEST(DBTest, HiddenValuesAreRemoved) { TEST(DBTest, HiddenValuesAreRemoved) {
Random rnd(301); Random rnd(301);
FillLevels("a", "z");
std::string big = RandomString(&rnd, 50000); std::string big = RandomString(&rnd, 50000);
Put("foo", big); Put("foo", big);
Put("pastfoo", "v"); Put("pastfoo", "v");
@ -702,40 +863,54 @@ TEST(DBTest, HiddenValuesAreRemoved) {
TEST(DBTest, DeletionMarkers1) { TEST(DBTest, DeletionMarkers1) {
Put("foo", "v1"); Put("foo", "v1");
ASSERT_OK(dbfull()->TEST_CompactMemTable()); ASSERT_OK(dbfull()->TEST_CompactMemTable());
dbfull()->TEST_CompactRange(0, "", "z"); const int last = config::kNumLevels - 1;
dbfull()->TEST_CompactRange(1, "", "z"); ASSERT_EQ(NumTableFilesAtLevel(last), 1); // foo => v1 is now in last level
ASSERT_EQ(NumTableFilesAtLevel(2), 1); // foo => v1 is now in level 2 file
// Place a table at level last-1 to prevent merging with preceding mutation
Put("a", "begin");
Put("z", "end");
dbfull()->TEST_CompactMemTable();
ASSERT_EQ(NumTableFilesAtLevel(last), 1);
ASSERT_EQ(NumTableFilesAtLevel(last-1), 1);
Delete("foo"); Delete("foo");
Put("foo", "v2"); Put("foo", "v2");
ASSERT_EQ(AllEntriesFor("foo"), "[ v2, DEL, v1 ]"); ASSERT_EQ(AllEntriesFor("foo"), "[ v2, DEL, v1 ]");
ASSERT_OK(dbfull()->TEST_CompactMemTable()); ASSERT_OK(dbfull()->TEST_CompactMemTable()); // Moves to level last-2
ASSERT_EQ(AllEntriesFor("foo"), "[ v2, DEL, v1 ]"); ASSERT_EQ(AllEntriesFor("foo"), "[ v2, DEL, v1 ]");
dbfull()->TEST_CompactRange(0, "", "z"); dbfull()->TEST_CompactRange(last-2, "", "z");
// DEL eliminated, but v1 remains because we aren't compacting that level // DEL eliminated, but v1 remains because we aren't compacting that level
// (DEL can be eliminated because v2 hides v1). // (DEL can be eliminated because v2 hides v1).
ASSERT_EQ(AllEntriesFor("foo"), "[ v2, v1 ]"); ASSERT_EQ(AllEntriesFor("foo"), "[ v2, v1 ]");
dbfull()->TEST_CompactRange(1, "", "z"); dbfull()->TEST_CompactRange(last-1, "", "z");
// Merging L1 w/ L2, so we are the base level for "foo", so DEL is removed. // Merging last-1 w/ last, so we are the base level for "foo", so
// (as is v1). // DEL is removed. (as is v1).
ASSERT_EQ(AllEntriesFor("foo"), "[ v2 ]"); ASSERT_EQ(AllEntriesFor("foo"), "[ v2 ]");
} }
TEST(DBTest, DeletionMarkers2) { TEST(DBTest, DeletionMarkers2) {
Put("foo", "v1"); Put("foo", "v1");
ASSERT_OK(dbfull()->TEST_CompactMemTable()); ASSERT_OK(dbfull()->TEST_CompactMemTable());
dbfull()->TEST_CompactRange(0, "", "z"); const int last = config::kNumLevels - 1;
dbfull()->TEST_CompactRange(1, "", "z"); ASSERT_EQ(NumTableFilesAtLevel(last), 1); // foo => v1 is now in last level
ASSERT_EQ(NumTableFilesAtLevel(2), 1); // foo => v1 is now in level 2 file
// Place a table at level last-1 to prevent merging with preceding mutation
Put("a", "begin");
Put("z", "end");
dbfull()->TEST_CompactMemTable();
ASSERT_EQ(NumTableFilesAtLevel(last), 1);
ASSERT_EQ(NumTableFilesAtLevel(last-1), 1);
Delete("foo"); Delete("foo");
ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v1 ]"); ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v1 ]");
ASSERT_OK(dbfull()->TEST_CompactMemTable()); ASSERT_OK(dbfull()->TEST_CompactMemTable()); // Moves to level last-2
ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v1 ]"); ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v1 ]");
dbfull()->TEST_CompactRange(0, "", "z"); dbfull()->TEST_CompactRange(last-2, "", "z");
// DEL kept: L2 file overlaps // DEL kept: "last" file overlaps
ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v1 ]"); ASSERT_EQ(AllEntriesFor("foo"), "[ DEL, v1 ]");
dbfull()->TEST_CompactRange(1, "", "z"); dbfull()->TEST_CompactRange(last-1, "", "z");
// Merging L1 w/ L2, so we are the base level for "foo", so DEL is removed. // Merging last-1 w/ last, so we are the base level for "foo", so
// (as is v1). // DEL is removed. (as is v1).
ASSERT_EQ(AllEntriesFor("foo"), "[ ]"); ASSERT_EQ(AllEntriesFor("foo"), "[ ]");
} }

@ -84,4 +84,23 @@ void InternalKeyComparator::FindShortSuccessor(std::string* key) const {
} }
} }
LookupKey::LookupKey(const Slice& user_key, SequenceNumber s) {
size_t usize = user_key.size();
size_t needed = usize + 13; // A conservative estimate
char* dst;
if (needed <= sizeof(space_)) {
dst = space_;
} else {
dst = new char[needed];
}
start_ = dst;
dst = EncodeVarint32(dst, usize + 8);
kstart_ = dst;
memcpy(dst, user_key.data(), usize);
dst += usize;
EncodeFixed64(dst, PackSequenceAndType(s, kValueTypeForSeek));
dst += 8;
end_ = dst;
}
} }

@ -160,6 +160,46 @@ inline bool ParseInternalKey(const Slice& internal_key,
return (c <= static_cast<unsigned char>(kTypeValue)); return (c <= static_cast<unsigned char>(kTypeValue));
} }
// A helper class useful for DBImpl::Get()
class LookupKey {
public:
// Initialize *this for looking up user_key at a snapshot with
// the specified sequence number.
LookupKey(const Slice& user_key, SequenceNumber sequence);
~LookupKey();
// Return a key suitable for lookup in a MemTable.
Slice memtable_key() const { return Slice(start_, end_ - start_); }
// Return an internal key (suitable for passing to an internal iterator)
Slice internal_key() const { return Slice(kstart_, end_ - kstart_); }
// Return the user key
Slice user_key() const { return Slice(kstart_, end_ - kstart_ - 8); }
private:
// We construct a char array of the form:
// klength varint32 <-- start_
// userkey char[klength] <-- kstart_
// tag uint64
// <-- end_
// The array is a suitable MemTable key.
// The suffix starting with "userkey" can be used as an InternalKey.
const char* start_;
const char* kstart_;
const char* end_;
char space_[200]; // Avoid allocation for short keys
// No copying allowed
LookupKey(const LookupKey&);
void operator=(const LookupKey&);
};
inline LookupKey::~LookupKey() {
if (start_ != space_) delete[] start_;
}
} }
#endif // STORAGE_LEVELDB_DB_FORMAT_H_ #endif // STORAGE_LEVELDB_DB_FORMAT_H_

@ -105,4 +105,41 @@ void MemTable::Add(SequenceNumber s, ValueType type,
table_.Insert(buf); table_.Insert(buf);
} }
bool MemTable::Get(const LookupKey& key, std::string* value, Status* s) {
Slice memkey = key.memtable_key();
Table::Iterator iter(&table_);
iter.Seek(memkey.data());
if (iter.Valid()) {
// entry format is:
// klength varint32
// userkey char[klength]
// tag uint64
// vlength varint32
// value char[vlength]
// Check that it belongs to same user key. We do not check the
// sequence number since the Seek() call above should have skipped
// all entries with overly large sequence numbers.
const char* entry = iter.key();
uint32_t key_length;
const char* key_ptr = GetVarint32Ptr(entry, entry+5, &key_length);
if (comparator_.comparator.user_comparator()->Compare(
Slice(key_ptr, key_length - 8),
key.user_key()) == 0) {
// Correct user key
const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
switch (static_cast<ValueType>(tag & 0xff)) {
case kTypeValue: {
Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
value->assign(v.data(), v.size());
return true;
}
case kTypeDeletion:
*s = Status::NotFound(Slice());
return true;
}
}
}
return false;
}
} }

@ -57,6 +57,12 @@ class MemTable {
const Slice& key, const Slice& key,
const Slice& value); const Slice& value);
// If memtable contains a value for key, store it in *value and return true.
// If memtable contains a deletion for key, store a NotFound() error
// in *status and return true.
// Else, return false.
bool Get(const LookupKey& key, std::string* value, Status* s);
private: private:
~MemTable(); // Private since only Unref() should be used to delete it ~MemTable(); // Private since only Unref() should be used to delete it

@ -212,14 +212,12 @@ class Repairer {
} }
delete lfile; delete lfile;
// We ignore any version edits generated by the conversion to a Table // Do not record a version edit for this conversion to a Table
// since ExtractMetaData() will also generate edits. // since ExtractMetaData() will also generate edits.
VersionEdit skipped;
FileMetaData meta; FileMetaData meta;
meta.number = next_file_number_++; meta.number = next_file_number_++;
Iterator* iter = mem->NewIterator(); Iterator* iter = mem->NewIterator();
status = BuildTable(dbname_, env_, options_, table_cache_, iter, status = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta);
&meta, &skipped);
delete iter; delete iter;
mem->Unref(); mem->Unref();
mem = NULL; mem = NULL;

@ -16,12 +16,13 @@ class VersionSet;
struct FileMetaData { struct FileMetaData {
int refs; int refs;
int allowed_seeks; // Seeks allowed until compaction
uint64_t number; uint64_t number;
uint64_t file_size; // File size in bytes uint64_t file_size; // File size in bytes
InternalKey smallest; // Smallest internal key served by table InternalKey smallest; // Smallest internal key served by table
InternalKey largest; // Largest internal key served by table InternalKey largest; // Largest internal key served by table
FileMetaData() : refs(0), file_size(0) { } FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0) { }
}; };
class VersionEdit { class VersionEdit {

@ -75,6 +75,37 @@ Version::~Version() {
} }
} }
int FindFile(const InternalKeyComparator& icmp,
const std::vector<FileMetaData*>& files,
const Slice& key) {
uint32_t left = 0;
uint32_t right = files.size();
while (left < right) {
uint32_t mid = (left + right) / 2;
const FileMetaData* f = files[mid];
if (icmp.InternalKeyComparator::Compare(f->largest.Encode(), key) < 0) {
// Key at "mid.largest" is < "target". Therefore all
// files at or before "mid" are uninteresting.
left = mid + 1;
} else {
// Key at "mid.largest" is >= "target". Therefore all files
// after "mid" are uninteresting.
right = mid;
}
}
return right;
}
bool SomeFileOverlapsRange(
const InternalKeyComparator& icmp,
const std::vector<FileMetaData*>& files,
const InternalKey& smallest,
const InternalKey& largest) {
const int index = FindFile(icmp, files, smallest.Encode());
return ((index < files.size()) &&
icmp.Compare(largest, files[index]->smallest) >= 0);
}
// An internal iterator. For a given version/level pair, yields // An internal iterator. For a given version/level pair, yields
// information about the files in the level. For a given entry, key() // information about the files in the level. For a given entry, key()
// is the largest key that occurs in the file, and value() is an // is the largest key that occurs in the file, and value() is an
@ -92,22 +123,7 @@ class Version::LevelFileNumIterator : public Iterator {
return index_ < flist_->size(); return index_ < flist_->size();
} }
virtual void Seek(const Slice& target) { virtual void Seek(const Slice& target) {
uint32_t left = 0; index_ = FindFile(icmp_, *flist_, target);
uint32_t right = flist_->size() - 1;
while (left < right) {
uint32_t mid = (left + right) / 2;
int cmp = icmp_.Compare((*flist_)[mid]->largest.Encode(), target);
if (cmp < 0) {
// Key at "mid.largest" is < than "target". Therefore all
// files at or before "mid" are uninteresting.
left = mid + 1;
} else {
// Key at "mid.largest" is >= "target". Therefore all files
// after "mid" are uninteresting.
right = mid;
}
}
index_ = left;
} }
virtual void SeekToFirst() { index_ = 0; } virtual void SeekToFirst() { index_ = 0; }
virtual void SeekToLast() { virtual void SeekToLast() {
@ -185,6 +201,144 @@ void Version::AddIterators(const ReadOptions& options,
} }
} }
// If "*iter" points at a value or deletion for user_key, store
// either the value, or a NotFound error and return true.
// Else return false.
static bool GetValue(Iterator* iter, const Slice& user_key,
std::string* value,
Status* s) {
if (!iter->Valid()) {
return false;
}
ParsedInternalKey parsed_key;
if (!ParseInternalKey(iter->key(), &parsed_key)) {
*s = Status::Corruption("corrupted key for ", user_key);
return true;
}
if (parsed_key.user_key != user_key) {
return false;
}
switch (parsed_key.type) {
case kTypeDeletion:
*s = Status::NotFound(Slice()); // Use an empty error message for speed
break;
case kTypeValue: {
Slice v = iter->value();
value->assign(v.data(), v.size());
break;
}
}
return true;
}
static bool NewestFirst(FileMetaData* a, FileMetaData* b) {
return a->number > b->number;
}
Status Version::Get(const ReadOptions& options,
const LookupKey& k,
std::string* value,
GetStats* stats) {
Slice ikey = k.internal_key();
Slice user_key = k.user_key();
const Comparator* ucmp = vset_->icmp_.user_comparator();
Status s;
stats->seek_file = NULL;
stats->seek_file_level = -1;
FileMetaData* last_file_read = NULL;
// We can search level-by-level since entries never hop across
// levels. Therefore we are guaranteed that if we find data
// in an smaller level, later levels are irrelevant.
std::vector<FileMetaData*> tmp;
FileMetaData* tmp2;
for (int level = 0; level < config::kNumLevels; level++) {
size_t num_files = files_[level].size();
if (num_files == 0) continue;
// Get the list of files to search in this level
FileMetaData* const* files = &files_[level][0];
if (level == 0) {
// Level-0 files may overlap each other. Find all files that
// overlap user_key and process them in order from newest to oldest.
tmp.reserve(num_files);
for (int i = 0; i < num_files; i++) {
FileMetaData* f = files[i];
if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
ucmp->Compare(user_key, f->largest.user_key()) <= 0) {
tmp.push_back(f);
}
}
if (tmp.empty()) continue;
std::sort(tmp.begin(), tmp.end(), NewestFirst);
files = &tmp[0];
num_files = tmp.size();
} else {
// Binary search to find earliest index whose largest key >= ikey.
uint32_t index = FindFile(vset_->icmp_, files_[level], ikey);
if (index >= num_files) {
files = NULL;
num_files = 0;
} else {
tmp2 = files[index];
if (ucmp->Compare(user_key, tmp2->smallest.user_key()) < 0) {
// All of "tmp2" is past any data for user_key
files = NULL;
num_files = 0;
} else {
files = &tmp2;
num_files = 1;
}
}
}
for (int i = 0; i < num_files; ++i) {
if (last_file_read != NULL && stats->seek_file == NULL) {
// We have had more than one seek for this read. Charge the 1st file.
stats->seek_file = last_file_read;
stats->seek_file_level = (i == 0 ? level - 1 : level);
}
FileMetaData* f = files[i];
last_file_read = f;
Iterator* iter = vset_->table_cache_->NewIterator(
options,
f->number,
f->file_size);
iter->Seek(ikey);
const bool done = GetValue(iter, user_key, value, &s);
if (!iter->status().ok()) {
s = iter->status();
delete iter;
return s;
} else {
delete iter;
if (done) {
return s;
}
}
}
}
return Status::NotFound(Slice()); // Use an empty error message for speed
}
bool Version::UpdateStats(const GetStats& stats) {
FileMetaData* f = stats.seek_file;
if (f != NULL) {
f->allowed_seeks--;
if (f->allowed_seeks <= 0 && file_to_compact_ == NULL) {
file_to_compact_ = f;
file_to_compact_level_ = stats.seek_file_level;
return true;
}
}
return false;
}
void Version::Ref() { void Version::Ref() {
++refs_; ++refs_;
} }
@ -198,13 +352,22 @@ void Version::Unref() {
} }
} }
bool Version::OverlapInLevel(int level,
const InternalKey& smallest,
const InternalKey& largest) {
return SomeFileOverlapsRange(vset_->icmp_, files_[level], smallest, largest);
}
std::string Version::DebugString() const { std::string Version::DebugString() const {
std::string r; std::string r;
for (int level = 0; level < config::kNumLevels; level++) { for (int level = 0; level < config::kNumLevels; level++) {
// E.g., level 1: 17:123['a' .. 'd'] 20:43['e' .. 'g'] // E.g.,
r.append("level "); // --- level 1 ---
// 17:123['a' .. 'd']
// 20:43['e' .. 'g']
r.append("--- level ");
AppendNumberTo(&r, level); AppendNumberTo(&r, level);
r.push_back(':'); r.append(" ---\n");
const std::vector<FileMetaData*>& files = files_[level]; const std::vector<FileMetaData*>& files = files_[level];
for (size_t i = 0; i < files.size(); i++) { for (size_t i = 0; i < files.size(); i++) {
r.push_back(' '); r.push_back(' ');
@ -215,9 +378,8 @@ std::string Version::DebugString() const {
AppendEscapedStringTo(&r, files[i]->smallest.Encode()); AppendEscapedStringTo(&r, files[i]->smallest.Encode());
r.append("' .. '"); r.append("' .. '");
AppendEscapedStringTo(&r, files[i]->largest.Encode()); AppendEscapedStringTo(&r, files[i]->largest.Encode());
r.append("']"); r.append("']\n");
} }
r.push_back('\n');
} }
return r; return r;
} }
@ -305,6 +467,23 @@ class VersionSet::Builder {
const int level = edit->new_files_[i].first; const int level = edit->new_files_[i].first;
FileMetaData* f = new FileMetaData(edit->new_files_[i].second); FileMetaData* f = new FileMetaData(edit->new_files_[i].second);
f->refs = 1; f->refs = 1;
// We arrange to automatically compact this file after
// a certain number of seeks. Let's assume:
// (1) One seek costs 10ms
// (2) Writing or reading 1MB costs 10ms (100MB/s)
// (3) A compaction of 1MB does 25MB of IO:
// 1MB read from this level
// 10-12MB read from next level (boundaries may be misaligned)
// 10-12MB written to next level
// This implies that 25 seeks cost the same as the compaction
// of 1MB of data. I.e., one seek costs approximately the
// same as the compaction of 40KB of data. We are a little
// conservative and allow approximately one seek for every 16KB
// of data before triggering a compaction.
f->allowed_seeks = (f->file_size / 16384);
if (f->allowed_seeks < 100) f->allowed_seeks = 100;
levels_[level].deleted_files.erase(f->number); levels_[level].deleted_files.erase(f->number);
levels_[level].added_files->insert(f); levels_[level].added_files->insert(f);
} }
@ -363,8 +542,14 @@ class VersionSet::Builder {
if (levels_[level].deleted_files.count(f->number) > 0) { if (levels_[level].deleted_files.count(f->number) > 0) {
// File is deleted: do nothing // File is deleted: do nothing
} else { } else {
std::vector<FileMetaData*>* files = &v->files_[level];
if (level > 0 && !files->empty()) {
// Must not overlap
assert(vset_->icmp_.Compare((*files)[files->size()-1]->largest,
f->smallest) < 0);
}
f->refs++; f->refs++;
v->files_[level].push_back(f); files->push_back(f);
} }
} }
}; };
@ -749,7 +934,7 @@ int64_t VersionSet::NumLevelBytes(int level) const {
int64_t VersionSet::MaxNextLevelOverlappingBytes() { int64_t VersionSet::MaxNextLevelOverlappingBytes() {
int64_t result = 0; int64_t result = 0;
std::vector<FileMetaData*> overlaps; std::vector<FileMetaData*> overlaps;
for (int level = 0; level < config::kNumLevels - 1; level++) { for (int level = 1; level < config::kNumLevels - 1; level++) {
for (size_t i = 0; i < current_->files_[level].size(); i++) { for (size_t i = 0; i < current_->files_[level].size(); i++) {
const FileMetaData* f = current_->files_[level][i]; const FileMetaData* f = current_->files_[level][i];
GetOverlappingInputs(level+1, f->smallest, f->largest, &overlaps); GetOverlappingInputs(level+1, f->smallest, f->largest, &overlaps);
@ -854,31 +1039,43 @@ Iterator* VersionSet::MakeInputIterator(Compaction* c) {
} }
Compaction* VersionSet::PickCompaction() { Compaction* VersionSet::PickCompaction() {
if (!NeedsCompaction()) { Compaction* c;
int level;
// We prefer compactions triggered by too much data in a level over
// the compactions triggered by seeks.
const bool size_compaction = (current_->compaction_score_ >= 1);
const bool seek_compaction = (current_->file_to_compact_ != NULL);
if (size_compaction) {
level = current_->compaction_level_;
assert(level >= 0);
assert(level+1 < config::kNumLevels);
c = new Compaction(level);
// Pick the first file that comes after compact_pointer_[level]
for (size_t i = 0; i < current_->files_[level].size(); i++) {
FileMetaData* f = current_->files_[level][i];
if (compact_pointer_[level].empty() ||
icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {
c->inputs_[0].push_back(f);
break;
}
}
if (c->inputs_[0].empty()) {
// Wrap-around to the beginning of the key space
c->inputs_[0].push_back(current_->files_[level][0]);
}
} else if (seek_compaction) {
level = current_->file_to_compact_level_;
c = new Compaction(level);
c->inputs_[0].push_back(current_->file_to_compact_);
} else {
return NULL; return NULL;
} }
const int level = current_->compaction_level_;
assert(level >= 0);
assert(level+1 < config::kNumLevels);
Compaction* c = new Compaction(level);
c->input_version_ = current_; c->input_version_ = current_;
c->input_version_->Ref(); c->input_version_->Ref();
// Pick the first file that comes after compact_pointer_[level]
for (size_t i = 0; i < current_->files_[level].size(); i++) {
FileMetaData* f = current_->files_[level][i];
if (compact_pointer_[level].empty() ||
icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) {
c->inputs_[0].push_back(f);
break;
}
}
if (c->inputs_[0].empty()) {
// Wrap-around to the beginning of the key space
c->inputs_[0].push_back(current_->files_[level][0]);
}
// Files in level 0 may overlap each other, so pick up all overlapping ones // Files in level 0 may overlap each other, so pick up all overlapping ones
if (level == 0) { if (level == 0) {
InternalKey smallest, largest; InternalKey smallest, largest;

@ -35,6 +35,21 @@ class Version;
class VersionSet; class VersionSet;
class WritableFile; class WritableFile;
// Return the smallest index i such that files[i]->largest >= key.
// Return files.size() if there is no such file.
// REQUIRES: "files" contains a sorted list of non-overlapping files.
extern int FindFile(const InternalKeyComparator& icmp,
const std::vector<FileMetaData*>& files,
const Slice& key);
// Returns true iff some file in "files" overlaps some part of
// [smallest,largest].
extern bool SomeFileOverlapsRange(
const InternalKeyComparator& icmp,
const std::vector<FileMetaData*>& files,
const InternalKey& smallest,
const InternalKey& largest);
class Version { class Version {
public: public:
// Append to *iters a sequence of iterators that will // Append to *iters a sequence of iterators that will
@ -42,11 +57,34 @@ class Version {
// REQUIRES: This version has been saved (see VersionSet::SaveTo) // REQUIRES: This version has been saved (see VersionSet::SaveTo)
void AddIterators(const ReadOptions&, std::vector<Iterator*>* iters); void AddIterators(const ReadOptions&, std::vector<Iterator*>* iters);
// Lookup the value for key. If found, store it in *val and
// return OK. Else return a non-OK status. Fills *stats.
// REQUIRES: lock is not held
struct GetStats {
FileMetaData* seek_file;
int seek_file_level;
};
Status Get(const ReadOptions&, const LookupKey& key, std::string* val,
GetStats* stats);
// Adds "stats" into the current state. Returns true if a new
// compaction may need to be triggered, false otherwise.
// REQUIRES: lock is held
bool UpdateStats(const GetStats& stats);
// Reference count management (so Versions do not disappear out from // Reference count management (so Versions do not disappear out from
// under live iterators) // under live iterators)
void Ref(); void Ref();
void Unref(); void Unref();
// Returns true iff some file in the specified level overlaps
// some part of [smallest,largest].
bool OverlapInLevel(int level,
const InternalKey& smallest,
const InternalKey& largest);
int NumFiles(int level) const { return files_[level].size(); }
// Return a human readable string that describes this version's contents. // Return a human readable string that describes this version's contents.
std::string DebugString() const; std::string DebugString() const;
@ -65,6 +103,10 @@ class Version {
// List of files per level // List of files per level
std::vector<FileMetaData*> files_[config::kNumLevels]; std::vector<FileMetaData*> files_[config::kNumLevels];
// Next file to compact based on seek stats.
FileMetaData* file_to_compact_;
int file_to_compact_level_;
// Level that should be compacted next and its compaction score. // Level that should be compacted next and its compaction score.
// Score < 1 means compaction is not strictly needed. These fields // Score < 1 means compaction is not strictly needed. These fields
// are initialized by Finalize(). // are initialized by Finalize().
@ -73,6 +115,8 @@ class Version {
explicit Version(VersionSet* vset) explicit Version(VersionSet* vset)
: vset_(vset), next_(this), prev_(this), refs_(0), : vset_(vset), next_(this), prev_(this), refs_(0),
file_to_compact_(NULL),
file_to_compact_level_(-1),
compaction_score_(-1), compaction_score_(-1),
compaction_level_(-1) { compaction_level_(-1) {
} }
@ -158,7 +202,10 @@ class VersionSet {
Iterator* MakeInputIterator(Compaction* c); Iterator* MakeInputIterator(Compaction* c);
// Returns true iff some level needs a compaction. // Returns true iff some level needs a compaction.
bool NeedsCompaction() const { return current_->compaction_score_ >= 1; } bool NeedsCompaction() const {
Version* v = current_;
return (v->compaction_score_ >= 1) || (v->file_to_compact_ != NULL);
}
// Add all files listed in any live version to *live. // Add all files listed in any live version to *live.
// May also mutate some internal state. // May also mutate some internal state.

@ -9,6 +9,9 @@
#include <endian.h> #include <endian.h>
#include <pthread.h> #include <pthread.h>
#ifdef SNAPPY
#include <snappy.h>
#endif
#include <stdint.h> #include <stdint.h>
#include <string> #include <string>
#include <cstdatomic> #include <cstdatomic>
@ -72,15 +75,30 @@ class AtomicPointer {
} }
}; };
// TODO(gabor): Implement actual compress
inline bool Snappy_Compress(const char* input, size_t input_length, inline bool Snappy_Compress(const char* input, size_t input_length,
std::string* output) { std::string* output) {
#ifdef SNAPPY
output->resize(snappy::MaxCompressedLength(input_length));
size_t outlen;
snappy::RawCompress(input, input_length, &(*output)[0], &outlen);
output->resize(outlen);
return true;
#endif
return false; return false;
} }
// TODO(gabor): Implement actual uncompress
inline bool Snappy_Uncompress(const char* input_data, size_t input_length, inline bool Snappy_Uncompress(const char* input_data, size_t input_length,
std::string* output) { std::string* output) {
#ifdef SNAPPY
size_t ulength;
if (!snappy::GetUncompressedLength(input_data, ulength, &ulength)) {
return false;
}
output->resize(ulength);
return snappy::RawUncompress(input_data, input_length, &(*output)[0]);
#endif
return false; return false;
} }

@ -727,11 +727,15 @@ TEST(Harness, RandomizedLongDB) {
Test(&rnd); Test(&rnd);
// We must have created enough data to force merging // We must have created enough data to force merging
std::string l0_files, l1_files; int files = 0;
ASSERT_TRUE(db()->GetProperty("leveldb.num-files-at-level0", &l0_files)); for (int level = 0; level < config::kNumLevels; level++) {
ASSERT_TRUE(db()->GetProperty("leveldb.num-files-at-level1", &l1_files)); std::string value;
ASSERT_GT(atoi(l0_files.c_str()) + atoi(l1_files.c_str()), 0); char name[100];
snprintf(name, sizeof(name), "leveldb.num-files-at-level%d", level);
ASSERT_TRUE(db()->GetProperty(name, &value));
files += atoi(value.c_str());
}
ASSERT_GT(files, 0);
} }
class MemTableTest { }; class MemTableTest { };

Loading…
Cancel
Save